Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Use refs cache to decide on fetching
cloudhead committed 2 years ago
commit 256c620d31fae417f95f9a3aca4539072c6e1971
parent 345fa57b6429b31dedcf337c4b1ca21bd84ecdd5
18 files changed +472 -582
modified radicle-fetch/src/lib.rs
@@ -20,7 +20,7 @@ pub use state::{FetchLimit, FetchResult};
pub use transport::Transport;

use radicle::crypto::PublicKey;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::ReadRepository as _;
use state::FetchState;
use thiserror::Error;
@@ -54,7 +54,7 @@ pub fn pull<S>(
    handle: &mut Handle<S>,
    limit: FetchLimit,
    remote: PublicKey,
-
    refs_at: Option<Vec<SignedRefsUpdate>>,
+
    refs_at: Option<Vec<RefsAt>>,
) -> Result<FetchResult, Error>
where
    S: transport::ConnectionStream,
modified radicle-fetch/src/sigrefs.rs
@@ -1,10 +1,8 @@
use std::collections::{BTreeMap, BTreeSet};
-
use std::ops::{Deref, DerefMut, Not as _};
+
use std::ops::{Deref, Not as _};

-
pub use radicle::storage::refs::{DiffedRefs, SignedRefsAt};
+
pub use radicle::storage::refs::SignedRefsAt;
pub use radicle::storage::{git::Validation, Validations};
-

-
use radicle::storage::refs;
use radicle::{crypto::PublicKey, storage::ValidateRepository};

use crate::state::Cached;
@@ -152,67 +150,6 @@ impl<'a> IntoIterator for &'a RemoteRefs {
    }
}

-
/// A set of [`DiffedRefs`] per remote `PublicKey`.
-
///
-
/// To construct use [`DiffedRefs::load`].
-
#[derive(Clone, Debug, Default)]
-
pub struct RemoteDiffedRefs(BTreeMap<PublicKey, DiffedRefs>);
-

-
impl RemoteDiffedRefs {
-
    /// Given a set of [`refs::RefsUpdate`]s, compute its
-
    /// [`DiffedRefs`] and use its [`refs::RefsUpdate::remote`] as the
-
    /// key for the `RemoteDiffedRefs` entry.
-
    ///
-
    /// If the `remote` is in the `may` set, then it is allowed to
-
    /// fail and will not be inserted in the set iff it does fail to
-
    /// load.
-
    ///
-
    /// If the `remote` is in the `must` set, then this method will
-
    /// fail iff loading the `DiffedRefs` fails.
-
    pub(crate) fn load<S>(
-
        cached: &Cached<S>,
-
        updates: Vec<refs::SignedRefsUpdate>,
-
        Select { must, may }: Select,
-
    ) -> Result<Self, error::Load> {
-
        updates
-
            .into_iter()
-
            .try_fold(Self::default(), |mut refs, update| {
-
                match cached.load_diffed_refs(&update) {
-
                    Ok(diff) => {
-
                        refs.insert(update.remote, diff);
-
                        Ok(refs)
-
                    }
-
                    Err(e) if must.contains(&update.remote) => Err(e),
-
                    Err(_) if may.contains(&update.remote) => Ok(refs),
-
                    Err(e) => Err(e),
-
                }
-
            })
-
    }
-
}
-

-
impl Deref for RemoteDiffedRefs {
-
    type Target = BTreeMap<PublicKey, DiffedRefs>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl DerefMut for RemoteDiffedRefs {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.0
-
    }
-
}
-

-
impl<'a> IntoIterator for &'a RemoteDiffedRefs {
-
    type Item = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::Item;
-
    type IntoIter = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::IntoIter;
-

-
    fn into_iter(self) -> Self::IntoIter {
-
        self.0.iter()
-
    }
-
}
-

pub struct Select<'a> {
    pub must: &'a BTreeSet<PublicKey>,
    pub may: &'a BTreeSet<PublicKey>,
modified radicle-fetch/src/stage.rs
@@ -15,18 +15,17 @@
//!      `rad/sigrefs`, for each configured namespace, i.e. followed
//!      and delegate peers if the scope is "followed" and all peers is the
//!      scope is all.
-
//!   3. [`DataRefs`]/[`DiffedRefs`]: fetches the `Oid`s for each
-
//!      reference listed in the `rad/sigrefs` for each fetched peer
-
//!      in the [`SpecialRefs`] stage. Additionally, any references
-
//!      that have been removed from `rad/sigrefs` are marked for
-
//!      deletion.
+
//!   3. [`DataRefs`]: fetches the `Oid`s for each reference listed in
+
//!      the `rad/sigrefs` for each fetched peer in the
+
//!      [`SpecialRefs`] stage. Additionally, any references that have
+
//!      been removed from `rad/sigrefs` are marked for deletion.
//!
//! ### Pull
//!
//! A `pull` is split into two stages:
//!
//!   1. [`SpecialRefs`]: see above.
-
//!   2. [`DataRefs`]/[`DiffedRefs`]: see above.
+
//!   2. [`DataRefs`]: see above.

use std::collections::{BTreeMap, BTreeSet, HashSet};

@@ -381,7 +380,7 @@ impl ProtocolStage for SigrefsAt {
}

/// The [`ProtocolStage`] for fetching data refs from the set of
-
/// `remotes`.
+
/// remotes in `trusted`.
///
/// All refs that are listed in the `remotes` sigrefs are checked
/// against our refdb/odb to build a set of `wants` and `haves`. The
@@ -494,112 +493,6 @@ impl ProtocolStage for DataRefs {
    }
}

-
/// The [`ProtocolStage`] that is similar to [`DataRefs`], however it
-
/// is aware that it is an update of `rad/sigrefs`. This means that it
-
/// will only compute `wants` and `haves` based on any modified
-
/// `Oid`s.
-
///
-
/// All refs and objects are prepared for updating as per usual, since
-
/// we keep track of in-memory references for validation.
-
#[derive(Debug)]
-
pub struct DiffedRefs {
-
    /// The node that is being fetched from.
-
    pub remote: PublicKey,
-
    /// The set of signed references from each remote that was
-
    /// fetched.
-
    pub remotes: sigrefs::RemoteDiffedRefs,
-
    /// The data limit for this stage of fetching.
-
    pub limit: u64,
-
}
-

-
impl ProtocolStage for DiffedRefs {
-
    fn ls_refs(&self) -> Option<NonEmpty<BString>> {
-
        None
-
    }
-

-
    fn ref_filter(&self, _r: Ref) -> Option<ReceivedRef> {
-
        None
-
    }
-

-
    fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
-
        Ok(())
-
    }
-

