Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: `info` announcements
Fintan Halpenny committed 2 years ago
commit 5cb4ad4622a401fd686cd1c801175686ec4a5805
parent 85d452b2221e125923ea87d3041dff46384d681b
10 files changed +318 -129
modified radicle-cli/examples/rad-sync.md
@@ -15,15 +15,12 @@ $ rad sync --announce
✓ Synced with 2 node(s)
```

-
If we try to sync again after the nodes have synced, we will get a timeout
-
after one second, since the nodes will not emit any message:
+
If we try to sync again after the nodes have synced, we will already
+
be up to date and so we will see the same message.

-
``` (fail)
-
$ rad sync --announce --timeout 1
-
✗ Syncing with 2 node(s)..
-
! Seed z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk timed out..
-
! Seed z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z timed out..
-
✗ Error: all seeds timed out
+
```
+
$ rad sync --announce
+
✓ Synced with 2 node(s)
```

We can also use the `--fetch` option to only fetch objects:
@@ -37,15 +34,12 @@ $ rad sync --fetch

Specifying both `--fetch` and `--announce` is equivalent to specifying none:

-
``` (fail)
-
$ rad sync --fetch --announce --timeout 1
+
```
+
$ rad sync --fetch --announce
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkux1…nVhib7Z..
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6Mkt67…v4N1tRk..
✓ Fetched repository from 2 seed(s)
-
✗ Syncing with 2 node(s)..
-
! Seed z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk timed out..
-
! Seed z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z timed out..
-
✗ Error: all seeds timed out
+
✓ Synced with 2 node(s)
```

It's also possible to use the `--seed` flag to only sync with a specific node:
modified radicle-node/src/control.rs
@@ -155,7 +155,7 @@ where
        Command::AnnounceRefs { rid } => {
            let refs = handle.announce_refs(rid)?;

-
            json::to_writer(writer, &refs)?;
+
            CommandResult::Okay(refs).to_writer(writer)?;
        }
        Command::AnnounceInventory => {
            if let Err(e) = handle.announce_inventory() {
@@ -268,7 +268,7 @@ mod tests {
                line,
                json::json!({
                    "remote": handle.nid().unwrap(),
-
                    "at":"0000000000000000000000000000000000000000"
+
                    "at": "0000000000000000000000000000000000000000"
                })
                .to_string()
            );
modified radicle-node/src/service.rs
@@ -35,7 +35,7 @@ use crate::node::routing::InsertResult;
use crate::node::{Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus};
use crate::prelude::*;
use crate::runtime::Emitter;
-
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
+
use crate::service::message::{Announcement, AnnouncementMessage, Info, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::tracking::{store::Write, Scope};
use crate::storage;
@@ -55,7 +55,7 @@ pub use radicle::node::tracking::config as tracking;

use self::io::Outbox;
use self::limitter::RateLimiter;
-
use self::message::InventoryAnnouncement;
+
use self::message::{InventoryAnnouncement, RefsStatus};
use self::tracking::NamespacesError;

/// How often to run the "idle" task.
@@ -612,14 +612,15 @@ where
                resp.send(untracked).ok();
            }
            Command::AnnounceRefs(id, resp) => match self.announce_refs(id, [self.node_id()]) {
-
                Ok(refs) => {
-
                    #[allow(clippy::unwrap_used)]
-
                    // SAFETY: we announced only our refs
-
                    let refs = refs.first().unwrap();
-
                    resp.send(*refs).ok();
-
                }
+
                Ok(refs) => match refs.as_slice() {
+
                    &[refs] => {
+
                        resp.send(refs).ok();
+
                    }
+
                    // SAFETY: Since we passed in one NID, we should get exactly one item back.
+
                    [..] => panic!("Service::command: unexpected refs returned"),
+
                },
                Err(err) => {
-
                    error!("Error announcing refs: {err}");
+
                    error!(target: "service", "Error announcing refs: {err}");
                }
            },
            Command::AnnounceInventory => {
@@ -628,7 +629,7 @@ where
                    .inventory()
                    .and_then(|i| self.announce_inventory(i))
                {
-
                    error!("Error announcing inventory: {}", err);
+
                    error!(target: "service", "Error announcing inventory: {err}");
                }
            }
            Command::SyncInventory(resp) => {
@@ -650,10 +651,10 @@ where
        &mut self,
        rid: Id,
        from: &NodeId,
-
        refs: Vec<RefsAt>,
+
        refs: NonEmpty<RefsAt>,
        timeout: time::Duration,
    ) {
-
        self._fetch(rid, from, refs, timeout, None)
+
        self._fetch(rid, from, refs.into(), timeout, None)
    }

    /// Initiate an outgoing fetch for some repository.
@@ -1136,43 +1137,70 @@ where
                    }
                }

-
                // Check if the announcer is in sync with our own refs, and if so emit an event.
-
                // This event is used for showing sync progress to users.
-
                match message.is_synced(&self.node_id(), &self.storage) {
-
                    Ok(synced) => {
-
                        if let Some(at) = synced {
-
                            self.emitter.emit(Event::RefsSynced {
-
                                rid: message.rid,
-
                                remote: *announcer,
-
                                at,
-
                            });
-
                        }
-
                    }
-
                    Err(e) => {
-
                        error!(target: "service", "Error checking refs announcement sync status: {e}");
-
                    }
-
                }
-

                // TODO: Buffer/throttle fetches.
                let repo_entry = self.tracking.repo_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo tracking configuration",
                );

                if repo_entry.policy == tracking::Policy::Track {
+
                    let (fresh, stale) = match self.refs_status_of(message, &repo_entry.scope) {
+
                        Ok(RefsStatus { fresh, stale }) => (fresh, stale),
+
                        Err(e) => {
+
                            error!(target: "service", "Failed to check refs status: {e}");
+
                            return Ok(relay);
+
                        }
+
                    };
+
                    // If the ref announcement indicates that the announcer already has
+
                    // our *owned* refs, then we emit an event, which can be used to
+
                    // show sync status to the user.
+
                    if let Some(at) = stale
+
                        .iter()
+
                        .find(|refs| &refs.remote == self.nid())
+
                        .copied()
+
                        .map(|RefsAt { at, .. }| at)
+
                    {
+
                        self.emitter.emit(Event::RefsSynced {
+
                            rid: message.rid,
+
                            remote: *announcer,
+
                            at,
+
                        });
+
                    }
+

                    // Refs can be relayed by peers who don't have the data in storage,
                    // therefore we only check whether we are connected to the *announcer*,
                    // which is required by the protocol to only announce refs it has.
-
                    if self.sessions.is_connected(announcer) {
-
                        match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
-
                            Ok(Some(refs)) => {
-
                                self.fetch_refs_at(message.rid, announcer, refs, FETCH_TIMEOUT)
-
                            }
-
                            Ok(None) => {}
-
                            Err(e) => {
-
                                error!(target: "service", "Failed to check refs announcement: {e}");
-
                                return Err(session::Error::Misbehavior);
+
                    if let Some(remote) = self.sessions.get(announcer).cloned() {
+
                        // If the relayer is also the origin of the message, we inform it
+
                        // about any refs that are already in sync (stale).
+
                        if relayer == announcer {
+
                            // If the stale refs contain refs announced by the peer, let it know
+
                            // that we're already in sync.
+
                            if let Some(at) = stale
+
                                .iter()
+
                                .find(|refs| refs.remote == remote.id)
+
                                .copied()
+
                                .map(|RefsAt { at, .. }| at)
+
                            {
+
                                debug!(
+
                                    target: "service", "Refs of {} already synced for {} at {at}",
+
                                    remote.id,
+
                                    message.rid,
+
                                );
+
                                self.outbox.write(
+
                                    &remote,
+
                                    Info::RefsAlreadySynced {
+
                                        rid: message.rid,
+
                                        at,
+
                                    }
+
                                    .into(),
+
                                );
                            }
                        }
+
                        // Finally, if there's anything to fetch, we fetch it from the
+
                        // remote.
+
                        if let Some(fresh) = NonEmpty::from_vec(fresh) {
+
                            self.fetch_refs_at(message.rid, &remote.id, fresh, FETCH_TIMEOUT);
+
                        }
                    } else {
                        trace!(
                            target: "service",
@@ -1240,47 +1268,49 @@ where
        Ok(false)
    }

-
    /// A convenient method to check if we should fetch from a `RefsAnnouncement`
-
    /// with `scope`.
-
    fn should_fetch_refs_announcement(
+
    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
+
        match info {
+
            Info::RefsAlreadySynced { rid, at } => {
+
                debug!(target: "service", "Refs already synced for {rid} by {remote}");
+
                self.emitter.emit(Event::RefsSynced {
+
                    rid: *rid,
+
                    remote,
+
                    at: *at,
+
                });
+
            }
+
        }
+

+
        Ok(())
+
    }
+

+
    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
+
    fn refs_status_of(
        &self,
        message: &RefsAnnouncement,
        scope: &tracking::Scope,
-
    ) -> Result<Option<Vec<RefsAt>>, Error> {
+
    ) -> Result<RefsStatus, Error> {
+
        let mut refs = message.refs_status(&self.storage)?;
+

        // First, check the freshness.
-
        if !message.is_fresh(&self.storage)? {
+
        if refs.fresh.is_empty() {
            debug!(target: "service", "All refs of {} are already in local storage", &message.rid);
-
            return Ok(None);
+
            return Ok(refs);
        }

        // Second, check the scope.
        match scope {
-
            tracking::Scope::All => Ok(Some(message.refs.clone().into())),
+
            tracking::Scope::All => Ok(refs),
            tracking::Scope::Trusted => {
                match self.tracking.namespaces_for(&self.storage, &message.rid) {
-
                    Ok(Namespaces::All) => Ok(Some(message.refs.clone().into())),
+
                    Ok(Namespaces::All) => Ok(refs),
                    Ok(Namespaces::Trusted(mut trusted)) => {
                        // Get the set of trusted nodes except self.
-
                        trusted.remove(&self.node_id());
+
                        trusted.remove(self.nid());
+
                        refs.fresh.retain(|r| trusted.contains(&r.remote));

-
                        // Check if there is at least one trusted ref.
-
                        Ok(Some(
-
                            message
-
                                .refs
-
                                .iter()
-
                                .filter(|refs_at| trusted.contains(&refs_at.remote))
-
                                .cloned()
-
                                .collect(),
-
                        ))
-
                    }
-
                    Err(NamespacesError::NoTrusted { rid }) => {
-
                        debug!(target: "service", "No trusted nodes to fetch {}", &rid);
-
                        Ok(None)
-
                    }
-
                    Err(e) => {
-
                        error!(target: "service", "Failed to obtain namespaces: {e}");
-
                        Err(e.into())
+
                        Ok(refs)
                    }
+
                    Err(e) => Err(e.into()),
                }
            }
        }
@@ -1363,6 +1393,10 @@ where
                }
                peer.subscribe = Some(subscribe);
            }
+
            (session::State::Connected { .. }, Message::Info(info)) => {
+
                let remote = peer.id;
+
                self.handle_info(remote, &info)?;
+
            }
            (session::State::Connected { .. }, Message::Ping(Ping { ponglen, .. })) => {
                // Ignore pings which ask for too much data.
                if ponglen > Ping::MAX_PONG_ZEROES {
modified radicle-node/src/service/message.rs
@@ -1,7 +1,8 @@
use std::{fmt, io, mem};

-
use radicle::git::Oid;
+
use radicle::git;
use radicle::storage::refs::RefsAt;
+
use radicle::storage::ReadRepository;

use crate::crypto;
use crate::identity::Id;
@@ -164,47 +165,80 @@ pub struct RefsAnnouncement {
    pub timestamp: Timestamp,
}

-
impl RefsAnnouncement {
-
    /// Check if this announcement is "fresh", meaning if it contains refs we do not have.
-
    pub fn is_fresh<S: ReadStorage>(&self, storage: S) -> Result<bool, storage::Error> {
-
        let repo = match storage.repository(self.rid) {
-
            // If the repo doesn't exist, we consider this announcement "fresh", since we
-
            // obviously don't have the refs.
-
            Err(e) if e.is_not_found() => return Ok(true),
-
            Err(e) => return Err(e),
-
            Ok(r) => r,
-
        };
+
/// Track the status of `RefsAt` within a given repository.
+
#[derive(Default)]
+
pub struct RefsStatus {
+
    /// The `rad/sigrefs` was missing or it's ahead of the local
+
    /// `rad/sigrefs`.
+
    pub fresh: Vec<RefsAt>,
+
    /// The `rad/sigrefs` has been seen before.
+
    pub stale: Vec<RefsAt>,
+
}

-
        for theirs in self.refs.iter() {
-
            if let Ok(ours) = RefsAt::new(&repo, theirs.remote) {
-
                if ours.at != theirs.at {
-
                    return Ok(true);
+
impl RefsStatus {
+
    fn insert<S: ReadRepository>(
+
        &mut self,
+
        theirs: RefsAt,
+
        repo: &S,
+
    ) -> Result<(), storage::Error> {
+
        match RefsAt::new(repo, theirs.remote) {
+
            Ok(ours) => {
+
                if Self::is_fresh(repo, theirs.at, ours.at)? {
+
                    self.fresh.push(theirs);
+
                } else {
+
                    self.stale.push(theirs);
                }
-
            } else {
-
                return Ok(true);
+
            }
+
            Err(e) if git::is_not_found_err(&e) => self.fresh.push(theirs),
+
            Err(e) => {
+
                log::warn!(
+
                    target: "service",
+
                    "failed to load 'refs/namespaces/{}/rad/sigrefs': {e}", theirs.remote
+
                )
            }
        }
-
        Ok(false)
+
        Ok(())
+
    }
+

+
    /// If `theirs` is not the same as `ours` and we have not seen
+
    /// `theirs` before, i.e. it's not a previous `rad/sigrefs`, then
+
    /// we can consider `theirs` a fresh update.
+
    fn is_fresh<S: ReadRepository>(
+
        repo: &S,
+
        theirs: git::Oid,
+
        ours: git::Oid,
+
    ) -> Result<bool, git::ext::Error> {
+
        if repo.contains(theirs)? {
+
            Ok(theirs != ours && !repo.is_ancestor_of(theirs, ours)?)
+
        } else {
+
            Ok(true)
+
        }
    }
+
}

-
    /// Check if an announcement tells us that a node is in sync with a local remote.
-
    pub fn is_synced<S: ReadStorage>(
-
        &self,
-
        remote: &NodeId,
-
        storage: S,
-
    ) -> Result<Option<Oid>, storage::Error> {
+
impl RefsAnnouncement {
+
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
+
    /// announcement.
+
    pub fn refs_status<S: ReadStorage>(&self, storage: S) -> Result<RefsStatus, storage::Error> {
        let repo = match storage.repository(self.rid) {
-
            // If the repo doesn't exist, we're not in sync.
-
            Err(e) if e.is_not_found() => return Ok(None),
+
            // If the repo doesn't exist, we consider this
+
            // announcement "fresh", since we obviously don't
+
            // have the refs.
+
            Err(e) if e.is_not_found() => {
+
                return Ok(RefsStatus {
+
                    fresh: self.refs.clone().into(),
+
                    stale: Vec::new(),
+
                })
+
            }
            Err(e) => return Err(e),
            Ok(r) => r,
        };

-
        if let Some(theirs) = self.refs.iter().find(|refs_at| refs_at.remote == *remote) {
-
            let ours = RefsAt::new(&repo, *remote)?;
-
            return Ok((ours.at == theirs.at).then_some(theirs.at));
+
        let mut status = RefsStatus::default();
+
        for theirs in self.refs.iter() {
+
            status.insert(*theirs, &repo)?;
        }
-
        Ok(None)
+
        Ok(status)
    }
}

@@ -218,6 +252,17 @@ pub struct InventoryAnnouncement {
    pub timestamp: Timestamp,
}

+
/// Node announcing information to a connected peer.
+
///
+
/// This should not be relayed and should be used to send an
+
/// informational message a peer.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub enum Info {
+
    /// Tell a node that sent a refs announcement that it was already synced at the given `Oid`,
+
    /// for this particular `rid`.
+
    RefsAlreadySynced { rid: Id, at: git::Oid },
+
}
+

/// Announcement messages are messages that are relayed between peers.
#[derive(Clone, PartialEq, Eq)]
pub enum AnnouncementMessage {
@@ -362,6 +407,11 @@ pub enum Message {
    /// using [`Message::Subscribe`].
    Announcement(Announcement),

+
    /// Informational message. These messages are sent between peers for information
+
    /// and do not need to be acted upon. They can be safely ignored, though handling
+
    /// them can be useful for the user.
+
    Info(Info),
+

    /// Ask a connected peer for a Pong.
    ///
    /// Used to check if the remote peer is responsive, or a side-effect free way to keep a
@@ -446,6 +496,11 @@ impl Message {
                    )
                }
            },
+
            Self::Info(Info::RefsAlreadySynced { rid,  .. }) => {
+
                format!(
+
                    "{verb} `refs-already-synced` info {prep} {remote} for {rid}"
+
                )
+
            },
            Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
            Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
            Self::Subscribe(Subscribe { .. }) => {
@@ -492,6 +547,12 @@ impl From<Announcement> for Message {
    }
}

+
impl From<Info> for Message {
+
    fn from(info: Info) -> Self {
+
        Self::Info(info)
+
    }
+
}
+

impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
@@ -501,6 +562,9 @@ impl fmt::Debug for Message {
            Self::Announcement(Announcement { node, message, .. }) => {
                write!(f, "Announcement({node}, {message:?})")
            }
+
            Self::Info(info) => {
+
                write!(f, "Info({info:?})")
+
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
            Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
        }
modified radicle-node/src/test/arbitrary.rs
@@ -6,7 +6,7 @@ use crate::node::Alias;
use crate::prelude::{BoundedVec, Id, NodeId, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
use crate::service::message::{
-
    Announcement, InventoryAnnouncement, Message, NodeAnnouncement, Ping, RefsAnnouncement,
+
    Announcement, Info, InventoryAnnouncement, Message, NodeAnnouncement, Ping, RefsAnnouncement,
    Subscribe, ZeroBytes,
};
use crate::wire::MessageType;
@@ -34,6 +34,7 @@ impl Arbitrary for Message {
                MessageType::InventoryAnnouncement,
                MessageType::NodeAnnouncement,
                MessageType::RefsAnnouncement,
+
                MessageType::Info,
                MessageType::Subscribe,
                MessageType::Ping,
                MessageType::Pong,
@@ -81,6 +82,13 @@ impl Arbitrary for Message {
                }
                .into()
            }
+
            MessageType::Info => {
+
                let message = Info::RefsAlreadySynced {
+
                    rid: Id::arbitrary(g),
+
                    at: oid(),
+
                };
+
                Self::Info(message)
+
            }
            MessageType::Subscribe => Self::Subscribe(Subscribe {
                filter: Filter::arbitrary(g),
                since: Timestamp::arbitrary(g),
modified radicle-node/src/tests.rs
@@ -1334,8 +1334,9 @@ fn test_queued_fetch_same_rid() {
fn test_refs_synced_event() {
    let temp = tempfile::tempdir().unwrap();
    let storage = Storage::open(temp.path(), fixtures::user()).unwrap();
-
    let mut alice = Peer::with_storage("alice", [8, 8, 8, 8], storage);
-
    let bob = Peer::new("eve", [9, 9, 9, 9]);
+
    let mut alice = Peer::with_storage("alice", [8, 8, 8, 8], storage.clone());
+
    let bob = Peer::new("bob", [9, 9, 9, 9]);
+
    let eve = Peer::with_storage("eve", [7, 7, 7, 7], storage);
    let acme = alice.project("acme", "");
    let events = alice.events();
    let ann = AnnouncementMessage::from(RefsAnnouncement {
@@ -1347,29 +1348,34 @@ fn test_refs_synced_event() {
    });
    let msg = ann.signed(bob.signer());

+
    alice.track_repo(&acme, tracking::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.receive(bob.id, Message::Announcement(msg));

    events
        .wait(
            |e| {
-
                if let Event::RefsSynced {
-
                    remote,
-
                    rid,
-
                    at: _at,
-
                } = e
-
                {
-
                    assert_eq!(remote, &bob.id);
-
                    assert_eq!(rid, &acme);
-

-
                    Some(())
-
                } else {
-
                    None
-
                }
+
                matches!(
+
                    e,
+
                    Event::RefsSynced { remote, rid, .. }
+
                    if rid == &acme && remote == &bob.id
+
                )
+
                .then_some(())
            },
            time::Duration::from_secs(3),
        )
        .unwrap();
+

+
    // Now a relayed announcement.
+
    alice.receive(bob.id, eve.node_announcement());
+
    alice.receive(bob.id, eve.refs_announcement(acme));
+

+
    events
+
        .wait(
+
            |e| matches!(e, Event::RefsSynced { remote, .. } if remote == &eve.id).then_some(()),
+
            time::Duration::from_secs(3),
+
        )
+
        .unwrap();
}

#[test]
modified radicle-node/src/wire.rs
@@ -60,6 +60,8 @@ pub enum Error {
    UnknownAddressType(u8),
    #[error("unknown message type `{0}`")]
    UnknownMessageType(u16),
+
    #[error("unknown info type `{0}`")]
+
    UnknownInfoType(u16),
    #[error("unexpected bytes")]
    UnexpectedBytes,
}
modified radicle-node/src/wire/message.rs
@@ -2,6 +2,7 @@ use std::{io, mem, net};

use byteorder::{NetworkEndian, ReadBytesExt};
use cyphernet::addr::{Addr, HostName, NetAddr};
+
use radicle::git::Oid;
use radicle::node::Address;

use crate::prelude::*;
@@ -19,6 +20,7 @@ pub enum MessageType {
    Subscribe = 8,
    Ping = 10,
    Pong = 12,
+
    Info = 14,
}

impl From<MessageType> for u16 {
@@ -38,6 +40,7 @@ impl TryFrom<u16> for MessageType {
            8 => Ok(MessageType::Subscribe),
            10 => Ok(MessageType::Ping),
            12 => Ok(MessageType::Pong),
+
            14 => Ok(MessageType::Info),
            _ => Err(other),
        }
    }
@@ -56,6 +59,7 @@ impl Message {
                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
            },
+
            Self::Info(_) => MessageType::Info,
            Self::Ping { .. } => MessageType::Ping,
            Self::Pong { .. } => MessageType::Pong,
        }
@@ -180,6 +184,76 @@ impl wire::Decode for InventoryAnnouncement {
    }
}

+
/// The type tracking the different variants of [`Info`] for encoding and
+
/// decoding purposes.
+
#[repr(u8)]
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum InfoType {
+
    RefsAlreadySynced = 1,
+
}
+

+
impl From<InfoType> for u16 {
+
    fn from(other: InfoType) -> Self {
+
        other as u16
+
    }
+
}
+

+
impl TryFrom<u16> for InfoType {
+
    type Error = u16;
+

+
    fn try_from(other: u16) -> Result<Self, Self::Error> {
+
        match other {
+
            1 => Ok(Self::RefsAlreadySynced),
+
            n => Err(n),
+
        }
+
    }
+
}
+

+
impl From<Info> for InfoType {
+
    fn from(info: Info) -> Self {
+
        (&info).into()
+
    }
+
}
+

+
impl From<&Info> for InfoType {
+
    fn from(info: &Info) -> Self {
+
        match info {
+
            Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
+
        }
+
    }
+
}
+

+
impl wire::Encode for Info {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+
        n += u16::from(InfoType::from(self)).encode(writer)?;
+
        match self {
+
            Info::RefsAlreadySynced { rid, at } => {
+
                n += rid.encode(writer)?;
+
                n += at.encode(writer)?;
+
            }
+
        }
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for Info {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let info_type = reader.read_u16::<NetworkEndian>()?;
+

+
        match InfoType::try_from(info_type) {
+
            Ok(InfoType::RefsAlreadySynced) => {
+
                let rid = Id::decode(reader)?;
+
                let at = Oid::decode(reader)?;
+

+
                Ok(Self::RefsAlreadySynced { rid, at })
+
            }
+
            Err(other) => Err(wire::Error::UnknownInfoType(other)),
+
        }
+
    }
+
}
+

impl wire::Encode for Message {
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
        let mut n = self.type_id().encode(writer)?;
@@ -203,6 +277,9 @@ impl wire::Encode for Message {
                n += message.encode(writer)?;
                n += signature.encode(writer)?;
            }
+
            Self::Info(info) => {
+
                n += info.encode(writer)?;
+
            }
            Self::Ping(Ping { ponglen, zeroes }) => {
                n += ponglen.encode(writer)?;
                n += zeroes.encode(writer)?;
@@ -274,6 +351,10 @@ impl wire::Decode for Message {
                }
                .into())
            }
+
            Ok(MessageType::Info) => {
+
                let info = Info::decode(reader)?;
+
                Ok(Self::Info(info))
+
            }
            Ok(MessageType::Ping) => {
                let ponglen = u16::decode(reader)?;
                let zeroes = ZeroBytes::decode(reader)?;
modified radicle/src/node.rs
@@ -823,7 +823,6 @@ impl Node {
    ) -> Result<AnnounceResult, Error> {
        let events = self.subscribe(timeout)?;
        let mut seeds = seeds.into_iter().collect::<BTreeSet<_>>();
-

        let refs = self.announce_refs(rid)?;

        callback(AnnounceEvent::Announced);
@@ -840,9 +839,8 @@ impl Node {
                }) if rid == rid_ => {
                    if seeds.remove(&remote) && refs.at == at {
                        synced.push(remote);
+
                        callback(AnnounceEvent::RefsSynced { remote });
                    }
-

-
                    callback(AnnounceEvent::RefsSynced { remote });
                }
                Ok(_) => {}

modified radicle/src/storage.rs
@@ -103,6 +103,8 @@ pub enum Error {
    Refs(#[from] refs::Error),
    #[error("git: {0}")]
    Git(#[from] git2::Error),
+
    #[error("git: {0}")]
+
    Ext(#[from] git::ext::Error),
    #[error("invalid repository identifier {0:?}")]
    InvalidId(std::ffi::OsString),
    #[error("i/o: {0}")]