Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Use refs cache to decide on fetching
Merged did:key:z6MksFqX...wzpT opened 2 years ago

When deciding whether or not to fetch a repo based on a refs announcement, we need to know whether our signed refs are stale or not, compared to the ones in the announcement. This can be quite expensive as it requires multiple reads from Git’s ODB, which often results in packfile loading.

To remedy this, we introduce a refs cache in our node database that is consulted when an announcement is received.

We also remove the logic that generates info messages, since they are rarely needed and introduced more complexity.

Additionally, we revert commit 3ad2b4431f4406fbb96c30d1edc77c5d969f17cb, which introduced logic to minimze wants and haves with additional logic, since it potentially also increases the load on the ODB.

18 files changed +472 -582 345fa57b 256c620d
modified radicle-fetch/src/lib.rs
@@ -20,7 +20,7 @@ pub use state::{FetchLimit, FetchResult};
pub use transport::Transport;

use radicle::crypto::PublicKey;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::ReadRepository as _;
use state::FetchState;
use thiserror::Error;
@@ -54,7 +54,7 @@ pub fn pull<S>(
    handle: &mut Handle<S>,
    limit: FetchLimit,
    remote: PublicKey,
-
    refs_at: Option<Vec<SignedRefsUpdate>>,
+
    refs_at: Option<Vec<RefsAt>>,
) -> Result<FetchResult, Error>
where
    S: transport::ConnectionStream,
modified radicle-fetch/src/sigrefs.rs
@@ -1,10 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
-
use std::ops::{Deref, DerefMut, Not as _};
+
use std::ops::{Deref, Not as _};

-
pub use radicle::storage::refs::{DiffedRefs, SignedRefsAt};
+
pub use radicle::storage::refs::SignedRefsAt;
pub use radicle::storage::{git::Validation, Validations};
-

-
use radicle::storage::refs;
use radicle::{crypto::PublicKey, storage::ValidateRepository};

use crate::state::Cached;
@@ -152,67 +150,6 @@ impl<'a> IntoIterator for &'a RemoteRefs {
    }
}

-
/// A set of [`DiffedRefs`] per remote `PublicKey`.
-
///
-
/// To construct use [`DiffedRefs::load`].
-
#[derive(Clone, Debug, Default)]
-
pub struct RemoteDiffedRefs(BTreeMap<PublicKey, DiffedRefs>);
-

-
impl RemoteDiffedRefs {
-
    /// Given a set of [`refs::RefsUpdate`]s, compute its
-
    /// [`DiffedRefs`] and use its [`refs::RefsUpdate::remote`] as the
-
    /// key for the `RemoteDiffedRefs` entry.
-
    ///
-
    /// If the `remote` is in the `may` set, then it is allowed to
-
    /// fail and will not be inserted in the set iff it does fail to
-
    /// load.
-
    ///
-
    /// If the `remote` is in the `must` set, then this method will
-
    /// fail iff loading the `DiffedRefs` fails.
-
    pub(crate) fn load<S>(
-
        cached: &Cached<S>,
-
        updates: Vec<refs::SignedRefsUpdate>,
-
        Select { must, may }: Select,
-
    ) -> Result<Self, error::Load> {
-
        updates
-
            .into_iter()
-
            .try_fold(Self::default(), |mut refs, update| {
-
                match cached.load_diffed_refs(&update) {
-
                    Ok(diff) => {
-
                        refs.insert(update.remote, diff);
-
                        Ok(refs)
-
                    }
-
                    Err(e) if must.contains(&update.remote) => Err(e),
-
                    Err(_) if may.contains(&update.remote) => Ok(refs),
-
                    Err(e) => Err(e),
-
                }
-
            })
-
    }
-
}
-

-
impl Deref for RemoteDiffedRefs {
-
    type Target = BTreeMap<PublicKey, DiffedRefs>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl DerefMut for RemoteDiffedRefs {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.0
-
    }
-
}
-

-
impl<'a> IntoIterator for &'a RemoteDiffedRefs {
-
    type Item = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::Item;
-
    type IntoIter = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::IntoIter;
-

-
    fn into_iter(self) -> Self::IntoIter {
-
        self.0.iter()
-
    }
-
}
-

pub struct Select<'a> {
    pub must: &'a BTreeSet<PublicKey>,
    pub may: &'a BTreeSet<PublicKey>,
modified radicle-fetch/src/stage.rs
@@ -15,18 +15,17 @@
//!      `rad/sigrefs`, for each configured namespace, i.e. followed
//!      and delegate peers if the scope is "followed" and all peers is the
//!      scope is all.
-
//!   3. [`DataRefs`]/[`DiffedRefs`]: fetches the `Oid`s for each
-
//!      reference listed in the `rad/sigrefs` for each fetched peer
-
//!      in the [`SpecialRefs`] stage. Additionally, any references
-
//!      that have been removed from `rad/sigrefs` are marked for
-
//!      deletion.
+
//!   3. [`DataRefs`]: fetches the `Oid`s for each reference listed in
+
//!      the `rad/sigrefs` for each fetched peer in the
+
//!      [`SpecialRefs`] stage. Additionally, any references that have
+
//!      been removed from `rad/sigrefs` are marked for deletion.
//!
//! ### Pull
//!
//! A `pull` is split into two stages:
//!
//!   1. [`SpecialRefs`]: see above.
-
//!   2. [`DataRefs`]/[`DiffedRefs`]: see above.
+
//!   2. [`DataRefs`]: see above.

use std::collections::{BTreeMap, BTreeSet, HashSet};

@@ -381,7 +380,7 @@ impl ProtocolStage for SigrefsAt {
}

/// The [`ProtocolStage`] for fetching data refs from the set of
-
/// `remotes`.
+
/// remotes in `trusted`.
///
/// All refs that are listed in the `remotes` sigrefs are checked
/// against our refdb/odb to build a set of `wants` and `haves`. The
@@ -494,112 +493,6 @@ impl ProtocolStage for DataRefs {
    }
}

-
/// The [`ProtocolStage`] that is similar to [`DataRefs`], however it
-
/// is aware that it is an update of `rad/sigrefs`. This means that it
-
/// will only compute `wants` and `haves` based on any modified
-
/// `Oid`s.
-
///
-
/// All refs and objects are prepared for updating as per usual, since
-
/// we keep track of in-memory references for validation.
-
#[derive(Debug)]
-
pub struct DiffedRefs {
-
    /// The node that is being fetched from.
-
    pub remote: PublicKey,
-
    /// The set of signed references from each remote that was
-
    /// fetched.
-
    pub remotes: sigrefs::RemoteDiffedRefs,
-
    /// The data limit for this stage of fetching.
-
    pub limit: u64,
-
}
-

-
impl ProtocolStage for DiffedRefs {
-
    fn ls_refs(&self) -> Option<NonEmpty<BString>> {
-
        None
-
    }
-

-
    fn ref_filter(&self, _r: Ref) -> Option<ReceivedRef> {
-
        None
-
    }
-

-
    fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
-
        Ok(())
-
    }
