Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
fetch: only send changed wants and haves
Merged fintohaps opened 2 years ago

This change limits the amount of wants and haves data that is sent to the serving side of a fetch.

When a RefsAt announcement is sent, the fetching peer can calculate the difference between the rad/sigrefs they’re aware of – if it exists – and the newly advertised rad/sigrefs. This means they can efficiently ask for the wants and haves of references that have changed – saving some data sent over to the serving side.

10 files changed +416 -65 0f0f9ff9 3ad2b443
modified radicle-fetch/src/lib.rs
@@ -20,7 +20,7 @@ pub use state::{FetchLimit, FetchResult};
pub use transport::Transport;

use radicle::crypto::PublicKey;
-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::SignedRefsUpdate;
use radicle::storage::ReadRepository as _;
use state::FetchState;
use thiserror::Error;
@@ -54,7 +54,7 @@ pub fn pull<S>(
    handle: &mut Handle<S>,
    limit: FetchLimit,
    remote: PublicKey,
-
    refs_at: Option<Vec<RefsAt>>,
+
    refs_at: Option<Vec<SignedRefsUpdate>>,
) -> Result<FetchResult, Error>
where
    S: transport::ConnectionStream,
modified radicle-fetch/src/sigrefs.rs
@@ -1,7 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
-
use std::ops::{Deref, Not as _};
+
use std::ops::{Deref, DerefMut, Not as _};

-
pub use radicle::storage::refs::SignedRefsAt;
+
use radicle::storage::refs;
+
pub use radicle::storage::refs::{DiffedRefs, SignedRefsAt};
pub use radicle::storage::{git::Validation, Validations};
use radicle::{crypto::PublicKey, storage::ValidateRepository};

@@ -150,6 +151,67 @@ impl<'a> IntoIterator for &'a RemoteRefs {
    }
}

+
/// A set of [`DiffedRefs`] per remote `PublicKey`.
+
///
+
/// To construct use [`DiffedRefs::load`].
+
#[derive(Clone, Debug, Default)]
+
pub struct RemoteDiffedRefs(BTreeMap<PublicKey, DiffedRefs>);
+

+
impl RemoteDiffedRefs {
+
    /// Given a set of [`refs::RefsUpdate`]s, compute its
+
    /// [`DiffedRefs`] and use its [`refs::RefsUpdate::remote`] as the
+
    /// key for the `RemoteDiffedRefs` entry.
+
    ///
+
    /// If the `remote` is in the `may` set, then it is allowed to
+
    /// fail and will not be inserted in the set iff it does fail to
+
    /// load.
+
    ///
+
    /// If the `remote` is in the `must` set, then this method will
+
    /// fail iff loading the `DiffedRefs` fails.
+
    pub(crate) fn load<S>(
+
        cached: &Cached<S>,
+
        updates: Vec<refs::SignedRefsUpdate>,
+
        Select { must, may }: Select,
+
    ) -> Result<Self, error::Load> {
+
        updates
+
            .into_iter()
+
            .try_fold(Self::default(), |mut refs, update| {
+
                match cached.load_diffed_refs(&update) {
+
                    Ok(diff) => {
+
                        refs.insert(update.remote, diff);
+
                        Ok(refs)
+
                    }
+
                    Err(e) if must.contains(&update.remote) => Err(e),
+
                    Err(_) if may.contains(&update.remote) => Ok(refs),
+
                    Err(e) => Err(e),
+
                }
+
            })
+
    }
+
}
+

+
impl Deref for RemoteDiffedRefs {
+
    type Target = BTreeMap<PublicKey, DiffedRefs>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl DerefMut for RemoteDiffedRefs {
+
    fn deref_mut(&mut self) -> &mut Self::Target {
+
        &mut self.0
+
    }
+
}
+

+
impl<'a> IntoIterator for &'a RemoteDiffedRefs {
+
    type Item = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::Item;
+
    type IntoIter = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::IntoIter;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.0.iter()
+
    }
+
}
+

pub struct Select<'a> {
    pub must: &'a BTreeSet<PublicKey>,
    pub may: &'a BTreeSet<PublicKey>,