-
    fn wants_haves(
-
        &self,
-
        refdb: &Repository,
-
        _refs: &[ReceivedRef],
-
    ) -> Result<WantsHaves, error::WantsHaves> {
-
        let mut wants_haves = WantsHaves::default();
-

-
        for (remote, refs) in &self.remotes {
-
            wants_haves.add(
-
                refdb,
-
                refs.iter().filter_map(|(refname, up)| {
-
                    let refname = Qualified::from_refstr(refname)
-
                        .map(|refname| refname.with_namespace(Component::from(remote)))?;
-
                    let tip = up.modified()?;
-
                    Some((refname, *tip))
-
                }),
-
            )?;
-
        }
-

-
        Ok(wants_haves)
-
    }
-

-
    fn prepare_updates<'a>(
-
        &self,
-
        _s: &FetchState,
-
        _repo: &Repository,
-
        _refs: &'a [ReceivedRef],
-
    ) -> Result<Updates<'a>, error::Prepare> {
-
        use radicle::storage::refs::Update::{Added, Changed, Deleted, Same};
-

-
        let mut updates = Updates::default();
-
        let prefix_rad = refname!("refs/rad");
-

-
        for (remote, refs) in &self.remotes {
-
            for (name, up) in refs.iter() {
-
                let is_refs_rad = name.starts_with(prefix_rad.as_str());
-
                let tracking: Namespaced<'_> = Qualified::from_refstr(name)
-
                    .and_then(|q| refs::ReceivedRefname::remote(*remote, q).to_namespaced())
-
                    .expect("we checked sigrefs well-formedness in wants_refs already");
-
                match up {
-
                    Added { oid } | Changed { oid } => updates.add(
-
                        *remote,
-
                        Update::Direct {
-
                            name: tracking,
-
                            target: *oid,
-
                            no_ff: Policy::Allow,
-
                        },
-
                    ),
-
                    Deleted { oid } if !is_refs_rad => updates.add(
-
                        *remote,
-
                        Update::Prune {
-
                            name: tracking,
-
                            prev: either::Left(*oid),
-
                        },
-
                    ),
-
                    // N.b. create an update for this reference so
-
                    // that the in-memory refdb is updated.
-
                    Same { oid } => updates.add(
-
                        *remote,
-
                        Update::Direct {
-
                            name: tracking,
-
                            target: *oid,
-
                            no_ff: Policy::Allow,
-
                        },
-
                    ),
-
                    // N.b. `refs/rad` is not subject to pruning.
-
                    Deleted { .. } => continue,
-
                }
-
            }
-
        }
-

-
        Ok(updates)
-
    }
-
}
-

// N.b. the `delegates` are the delegates of the repository, with the
// potential removal of the local peer in the case of a `pull`.
fn special_refs_updates<'a>(
modified radicle-fetch/src/state.rs
@@ -8,7 +8,7 @@ use radicle::identity::{Doc, DocError};

use radicle::prelude::Verified;
use radicle::storage;
-
use radicle::storage::refs::{RefsAt, SignedRefs, SignedRefsUpdate};
+
use radicle::storage::refs::{RefsAt, SignedRefs};
use radicle::storage::{
    git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
};
@@ -273,23 +273,18 @@ impl FetchState {
        Ok(fetched)
    }

-
    /// Fetch the set of special refs, depending on `refs_updates`.
+
    /// Fetch the set of special refs, depending on `refs_at`.
    ///
-
    /// If `refs_updates` is `Some`, then run the [`SigrefsAt`] stage,
+
    /// If `refs_at` is `Some`, then run the [`SigrefsAt`] stage,
    /// which specifically fetches `rad/sigrefs` which are listed in
-
    /// `refs_at`. It then runs the [`DiffedRefs`] stage to fetch the
-
    /// difference between old references listed existing
-
    /// `rad/sigrefs` and the new `rad/sigrefs` fetched in the
-
    /// previous step.
+
    /// `refs_at`.
    ///
-
    /// If `refs_updates` is `None`, then run the [`SpecialRefs`]
-
    /// stage, which fetches `rad/sigrefs` and `rad/id` from all
-
    /// tracked and delegate peers (scope dependent). It then runs the
-
    /// [`DataRefs`] stage to fetch the references listed in the
-
    /// `rad/sigrefs` fetched in the previous step.
+
    /// If `refs_at` is `None`, then run the [`SpecialRefs`] stage,
+
    /// which fetches `rad/sigrefs` and `rad/id` from all tracked and
+
    /// delegate peers (scope dependent).
    ///
-
    /// The resulting `Vec<PublicKey>` will be the set of fetched
-
    /// remotes.
+
    /// The resulting [`sigrefs::RemoteRefs`] will be the set of