-

-
    fn wants_haves(
-
        &self,
-
        refdb: &Repository,
-
        _refs: &[ReceivedRef],
-
    ) -> Result<WantsHaves, error::WantsHaves> {
-
        let mut wants_haves = WantsHaves::default();
-

-
        for (remote, refs) in &self.remotes {
-
            wants_haves.add(
-
                refdb,
-
                refs.iter().filter_map(|(refname, up)| {
-
                    let refname = Qualified::from_refstr(refname)
-
                        .map(|refname| refname.with_namespace(Component::from(remote)))?;
-
                    let tip = up.modified()?;
-
                    Some((refname, *tip))
-
                }),
-
            )?;
-
        }
-

-
        Ok(wants_haves)
-
    }
-

-
    fn prepare_updates<'a>(
-
        &self,
-
        _s: &FetchState,
-
        _repo: &Repository,
-
        _refs: &'a [ReceivedRef],
-
    ) -> Result<Updates<'a>, error::Prepare> {
-
        use radicle::storage::refs::Update::{Added, Changed, Deleted, Same};
-

-
        let mut updates = Updates::default();
-
        let prefix_rad = refname!("refs/rad");
-

-
        for (remote, refs) in &self.remotes {
-
            for (name, up) in refs.iter() {
-
                let is_refs_rad = name.starts_with(prefix_rad.as_str());
-
                let tracking: Namespaced<'_> = Qualified::from_refstr(name)
-
                    .and_then(|q| refs::ReceivedRefname::remote(*remote, q).to_namespaced())
-
                    .expect("we checked sigrefs well-formedness in wants_refs already");
-
                match up {
-
                    Added { oid } | Changed { oid } => updates.add(
-
                        *remote,
-
                        Update::Direct {
-
                            name: tracking,
-
                            target: *oid,
-
                            no_ff: Policy::Allow,
-
                        },
-
                    ),
-
                    Deleted { oid } if !is_refs_rad => updates.add(
-
                        *remote,
-
                        Update::Prune {
-
                            name: tracking,
-
                            prev: either::Left(*oid),
-
                        },
-
                    ),
-
                    // N.b. create an update for this reference so
-
                    // that the in-memory refdb is updated.
-
                    Same { oid } => updates.add(
-
                        *remote,
-
                        Update::Direct {
-
                            name: tracking,
-
                            target: *oid,
-
                            no_ff: Policy::Allow,
-
                        },
-
                    ),
-
                    // N.b. `refs/rad` is not subject to pruning.
-
                    Deleted { .. } => continue,
-
                }
-
            }
-
        }
-

-
        Ok(updates)
-
    }
-
}
-

// N.b. the `delegates` are the delegates of the repository, with the
// potential removal of the local peer in the case of a `pull`.
fn special_refs_updates<'a>(
modified radicle-fetch/src/state.rs
@@ -8,7 +8,7 @@ use radicle::identity::{Doc, DocError};

use radicle::prelude::Verified;
use radicle::storage;
-
use radicle::storage::refs::{RefsAt, SignedRefs, SignedRefsUpdate};
+
use radicle::storage::refs::{RefsAt, SignedRefs};
use radicle::storage::{
    git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
};
@@ -273,23 +273,18 @@ impl FetchState {
        Ok(fetched)
    }

-
    /// Fetch the set of special refs, depending on `refs_updates`.
+
    /// Fetch the set of special refs, depending on `refs_at`.
    ///
-
    /// If `refs_updates` is `Some`, then run the [`SigrefsAt`] stage,
+
    /// If `refs_at` is `Some`, then run the [`SigrefsAt`] stage,
    /// which specifically fetches `rad/sigrefs` which are listed in
-
    /// `refs_at`. It then runs the [`DiffedRefs`] stage to fetch the
-
    /// difference between old references listed existing
-
    /// `rad/sigrefs` and the new `rad/sigrefs` fetched in the
-
    /// previous step.
+
    /// `refs_at`.
    ///
-
    /// If `refs_updates` is `None`, then run the [`SpecialRefs`]
-
    /// stage, which fetches `rad/sigrefs` and `rad/id` from all
-
    /// tracked and delegate peers (scope dependent). It then runs the
-
    /// [`DataRefs`] stage to fetch the references listed in the
-
    /// `rad/sigrefs` fetched in the previous step.
+
    /// If `refs_at` is `None`, then run the [`SpecialRefs`] stage,
+
    /// which fetches `rad/sigrefs` and `rad/id` from all tracked and
+
    /// delegate peers (scope dependent).
    ///
-
    /// The resulting `Vec<PublicKey>` will be the set of fetched
-
    /// remotes.
+
    /// The resulting [`sigrefs::RemoteRefs`] will be the set of