modified radicle-fetch/src/stage.rs
@@ -15,17 +15,18 @@
//!      `rad/sigrefs`, for each configured namespace, i.e. followed
//!      and delegate peers if the scope is "followed" and all peers is the
//!      scope is all.
-
//!   3. [`DataRefs`]: fetches the `Oid`s for each reference listed in
-
//!      the `rad/sigrefs` for each fetched peer in the
-
//!      [`SpecialRefs`] stage. Additionally, any references that have
-
//!      been removed from `rad/sigrefs` are marked for deletion.
+
//!   3. [`DataRefs`]/[`DiffedRefs`]: fetches the `Oid`s for each
+
//!      reference listed in the `rad/sigrefs` for each fetched peer
+
//!      in the [`SpecialRefs`] stage. Additionally, any references
+
//!      that have been removed from `rad/sigrefs` are marked for
+
//!      deletion.
//!
//! ### Pull
//!
//! A `pull` is split into two stages:
//!
//!   1. [`SpecialRefs`]: see above.
-
//!   2. [`DataRefs`]: see above.
+
//!   2. [`DataRefs`]/[`DiffedRefs`]: see above.

use std::collections::{BTreeMap, BTreeSet, HashSet};

@@ -380,7 +381,7 @@ impl ProtocolStage for SigrefsAt {
}

/// The [`ProtocolStage`] for fetching data refs from the set of
-
/// remotes in `trusted`.
+
/// `remotes`.
///
/// All refs that are listed in the `remotes` sigrefs are checked
/// against our refdb/odb to build a set of `wants` and `haves`. The
@@ -493,6 +494,112 @@ impl ProtocolStage for DataRefs {
    }
}

+
/// The [`ProtocolStage`] that is similar to [`DataRefs`], however it
+
/// is aware that it is an update of `rad/sigrefs`. This means that it
+
/// will only compute `wants` and `haves` based on any modified
+
/// `Oid`s.
+
///
+
/// All refs and objects are prepared for updating as per usual, since
+
/// we keep track of in-memory references for validation.
+
#[derive(Debug)]
+
pub struct DiffedRefs {
+
    /// The node that is being fetched from.
+
    pub remote: PublicKey,
+
    /// The set of signed references from each remote that was
+
    /// fetched.
+
    pub remotes: sigrefs::RemoteDiffedRefs,
+
    /// The data limit for this stage of fetching.
+
    pub limit: u64,
+
}
+

+
impl ProtocolStage for DiffedRefs {
+
    fn ls_refs(&self) -> Option<NonEmpty<BString>> {
+
        None
+
    }
+

+
    fn ref_filter(&self, _r: Ref) -> Option<ReceivedRef> {
+
        None
+
    }
+

+
    fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
+
        Ok(())
+
    }
+

+
    fn wants_haves(
+
        &self,
+
        refdb: &Repository,
+
        _refs: &[ReceivedRef],
+
    ) -> Result<WantsHaves, error::WantsHaves> {
+
        let mut wants_haves = WantsHaves::default();
+

+
        for (remote, refs) in &self.remotes {
+
            wants_haves.add(
+
                refdb,
+
                refs.iter().filter_map(|(refname, up)| {
+
                    let refname = Qualified::from_refstr(refname)
+
                        .map(|refname| refname.with_namespace(Component::from(remote)))?;
+
                    let tip = up.modified()?;
+
                    Some((refname, *tip))
+
                }),
+
            )?;
+
        }
+

+
        Ok(wants_haves)
+
    }
+

+
    fn prepare_updates<'a>(
+
        &self,
+
        _s: &FetchState,
+
        _repo: &Repository,
+
        _refs: &'a [ReceivedRef],
+
    ) -> Result<Updates<'a>, error::Prepare> {
+
        use radicle::storage::refs::Update::{Added, Changed, Deleted, Same};
+

+
        let mut updates = Updates::default();
+
        let prefix_rad = refname!("refs/rad");
+

