Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Announce offline ref updates
cloudhead committed 2 years ago
commit 358324883c0aed8acc40e7b565663f89b236692d
parent b140d8580fb00e22e08fb5b9a755fcf13fe3cf74
11 files changed +319 -57
modified radicle-cli/examples/rad-sync.md
@@ -8,15 +8,18 @@ $ rad issue open --title "Test `rad sync`" --description "Check that the command
```

If we check the sync status, we see that our peers are out of sync:
+
Our own node is also out of sync, since we used `--no-announce`.
+
It isn't aware of the updates to the repo.

```
$ rad sync status
-
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
-
│ ●   NID                                                Address                Status        At        Timestamp │
-
├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
-
│ ●   z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z   eve.radicle.xyz:8776   out-of-sync   f209c9f   [  ...  ] │
-
│ ●   z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk   bob.radicle.xyz:8776   out-of-sync   f209c9f   [  ...  ] │
-
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+
╭───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+
│ ●   NID                                                Address                  Status        At        Timestamp │
+
├───────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
+
│ ●   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi   alice.radicle.xyz:8776   out-of-sync   f209c9f   [  ...  ] │
+
│ ●   z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk   bob.radicle.xyz:8776     out-of-sync   f209c9f   [  ...  ] │
+
│ ●   z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z   eve.radicle.xyz:8776     out-of-sync   f209c9f   [  ...  ] │
+
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

Now let's run `rad sync`. This will announce the issue refs to the network and
@@ -39,8 +42,8 @@ We can also use the `--fetch` option to only fetch objects:

```
$ rad sync --fetch
-
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkt67…v4N1tRk..
+
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetched repository from 2 seed(s)
```

@@ -48,8 +51,8 @@ Specifying both `--fetch` and `--announce` is equivalent to specifying none:

```
$ rad sync --fetch --announce
-
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkt67…v4N1tRk..
+
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetched repository from 2 seed(s)
✓ Nothing to announce, already in sync with network (see `rad sync status`)
```
@@ -66,7 +69,7 @@ And the `--replicas` flag to sync with a number of nodes:

```
$ rad sync --fetch --replicas 1
-
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
+
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkt67…v4N1tRk..
✓ Fetched repository from 1 seed(s)
```

@@ -74,10 +77,11 @@ We can check the sync status again to make sure everything's in sync:

```
$ rad sync status
-
╭────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
-
│ ●   NID                                                Address                Status   At        Timestamp │
-
├────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
-
│ ●   z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z   eve.radicle.xyz:8776   synced   9f615f9   [  ...  ] │
-
│ ●   z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk   bob.radicle.xyz:8776   synced   9f615f9   [  ...  ] │
-
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+
╭──────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+
│ ●   NID                                                Address                  Status   At        Timestamp │
+
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
+
│ ●   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi   alice.radicle.xyz:8776   synced   9f615f9   [  ...  ] │
+
│ ●   z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk   bob.radicle.xyz:8776     synced   9f615f9   [  ...  ] │
+
│ ●   z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z   eve.radicle.xyz:8776     synced   9f615f9   [  ...  ] │
+
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```
modified radicle-cli/src/commands/sync.rs
@@ -1,3 +1,4 @@
+
use std::cmp::Ordering;
use std::ffi::OsString;
use std::time;

@@ -272,7 +273,8 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