+
    /// `rad/sigrefs` of the fetched remotes.
    fn run_special_refs<S>(
        &mut self,
        handle: &mut Handle<S>,
@@ -297,19 +292,18 @@ impl FetchState {
        delegates: BTreeSet<PublicKey>,
        limit: &FetchLimit,
        remote: PublicKey,
-
        refs_updates: Option<Vec<SignedRefsUpdate>>,
-
    ) -> Result<Vec<PublicKey>, error::Protocol>
+
        refs_at: Option<Vec<RefsAt>>,
+
    ) -> Result<sigrefs::RemoteRefs, error::Protocol>
    where
        S: transport::ConnectionStream,
    {
-
        match refs_updates {
-
            Some(refs_updates) => {
-
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_updates
+
        match refs_at {
+
            Some(refs_at) => {
+
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_at
                    .iter()
                    .map(|refs_at| refs_at.remote)
                    .partition(|id| delegates.contains(id));

-
                let refs_at = refs_updates.iter().map(RefsAt::from).collect();
                let sigrefs_at = stage::SigrefsAt {
                    remote,
                    delegates,
@@ -320,22 +314,14 @@ impl FetchState {
                log::trace!(target: "fetch", "{sigrefs_at:?}");
                self.run_stage(handle, handshake, &sigrefs_at)?;

-
                let remotes = sigrefs::RemoteDiffedRefs::load(
+
                let signed_refs = sigrefs::RemoteRefs::load(
                    &self.as_cached(handle),
-
                    refs_updates,
                    sigrefs::Select {
                        must: &must,
                        may: &may,
                    },
                )?;
-
                let fetched = remotes.keys().copied().collect();
-
                let refs = stage::DiffedRefs {
-
                    remote,
-
                    remotes,
-
                    limit: limit.refs,
-
                };
-
                self.run_stage(handle, handshake, &refs)?;
-
                Ok(fetched)
+
                Ok(signed_refs)
            }
            None => {
                let followed = handle.allowed();
@@ -350,7 +336,7 @@ impl FetchState {
                log::trace!(target: "fetch", "{special_refs:?}");
                let fetched = self.run_stage(handle, handshake, &special_refs)?;

-
                let remotes = sigrefs::RemoteRefs::load(
+
                let signed_refs = sigrefs::RemoteRefs::load(
                    &self.as_cached(handle),
                    sigrefs::Select {
                        must: &delegates,
@@ -361,14 +347,7 @@ impl FetchState {
                            .collect(),
                    },
                )?;
-
                let fetched = remotes.keys().copied().collect();
-
                let refs = stage::DataRefs {
-
                    remote,
-
                    remotes,
-
                    limit: limit.refs,
-
                };
-
                self.run_stage(handle, handshake, &refs)?;
-
                Ok(fetched)
+
                Ok(signed_refs)
            }
        }
    }
@@ -395,7 +374,7 @@ impl FetchState {
        handshake: &handshake::Outcome,
        limit: FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Protocol>
    where
        S: transport::ConnectionStream,
@@ -434,7 +413,7 @@ impl FetchState {

        log::trace!(target: "fetch", "Identity delegates {delegates:?}");

-
        let fetched = self.run_special_refs(
+
        let signed_refs = self.run_special_refs(
            handle,
            handshake,
            delegates.clone(),
@@ -444,8 +423,21 @@ impl FetchState {
        )?;
        log::debug!(
            target: "fetch",
-
            "Fetched data refs for {} remote(s) ({}ms)",
-
            fetched.len(),
+
            "Fetched data for {} remote(s) ({}ms)",
+
            signed_refs.len(),
+
            start.elapsed().as_millis()
+
        );
+

+
        let data_refs = stage::DataRefs {
+
            remote,
+
            remotes: signed_refs,
+
            limit: limit.refs,
+
        };
+
        self.run_stage(handle, handshake, &data_refs)?;
+
        log::debug!(
+
            target: "fetch",
+
            "Fetched data refs for {} remotes ({}ms)",
+
            data_refs.remotes.len(),
            start.elapsed().as_millis()
        );

@@ -467,6 +459,7 @@ impl FetchState {
        // added to `warnings`.
        let mut warnings = sigrefs::Validations::default();
        let mut failures = sigrefs::Validations::default();
+
        let signed_refs = data_refs.remotes;

        // We may prune fetched remotes, so we keep track of
        // non-pruned, fetched remotes here.
@@ -474,7 +467,7 @@ impl FetchState {

        // TODO(finto): this might read better if it got its own
        // private function.
-
        for remote in &fetched {
+
        for remote in signed_refs.keys() {
            if handle.is_blocked(remote) {
                continue;
            }
@@ -644,13 +637,6 @@ impl<'a, S> Cached<'a, S> {
        }
    }

-
    pub fn load_diffed_refs(
-
        &self,
-
        update: &SignedRefsUpdate,
-
    ) -> Result<storage::refs::DiffedRefs, sigrefs::error::Load> {
-
        update.difference(&self.handle.repo)
-
    }
-

    #[allow(dead_code)]
    pub(crate) fn inspect(&self) {
        self.state.refs.inspect()
modified radicle-node/src/runtime.rs
@@ -157,7 +157,8 @@ impl Runtime {
        let policy = config.policy;

        log::info!(target: "node", "Opening node database..");
-
        let mut db: service::Stores<_> = home.database_mut()?.into();
+
        let db = home.database_mut()?;
+
        let mut stores: service::Stores<_> = db.clone().into();

        log::info!(target: "node", "Opening policy database..");
        let policies = home.policies_mut()?;
@@ -206,13 +207,13 @@ impl Runtime {
                .expect("Runtime::init: unable to solve proof-of-work puzzle")
        };

-
        if config.connect.is_empty() && db.addresses().is_empty()? {
+
        if config.connect.is_empty() && stores.addresses().is_empty()? {
            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");

            for (alias, addr) in config.network.bootstrap() {
                let (id, addr) = addr.into();

-
                db.addresses_mut().insert(
+
                stores.addresses_mut().insert(
                    &id,
                    radicle::node::Features::SEED,
                    alias,
@@ -221,14 +222,14 @@ impl Runtime {
                    [node::KnownAddress::new(addr, address::Source::Bootstrap)],
                )?;
            }
-
            log::info!(target: "node", "{} nodes added to address book", db.addresses().len()?);
+
            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
        }

        let emitter: Emitter<Event> = Default::default();
        let mut service = service::Service::new(
            config.clone(),
            clock,
-
            db,
+
            stores,
            storage.clone(),
            policies,
            signer.clone(),
@@ -264,6 +265,7 @@ impl Runtime {
            handle.clone(),
            notifications,
            cobs_cache,
+
            db,
            worker::Config {
                capacity: config.workers,
                storage: storage.clone(),
modified radicle-node/src/service.rs
@@ -26,11 +26,12 @@ use radicle::node::address;
use radicle::node::address::Store as _;
use radicle::node::address::{AddressBook, KnownAddress};
use radicle::node::config::PeerConfig;
+
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{ConnectOptions, Penalty, Severity};
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::{Inventory, RepositoryError};

use crate::crypto;
@@ -44,13 +45,12 @@ use crate::node::{
use crate::prelude::*;
use crate::runtime::Emitter;
use crate::service::gossip::Store as _;
-
use crate::service::message::{Announcement, AnnouncementMessage, Info, Ping};
-
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
+
use crate::service::message::{
+
    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
+
};
use crate::service::policy::{store::Write, Policy, Scope};
use crate::storage;
-
use crate::storage::refs::RefsAt;
-
use crate::storage::ReadRepository;
-
use crate::storage::{Namespaces, ReadStorage};
+
use crate::storage::{refs::RefsAt, Namespaces, ReadRepository, ReadStorage};
use crate::worker::fetch;
use crate::worker::FetchError;
use crate::Link;
@@ -64,7 +64,7 @@ pub use radicle::node::policy::config as policy;

use self::io::Outbox;
use self::limitter::RateLimiter;
-
use self::message::{InventoryAnnouncement, RefsStatus};
+
use self::message::InventoryAnnouncement;
use self::policy::NamespacesError;

/// How often to run the "idle" task.
@@ -159,7 +159,10 @@ pub enum Error {
}

/// A store for all node data.
-
pub trait Store: address::Store + gossip::Store + routing::Store + seed::Store {}
+
pub trait Store:
+
    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
+
{
+
}

impl Store for node::Database {}

@@ -251,7 +254,7 @@ struct FetchState {
    /// Node we're fetching from.
    from: NodeId,
    /// What refs we're fetching.
-
    refs_at: Vec<SignedRefsUpdate>,
+
    refs_at: Vec<RefsAt>,
    /// Channels waiting for fetch results.
    subscribers: Vec<chan::Sender<FetchResult>>,
}
@@ -273,7 +276,7 @@ struct QueuedFetch {
    /// Peer being fetched from.
    from: NodeId,
    /// Refs being fetched.
-
    refs_at: Vec<SignedRefsUpdate>,
+
    refs_at: Vec<RefsAt>,
    /// Result channel.
    channel: Option<chan::Sender<FetchResult>>,
}
@@ -325,6 +328,16 @@ where
    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
        &mut self.0
    }
+

+
    /// Get the database as a refs db.
+
    pub fn refs(&self) -> &impl node::refs::Store {
+
        &self.0
+
    }
+

+
    /// Get the database as a refs db, mutably.
+
    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
+
        &mut self.0
+
    }
}

impl<D> From<D> for Stores<D> {
@@ -767,7 +780,7 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => match self.announce_refs(id, [self.node_id()]) {
+
            Command::AnnounceRefs(id, resp) => match self.announce_own_refs(id) {
                Ok(refs) => match refs.as_slice() {
                    &[refs] => {
                        resp.send(refs).ok();
@@ -809,7 +822,7 @@ where
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs: NonEmpty<SignedRefsUpdate>,
+
        refs: NonEmpty<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -831,7 +844,7 @@ where
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs_at: Vec<SignedRefsUpdate>,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -886,7 +899,7 @@ where
        &mut self,
        rid: RepoId,
        from: &NodeId,
-
        refs_at: Vec<SignedRefsUpdate>,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) -> Result<&mut FetchState, TryFetchError> {
        let from = *from;
@@ -1051,7 +1064,6 @@ where
                .seed_policy(&rid)
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

-
            let refs_at = refs_at.iter().map(RefsAt::from).collect();
            match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
                Ok(status) => {
                    if let Some(refs) = NonEmpty::from_vec(status.fresh) {
@@ -1398,8 +1410,12 @@ where
                        info!(target: "service", "Routing table updated for {} with seed {announcer}", message.rid);
                    }
                }
+
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
+
                    debug!(target: "service", "Skipping fetch, empty refs announcement for {}", message.rid);
+
                    return Ok(false);
+
                };

-
                // Update sync status for this repo.
+
                // Update sync status of announcer for this repo.
                if let Some(refs) = message.refs.iter().find(|r| &r.remote == self.nid()) {
                    match self.db.seeds_mut().synced(
                        &message.rid,
@@ -1430,71 +1446,29 @@ where
                let repo_entry = self.policies.seed_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo seeding configuration",
                );
-
                if repo_entry.policy == Policy::Allow {
-
                    let (fresh, stale) = match self.refs_status_of(
-
                        message.rid,
-
                        message.refs.clone().into(),
-
                        &repo_entry.scope,
-
                    ) {
-
                        Ok(RefsStatus { fresh, stale }) => (fresh, stale),
-
                        Err(e) => {
-
                            error!(target: "service", "Failed to check refs status: {e}");
-
                            return Ok(relay);
-
                        }
-
                    };
-

-
                    // Refs can be relayed by peers who don't have the data in storage,
-
                    // therefore we only check whether we are connected to the *announcer*,
-
                    // which is required by the protocol to only announce refs it has.
-
                    if let Some(remote) = self.sessions.get(announcer).cloned() {
-
                        // If the relayer is also the origin of the message, we inform it
-
                        // about any refs that are already in sync (stale).
-
                        if relayer == announcer {
-
                            // If the stale refs contain refs announced by the peer, let it know
-
                            // that we're already in sync.
-
                            if let Some(at) = stale
-
                                .iter()
-
                                .find(|refs| refs.remote == remote.id)
-
                                .copied()
-
                                .map(|RefsAt { at, .. }| at)
-
                            {
-
                                debug!(
-
                                    target: "service", "Refs of {} already synced for {} at {at}",
-
                                    remote.id,
-
                                    message.rid,
-
                                );
-
                                self.outbox.write(
-
                                    &remote,
-
                                    Info::RefsAlreadySynced {
-
                                        rid: message.rid,
-
                                        at,
-
                                    }
-
                                    .into(),
-
                                );
-
                            }
-
                        }
-
                        // Finally, if there's anything to fetch, we fetch it from the
-
                        // remote.
-
                        if let Some(fresh) = NonEmpty::from_vec(fresh) {
-
                            self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT, None);
-
                        } else {
-
                            debug!(target: "service", "Skipping fetch, all refs of {} are already in local storage", message.rid);
-
                        }
-
                    } else {
-
                        trace!(
-
                            target: "service",
-
                            "Skipping fetch of {}, no sessions connected to {announcer}",
-
                            message.rid
-
                        );
-
                    }
-
                    return Ok(relay);
-
                } else {
+
                if repo_entry.policy != Policy::Allow {
                    debug!(
                        target: "service",
                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded",
                        message.rid
                    );
+
                    return Ok(false);
                }
+
                // Refs can be relayed by peers who don't have the data in storage,
+
                // therefore we only check whether we are connected to the *announcer*,
+
                // which is required by the protocol to only announce refs it has.
+
                let Some(remote) = self.sessions.get(announcer).cloned() else {
+
                    trace!(
+
                        target: "service",
+
                        "Skipping fetch of {}, no sessions connected to {announcer}",
+
                        message.rid
+
                    );
+
                    return Ok(relay);
+
                };
+
                // Finally, if there's anything to fetch, we fetch it from the remote.
+
                self.fetch_refs_at(message.rid, remote.id, refs, FETCH_TIMEOUT, None);
+

+
                return Ok(relay);
            }
            AnnouncementMessage::Node(
                ann @ NodeAnnouncement {
@@ -1549,6 +1523,7 @@ where

    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
        match info {
+
            // Nb. We don't currently send this message.
            Info::RefsAlreadySynced { rid, at } => {
                debug!(target: "service", "Refs already synced for {rid} by {remote}");
                self.emitter.emit(Event::RefsSynced {
@@ -1562,39 +1537,6 @@ where
        Ok(())
    }

-
    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
-
    fn refs_status_of(
-
        &self,
-
        rid: RepoId,
-
        refs: Vec<RefsAt>,
-
        scope: &policy::Scope,
-
    ) -> Result<RefsStatus, Error> {
-
        let mut refs = RefsStatus::new(rid, refs, &self.storage)?;
-

-
        // First, check the freshness.
-
        if refs.fresh.is_empty() {
-
            return Ok(refs);
-
        }
-

-
        // Second, check the scope.
-
        match scope {
-
            policy::Scope::All => Ok(refs),
-
            policy::Scope::Followed => {
-
                match self.policies.namespaces_for(&self.storage, &rid) {
-
                    Ok(Namespaces::All) => Ok(refs),
-
                    Ok(Namespaces::Followed(mut followed)) => {
-
                        // Get the set of followed nodes except self.
-
                        followed.remove(self.nid());
-
                        refs.fresh.retain(|r| followed.contains(&r.remote));
-

-
                        Ok(refs)
-
                    }
-
                    Err(e) => Err(e.into()),
-
                }
-
            }
-
        }
-
    }
-

    pub fn handle_message(
        &mut self,
        remote: &NodeId,
@@ -1720,6 +1662,37 @@ where
        Ok(())
    }

+
    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
+
    fn refs_status_of(
+
        &self,
+
        rid: RepoId,
+
        refs: Vec<RefsAt>,
+
        scope: &policy::Scope,
+
    ) -> Result<RefsStatus, Error> {
+
        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
+
        // First, check the freshness.
+
        if refs.fresh.is_empty() {
+
            return Ok(refs);
+
        }
+
        // Second, check the scope.
+
        match scope {
+
            policy::Scope::All => Ok(refs),
+
            policy::Scope::Followed => {
+
                match self.policies.namespaces_for(&self.storage, &rid) {
+
                    Ok(Namespaces::All) => Ok(refs),
+
                    Ok(Namespaces::Followed(mut followed)) => {
+
                        // Get the set of followed nodes except self.
+
                        followed.remove(self.nid());
+
                        refs.fresh.retain(|r| followed.contains(&r.remote));
+

+
                        Ok(refs)
+
                    }
+
                    Err(e) => Err(e.into()),
+
                }
+
            }
+
        }
+
    }
+

    /// Set of initial messages to send to a peer.
    fn initial(&self, _link: Link) -> Vec<Message> {
        let filter = self.filter();
@@ -1858,7 +1831,32 @@ where
        Ok((msg.signed(&self.signer), refs.into()))
    }

-
    /// Announce local refs for given id.
+
    /// Announce our own refs for the given repo.
+
    fn announce_own_refs(&mut self, rid: RepoId) -> Result<Vec<RefsAt>, Error> {
+
        let refs = self.announce_refs(rid, [self.node_id()])?;
+

+
        // Update refs database with our signed refs branches.
+
        // This isn't strictly necessary for now, as we only use the database for fetches, and
+
        // we don't fetch our own refs that are announced, but it's for good measure.
+
        if let &[r] = refs.as_slice() {
+
            let now = self.local_time();
+

+
            if let Err(e) =
+
                self.database_mut()
+
                    .refs_mut()
+
                    .set(&rid, &r.remote, &SIGREFS_BRANCH, r.at, now)
+
            {
+
                error!(
+
                    target: "service",
+
                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
+
                    r.remote
+
                );
+
            }
+
        }
+
        Ok(refs)
+
    }
+

+
    /// Announce local refs for given repo.
    fn announce_refs(
        &mut self,
        rid: RepoId,
@@ -1882,6 +1880,7 @@ where
                error!(target: "service", "Error updating sync status for local node: {e}");
            }
        }
+

        self.outbox.announce(
            ann,
            peers.filter(|p| {
@@ -1890,7 +1889,6 @@ where
            }),
            self.db.gossip_mut(),
        );
-

        Ok(refs)
    }

modified radicle-node/src/service/io.rs
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::time;

use log::*;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;

use crate::prelude::*;
use crate::service::session::Session;
@@ -30,7 +30,7 @@ pub enum Io {
        /// Namespaces being fetched.
        namespaces: Namespaces,
        /// If the node is fetching specific `rad/sigrefs`.
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -126,7 +126,7 @@ impl Outbox {
        remote: &mut Session,
        rid: RepoId,
        namespaces: Namespaces,
-
        refs_at: Vec<SignedRefsUpdate>,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
        remote.fetching(rid);
modified radicle-node/src/service/message.rs
@@ -1,8 +1,7 @@
use std::{fmt, io, mem};

use radicle::git;
-
use radicle::storage::refs::{RefsAt, SignedRefsUpdate};
-
use radicle::storage::ReadRepository;
+
use radicle::storage::refs::RefsAt;

use crate::crypto;
use crate::identity::RepoId;
@@ -12,7 +11,6 @@ use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{Link, NodeId, Timestamp};
use crate::storage;
-
use crate::storage::ReadStorage;
use crate::wire;

/// Maximum number of addresses which can be announced to other nodes.
@@ -170,7 +168,7 @@ pub struct RefsAnnouncement {
pub struct RefsStatus {
    /// The `rad/sigrefs` was missing or it's ahead of the local
    /// `rad/sigrefs`.
-
    pub fresh: Vec<SignedRefsUpdate>,
+
    pub fresh: Vec<RefsAt>,
    /// The `rad/sigrefs` has been seen before.
    pub stale: Vec<RefsAt>,
}
@@ -178,86 +176,49 @@ pub struct RefsStatus {
impl RefsStatus {
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
    /// announcement.
-
    pub fn new<S: ReadStorage>(
+
    ///
+
    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
+
    /// for ancestry matches, since we don't cache the whole history (only the tips).
+
    /// This, however, is not a problem because the signed refs branch is fast-forward only,
+
    /// and old refs announcements will be discarded due to their lower timestamps.
+
    pub fn new<D: node::refs::Store>(
        rid: RepoId,
        refs: Vec<RefsAt>,
-
        storage: S,
+
        db: &D,
    ) -> Result<RefsStatus, storage::Error> {
-
        let repo = match storage.repository(rid) {
-
            // If the repo doesn't exist, we consider this
-
            // announcement "fresh", since we obviously don't
-
            // have the refs.
-
            Err(e) if e.is_not_found() => {
-
                let fresh = refs
-
                    .into_iter()
-
                    .map(|RefsAt { remote, at }| SignedRefsUpdate {
-
                        remote,
-
                        old: None,
-
                        new: at,
-
                    })
-
                    .collect();
-
                return Ok(RefsStatus {
-
                    fresh,
-
                    stale: Vec::new(),
-
                });
-
            }
-
            Err(e) => return Err(e),
-
            Ok(r) => r,
-
        };
-

        let mut status = RefsStatus::default();
        for theirs in refs.iter() {
-
            status.insert(*theirs, &repo)?;
+
            status.insert(&rid, *theirs, db)?;
        }
        Ok(status)
    }

-
    fn insert<S: ReadRepository>(
+
    fn insert<D: node::refs::Store>(
        &mut self,
+
        repo: &RepoId,
        theirs: RefsAt,
-
        repo: &S,
+
        db: &D,
    ) -> Result<(), storage::Error> {
-
        match RefsAt::new(repo, theirs.remote) {
-
            Ok(ours) => {
-
                if Self::is_fresh(repo, theirs.at, ours.at)? {
-
                    self.fresh.push(SignedRefsUpdate {
-
                        remote: theirs.remote,
-
                        old: Some(ours.at),
-
                        new: theirs.at,
-
                    });
+
        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
+
            Ok(Some((ours, _))) => {
+
                if theirs.at != ours {
+
                    self.fresh.push(theirs);
                } else {
                    self.stale.push(theirs);
                }
            }
-
            Err(e) if git::is_not_found_err(&e) => self.fresh.push(SignedRefsUpdate {
-
                remote: theirs.remote,
-
                old: None,
-
                new: theirs.at,
-
            }),
+
            Ok(None) => {
+
                self.fresh.push(theirs);
+
            }
            Err(e) => {
                log::warn!(
                    target: "service",
-
                    "failed to load 'refs/namespaces/{}/rad/sigrefs': {e}", theirs.remote
-
                )
+
                    "Error getting cached ref of {repo} for refs status: {e}"
+
                );
            }
        }
        Ok(())
    }
-

-
    /// If `theirs` is not the same as `ours` and we have not seen
-
    /// `theirs` before, i.e. it's not a previous `rad/sigrefs`, then
-
    /// we can consider `theirs` a fresh update.
-
    fn is_fresh<S: ReadRepository>(
-
        repo: &S,
-
        theirs: git::Oid,
-
        ours: git::Oid,
-
    ) -> Result<bool, git::ext::Error> {
-
        if repo.contains(theirs)? {
-
            Ok(theirs != ours && !repo.is_ancestor_of(theirs, ours)?)
-
        } else {
-
            Ok(true)
-
        }
-
    }
}

/// Node announcing its inventory to the network.
modified radicle-node/src/tests.rs
@@ -10,6 +10,7 @@ use crossbeam_channel as chan;
use netservices::Direction as Link;
use radicle::identity::Visibility;
use radicle::node::address::Store;
+
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
@@ -29,6 +30,7 @@ use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
use crate::storage::git::Storage;
+
use crate::storage::refs::SIGREFS_BRANCH;
use crate::storage::ReadStorage;
use crate::test::arbitrary;
use crate::test::assert_matches;
@@ -1431,10 +1433,16 @@ fn test_queued_fetch_from_ann_same_rid() {
        .join(git::refname!("refs/sigrefs"));

    // Finish the 1st fetch.
+
    // Ensure the ref is in the storage and cache.
    alice.storage_mut().repo_mut(&rid).remotes.insert(
        carol.id(),
        carol.signed_refs_at(arbitrary::gen::<Refs>(1), oid),
    );
+
    alice
+
        .database_mut()
+
        .refs_mut()
+
        .set(&rid, &carol.id, &SIGREFS_BRANCH, oid, LocalTime::now())
+
        .unwrap();
    alice.fetched(
        rid,
        bob.id,
modified radicle-node/src/worker.rs
@@ -13,7 +13,7 @@ use crossbeam_channel as chan;
use radicle::identity::RepoId;
use radicle::node::notifications;
use radicle::prelude::NodeId;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{cob, crypto, Storage};
use radicle_fetch::FetchLimit;
@@ -106,7 +106,7 @@ pub enum FetchRequest {
        /// Remote peer we are interacting with.
        remote: NodeId,
        /// If this fetch is for a particular set of `rad/sigrefs`.
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -180,6 +180,7 @@ struct Worker {
    policies: policy::Config<policy::store::Read>,
    notifications: notifications::StoreWriter,
    cache: cob::cache::StoreWriter,
+
    db: radicle::node::Database,
}

impl Worker {
@@ -200,13 +201,7 @@ impl Worker {
        } = task;
        let remote = fetch.remote();
        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
-
        let result = self._process(
-
            fetch,
-
            stream,
-
            channels,
-
            self.notifications.clone(),
-
            self.cache.clone(),
-
        );
+
        let result = self._process(fetch, stream, channels, self.notifications.clone());

        log::trace!(target: "worker", "Sending response back to service..");

@@ -229,7 +224,6 @@ impl Worker {
        stream: StreamId,
        mut channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
-
        cache: cob::cache::StoreWriter,
    ) -> FetchResult {
        match fetch {
            FetchRequest::Initiator {
@@ -240,7 +234,7 @@ impl Worker {
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
-
                let result = self.fetch(rid, remote, refs_at, channels, notifs, cache);
+
                let result = self.fetch(rid, remote, refs_at, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
@@ -294,10 +288,9 @@ impl Worker {
        &mut self,
        rid: RepoId,
        remote: NodeId,
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
        channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
-
        mut cache: cob::cache::StoreWriter,
    ) -> Result<fetch::FetchResult, FetchError> {
        let FetchConfig {
            limit,
@@ -309,6 +302,7 @@ impl Worker {
        let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
        let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;

+
        let mut cache = self.cache.clone();
        let handle = fetch::Handle::new(
            rid,
            *local,
@@ -318,7 +312,15 @@ impl Worker {
            channels,
            notifs,
        )?;
-
        let result = handle.fetch(rid, &self.storage, &mut cache, *limit, remote, refs_at)?;
+
        let result = handle.fetch(
+
            rid,
+
            &self.storage,
+
            &mut cache,
+
            &mut self.db,
+
            *limit,
+
            remote,
+
            refs_at,
+
        )?;

        if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
            // N.b. ensure that `git gc` works in debug mode.
@@ -343,6 +345,7 @@ impl Pool {
        handle: Handle,
        notifications: notifications::StoreWriter,
        cache: cob::cache::StoreWriter,
+
        db: radicle::node::Database,
        config: Config,
    ) -> Result<Self, policy::Error> {
        let mut pool = Vec::with_capacity(config.capacity);
@@ -361,6 +364,7 @@ impl Pool {
                policies,
                notifications: notifications.clone(),
                cache: cache.clone(),
+
                db: db.clone(),
            };
            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());

modified radicle-node/src/worker/fetch.rs
@@ -7,7 +7,7 @@ use localtime::LocalTime;

use radicle::crypto::PublicKey;
use radicle::prelude::RepoId;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{
    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, WriteRepository as _,
};
@@ -62,14 +62,15 @@ impl Handle {
        }
    }

-
    pub fn fetch(
+
    pub fn fetch<D: node::refs::Store>(
        self,
        rid: RepoId,
        storage: &Storage,
        cache: &mut cob::cache::StoreWriter,
+
        refsdb: &mut D,
        limit: FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Fetch> {
        let (result, clone, notifs) = match self {
            Self::Clone { mut handle, tmp } => {
@@ -123,6 +124,7 @@ impl Handle {
                }

                cache_cobs(&rid, &applied.updated, &repo, cache)?;
+
                cache_refs(&rid, &applied.updated, refsdb)?;

                Ok(FetchResult {
                    updated: applied.updated,
@@ -203,6 +205,45 @@ fn notify(
    Ok(())
}

+
/// Cache certain ref updates in our database.
+
fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
+
where
+
    D: node::refs::Store,
+
{
+
    let time = LocalTime::now();
+

+
    for r in refs {
+
        let name = r.name();
+
        let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
+
            Err(e) => {
+
                log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
+
                log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
+
                break;
+
            }
+
            Ok((n, q)) => (n, q),
+
        };
+
        if qualified != *git::refs::storage::SIGREFS_BRANCH {
+
            // Only cache `rad/sigrefs`.
+
            continue;
+
        }
+
        log::trace!(target: "node", "Updating cache for {name} in {repo}");
+

+
        let result = match r {
+
            RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
+
            RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
+
            RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
+
            RefUpdate::Skipped { .. } => continue,
+
        };
+

+
        if let Err(e) = result {
+
            log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
+
            log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
+
            break;
+
        }
+
    }
+
    Ok(())
+
}
+

/// Write new `RefUpdate`s that are related a `Patch` or an `Issue`
/// COB to the COB cache.
fn cache_cobs<S, C>(
modified radicle-node/src/worker/fetch/error.rs
@@ -17,6 +17,8 @@ pub enum Fetch {
    StorageCopy(#[from] io::Error),
    #[error(transparent)]
    Repository(#[from] radicle::storage::RepositoryError),
+
    #[error(transparent)]
+
    RefsDb(#[from] radicle::node::refs::Error),
    #[error("validation of storage repository failed")]
    Validation,
    #[error(transparent)]
modified radicle/src/node.rs
@@ -8,6 +8,7 @@ pub mod db;
pub mod events;
pub mod notifications;
pub mod policy;
+
pub mod refs;
pub mod routing;
pub mod seed;

modified radicle/src/node/db.rs
@@ -8,6 +8,7 @@
//! The database schema is contained within the first migration. See [`version`], [`bump`] and
//! [`migrate`] for how this works.
use std::path::Path;
+
use std::sync::Arc;
use std::{fmt, time};

use sqlite as sql;
@@ -38,8 +39,9 @@ pub enum Error {
}

/// A file-backed database storing information about the network.
+
#[derive(Clone)]
pub struct Database {
-
    pub db: sql::ConnectionThreadSafe,
+
    pub db: Arc<sql::ConnectionThreadSafe>,
}

impl fmt::Debug for Database {
@@ -50,7 +52,7 @@ impl fmt::Debug for Database {

impl From<sql::ConnectionThreadSafe> for Database {
    fn from(db: sql::ConnectionThreadSafe) -> Self {
-
        Self { db }
+
        Self { db: Arc::new(db) }
    }
}

@@ -65,7 +67,7 @@ impl Database {
        db.execute(Self::PRAGMA)?;
        migrate(&db)?;

-
        Ok(Self { db })
+
        Ok(Self { db: Arc::new(db) })
    }

    /// Same as [`Self::open`], but in read-only mode. This is useful to have multiple
@@ -78,7 +80,7 @@ impl Database {
        db.set_busy_timeout(DB_READ_TIMEOUT.as_millis() as usize)?;
        db.execute(Self::PRAGMA)?;

-
        Ok(Self { db })
+
        Ok(Self { db: Arc::new(db) })
    }

    /// Create a new in-memory database.
@@ -87,7 +89,7 @@ impl Database {
        db.execute(Self::PRAGMA)?;
        migrate(&db)?;

-
        Ok(Self { db })
+
        Ok(Self { db: Arc::new(db) })
    }

    /// Get the database version. This is updated on schema changes.
modified radicle/src/node/db/schema.sql
@@ -96,3 +96,24 @@ create table if not exists "repo-sync-status" (
  unique ("repo", "node")
  --
) strict;
+

+
-- Git refs cache.
+
create table if not exists "refs" (
+
  -- Repository ID.
+
  "repo"                 text      not null,
+
  -- Ref namespace (NID).
+
  --
+
  -- Nb. We don't use a foreign key constraint because we can't guarantee
+
  -- that we'll have received a node announcement from this node.
+
  "namespace"            text      not null,
+
  -- Ref name (qualified).
+
  "ref"                  text      not null,
+
  -- Ref OID.
+
  "oid"                  text      not null,
+
  -- When this entry was created or updated.
+
  "timestamp"            integer   not null,
+
  --
+
  unique ("repo", "namespace", "ref")
+
  --
+
) strict;
+

added radicle/src/node/refs.rs
@@ -0,0 +1,2 @@
+
pub mod store;
+
pub use store::{Error, Store};
added radicle/src/node/refs/store.rs
@@ -0,0 +1,182 @@
+
#![allow(clippy::type_complexity)]
+
use std::num::TryFromIntError;
+
use std::str::FromStr;
+

+
use localtime::LocalTime;
+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::git::{Oid, Qualified};
+
use crate::node::Database;
+
use crate::node::NodeId;
+
use crate::prelude::RepoId;
+

+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
    /// Timestamp error.
+
    #[error("invalid timestamp: {0}")]
+
    Timestamp(#[from] TryFromIntError),
+
}
+

+
/// Refs store.
+
///
+
/// Used to cache git references.
+
pub trait Store {
+
    fn set(
+
        &mut self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
        oid: Oid,
+
        timestamp: LocalTime,
+
    ) -> Result<bool, Error>;
+

+
    fn get(
+
        &self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<Option<(Oid, LocalTime)>, Error>;
+

+
    fn delete(&self, repo: &RepoId, namespace: &NodeId, refname: &Qualified)
+
        -> Result<bool, Error>;
+
}
+

+
impl Store for Database {
+
    fn set(
+
        &mut self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
        oid: Oid,
+
        timestamp: LocalTime,
+
    ) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `refs` (repo, namespace, ref, oid, timestamp)
+
             VALUES (?1, ?2, ?3, ?4, ?5)
+
             ON CONFLICT DO UPDATE
+
             SET oid = ?4, timestamp = ?5
+
             WHERE timestamp < ?5 AND oid <> ?4",
+
        )?;
+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, namespace))?;
+
        stmt.bind((3, refname.to_string().as_str()))?;
+
        stmt.bind((4, oid.to_string().as_str()))?;
+
        stmt.bind((5, i64::try_from(timestamp.as_millis())?))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn get(
+
        &self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<Option<(Oid, LocalTime)>, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT oid, timestamp FROM refs WHERE repo = ?1 AND namespace = ?2 AND ref = ?3",
+
        )?;
+

+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, namespace))?;
+
        stmt.bind((3, refname.to_string().as_str()))?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            let oid = row.try_read::<&str, _>("oid")?;
+
            let oid = Oid::from_str(oid).map_err(|e| {
+
                Error::Internal(sql::Error {
+
                    code: None,
+
                    message: Some(format!("sql: invalid oid '{oid}': {e}")),
+
                })
+
            })?;
+
            let timestamp = row.try_read::<i64, _>("timestamp")?;
+
            let timestamp = LocalTime::from_millis(timestamp as u128);
+

+
            Ok(Some((oid, timestamp)))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    fn delete(
+
        &self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<bool, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM refs WHERE repo = ?1 AND namespace = ?2 AND ref = ?3")?;
+

+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, namespace))?;
+
        stmt.bind((3, refname.to_string().as_str()))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use crate::git::qualified;
+
    use crate::test::arbitrary;
+
    use localtime::{LocalDuration, LocalTime};
+

+
    #[test]
+
    fn test_set_and_delete() {
+
        let mut db = Database::memory().unwrap();
+
        let oid = arbitrary::oid();
+

+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let namespace = arbitrary::gen::<NodeId>(1);
+
        let refname = qualified!("refs/heads/master");
+
        let timestamp = LocalTime::now();
+

+
        assert!(db.set(&repo, &namespace, &refname, oid, timestamp).unwrap());
+
        assert!(db.get(&repo, &namespace, &refname).unwrap().is_some());
+
        assert!(db.delete(&repo, &namespace, &refname).unwrap());
+
        assert!(db.get(&repo, &namespace, &refname).unwrap().is_none());
+
        assert!(!db.delete(&repo, &namespace, &refname).unwrap());
+
    }
+

+
    #[test]
+
    fn test_set_and_get() {
+
        let mut db = Database::memory().unwrap();
+
        let oid1 = arbitrary::oid();
+
        let oid2 = arbitrary::oid();
+

+
        assert_ne!(oid1, oid2);
+

+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let namespace = arbitrary::gen::<NodeId>(1);
+
        let refname = qualified!("refs/heads/master");
+
        let mut timestamp = LocalTime::now();
+

+
        assert_eq!(db.get(&repo, &namespace, &refname).unwrap(), None);
+
        assert!(db
+
            .set(&repo, &namespace, &refname, oid1, timestamp)
+
            .unwrap());
+
        assert_eq!(
+
            db.get(&repo, &namespace, &refname).unwrap(),
+
            Some((oid1, timestamp))
+
        );
+
        assert!(!db
+
            .set(&repo, &namespace, &refname, oid1, timestamp)
+
            .unwrap());
+
        timestamp.elapse(LocalDuration::from_millis(1));
+

+
        assert!(db
+
            .set(&repo, &namespace, &refname, oid2, timestamp)
+
            .unwrap());
+
        assert_eq!(
+
            db.get(&repo, &namespace, &refname).unwrap(),
+
            Some((oid2, timestamp))
+
        );
+
    }
+
}
modified radicle/src/storage/refs.rs
@@ -437,156 +437,6 @@ impl Deref for SignedRefsAt {
    }
}

-
/// A snapshot of a `rad/sigrefs` update.
-
///
-
/// If `old` is `None` then this is a newly seen `rad/sigrefs`,
-
/// otherwise it is an update from `old` to `new`.
-
///
-
/// The [`SignedRefsUpdate::difference`] method will compute the difference
-
/// between the two `Oid`s, returning [`DiffedRefs`].
-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-
pub struct SignedRefsUpdate {
-
    /// The remote namespace of the `rad/sigrefs`.
-
    pub remote: RemoteId,
-
    /// The commit SHA of the previously seen `rad/sigrefs`.
-
    pub old: Option<Oid>,
-
    /// The commit SHA of the newly seen `rad/sigrefs`.
-
    pub new: Oid,
-
}
-

-
/// A difference update between two references in a `rad/sigrefs`
-
/// payload.
-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-
pub enum Update {
-
    /// The reference was added and points to `oid`.
-
    Added { oid: Oid },
-
    /// The reference was changed to point to `oid`.
-
    Changed { oid: Oid },
-
    /// The reference was deleted and used to point to `oid`.
-
    Deleted { oid: Oid },
-
    /// The reference was not changed, and still points to `oid`.
-
    Same { oid: Oid },
-
}
-

-
impl Update {
-
    /// Return the `oid` of the update if it was `Added` or `Changed`.
-
    pub fn modified(&self) -> Option<&Oid> {
-
        match self {
-
            Update::Added { oid } => Some(oid),
-
            Update::Changed { oid } => Some(oid),
-
            Update::Deleted { .. } | Update::Same { .. } => None,
-
        }
-
    }
-

-
    /// Return the `oid` of the update.
-
    fn oid(&self) -> &Oid {
-
        match self {
-
            Update::Added { oid } => oid,
-
            Update::Changed { oid } => oid,
-
            Update::Deleted { oid } => oid,
-
            Update::Same { oid } => oid,
-
        }
-
    }
-
}
-

-
/// The set of [`Update`]s for a given [`git::RefString`].
-
///
-
/// To construct a `DiffedRefs` use [`SignedRefsUpdate::difference`].
-
#[derive(Clone, Debug, Default)]
-
pub struct DiffedRefs(BTreeMap<git::RefString, Update>);
-

-
impl Deref for DiffedRefs {
-
    type Target = BTreeMap<git::RefString, Update>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl DerefMut for DiffedRefs {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.0
-
    }
-
}
-

-
impl SignedRefsUpdate {
-
    /// Get the difference between [`SignedRefsUpdate::old`], if it exists,
-
    /// and [`SignedRefsUpdate::new`].
-
    ///
-
    /// If `old` is `None`, then all `Updates`s are `Added`.
-
    ///
-
    /// Otherwise, they are computed to be:
-
    ///    - `Added` if the reference only exists in `new`.
-
    ///    - `Changed` if the reference exists in `old` and `new`,
-
    ///       and the `oid`s are different.
-
    ///    - `Deleted` if the reference only exists in `old`.
-
    ///    - `Same` if the reference exists in `old` and `new`, and
-
    ///      the `oid`s are the same.
-
    pub fn difference<S>(&self, repo: &S) -> Result<DiffedRefs, Error>
-
    where
-
        S: ReadRepository,
-
    {
-
        let new = self.load_new(repo)?;
-
        let mut refs = DiffedRefs(
-
            new.iter()
-
                .map(|(refname, oid)| (refname.clone(), Update::Added { oid: *oid }))
-
                .collect::<BTreeMap<_, _>>(),
-
        );
-

-
        match self.load_old(repo)? {
-
            None => Ok(refs),
-
            Some(old) => {
-
                for (refname, old) in old.sigrefs.iter() {
-
                    refs.entry(refname.clone())
-
                        .and_modify(|update| {
-
                            // N.b. we rely on the fact that is always the
-
                            // newly added Oid.
-
                            let new = update.oid();
-
                            if new == old {
-
                                *update = Update::Same { oid: *new };
-
                            } else {
-
                                *update = Update::Changed { oid: *update.oid() };
-
                            }
-
                        })
-
                        .or_insert(Update::Deleted { oid: *old });
-
                }
-
                Ok(refs)
-
            }
-
        }
-
    }
-

-
    fn load_old<S>(&self, repo: &S) -> Result<Option<SignedRefsAt>, Error>
-
    where
-
        S: ReadRepository,
-
    {
-
        self.old
-
            .map(|at| {
-
                RefsAt {
-
                    remote: self.remote,
-
                    at,
-
                }
-
                .load(repo)
-
            })
-
            .transpose()
-
    }
-

-
    fn load_new<S>(&self, repo: &S) -> Result<SignedRefsAt, Error>
-
    where
-
        S: ReadRepository,
-
    {
-
        RefsAt::from(self).load(repo)
-
    }
-
}
-

-
impl From<&SignedRefsUpdate> for RefsAt {
-
    fn from(up: &SignedRefsUpdate) -> Self {
-
        Self {
-
            remote: up.remote,
-
            at: up.new,
-
        }
-
    }
-
}
-

pub mod canonical {
    use super::*;