+
        for (remote, refs) in &self.remotes {
+
            for (name, up) in refs.iter() {
+
                let is_refs_rad = name.starts_with(prefix_rad.as_str());
+
                let tracking: Namespaced<'_> = Qualified::from_refstr(name)
+
                    .and_then(|q| refs::ReceivedRefname::remote(*remote, q).to_namespaced())
+
                    .expect("we checked sigrefs well-formedness in wants_refs already");
+
                match up {
+
                    Added { oid } | Changed { oid } => updates.add(
+
                        *remote,
+
                        Update::Direct {
+
                            name: tracking,
+
                            target: *oid,
+
                            no_ff: Policy::Allow,
+
                        },
+
                    ),
+
                    Deleted { oid } if !is_refs_rad => updates.add(
+
                        *remote,
+
                        Update::Prune {
+
                            name: tracking,
+
                            prev: either::Left(*oid),
+
                        },
+
                    ),
+
                    // N.b. create an update for this reference so
+
                    // that the in-memory refdb is updated.
+
                    Same { oid } => updates.add(
+
                        *remote,
+
                        Update::Direct {
+
                            name: tracking,
+
                            target: *oid,
+
                            no_ff: Policy::Allow,
+
                        },
+
                    ),
+
                    // N.b. `refs/rad` is not subject to pruning.
+
                    Deleted { .. } => continue,
+
                }
+
            }
+
        }
+

+
        Ok(updates)
+
    }
+
}
+

// N.b. the `delegates` are the delegates of the repository, with the
// potential removal of the local peer in the case of a `pull`.
fn special_refs_updates<'a>(
modified radicle-fetch/src/state.rs
@@ -8,7 +8,7 @@ use radicle::identity::{Doc, DocError};

use radicle::prelude::Verified;
use radicle::storage;
-
use radicle::storage::refs::{RefsAt, SignedRefs};
+
use radicle::storage::refs::{RefsAt, SignedRefs, SignedRefsUpdate};
use radicle::storage::{
    git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
};
@@ -273,18 +273,23 @@ impl FetchState {
        Ok(fetched)
    }

-
    /// Fetch the set of special refs, depending on `refs_at`.
+
    /// Fetch the set of special refs, depending on `refs_updates`.
    ///
-
    /// If `refs_at` is `Some`, then run the [`SigrefsAt`] stage,
+
    /// If `refs_updates` is `Some`, then run the [`SigrefsAt`] stage,
    /// which specifically fetches `rad/sigrefs` which are listed in
-
    /// `refs_at`.
+
    /// `refs_at`. It then runs the [`DiffedRefs`] stage to fetch the
+
    /// difference between old references listed existing
+
    /// `rad/sigrefs` and the new `rad/sigrefs` fetched in the
+
    /// previous step.
    ///
-
    /// If `refs_at` is `None`, then run the [`SpecialRefs`] stage,
-
    /// which fetches `rad/sigrefs` and `rad/id` from all tracked and
-
    /// delegate peers (scope dependent).
+
    /// If `refs_updates` is `None`, then run the [`SpecialRefs`]
+
    /// stage, which fetches `rad/sigrefs` and `rad/id` from all
+
    /// tracked and delegate peers (scope dependent). It then runs the
+
    /// [`DataRefs`] stage to fetch the references listed in the
+
    /// `rad/sigrefs` fetched in the previous step.
    ///
-
    /// The resulting [`sigrefs::RemoteRefs`] will be the set of
-
    /// `rad/sigrefs` of the fetched remotes.
+
    /// The resulting `Vec<PublicKey>` will be the set of fetched