+
    /// `rad/sigrefs` of the fetched remotes.
    fn run_special_refs<S>(
        &mut self,
        handle: &mut Handle<S>,
@@ -297,19 +292,18 @@ impl FetchState {
        delegates: BTreeSet<PublicKey>,
        limit: &FetchLimit,
        remote: PublicKey,
-
        refs_updates: Option<Vec<SignedRefsUpdate>>,
-
    ) -> Result<Vec<PublicKey>, error::Protocol>
+
        refs_at: Option<Vec<RefsAt>>,
+
    ) -> Result<sigrefs::RemoteRefs, error::Protocol>
    where
        S: transport::ConnectionStream,
    {
-
        match refs_updates {
-
            Some(refs_updates) => {
-
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_updates
+
        match refs_at {
+
            Some(refs_at) => {
+
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_at
                    .iter()
                    .map(|refs_at| refs_at.remote)
                    .partition(|id| delegates.contains(id));

-
                let refs_at = refs_updates.iter().map(RefsAt::from).collect();
                let sigrefs_at = stage::SigrefsAt {
                    remote,
                    delegates,
@@ -320,22 +314,14 @@ impl FetchState {
                log::trace!(target: "fetch", "{sigrefs_at:?}");
                self.run_stage(handle, handshake, &sigrefs_at)?;

-
                let remotes = sigrefs::RemoteDiffedRefs::load(
+
                let signed_refs = sigrefs::RemoteRefs::load(
                    &self.as_cached(handle),
-
                    refs_updates,
                    sigrefs::Select {
                        must: &must,
                        may: &may,
                    },
                )?;
-
                let fetched = remotes.keys().copied().collect();
-
                let refs = stage::DiffedRefs {
-
                    remote,
-
                    remotes,
-
                    limit: limit.refs,
-
                };
-
                self.run_stage(handle, handshake, &refs)?;
-
                Ok(fetched)
+
                Ok(signed_refs)
            }
            None => {
                let followed = handle.allowed();
@@ -350,7 +336,7 @@ impl FetchState {
                log::trace!(target: "fetch", "{special_refs:?}");
                let fetched = self.run_stage(handle, handshake, &special_refs)?;

-
                let remotes = sigrefs::RemoteRefs::load(
+
                let signed_refs = sigrefs::RemoteRefs::load(
                    &self.as_cached(handle),
                    sigrefs::Select {
                        must: &delegates,
@@ -361,14 +347,7 @@ impl FetchState {
                            .collect(),
                    },
                )?;
-
                let fetched = remotes.keys().copied().collect();
-
                let refs = stage::DataRefs {
-
                    remote,
-
                    remotes,
-
                    limit: limit.refs,
-
                };
-
                self.run_stage(handle, handshake, &refs)?;
-
                Ok(fetched)
+
                Ok(signed_refs)
            }
        }
    }
@@ -395,7 +374,7 @@ impl FetchState {
        handshake: &handshake::Outcome,
        limit: FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Protocol>
    where
        S: transport::ConnectionStream,
@@ -434,7 +413,7 @@ impl FetchState {

        log::trace!(target: "fetch", "Identity delegates {delegates:?}");

-
        let fetched = self.run_special_refs(
+
        let signed_refs = self.run_special_refs(
            handle,
            handshake,
            delegates.clone(),
@@ -444,8 +423,21 @@ impl FetchState {
        )?;
        log::debug!(
            target: "fetch",
-
            "Fetched data refs for {} remote(s) ({}ms)",
-
            fetched.len(),
+
            "Fetched data for {} remote(s) ({}ms)",
+
            signed_refs.len(),
+
            start.elapsed().as_millis()
+
        );
+

+
        let data_refs = stage::DataRefs {
+
            remote,
+
            remotes: signed_refs,
+
            limit: limit.refs,
+
        };
+
        self.run_stage(handle, handshake, &data_refs)?;
+
        log::debug!(
+
            target: "fetch",
+
            "Fetched data refs for {} remotes ({}ms)",
+
            data_refs.remotes.len(),
            start.elapsed().as_millis()
        );

@@ -467,6 +459,7 @@ impl FetchState {
        // added to `warnings`.
        let mut warnings = sigrefs::Validations::default();
        let mut failures = sigrefs::Validations::default();
+
        let signed_refs = data_refs.remotes;

        // We may prune fetched remotes, so we keep track of
        // non-pruned, fetched remotes here.
@@ -474,7 +467,7 @@ impl FetchState {

        // TODO(finto): this might read better if it got its own
        // private function.
-
        for remote in &fetched {
+
        for remote in signed_refs.keys() {
            if handle.is_blocked(remote) {
                continue;
            }
@@ -644,13 +637,6 @@ impl<'a, S> Cached<'a, S> {
        }
    }

-
    pub fn load_diffed_refs(
-
        &self,
-
        update: &SignedRefsUpdate,
-
    ) -> Result<storage::refs::DiffedRefs, sigrefs::error::Load> {
-
        update.difference(&self.handle.repo)
-
    }
-

    #[allow(dead_code)]
    pub(crate) fn inspect(&self) {
        self.state.refs.inspect()
modified radicle-node/src/runtime.rs
@@ -157,7 +157,8 @@ impl Runtime {
        let policy = config.policy;

        log::info!(target: "node", "Opening node database..");
-
        let mut db: service::Stores<_> = home.database_mut()?.into();
+
        let db = home.database_mut()?;
+
        let mut stores: service::Stores<_> = db.clone().into();

        log::info!(target: "node", "Opening policy database..");
        let policies = home.policies_mut()?;
@@ -206,13 +207,13 @@ impl Runtime {
                .expect("Runtime::init: unable to solve proof-of-work puzzle")
        };

-
        if config.connect.is_empty() && db.addresses().is_empty()? {
+
        if config.connect.is_empty() && stores.addresses().is_empty()? {
            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");

            for (alias, addr) in config.network.bootstrap() {
                let (id, addr) = addr.into();

-
                db.addresses_mut().insert(
+
                stores.addresses_mut().insert(
                    &id,
                    radicle::node::Features::SEED,
                    alias,
@@ -221,14 +222,14 @@ impl Runtime {
                    [node::KnownAddress::new(addr, address::Source::Bootstrap)],
                )?;
            }
-
            log::info!(target: "node", "{} nodes added to address book", db.addresses().len()?);
+
            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
        }

        let emitter: Emitter<Event> = Default::default();
        let mut service = service::Service::new(
            config.clone(),
            clock,
-
            db,
+
            stores,
            storage.clone(),
            policies,
            signer.clone(),
@@ -264,6 +265,7 @@ impl Runtime {
            handle.clone(),
            notifications,
            cobs_cache,
+
            db,
            worker::Config {
                capacity: config.workers,
                storage: storage.clone(),
modified radicle-node/src/service.rs
@@ -26,11 +26,12 @@ use radicle::node::address;
use radicle::node::address::Store as _;
use radicle::node::address::{AddressBook, KnownAddress};
use radicle::node::config::PeerConfig;
+
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{ConnectOptions, Penalty, Severity};
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::{Inventory, RepositoryError};

use crate::crypto;
@@ -44,13 +45,12 @@ use crate::node::{
use crate::prelude::*;
use crate::runtime::Emitter;
use crate::service::gossip::Store as _;
-
use crate::service::message::{Announcement, AnnouncementMessage, Info, Ping};
-
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
+
use crate::service::message::{
+
    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
+
};
use crate::service::policy::{store::Write, Policy, Scope};
use crate::storage;
-
use crate::storage::refs::RefsAt;
-
use crate::storage::ReadRepository;
-
use crate::storage::{Namespaces, ReadStorage};
+
use crate::storage::{refs::RefsAt, Namespaces, ReadRepository, ReadStorage};
use crate::worker::fetch;
use crate::worker::FetchError;
use crate::Link;
@@ -64,7 +64,7 @@ pub use radicle::node::policy::config as policy;

use self::io::Outbox;
use self::limitter::RateLimiter;
-
use self::message::{InventoryAnnouncement, RefsStatus};
+
use self::message::InventoryAnnouncement;
use self::policy::NamespacesError;

/// How often to run the "idle" task.
@@ -159,7 +159,10 @@ pub enum Error {
}

/// A store for all node data.
-
pub trait Store: address::Store + gossip::Store + routing::Store + seed::Store {}
+
pub trait Store:
+
    address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
+
{
+
}

impl Store for node::Database {}

@@ -251,7 +254,7 @@ struct FetchState {
    /// Node we're fetching from.
    from: NodeId,
    /// What refs we're fetching.
-
    refs_at: Vec<SignedRefsUpdate>,
+
    refs_at: Vec<RefsAt>,
    /// Channels waiting for fetch results.
    subscribers: Vec<chan::Sender<FetchResult>>,
}
@@ -273,7 +276,7 @@ struct QueuedFetch {
    /// Peer being fetched from.
    from: NodeId,
    /// Refs being fetched.
-
    refs_at: Vec<SignedRefsUpdate>,
+
    refs_at: Vec<RefsAt>,
    /// Result channel.
    channel: Option<chan::Sender<FetchResult>>,
}
@@ -325,6 +328,16 @@ where
    pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
        &mut self.0
    }
+

+
    /// Get the database as a refs db.
+
    pub fn refs(&self) -> &impl node::refs::Store {
+
        &self.0
+
    }
+

+
    /// Get the database as a refs db, mutably.
+
    pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
+
        &mut self.0
+
    }
}

impl<D> From<D> for Stores<D> {
@@ -767,7 +780,7 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => match self.announce_refs(id, [self.node_id()]) {
+
            Command::AnnounceRefs(id, resp) => match self.announce_own_refs(id) {
                Ok(refs) => match refs.as_slice() {
                    &[refs] => {
                        resp.send(refs).ok();
@@ -809,7 +822,7 @@ where
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs: NonEmpty<SignedRefsUpdate>,
+
        refs: NonEmpty<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -831,7 +844,7 @@ where
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs_at: Vec<SignedRefsUpdate>,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -886,7 +899,7 @@ where
        &mut self,
        rid: RepoId,
        from: &NodeId,
-
        refs_at: Vec<SignedRefsUpdate>,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) -> Result<&mut FetchState, TryFetchError> {
        let from = *from;
@@ -1051,7 +1064,6 @@ where
                .seed_policy(&rid)
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

-
            let refs_at = refs_at.iter().map(RefsAt::from).collect();
            match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
                Ok(status) => {
                    if let Some(refs) = NonEmpty::from_vec(status.fresh) {
@@ -1398,8 +1410,12 @@ where
                        info!(target: "service", "Routing table updated for {} with seed {announcer}", message.rid);
                    }
                }
+
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
+
                    debug!(target: "service", "Skipping fetch, empty refs announcement for {}", message.rid);
+
                    return Ok(false);
+
                };

-
                // Update sync status for this repo.
+
                // Update sync status of announcer for this repo.
                if let Some(refs) = message.refs.iter().find(|r| &r.remote == self.nid()) {
                    match self.db.seeds_mut().synced(
                        &message.rid,
@@ -1430,71 +1446,29 @@ where
                let repo_entry = self.policies.seed_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo seeding configuration",
                );
-
                if repo_entry.policy == Policy::Allow {
-
                    let (fresh, stale) = match self.refs_status_of(
-
                        message.rid,
-
                        message.refs.clone().into(),
-
                        &repo_entry.scope,
-
                    ) {
-
                        Ok(RefsStatus { fresh, stale }) => (fresh, stale),
-
                        Err(e) => {
-
                            error!(target: "service", "Failed to check refs status: {e}");
-
                            return Ok(relay);
-
                        }
-
                    };
-

-
                    // Refs can be relayed by peers who don't have the data in storage,
-
                    // therefore we only check whether we are connected to the *announcer*,
-
                    // which is required by the protocol to only announce refs it has.
-
                    if let Some(remote) = self.sessions.get(announcer).cloned() {
-
                        // If the relayer is also the origin of the message, we inform it
-
                        // about any refs that are already in sync (stale).
-
                        if relayer == announcer {
-
                            // If the stale refs contain refs announced by the peer, let it know
-
                            // that we're already in sync.
-
                            if let Some(at) = stale
-
                                .iter()
-
                                .find(|refs| refs.remote == remote.id)
-
                                .copied()
-
                                .map(|RefsAt { at, .. }| at)
-
                            {
-
                                debug!(
-
                                    target: "service", "Refs of {} already synced for {} at {at}",
-
                                    remote.id,
-
                                    message.rid,
-
                                );
-
                                self.outbox.write(
-
                                    &remote,
-
                                    Info::RefsAlreadySynced {
-
                                        rid: message.rid,
-
                                        at,
-
                                    }
-
                                    .into(),
-
                                );
-
                            }
-
                        }
-
                        // Finally, if there's anything to fetch, we fetch it from the
-
                        // remote.
-
                        if let Some(fresh) = NonEmpty::from_vec(fresh) {
-
                            self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT, None);
-
                        } else {
-
                            debug!(target: "service", "Skipping fetch, all refs of {} are already in local storage", message.rid);
-
                        }
-
                    } else {
-
                        trace!(
-
                            target: "service",
-
                            "Skipping fetch of {}, no sessions connected to {announcer}",
-
                            message.rid
-
                        );
-
                    }
-
                    return Ok(relay);
-
                } else {
+
                if repo_entry.policy != Policy::Allow {
                    debug!(
                        target: "service",
                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded",
                        message.rid
                    );
+
                    return Ok(false);
                }
+
                // Refs can be relayed by peers who don't have the data in storage,
+
                // therefore we only check whether we are connected to the *announcer*,
+
                // which is required by the protocol to only announce refs it has.
+
                let Some(remote) = self.sessions.get(announcer).cloned() else {
+
                    trace!(
+
                        target: "service",
+
                        "Skipping fetch of {}, no sessions connected to {announcer}",
+
                        message.rid
+
                    );
+
                    return Ok(relay);
+
                };
+
                // Finally, if there's anything to fetch, we fetch it from the remote.
+
                self.fetch_refs_at(message.rid, remote.id, refs, FETCH_TIMEOUT, None);
+

+
                return Ok(relay);
            }
            AnnouncementMessage::Node(
                ann @ NodeAnnouncement {
@@ -1549,6 +1523,7 @@ where

    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
        match info {
+
            // Nb. We don't currently send this message.
            Info::RefsAlreadySynced { rid, at } => {
                debug!(target: "service", "Refs already synced for {rid} by {remote}");
                self.emitter.emit(Event::RefsSynced {
@@ -1562,39 +1537,6 @@ where
        Ok(())
    }

-
    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
-
    fn refs_status_of(
-
        &self,
-
        rid: RepoId,
-
        refs: Vec<RefsAt>,
-
        scope: &policy::Scope,
-
    ) -> Result<RefsStatus, Error> {
-
        let mut refs = RefsStatus::new(rid, refs, &self.storage)?;
-

-
        // First, check the freshness.
-
        if refs.fresh.is_empty() {
-
            return Ok(refs);
-
        }
-

-
        // Second, check the scope.
-
        match scope {
-
            policy::Scope::All => Ok(refs),
-
            policy::Scope::Followed => {
-
                match self.policies.namespaces_for(&self.storage, &rid) {
-
                    Ok(Namespaces::All) => Ok(refs),
-
                    Ok(Namespaces::Followed(mut followed)) => {
-
                        // Get the set of followed nodes except self.
-
                        followed.remove(self.nid());
-
                        refs.fresh.retain(|r| followed.contains(&r.remote));
-

-
                        Ok(refs)
-
                    }
-
                    Err(e) => Err(e.into()),
-
                }
-
            }
-
        }
-
    }
-

    pub fn handle_message(
        &mut self,
        remote: &NodeId,
@@ -1720,6 +1662,37 @@ where
        Ok(())
    }

+
    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
+
    fn refs_status_of(
+
        &self,
+
        rid: RepoId,
+
        refs: Vec<RefsAt>,
+
        scope: &policy::Scope,
+
    ) -> Result<RefsStatus, Error> {
+
        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
+
        // First, check the freshness.
+
        if refs.fresh.is_empty() {
+
            return Ok(refs);
+
        }
+
        // Second, check the scope.
+
        match scope {
+
            policy::Scope::All => Ok(refs),
+
            policy::Scope::Followed => {
+
                match self.policies.namespaces_for(&self.storage, &rid) {
+
                    Ok(Namespaces::All) => Ok(refs),
+
                    Ok(Namespaces::Followed(mut followed)) => {
+
                        // Get the set of followed nodes except self.
+
                        followed.remove(self.nid());
+
                        refs.fresh.retain(|r| followed.contains(&r.remote));
+

+
                        Ok(refs)
+
                    }
+
                    Err(e) => Err(e.into()),
+
                }
+
            }
+
        }
+
    }
+

    /// Set of initial messages to send to a peer.
    fn initial(&self, _link: Link) -> Vec<Message> {
        let filter = self.filter();
@@ -1858,7 +1831,32 @@ where
        Ok((msg.signed(&self.signer), refs.into()))
    }

-
    /// Announce local refs for given id.
+
    /// Announce our own refs for the given repo.
+
    fn announce_own_refs(&mut self, rid: RepoId) -> Result<Vec<RefsAt>, Error> {
+
        let refs = self.announce_refs(rid, [self.node_id()])?;
+

+
        // Update refs database with our signed refs branches.
+
        // This isn't strictly necessary for now, as we only use the database for fetches, and
+
        // we don't fetch our own refs that are announced, but it's for good measure.
+
        if let &[r] = refs.as_slice() {
+
            let now = self.local_time();
+

+
            if let Err(e) =
+
                self.database_mut()
+
                    .refs_mut()
+
                    .set(&rid, &r.remote, &SIGREFS_BRANCH, r.at, now)
+
            {
+
                error!(
+
                    target: "service",
+
                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
+
                    r.remote
+
                );
+
            }
+
        }
+
        Ok(refs)
+
    }
+

+
    /// Announce local refs for given repo.
    fn announce_refs(
        &mut self,
        rid: RepoId,
@@ -1882,6 +1880,7 @@ where
                error!(target: "service", "Error updating sync status for local node: {e}");
            }
        }
+

        self.outbox.announce(
            ann,
            peers.filter(|p| {
@@ -1890,7 +1889,6 @@ where
            }),
            self.db.gossip_mut(),
        );
-

        Ok(refs)
    }

modified radicle-node/src/service/io.rs
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::time;

use log::*;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;

use crate::prelude::*;
use crate::service::session::Session;
@@ -30,7 +30,7 @@ pub enum Io {
        /// Namespaces being fetched.
        namespaces: Namespaces,
        /// If the node is fetching specific `rad/sigrefs`.
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -126,7 +126,7 @@ impl Outbox {
        remote: &mut Session,
        rid: RepoId,
        namespaces: Namespaces,
-
        refs_at: Vec<SignedRefsUpdate>,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
        remote.fetching(rid);
modified radicle-node/src/service/message.rs
@@ -1,8 +1,7 @@
use std::{fmt, io, mem};

use radicle::git;
-
use radicle::storage::refs::{RefsAt, SignedRefsUpdate};
-
use radicle::storage::ReadRepository;
+
use radicle::storage::refs::RefsAt;

use crate::crypto;
use crate::identity::RepoId;
@@ -12,7 +11,6 @@ use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{Link, NodeId, Timestamp};
use crate::storage;
-
use crate::storage::ReadStorage;
use crate::wire;

/// Maximum number of addresses which can be announced to other nodes.
@@ -170,7 +168,7 @@ pub struct RefsAnnouncement {
pub struct RefsStatus {
    /// The `rad/sigrefs` was missing or it's ahead of the local
    /// `rad/sigrefs`.
-
    pub fresh: Vec<SignedRefsUpdate>,
+
    pub fresh: Vec<RefsAt>,
    /// The `rad/sigrefs` has been seen before.
    pub stale: Vec<RefsAt>,
}
@@ -178,86 +176,49 @@ pub struct RefsStatus {
impl RefsStatus {
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
    /// announcement.
-
    pub fn new<S: ReadStorage>(
+
    ///
+
    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
+
    /// for ancestry matches, since we don't cache the whole history (only the tips).
+
    /// This, however, is not a problem because the signed refs branch is fast-forward only,
+
    /// and old refs announcements will be discarded due to their lower timestamps.
+
    pub fn new<D: node::refs::Store>(
        rid: RepoId,
        refs: Vec<RefsAt>,
-
        storage: S,
+
        db: &D,
    ) -> Result<RefsStatus, storage::Error> {
-
        let repo = match storage.repository(rid) {
-
            // If the repo doesn't exist, we consider this
-
            // announcement "fresh", since we obviously don't
-
            // have the refs.
-
            Err(e) if e.is_not_found() => {
-
                let fresh = refs
-
                    .into_iter()
-
                    .map(|RefsAt { remote, at }| SignedRefsUpdate {
-
                        remote,
-
                        old: None,
-
                        new: at,
-
                    })
-
                    .collect();
-
                return Ok(RefsStatus {
-
                    fresh,
-
                    stale: Vec::new(),
-
                });
-
            }
-
            Err(e) => return Err(e),
-
            Ok(r) => r,
-
        };
-

        let mut status = RefsStatus::default();
        for theirs in refs.iter() {
-
            status.insert(*theirs, &repo)?;
+
            status.insert(&rid, *theirs, db)?;
        }
        Ok(status)
    }

-
    fn insert<S: ReadRepository>(
+
    fn insert<D: node::refs::Store>(
        &mut self,
+
        repo: &RepoId,
        theirs: RefsAt,
-
        repo: &S,
+
        db: &D,
    ) -> Result<(), storage::Error> {
-
        match RefsAt::new(repo, theirs.remote) {
-
            Ok(ours) => {
-
                if Self::is_fresh(repo, theirs.at, ours.at)? {
-
                    self.fresh.push(SignedRefsUpdate {
-
                        remote: theirs.remote,
-
                        old: Some(ours.at),
-
                        new: theirs.at,
-
                    });
+
        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
+
            Ok(Some((ours, _))) => {
+
                if theirs.at != ours {
+
                    self.fresh.push(theirs);
                } else {
                    self.stale.push(theirs);
                }
            }
-
            Err(e) if git::is_not_found_err(&e) => self.fresh.push(SignedRefsUpdate {
-
                remote: theirs.remote,
-
                old: None,
-
                new: theirs.at,
-
            }),
+
            Ok(None) => {
+
                self.fresh.push(theirs);
+
            }
            Err(e) => {
                log::warn!(
                    target: "service",
-
                    "failed to load 'refs/namespaces/{}/rad/sigrefs': {e}", theirs.remote
-
                )
+
                    "Error getting cached ref of {repo} for refs status: {e}"
+
                );
            }
        }
        Ok(())
    }
-

-
    /// If `theirs` is not the same as `ours` and we have not seen
-
    /// `theirs` before, i.e. it's not a previous `rad/sigrefs`, then
-
    /// we can consider `theirs` a fresh update.
-
    fn is_fresh<S: ReadRepository>(
-
        repo: &S,
-
        theirs: git::Oid,
-
        ours: git::Oid,
-
    ) -> Result<bool, git::ext::Error> {
-
        if repo.contains(theirs)? {
-
            Ok(theirs != ours && !repo.is_ancestor_of(theirs, ours)?)
-
        } else {
-
            Ok(true)
-
        }
-
    }
}

/// Node announcing its inventory to the network.
modified radicle-node/src/tests.rs
@@ -10,6 +10,7 @@ use crossbeam_channel as chan;
use netservices::Direction as Link;
use radicle::identity::Visibility;
use radicle::node::address::Store;
+
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
@@ -29,6 +30,7 @@ use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
use crate::storage::git::Storage;
+
use crate::storage::refs::SIGREFS_BRANCH;
use crate::storage::ReadStorage;
use crate::test::arbitrary;
use crate::test::assert_matches;
@@ -1431,10 +1433,16 @@ fn test_queued_fetch_from_ann_same_rid() {
        .join(git::refname!("refs/sigrefs"));

    // Finish the 1st fetch.
+
    // Ensure the ref is in the storage and cache.
    alice.storage_mut().repo_mut(&rid).remotes.insert(
        carol.id(),
        carol.signed_refs_at(arbitrary::gen::<Refs>(1), oid),
    );
+
    alice
+
        .database_mut()
+
        .refs_mut()
+
        .set(&rid, &carol.id, &SIGREFS_BRANCH, oid, LocalTime::now())
+
        .unwrap();
    alice.fetched(
        rid,
        bob.id,
modified radicle-node/src/worker.rs
@@ -13,7 +13,7 @@ use crossbeam_channel as chan;
use radicle::identity::RepoId;
use radicle::node::notifications;
use radicle::prelude::NodeId;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{cob, crypto, Storage};
use radicle_fetch::FetchLimit;
@@ -106,7 +106,7 @@ pub enum FetchRequest {
        /// Remote peer we are interacting with.
        remote: NodeId,
        /// If this fetch is for a particular set of `rad/sigrefs`.
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -180,6 +180,7 @@ struct Worker {
    policies: policy::Config<policy::store::Read>,
    notifications: notifications::StoreWriter,
    cache: cob::cache::StoreWriter,
+
    db: radicle::node::Database,
}

impl Worker {
@@ -200,13 +201,7 @@ impl Worker {
        } = task;
        let remote = fetch.remote();
        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
-
        let result = self._process(
-
            fetch,
-
            stream,
-
            channels,
-
            self.notifications.clone(),
-
            self.cache.clone(),
-
        );
+
        let result = self._process(fetch, stream, channels, self.notifications.clone());

        log::trace!(target: "worker", "Sending response back to service..");

@@ -229,7 +224,6 @@ impl Worker {
        stream: StreamId,
        mut channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
-
        cache: cob::cache::StoreWriter,
    ) -> FetchResult {
        match fetch {
            FetchRequest::Initiator {
@@ -240,7 +234,7 @@ impl Worker {
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
-
                let result = self.fetch(rid, remote, refs_at, channels, notifs, cache);
+
                let result = self.fetch(rid, remote, refs_at, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
@@ -294,10 +288,9 @@ impl Worker {
        &mut self,
        rid: RepoId,
        remote: NodeId,
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
        channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
-
        mut cache: cob::cache::StoreWriter,
    ) -> Result<fetch::FetchResult, FetchError> {
        let FetchConfig {
            limit,
@@ -309,6 +302,7 @@ impl Worker {
        let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
        let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;

+
        let mut cache = self.cache.clone();
        let handle = fetch::Handle::new(
            rid,
            *local,
@@ -318,7 +312,15 @@ impl Worker {
            channels,
            notifs,
        )?;
-
        let result = handle.fetch(rid, &self.storage, &mut cache, *limit, remote, refs_at)?;
+
        let result = handle.fetch(
+
            rid,
+
            &self.storage,
+
            &mut cache,
+
            &mut self.db,
+
            *limit,
+
            remote,
+
            refs_at,
+
        )?;

        if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
            // N.b. ensure that `git gc` works in debug mode.
@@ -343,6 +345,7 @@ impl Pool {
        handle: Handle,
        notifications: notifications::StoreWriter,
        cache: cob::cache::StoreWriter,
+
        db: radicle::node::Database,
        config: Config,
    ) -> Result<Self, policy::Error> {
        let mut pool = Vec::with_capacity(config.capacity);
@@ -361,6 +364,7 @@ impl Pool {
                policies,
                notifications: notifications.clone(),
                cache: cache.clone(),
+
                db: db.clone(),
            };
            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());

modified radicle-node/src/worker/fetch.rs
@@ -7,7 +7,7 @@ use localtime::LocalTime;

use radicle::crypto::PublicKey;
use radicle::prelude::RepoId;
-
use radicle::storage::refs::SignedRefsUpdate;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{
    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, WriteRepository as _,
};
@@ -62,14 +62,15 @@ impl Handle {
        }
    }

-
    pub fn fetch(
+
    pub fn fetch<D: node::refs::Store>(
        self,
        rid: RepoId,
        storage: &Storage,
        cache: &mut cob::cache::StoreWriter,
+
        refsdb: &mut D,
        limit: FetchLimit,
        remote: PublicKey,
-
        refs_at: Option<Vec<SignedRefsUpdate>>,
+
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Fetch> {
        let (result, clone, notifs) = match self {
            Self::Clone { mut handle, tmp } => {
@@ -123,6 +124,7 @@ impl Handle {
                }

                cache_cobs(&rid, &applied.updated, &repo, cache)?;
+
                cache_refs(&rid, &applied.updated, refsdb)?;

                Ok(FetchResult {
                    updated: applied.updated,
@@ -203,6 +205,45 @@ fn notify(
    Ok(())
}

+
/// Cache certain ref updates in our database.
+
fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
+
where
+
    D: node::refs::Store,
+
{
+
    let time = LocalTime::now();
+

+
    for r in refs {
+
        let name = r.name();
+
        let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
+
            Err(e) => {
+
                log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
+
                log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
+
                break;
+
            }
+
            Ok((n, q)) => (n, q),
+
        };
+
        if qualified != *git::refs::storage::SIGREFS_BRANCH {
+
            // Only cache `rad/sigrefs`.
+
            continue;
+
        }
+
        log::trace!(target: "node", "Updating cache for {name} in {repo}");
+

+
        let result = match r {
+
            RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
+
            RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
+
            RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
+
            RefUpdate::Skipped { .. } => continue,
+
        };
+

+
        if let Err(e) = result {
+
            log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
+
            log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
+
            break;
+
        }
+
    }
+
    Ok(())
+
}
+

/// Write new `RefUpdate`s that are related a `Patch` or an `Issue`
/// COB to the COB cache.
fn cache_cobs<S, C>(
modified radicle-node/src/worker/fetch/error.rs
@@ -17,6 +17,8 @@ pub enum Fetch {
    StorageCopy(#[from] io::Error),
    #[error(transparent)]
    Repository(#[from] radicle::storage::RepositoryError),
+
    #[error(transparent)]
+
    RefsDb(#[from] radicle::node::refs::Error),
    #[error("validation of storage repository failed")]
    Validation,
    #[error(transparent)]
modified radicle/src/node.rs
@@ -8,6 +8,7 @@ pub mod db;
pub mod events;
pub mod notifications;
pub mod policy;
+
pub mod refs;
pub mod routing;
pub mod seed;

modified radicle/src/node/db.rs
@@ -8,6 +8,7 @@
//! The database schema is contained within the first migration. See [`version`], [`bump`] and
//! [`migrate`] for how this works.
use std::path::Path;
+
use std::sync::Arc;
use std::{fmt, time};

use sqlite as sql;
@@ -38,8 +39,9 @@ pub enum Error {
}

/// A file-backed database storing information about the network.
+
#[derive(Clone)]
pub struct Database {
-
    pub db: sql::ConnectionThreadSafe,
+
    pub db: Arc<sql::ConnectionThreadSafe>,
}

impl fmt::Debug for Database {
@@ -50,7 +52,7 @@ impl fmt::Debug for Database {

impl From<sql::ConnectionThreadSafe> for Database {
    fn from(db: sql::ConnectionThreadSafe) -> Self {
-
        Self { db }
+
        Self { db: Arc::new(db) }
    }
}

@@ -65,7 +67,7 @@ impl Database {
        db.execute(Self::PRAGMA)?;
        migrate(&db)?;

-
        Ok(Self { db })
+
        Ok(Self { db: Arc::new(db) })
    }

    /// Same as [`Self::open`], but in read-only mode. This is useful to have multiple
@@ -78,7 +80,7 @@ impl Database {
        db.set_busy_timeout(DB_READ_TIMEOUT.as_millis() as usize)?;
        db.execute(Self::PRAGMA)?;

-
        Ok(Self { db })
+
        Ok(Self { db: Arc::new(db) })
    }

    /// Create a new in-memory database.
@@ -87,7 +89,7 @@ impl Database {
        db.execute(Self::PRAGMA)?;
        migrate(&db)?;

-
        Ok(Self { db })
+
        Ok(Self { db: Arc::new(db) })
    }

    /// Get the database version. This is updated on schema changes.
modified radicle/src/node/db/schema.sql
@@ -96,3 +96,24 @@ create table if not exists "repo-sync-status" (
  unique ("repo", "node")
  --
) strict;
+

+
-- Git refs cache.
+
create table if not exists "refs" (
+
  -- Repository ID.
+
  "repo"                 text      not null,
+
  -- Ref namespace (NID).
+
  --
+
  -- Nb. We don't use a foreign key constraint because we can't guarantee
+
  -- that we'll have received a node announcement from this node.
+
  "namespace"            text      not null,
+
  -- Ref name (qualified).
+
  "ref"                  text      not null,
+
  -- Ref OID.
+
  "oid"                  text      not null,
+
  -- When this entry was created or updated.
+
  "timestamp"            integer   not null,
+
  --
+
  unique ("repo", "namespace", "ref")
+
  --
+
) strict;
+

added radicle/src/node/refs.rs
@@ -0,0 +1,2 @@
+
pub mod store;
+
pub use store::{Error, Store};
added radicle/src/node/refs/store.rs
@@ -0,0 +1,182 @@
+
#![allow(clippy::type_complexity)]
+
use std::num::TryFromIntError;
+
use std::str::FromStr;
+

+
use localtime::LocalTime;
+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::git::{Oid, Qualified};
+
use crate::node::Database;
+
use crate::node::NodeId;
+
use crate::prelude::RepoId;
+

+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
    /// Timestamp error.
+
    #[error("invalid timestamp: {0}")]
+
    Timestamp(#[from] TryFromIntError),
+
}
+

+
/// Refs store.
+
///
+
/// Used to cache git references.
+
pub trait Store {
+
    fn set(
+
        &mut self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
        oid: Oid,
+
        timestamp: LocalTime,
+
    ) -> Result<bool, Error>;
+

+
    fn get(
+
        &self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<Option<(Oid, LocalTime)>, Error>;
+

+
    fn delete(&self, repo: &RepoId, namespace: &NodeId, refname: &Qualified)
+
        -> Result<bool, Error>;
+
}
+

+
impl Store for Database {
+
    fn set(
+
        &mut self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
        oid: Oid,
+
        timestamp: LocalTime,
+
    ) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `refs` (repo, namespace, ref, oid, timestamp)
+
             VALUES (?1, ?2, ?3, ?4, ?5)
+
             ON CONFLICT DO UPDATE
+
             SET oid = ?4, timestamp = ?5
+
             WHERE timestamp < ?5 AND oid <> ?4",
+
        )?;
+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, namespace))?;
+
        stmt.bind((3, refname.to_string().as_str()))?;
+
        stmt.bind((4, oid.to_string().as_str()))?;
+
        stmt.bind((5, i64::try_from(timestamp.as_millis())?))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn get(
+
        &self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<Option<(Oid, LocalTime)>, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT oid, timestamp FROM refs WHERE repo = ?1 AND namespace = ?2 AND ref = ?3",
+
        )?;
+

+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, namespace))?;
+
        stmt.bind((3, refname.to_string().as_str()))?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            let oid = row.try_read::<&str, _>("oid")?;
+
            let oid = Oid::from_str(oid).map_err(|e| {
+
                Error::Internal(sql::Error {
+
                    code: None,
+
                    message: Some(format!("sql: invalid oid '{oid}': {e}")),
+
                })
+
            })?;
+
            let timestamp = row.try_read::<i64, _>("timestamp")?;
+
            let timestamp = LocalTime::from_millis(timestamp as u128);
+

+
            Ok(Some((oid, timestamp)))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    fn delete(
+
        &self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<bool, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM refs WHERE repo = ?1 AND namespace = ?2 AND ref = ?3")?;
+

+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, namespace))?;
+
        stmt.bind((3, refname.to_string().as_str()))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use crate::git::qualified;
+
    use crate::test::arbitrary;
+
    use localtime::{LocalDuration, LocalTime};
+

+
    #[test]
+
    fn test_set_and_delete() {
+
        let mut db = Database::memory().unwrap();
+
        let oid = arbitrary::oid();
+

+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let namespace = arbitrary::gen::<NodeId>(1);
+
        let refname = qualified!("refs/heads/master");
+
        let timestamp = LocalTime::now();
+

+
        assert!(db.set(&repo, &namespace, &refname, oid, timestamp).unwrap());
+
        assert!(db.get(&repo, &namespace, &refname).unwrap().is_some());
+
        assert!(db.delete(&repo, &namespace, &refname).unwrap());
+
        assert!(db.get(&repo, &namespace, &refname).unwrap().is_none());
+
        assert!(!db.delete(&repo, &namespace, &refname).unwrap());
+
    }
+

+
    #[test]
+
    fn test_set_and_get() {
+
        let mut db = Database::memory().unwrap();
+
        let oid1 = arbitrary::oid();
+
        let oid2 = arbitrary::oid();
+

+
        assert_ne!(oid1, oid2);
+

+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let namespace = arbitrary::gen::<NodeId>(1);
+
        let refname = qualified!("refs/heads/master");
+
        let mut timestamp = LocalTime::now();
+

+
        assert_eq!(db.get(&repo, &namespace, &refname).unwrap(), None);
+
        assert!(db
+
            .set(&repo, &namespace, &refname, oid1, timestamp)
+
            .unwrap());
+
        assert_eq!(
+
            db.get(&repo, &namespace, &refname).unwrap(),
+
            Some((oid1, timestamp))
+
        );
+
        assert!(!db
+
            .set(&repo, &namespace, &refname, oid1, timestamp)
+
            .unwrap());
+
        timestamp.elapse(LocalDuration::from_millis(1));
+

+
        assert!(db
+
            .set(&repo, &namespace, &refname, oid2, timestamp)
+
            .unwrap());
+
        assert_eq!(
+
            db.get(&repo, &namespace, &refname).unwrap(),
+
            Some((oid2, timestamp))
+
        );
+
    }
+
}
modified radicle/src/storage/refs.rs
@@ -437,156 +437,6 @@ impl Deref for SignedRefsAt {
    }
}

-
/// A snapshot of a `rad/sigrefs` update.
-
///
-
/// If `old` is `None` then this is a newly seen `rad/sigrefs`,
-
/// otherwise it is an update from `old` to `new`.
-
///
-
/// The [`SignedRefsUpdate::difference`] method will compute the difference
-
/// between the two `Oid`s, returning [`DiffedRefs`].
-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-
pub struct SignedRefsUpdate {
-
    /// The remote namespace of the `rad/sigrefs`.
-
    pub remote: RemoteId,
-
    /// The commit SHA of the previously seen `rad/sigrefs`.
-
    pub old: Option<Oid>,
-
    /// The commit SHA of the newly seen `rad/sigrefs`.
-
    pub new: Oid,
-
}
-

-
/// A difference update between two references in a `rad/sigrefs`
-
/// payload.
-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-
pub enum Update {
-
    /// The reference was added and points to `oid`.
-
    Added { oid: Oid },
-
    /// The reference was changed to point to `oid`.
-
    Changed { oid: Oid },
-
    /// The reference was deleted and used to point to `oid`.
-
    Deleted { oid: Oid },
-
    /// The reference was not changed, and still points to `oid`.
-
    Same { oid: Oid },
-
}
-

-
impl Update {
-
    /// Return the `oid` of the update if it was `Added` or `Changed`.
-
    pub fn modified(&self) -> Option<&Oid> {
-
        match self {
-
            Update::Added { oid } => Some(oid),
-
            Update::Changed { oid } => Some(oid),
-
            Update::Deleted { .. } | Update::Same { .. } => None,
-
        }
-
    }
-

-
    /// Return the `oid` of the update.
-
    fn oid(&self) -> &Oid {
-
        match self {
-
            Update::Added { oid } => oid,
-
            Update::Changed { oid } => oid,
-
            Update::Deleted { oid } => oid,
-
            Update::Same { oid } => oid,
-
        }
-
    }
-
}
-

-
/// The set of [`Update`]s for a given [`git::RefString`].
-
///
-
/// To construct a `DiffedRefs` use [`SignedRefsUpdate::difference`].
-
#[derive(Clone, Debug, Default)]
-
pub struct DiffedRefs(BTreeMap<git::RefString, Update>);
-

-
impl Deref for DiffedRefs {
-
    type Target = BTreeMap<git::RefString, Update>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl DerefMut for DiffedRefs {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.0
-
    }
-
}
-

-
impl SignedRefsUpdate {
-
    /// Get the difference between [`SignedRefsUpdate::old`], if it exists,
-
    /// and [`SignedRefsUpdate::new`].
-
    ///
-
    /// If `old` is `None`, then all `Updates`s are `Added`.
-
    ///
-
    /// Otherwise, they are computed to be:
-
    ///    - `Added` if the reference only exists in `new`.
-
    ///    - `Changed` if the reference exists in `old` and `new`,
-
    ///       and the `oid`s are different.
-
    ///    - `Deleted` if the reference only exists in `old`.
-
    ///    - `Same` if the reference exists in `old` and `new`, and
-
    ///      the `oid`s are the same.
-
    pub fn difference<S>(&self, repo: &S) -> Result<DiffedRefs, Error>
-
    where
-
        S: ReadRepository,
-
    {
-
        let new = self.load_new(repo)?;
-
        let mut refs = DiffedRefs(
-
            new.iter()
-
                .map(|(refname, oid)| (refname.clone(), Update::Added { oid: *oid }))
-
                .collect::<BTreeMap<_, _>>(),
-
        );
-

-
        match self.load_old(repo)? {
-
            None => Ok(refs),
-
            Some(old) => {
-
                for (refname, old) in old.sigrefs.iter() {
-
                    refs.entry(refname.clone())
-
                        .and_modify(|update| {
-
                            // N.b. we rely on the fact that is always the
-
                            // newly added Oid.
-
                            let new = update.oid();
-
                            if new == old {
-
                                *update = Update::Same { oid: *new };
-
                            } else {
-
                                *update = Update::Changed { oid: *update.oid() };
-
                            }
-
                        })
-
                        .or_insert(Update::Deleted { oid: *old });
-
                }
-
                Ok(refs)
-
            }
-
        }
-
    }
-

-
    fn load_old<S>(&self, repo: &S) -> Result<Option<SignedRefsAt>, Error>
-
    where
-
        S: ReadRepository,
-
    {
-
        self.old
-
            .map(|at| {
-
                RefsAt {
-
                    remote: self.remote,
-
                    at,
-
                }
-
                .load(repo)
-
            })
-
            .transpose()
-
    }
-

-
    fn load_new<S>(&self, repo: &S) -> Result<SignedRefsAt, Error>
-
    where
-
        S: ReadRepository,
-
    {
-
        RefsAt::from(self).load(repo)
-
    }
-
}
-

-
impl From<&SignedRefsUpdate> for RefsAt {
-
    fn from(up: &SignedRefsUpdate) -> Self {
-
        Self {
-
            remote: up.remote,
-
            at: up.new,
-
        }
-
    }
-
}
-

pub mod canonical {
    use super::*;