fn sync_status(rid: Id, node: &mut Node) -> anyhow::Result<()> {
    let mut table = Table::<6, term::Label>::new(TableOptions::bordered());
-
    let seeds: Vec<_> = node.seeds(rid)?.into();
+
    let mut seeds: Vec<_> = node.seeds(rid)?.into();
+
    let local = node.nid()?;

    table.push([
        term::format::dim(String::from("●")).into(),
@@ -284,6 +286,15 @@ fn sync_status(rid: Id, node: &mut Node) -> anyhow::Result<()> {
    ]);
    table.divider();

+
    // Always show our local node first.
+
    seeds.sort_by(|a, _| {
+
        if a.nid == local {
+
            Ordering::Less
+
        } else {
+
            Ordering::Equal
+
        }
+
    });
+

    for seed in seeds {
        let (icon, status, refs, time) = match seed.sync {
            Some(SyncStatus::Synced { at }) => (
@@ -350,15 +361,17 @@ fn announce_refs(
                }
            }
            RepoSync::Replicas(replicas) => {
-
                let count = synced.count();
-
                if count >= seeds.len() {
+
                let synced = synced.collect::<Vec<_>>();
+
                if synced.len() >= seeds.len() {
                    term::success!(
                        "Nothing to announce, already in sync with network (see `rad sync status`)"
                    );
                    return Ok(());
                }
-
                if count >= replicas {
-
                    term::success!("Nothing to announce, already in sync with {count} seed(s) (see `rad sync status`)");
+
                // Replicas not counting our local replica.
+
                let remotes = synced.iter().filter(|s| &s.nid != profile.id()).count();
+
                if remotes >= replicas {
+
                    term::success!("Nothing to announce, already in sync with {remotes} seed(s) (see `rad sync status`)");
                    return Ok(());
                }
            }
@@ -444,6 +457,7 @@ fn fetch_all(
    let seeds = node.seeds(rid)?;
    let mut results = FetchResults::default();
    let (connected, mut disconnected) = seeds.partition();
+
    let local = node.nid()?;

    // Fetch from connected seeds.
    for seed in connected.iter().take(count) {
@@ -456,6 +470,10 @@ fn fetch_all(
        let Some(seed) = disconnected.pop() else {
            break;
        };
+
        if seed.nid == local {
+
            // Skip our own node.
+
            continue;
+
        }
        // Try all seed addresses until one succeeds.
        for ka in seed.addrs {
            let spinner = term::spinner(format!(
modified radicle-node/src/service.rs
@@ -122,6 +122,8 @@ pub enum Error {
    #[error(transparent)]
    Storage(#[from] storage::Error),
    #[error(transparent)]
+
    Gossip(#[from] gossip::Error),
+
    #[error(transparent)]
    Refs(#[from] storage::refs::Error),
    #[error(transparent)]
    Routing(#[from] routing::Error),
@@ -440,6 +442,21 @@ where

        self.start_time = time;

+
        // Ensure that our local node is in our address database.
+
        self.addresses
+
            .insert(
+
                &self.node_id(),
+
                self.node.features,
+
                self.node.alias.clone(),
+
                self.node.work(),
+
                self.node.timestamp,
+
                self.node
+
                    .addresses
+
                    .iter()
+
                    .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
+
            )
+
            .expect("Service::initialize: error adding local node to address database");
+

        // Connect to configured peers.
        let addrs = self.config.connect.clone();
        for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
@@ -452,25 +469,45 @@ where
        self.routing
            .insert(&rids, self.node_id(), time.as_millis())?;

+
        let nid = self.node_id();
+
        let announced = self
+
            .addresses
+
            .seeded_by(&nid)?
+
            .collect::<Result<HashMap<_, _>, _>>()?;
        for rid in rids {
+
            let repo = self.storage.repository(rid)?;
+

            if !self.is_tracking(&rid)? {
                warn!(target: "service", "Local repository {rid} is not tracked");
            }
+
            // If we have no owned refs for this repo, then there's nothing to announce.
+
            let Ok(updated_at) = SyncedAt::load(&repo, nid) else {
+
                continue;
+
            };
+

+
            // Skip this repo if the sync status matches what we have in storage.
+
            if let Some(announced) = announced.get(&rid) {
+
                if updated_at.oid == announced.oid {
+
                    continue;
+
                }
+
            } else {
+
                debug!(target: "service", "Saving local sync status for {rid}..");
+
                // If we don't have a sync status for this repo, create one.
+
                self.addresses.synced(
+
                    &rid,
+
                    &nid,
+
                    updated_at.oid,
+
                    updated_at.timestamp.as_millis(),
+
                )?;
+
            }
+
            // If we got here, it likely means a repo was updated while the node was stopped.
+
            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
+
            // the historical gossip messages when a node connects and subscribes to this repo.
+
            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
+
                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
+
                self.gossip.announced(&nid, &ann)?;
+
            }
        }
-
        // Ensure that our local node is in our address database.
-
        self.addresses
-
            .insert(
-
                &self.node_id(),
-
                self.node.features,
-
                self.node.alias.clone(),
-
                self.node.work(),
-
                self.node.timestamp,
-
                self.node
-
                    .addresses
-
                    .iter()
-
                    .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
-
            )
-
            .expect("Service::initialize: error adding local node to address database");

        // Setup subscription filter for tracked repos.
        self.filter = Filter::new(
@@ -1049,7 +1086,7 @@ where
            }
        }

-
        // Discard announcement messages we've already seen, otherwise update out last seen time.
+
        // Discard announcement messages we've already seen, otherwise update our last seen time.
        match self.gossip.announced(announcer, announcement) {
            Ok(fresh) => {
                if !fresh {
@@ -1543,15 +1580,13 @@ where
        Ok(synced)
    }

-
    /// Announce local refs for given id.
-
    fn announce_refs(
-
        &mut self,
+
    /// Return a refs announcement including the given remotes.
+
    fn refs_announcement_for(
+
        &self,
        rid: Id,
        remotes: impl IntoIterator<Item = NodeId>,
-
    ) -> Result<Vec<RefsAt>, Error> {
+
    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
        let repo = self.storage.repository(rid)?;
-
        let doc = repo.identity_doc()?;
-
        let peers = self.sessions.connected().map(|(_, p)| p);
        let timestamp = self.time();
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

@@ -1573,17 +1608,40 @@ where
            refs: refs.clone(),
            timestamp,
        });
-
        let ann = msg.signed(&self.signer);
+
        Ok((msg.signed(&self.signer), refs.into()))
+
    }

-
        self.outbox.broadcast(
+
    /// Announce local refs for given id.
+
    fn announce_refs(
+
        &mut self,
+
        rid: Id,
+
        remotes: impl IntoIterator<Item = NodeId>,
+
    ) -> Result<Vec<RefsAt>, Error> {
+
        let repo = self.storage.repository(rid)?;
+
        let doc = repo.identity_doc()?;
+
        let peers = self.sessions.connected().map(|(_, p)| p);
+
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
+

+
        // Update our local sync status. This is useful for determining if refs were updated while
+
        // the node was stopped.
+
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
+
            if let Err(e) = self
+
                .addresses
+
                .synced(&rid, &ann.node, refs.at, ann.timestamp())
+
            {
+
                error!(target: "service", "Error updating sync status for local node: {e}");
+
            }
+
        }
+
        self.outbox.announce(
            ann,
            peers.filter(|p| {
                // Only announce to peers who are allowed to view this repo.
                doc.is_visible_to(&p.id)
            }),
+
            &mut self.gossip,
        );

-
        Ok(refs.into())
+
        Ok(refs)
    }

    fn sync_and_announce(&mut self) {
@@ -1714,10 +1772,13 @@ where
    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
        let time = self.time();
-
        let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
-
        for (_, sess) in self.sessions.connected() {
-
            self.outbox.write(sess, inv.clone());
-
        }
+
        let msg = AnnouncementMessage::from(gossip::inventory(time, inventory));
+

+
        self.outbox.announce(
+
            msg.signed(&self.signer),
+
            self.sessions.connected().map(|(_, p)| p),
+
            &mut self.gossip,
+
        );
        Ok(())
    }

modified radicle-node/src/service/gossip/store.rs
@@ -57,7 +57,7 @@ impl GossipStore {
        Ok(self.db.change_count())
    }

-
    /// Get the last announcement in the store, by timestamp.
+
    /// Get the timestamp of the last announcement in the store.
    pub fn last(&self) -> Result<Option<Timestamp>, Error> {
        let stmt = self
            .db
@@ -247,6 +247,7 @@ impl From<wire::Error> for sql::Error {
    }
}

+
/// Type of gossip message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum GossipType {
    Refs,
modified radicle-node/src/service/io.rs
@@ -9,6 +9,7 @@ use crate::service::session::Session;
use crate::service::Link;
use crate::storage::Namespaces;

+
use super::gossip;
use super::message::{Announcement, AnnouncementMessage};

/// I/O operation to execute at the network/wire level.
@@ -62,6 +63,32 @@ impl Outbox {
        self.io.push_back(Io::Write(remote.id, vec![msg]));
    }

+
    /// Announce something to a peer. This is meant for our own announcement messages.
+
    pub fn announce<'a>(
+
        &mut self,
+
        ann: Announcement,
+
        peers: impl Iterator<Item = &'a Session>,
+
        gossip: &mut gossip::Store,
+
    ) {
+
        // Store our announcement so that it can be retrieved from us later, just like
+
        // announcements we receive from peers.
+
        if let Err(e) = gossip.announced(&ann.node, &ann) {
+
            error!(target: "service", "Error updating our gossip store with announced message: {e}");
+
        }
+

+
        for peer in peers {
+
            if let AnnouncementMessage::Refs(refs) = &ann.message {
+
                if let Some(subscribe) = &peer.subscribe {
+
                    if subscribe.filter.contains(&refs.rid) {
+
                        self.write(peer, ann.clone().into());
+
                    }
+
                }
+
            } else {
+
                self.write(peer, ann.clone().into());
+
            }
+
        }
+
    }
+

    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();

@@ -123,7 +150,7 @@ impl Outbox {
                if let Some(subscribe) = &p.subscribe {
                    subscribe.filter.contains(&id)
                } else {
-
                    // If the peer did not send us a `subscribe` message, we don'the
+
                    // If the peer did not send us a `subscribe` message, we don't
                    // relay any messages to them.
                    false
                }
modified radicle-node/src/test/peer.rs
@@ -57,7 +57,7 @@ where
    G: Signer + 'static,
{
    fn init(&mut self) {
-
        self.initialize()
+
        self.initialize();
    }

    fn addr(&self) -> Address {
@@ -204,16 +204,29 @@ where
        }
    }

-
    pub fn initialize(&mut self) {
+
    pub fn initialize(&mut self) -> bool {
        if !self.initialized {
            info!(
+
                target: "test",
                "{}: Initializing: id = {}, address = {}",
                self.name, self.id, self.ip
            );

            self.initialized = true;
            self.service.initialize(LocalTime::now()).unwrap();
+
            return true;
        }
+
        false
+
    }
+

+
    pub fn restart(&mut self) {
+
        assert!(self.initialized);
+
        info!(
+
            target: "test",
+
            "{}: Restarting: id = {}, address = {}",
+
            self.name, self.id, self.ip
+
        );
+
        self.service.initialize(LocalTime::now()).unwrap();
    }

    pub fn address(&self) -> Address {
modified radicle-node/src/tests.rs
@@ -843,6 +843,99 @@ fn test_refs_announcement_no_subscribe() {
}

#[test]
+
fn test_refs_announcement_offline() {
+
    logger::init(log::Level::Debug);
+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = {
+
        let signer = MockSigner::default();
+
        let storage = fixtures::storage(tmp.path().join("alice"), &signer).unwrap();
+

+
        Peer::config(
+
            "alice",
+
            [7, 7, 7, 7],
+
            storage,
+
            peer::Config {
+
                signer,
+
                ..peer::Config::default()
+
            },
+
        )
+
    };
+
    let inv = alice.inventory();
+
    let rid = inv.first().unwrap();
+
    let mut bob = Peer::new("bob", [8, 8, 8, 8]);
+
    bob.track_repo(rid, tracking::Scope::All).unwrap();
+

+
    // Make sure alice's service wasn't initialized before.
+
    assert!(alice.initialize());
+

+
    alice.connect_to(&bob);
+
    alice.receive(bob.id, Message::Subscribe(Subscribe::all()));
+

+
    // Alice announces the refs of all projects since she hasn't announced refs for these projects
+
    // yet.
+
    let mut messages = alice.messages(bob.id());
+
    for i in &inv {
+
        let msg = messages.next();
+
        assert_matches!(
+
            msg,
+
            Some(Message::Announcement(Announcement {
+
                node,
+
                message: AnnouncementMessage::Refs(RefsAnnouncement {
+
                    rid,
+
                    ..
+
                }),
+
                ..
+
            }))
+
            if node == alice.id && rid == *i
+
        );
+
    }
+

+
    // Create an issue without telling the node.
+
    let repo = alice.storage().repository(*rid).unwrap();
+
    let old_refs = RefsAt::new(&repo, alice.id).unwrap();
+
    let mut issues = radicle::issue::Issues::open(&repo).unwrap();
+
    issues
+
        .create("Issue while offline!", "", &[], &[], [], alice.signer())
+
        .unwrap();
+
    let new_refs = RefsAt::new(&repo, alice.id).unwrap();
+
    assert_ne!(old_refs, new_refs);
+

+
    // Now we restart Alice's node. It should pick up that something's changed in storage.
+
    alice.elapse(LocalDuration::from_secs(60));
+
    alice.disconnected(bob.id, &DisconnectReason::Command);
+
    alice.outbox().for_each(drop);
+
    alice.restart();
+
    alice.connect_to(&bob);
+
    alice.receive(
+
        bob.id,
+
        Message::Subscribe(Subscribe {
+
            filter: Filter::default(),
+
            since: alice.timestamp(),
+
            until: Timestamp::MAX,
+
        }),
+
    );
+

+
    let anns = alice
+
        .messages(bob.id())
+
        .filter_map(|m| {
+
            if let Message::Announcement(Announcement {
+
                message: AnnouncementMessage::Refs(ann),
+
                ..
+
            }) = m
+
            {
+
                Some(ann)
+
            } else {
+
                None
+
            }
+
        })
+
        .collect::<Vec<_>>();
+

+
    assert_eq!(anns.len(), 1);
+
    assert_eq!(anns.first().unwrap().rid, *rid);
+
    assert_eq!(anns.first().unwrap().refs.first().unwrap().at, new_refs.at);
+
}
+

+
#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
modified radicle/src/node/address/schema.sql
@@ -46,6 +46,7 @@ create table if not exists "announcements" (
  -- Node ID.
  "node"               text      not null references "nodes" ("id") on delete cascade,
  -- Repo ID, if any, for example in ref announcements.
+
  -- For other announcement types, this should be an empty string.
  "repo"               text      not null,
  -- Announcement type.
  --
modified radicle/src/node/address/store.rs
@@ -1,3 +1,4 @@
+
#![allow(clippy::type_complexity)]
use std::path::Path;
use std::str::FromStr;
use std::{fmt, io};
@@ -14,7 +15,7 @@ use crate::prelude::{Id, Timestamp};
use crate::sql::transaction;

use super::types;
-
use super::AddressType;
+
use super::{AddressType, SyncedAt};

#[derive(Error, Debug)]
pub enum Error {
@@ -261,11 +262,39 @@ impl Store for Book {
            Ok(types::Seed {
                nid,
                addresses,
-
                synced_at: types::SyncedAt { oid, timestamp },
+
                synced_at: SyncedAt { oid, timestamp },
            })
        })))
    }

+
    fn seeded_by(
+
        &self,
+
        nid: &NodeId,
+
    ) -> Result<Box<dyn Iterator<Item = Result<(Id, SyncedAt), Error>> + '_>, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT repo, head, timestamp
+
             FROM `repo-sync-status`
+
             WHERE node = ?",
+
        )?;
+
        stmt.bind((1, nid))?;
+

+
        Ok(Box::new(stmt.into_iter().map(|row| {
+
            let row = row?;
+
            let rid = row.try_read::<Id, _>("repo")?;
+
            let oid = row.try_read::<&str, _>("head")?;
+
            let oid = Oid::from_str(oid).map_err(|e| {
+
                Error::Internal(sql::Error {
+
                    code: None,
+
                    message: Some(format!("sql: invalid oid '{oid}': {e}")),
+
                })
+
            })?;
+
            let timestamp = row.try_read::<i64, _>("timestamp")?;
+
            let timestamp = LocalTime::from_millis(timestamp as u128);
+

+
            Ok((rid, SyncedAt { oid, timestamp }))
+
        })))
+
    }
+

    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error> {
        let mut stmt = self
            .db
@@ -397,6 +426,11 @@ pub trait Store {
        &self,
        rid: &Id,
    ) -> Result<Box<dyn Iterator<Item = Result<types::Seed, Error>> + '_>, Error>;
+
    /// Get the repos seeded by the given node.
+
    fn seeded_by(
+
        &self,
+
        nid: &NodeId,
+
    ) -> Result<Box<dyn Iterator<Item = Result<(Id, SyncedAt), Error>> + '_>, Error>;
    /// Returns the number of addresses.
    fn len(&self) -> Result<usize, Error>;
    /// Returns true if there are no addresses.
modified radicle/src/node/address/types.rs
@@ -9,7 +9,7 @@ use crate::collections::RandomMap;
use crate::git;
use crate::node::{Address, Alias};
use crate::prelude::{NodeId, Timestamp};
-
use crate::storage::ReadRepository;
+
use crate::storage::{refs::RefsAt, ReadRepository, RemoteId};
use crate::{node, profile};

/// A map with the ability to randomly select values.
@@ -196,6 +196,14 @@ pub struct SyncedAt {
}

impl SyncedAt {
+
    /// Load a new [`SyncedAt`] for the given remote.
+
    pub fn load<S: ReadRepository>(repo: &S, remote: RemoteId) -> Result<Self, git::ext::Error> {
+
        let refs = RefsAt::new(repo, remote)?;
+
        let oid = refs.at;
+

+
        Self::new(oid, repo)
+
    }
+

    /// Create a new [`SyncedAt`] given an OID, by looking up the timestamp in the repo.
    pub fn new<S: ReadRepository>(oid: git::ext::Oid, repo: &S) -> Result<Self, git::ext::Error> {
        let timestamp = repo.commit(oid)?.time();
modified radicle/src/test/storage.rs
@@ -177,8 +177,10 @@ impl ReadRepository for MockRepository {
        todo!()
    }

-
    fn commit(&self, _oid: Oid) -> Result<git2::Commit, git_ext::Error> {
-
        todo!()
+
    fn commit(&self, oid: Oid) -> Result<git2::Commit, git_ext::Error> {
+
        Err(git_ext::Error::NotFound(git_ext::NotFound::NoSuchObject(
+
            *oid,
+
        )))
    }

    fn revwalk(&self, _head: Oid) -> Result<git2::Revwalk, git2::Error> {