+
    /// remotes.
    fn run_special_refs<S>(
        &mut self,
        handle: &mut Handle<S>,
@@ -292,18 +297,19 @@ impl FetchState {
        delegates: BTreeSet<PublicKey>,
        limit: &FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<RefsAt>>,
-
    ) -> Result<sigrefs::RemoteRefs, error::Protocol>
+
        refs_updates: Option<Vec<SignedRefsUpdate>>,
+
    ) -> Result<Vec<PublicKey>, error::Protocol>
    where
        S: transport::ConnectionStream,
    {
-
        match refs_at {
-
            Some(refs_at) => {
-
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_at
+
        match refs_updates {
+
            Some(refs_updates) => {
+
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_updates
                    .iter()
                    .map(|refs_at| refs_at.remote)
                    .partition(|id| delegates.contains(id));

+
                let refs_at = refs_updates.iter().map(RefsAt::from).collect();
                let sigrefs_at = stage::SigrefsAt {
                    remote,
                    delegates,
@@ -314,14 +320,22 @@ impl FetchState {
                log::trace!(target: "fetch", "{sigrefs_at:?}");
                self.run_stage(handle, handshake, &sigrefs_at)?;

-
                let signed_refs = sigrefs::RemoteRefs::load(
+
                let remotes = sigrefs::RemoteDiffedRefs::load(
                    &self.as_cached(handle),
+
                    refs_updates,
                    sigrefs::Select {
                        must: &must,
                        may: &may,
                    },
                )?;
-
                Ok(signed_refs)
+
                let fetched = remotes.keys().copied().collect();
+
                let refs = stage::DiffedRefs {
+
                    remote,
+
                    remotes,
+
                    limit: limit.refs,
+
                };
+
                self.run_stage(handle, handshake, &refs)?;
+
                Ok(fetched)
            }
            None => {
                let followed = handle.allowed();
@@ -336,7 +350,7 @@ impl FetchState {
                log::trace!(target: "fetch", "{special_refs:?}");
                let fetched = self.run_stage(handle, handshake, &special_refs)?;

-
                let signed_refs = sigrefs::RemoteRefs::load(
+
                let remotes = sigrefs::RemoteRefs::load(
                    &self.as_cached(handle),
                    sigrefs::Select {
                        must: &delegates,
@@ -347,7 +361,14 @@ impl FetchState {
                            .collect(),
                    },
                )?;
-
                Ok(signed_refs)
+
                let fetched = remotes.keys().copied().collect();
+
                let refs = stage::DataRefs {
+
                    remote,
+
                    remotes,
+
                    limit: limit.refs,
+
                };
+
                self.run_stage(handle, handshake, &refs)?;
+
                Ok(fetched)
            }
        }
    }
@@ -374,7 +395,7 @@ impl FetchState {
        handshake: &handshake::Outcome,
        limit: FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<RefsAt>>,
+
        refs_at: Option<Vec<SignedRefsUpdate>>,
    ) -> Result<FetchResult, error::Protocol>
    where
        S: transport::ConnectionStream,
@@ -413,7 +434,7 @@ impl FetchState {

        log::trace!(target: "fetch", "Identity delegates {delegates:?}");

-
        let signed_refs = self.run_special_refs(
+
        let fetched = self.run_special_refs(
            handle,
            handshake,
            delegates.clone(),
@@ -423,21 +444,8 @@ impl FetchState {
        )?;
        log::debug!(
            target: "fetch",
-
            "Fetched rad/sigrefs for {} remotes ({}ms)",
-
            signed_refs.len(),
-
            start.elapsed().as_millis()
-
        );
-

-
        let data_refs = stage::DataRefs {
-
            remote,
-
            remotes: signed_refs,
-
            limit: limit.refs,
-
        };
-
        self.run_stage(handle, handshake, &data_refs)?;
-
        log::debug!(
-
            target: "fetch",
            "Fetched data refs for {} remotes ({}ms)",
-
            data_refs.remotes.len(),
+
            fetched.len(),
            start.elapsed().as_millis()
        );

@@ -459,7 +467,6 @@ impl FetchState {
        // added to `warnings`.
        let mut warnings = sigrefs::Validations::default();
        let mut failures = sigrefs::Validations::default();
-
        let signed_refs = data_refs.remotes;

        // We may prune fetched remotes, so we keep track of
        // non-pruned, fetched remotes here.
@@ -467,7 +474,7 @@ impl FetchState {

        // TODO(finto): this might read better if it got its own
        // private function.
-
        for remote in signed_refs.keys() {
+
        for remote in &fetched {
            if handle.is_blocked(remote) {
                continue;
            }
@@ -637,6 +644,13 @@ impl<'a, S> Cached<'a, S> {
        }
    }

+
    pub fn load_diffed_refs(
+
        &self,
+
        update: &SignedRefsUpdate,
+
    ) -> Result<storage::refs::DiffedRefs, sigrefs::error::Load> {
+
        update.difference(&self.handle.repo)
+
    }
+

    #[allow(dead_code)]
    pub(crate) fn inspect(&self) {
        self.state.refs.inspect()
modified radicle-node/src/service.rs
@@ -30,6 +30,7 @@ use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{ConnectOptions, Penalty, Severity};
+
use radicle::storage::refs::SignedRefsUpdate;
use radicle::storage::RepositoryError;

use crate::crypto;
@@ -250,7 +251,7 @@ struct FetchState {
    /// Node we're fetching from.
    from: NodeId,
    /// What refs we're fetching.
-
    refs_at: Vec<RefsAt>,
+
    refs_at: Vec<SignedRefsUpdate>,
    /// Channels waiting for fetch results.
    subscribers: Vec<chan::Sender<FetchResult>>,
}
@@ -272,7 +273,7 @@ struct QueuedFetch {
    /// Peer being fetched from.
    from: NodeId,
    /// Refs being fetched.
-
    refs_at: Vec<RefsAt>,
+
    refs_at: Vec<SignedRefsUpdate>,
    /// Result channel.
    channel: Option<chan::Sender<FetchResult>>,
}
@@ -806,7 +807,7 @@ where
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs: NonEmpty<RefsAt>,
+
        refs: NonEmpty<SignedRefsUpdate>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -828,7 +829,7 @@ where
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs_at: Vec<RefsAt>,
+
        refs_at: Vec<SignedRefsUpdate>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -883,7 +884,7 @@ where
        &mut self,
        rid: RepoId,
        from: &NodeId,
-
        refs_at: Vec<RefsAt>,
+
        refs_at: Vec<SignedRefsUpdate>,
        timeout: time::Duration,
    ) -> Result<&mut FetchState, TryFetchError> {
        let from = *from;
@@ -1043,6 +1044,7 @@ where
                .seed_policy(&rid)
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

+
            let refs_at = refs_at.iter().map(RefsAt::from).collect();
            match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
                Ok(status) => {
                    if let Some(refs) = NonEmpty::from_vec(status.fresh) {
modified radicle-node/src/service/io.rs
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::time;

use log::*;
-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::SignedRefsUpdate;

use crate::prelude::*;
use crate::service::session::Session;
@@ -30,7 +30,7 @@ pub enum Io {
        /// Namespaces being fetched.
        namespaces: Namespaces,
        /// If the node is fetching specific `rad/sigrefs`.
-
        refs_at: Option<Vec<RefsAt>>,
+
        refs_at: Option<Vec<SignedRefsUpdate>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -115,7 +115,7 @@ impl Outbox {
        remote: &mut Session,
        rid: RepoId,
        namespaces: Namespaces,
-
        refs_at: Vec<RefsAt>,
+
        refs_at: Vec<SignedRefsUpdate>,
        timeout: time::Duration,
    ) {
        remote.fetching(rid);
modified radicle-node/src/service/message.rs
@@ -1,7 +1,7 @@
use std::{fmt, io, mem};

use radicle::git;
-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::{RefsAt, SignedRefsUpdate};
use radicle::storage::ReadRepository;

use crate::crypto;
@@ -170,7 +170,7 @@ pub struct RefsAnnouncement {
pub struct RefsStatus {
    /// The `rad/sigrefs` was missing or it's ahead of the local
    /// `rad/sigrefs`.
-
    pub fresh: Vec<RefsAt>,
+
    pub fresh: Vec<SignedRefsUpdate>,
    /// The `rad/sigrefs` has been seen before.
    pub stale: Vec<RefsAt>,
}
@@ -188,10 +188,18 @@ impl RefsStatus {
            // announcement "fresh", since we obviously don't
            // have the refs.
            Err(e) if e.is_not_found() => {
+
                let fresh = refs
+
                    .into_iter()
+
                    .map(|RefsAt { remote, at }| SignedRefsUpdate {
+
                        remote,
+
                        old: None,
+
                        new: at,
+
                    })
+
                    .collect();
                return Ok(RefsStatus {
-
                    fresh: refs.clone(),
+
                    fresh,
                    stale: Vec::new(),
-
                })
+
                });
            }
            Err(e) => return Err(e),
            Ok(r) => r,
@@ -212,12 +220,20 @@ impl RefsStatus {
        match RefsAt::new(repo, theirs.remote) {
            Ok(ours) => {
                if Self::is_fresh(repo, theirs.at, ours.at)? {
-
                    self.fresh.push(theirs);
+
                    self.fresh.push(SignedRefsUpdate {
+
                        remote: theirs.remote,
+
                        old: Some(ours.at),
+
                        new: theirs.at,
+
                    });
                } else {
                    self.stale.push(theirs);
                }
            }
-
            Err(e) if git::is_not_found_err(&e) => self.fresh.push(theirs),
+
            Err(e) if git::is_not_found_err(&e) => self.fresh.push(SignedRefsUpdate {
+
                remote: theirs.remote,
+
                old: None,
+
                new: theirs.at,
+
            }),
            Err(e) => {
                log::warn!(
                    target: "service",
modified radicle-node/src/worker.rs
@@ -13,7 +13,7 @@ use crossbeam_channel as chan;
use radicle::identity::RepoId;
use radicle::node::notifications;
use radicle::prelude::NodeId;
-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::SignedRefsUpdate;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{cob, crypto, Storage};
use radicle_fetch::FetchLimit;
@@ -106,7 +106,7 @@ pub enum FetchRequest {
        /// Remote peer we are interacting with.
        remote: NodeId,
        /// If this fetch is for a particular set of `rad/sigrefs`.
-
        refs_at: Option<Vec<RefsAt>>,
+
        refs_at: Option<Vec<SignedRefsUpdate>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -285,7 +285,7 @@ impl Worker {
        &mut self,
        rid: RepoId,
        remote: NodeId,
-
        refs_at: Option<Vec<RefsAt>>,
+
        refs_at: Option<Vec<SignedRefsUpdate>>,
        channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
        mut cache: cob::cache::StoreWriter,
modified radicle-node/src/worker/fetch.rs
@@ -7,7 +7,7 @@ use localtime::LocalTime;

use radicle::crypto::PublicKey;
use radicle::prelude::RepoId;
-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::SignedRefsUpdate;
use radicle::storage::{
    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, WriteRepository as _,
};
@@ -67,7 +67,7 @@ impl Handle {
        cache: &mut cob::cache::StoreWriter,
        limit: FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<RefsAt>>,
+
        refs_at: Option<Vec<SignedRefsUpdate>>,
    ) -> Result<FetchResult, error::Fetch> {
        let (result, notifs) = match self {
            Self::Clone { mut handle, tmp } => {
modified radicle/src/storage/refs.rs
@@ -437,6 +437,156 @@ impl Deref for SignedRefsAt {
    }
}

+
/// A snapshot of a `rad/sigrefs` update.
+
///
+
/// If `old` is `None` then this is a newly seen `rad/sigrefs`,
+
/// otherwise it is an update from `old` to `new`.
+
///
+
/// The [`RefsUpdate::difference`] method will compute the difference
+
/// between the two `Oid`s, returning [`DiffedRefs`].
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct SignedRefsUpdate {
+
    /// The remote namespace of the `rad/sigrefs`.
+
    pub remote: RemoteId,
+
    /// The commit SHA of the previously seen `rad/sigrefs`.
+
    pub old: Option<Oid>,
+
    /// The commit SHA of the newly seen `rad/sigrefs`.
+
    pub new: Oid,
+
}
+

+
/// A difference update between two references in a `rad/sigrefs`
+
/// payload.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub enum Update {
+
    /// The reference was added and points to `oid`.
+
    Added { oid: Oid },
+
    /// The reference was changed to point to `oid`.
+
    Changed { oid: Oid },
+
    /// The reference was deleted and used to point to `oid`.
+
    Deleted { oid: Oid },
+
    /// The reference was not changed, and still points to `oid`.
+
    Same { oid: Oid },
+
}
+

+
impl Update {
+
    /// Return the `oid` of the update if it was `Added` or `Changed`.
+
    pub fn modified(&self) -> Option<&Oid> {
+
        match self {
+
            Update::Added { oid } => Some(oid),
+
            Update::Changed { oid } => Some(oid),
+
            Update::Deleted { .. } | Update::Same { .. } => None,
+
        }
+
    }
+

+
    /// Return the `oid` of the update.
+
    fn oid(&self) -> &Oid {
+
        match self {
+
            Update::Added { oid } => oid,
+
            Update::Changed { oid } => oid,
+
            Update::Deleted { oid } => oid,
+
            Update::Same { oid } => oid,
+
        }
+
    }
+
}
+

+
/// The set of [`Update`]s for a given [`git::RefString`].
+
///
+
/// To construct a `DiffedRefs` use [`RefsUpdate::difference`].
+
#[derive(Clone, Debug, Default)]
+
pub struct DiffedRefs(BTreeMap<git::RefString, Update>);
+

+
impl Deref for DiffedRefs {
+
    type Target = BTreeMap<git::RefString, Update>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl DerefMut for DiffedRefs {
+
    fn deref_mut(&mut self) -> &mut Self::Target {
+
        &mut self.0
+
    }
+
}
+

+
impl SignedRefsUpdate {
+
    /// Get the difference between [`RefsUpdate::old`], if it exists,
+
    /// and [`RefsUpdate::new`].
+
    ///
+
    /// If `old` is `None`, then all `Updates`s are `Added`.
+
    ///
+
    /// Otherwise, they are computed to be:
+
    ///     - `Added` if the reference only exists in `new`.
+
    ///     - `Changed` if the reference exists in `old` and `new`,
+
    ///       and the `oid`s are different.
+
    ///    - `Deleted` if the reference only exists in `old`.
+
    ///    - `Same` if the reference exists in `old` and `new`, and
+
    ///      the `oid`s are the same.
+
    pub fn difference<S>(&self, repo: &S) -> Result<DiffedRefs, Error>
+
    where
+
        S: ReadRepository,
+
    {
+
        let new = self.load_new(repo)?;
+
        let mut refs = DiffedRefs(
+
            new.iter()
+
                .map(|(refname, oid)| (refname.clone(), Update::Added { oid: *oid }))
+
                .collect::<BTreeMap<_, _>>(),
+
        );
+

+
        match self.load_old(repo)? {
+
            None => Ok(refs),
+
            Some(old) => {
+
                for (refname, old) in old.sigrefs.iter() {
+
                    refs.entry(refname.clone())
+
                        .and_modify(|update| {
+
                            // N.b. we rely on the fact that is always the
+
                            // newly added Oid.
+
                            let new = update.oid();
+
                            if new == old {
+
                                *update = Update::Same { oid: *new };
+
                            } else {
+
                                *update = Update::Changed { oid: *update.oid() };
+
                            }
+
                        })
+
                        .or_insert(Update::Deleted { oid: *old });
+
                }
+
                Ok(refs)
+
            }
+
        }
+
    }
+

+
    fn load_old<S>(&self, repo: &S) -> Result<Option<SignedRefsAt>, Error>
+
    where
+
        S: ReadRepository,
+
    {
+
        self.old
+
            .map(|at| {
+
                RefsAt {
+
                    remote: self.remote,
+
                    at,
+
                }
+
                .load(repo)
+
            })
+
            .transpose()
+
    }
+

+
    fn load_new<S>(&self, repo: &S) -> Result<SignedRefsAt, Error>
+
    where
+
        S: ReadRepository,
+
    {
+
        RefsAt::from(self).load(repo)
+
    }
+
}
+

+
impl From<&SignedRefsUpdate> for RefsAt {
+
    fn from(up: &SignedRefsUpdate) -> Self {
+
        Self {
+
            remote: up.remote,
+
            at: up.new,
+
        }
+
    }
+
}
+

pub mod canonical {
    use super::*;