Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Generalize commands announce/seeds to take public key
Merged lorenz opened 7 months ago

The commands “announce references” and “seeds” only take a repository ID, and carry the implicit assumption that the namespace with the same public key as the Node ID of the receiving radicle-node process is relevant, but no other namespace.

In the spirit of separating user and node identity, relax this, so that these command also carry the public key for which the announcement should be made or sync status should be reported.

This feature is not exposed via CLI, but used to pass through the user identity of the active profile.

For sync status, this patch still is not fully satisfactory, because the node does not store sync updates for all namespaces, but only for its own. Storing sync updates for namespaces that were announced from the node could be implemented in the future.

13 files changed +550 -293 7b8da0e7 1a55c8e2
modified crates/radicle-cli/src/commands/sync.rs
@@ -345,7 +345,7 @@ fn sync_status(
    const SYMBOL_STATE_UNKNOWN: &str = "•";

    let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
-
    let mut seeds: Vec<_> = node.seeds(rid)?.into();
+
    let mut seeds: Vec<_> = node.seeds_for(rid, [*profile.did()])?.into();
    let local_nid = node.nid()?;
    let aliases = profile.aliases();

@@ -522,7 +522,7 @@ pub fn fetch(
        None => {
            // We push nodes that are in our seed list in attempt to fulfill the
            // replicas, if needed.
-
            let seeds = node.seeds(rid)?;
+
            let seeds = node.seeds_for(rid, [*profile.did()])?;
            let (connected, disconnected) = seeds.partition();
            let candidates = connected
                .into_iter()
modified crates/radicle-cli/src/node.rs
@@ -149,7 +149,7 @@ where

    let config = match sync::PrivateNetwork::private_repo(&doc) {
        None => {
-
            let (synced, unsynced) = node.seeds(rid)?.iter().fold(
+
            let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
                (BTreeSet::new(), BTreeSet::new()),
                |(mut synced, mut unsynced), seed| {
                    if seed.is_synced() {
@@ -199,18 +199,24 @@ where
        reporting.completion.clone(),
    );

-
    match node.announce(rid, settings.timeout, announcer, |node, progress| {
-
        spinner.message(format!(
-
            "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
-
            term::format::node_id_human_compact(node),
-
            term::format::secondary(progress.preferred()),
-
            term::format::secondary(n_preferred_seeds),
-
            term::format::secondary(progress.synced()),
-
            // N.b. the number of replicas could exceed the target if we're
-
            // waiting for preferred seeds
-
            term::format::secondary(min_replicas.max(progress.synced())),
-
        ));
-
    }) {
+
    match node.announce(
+
        rid,
+
        [profile.did().into()],
+
        settings.timeout,
+
        announcer,
+
        |node, progress| {
+
            spinner.message(format!(
+
                "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
+
                term::format::node_id_human_compact(node),
+
                term::format::secondary(progress.preferred()),
+
                term::format::secondary(n_preferred_seeds),
+
                term::format::secondary(progress.synced()),
+
                // N.b. the number of replicas could exceed the target if we're
+
                // waiting for preferred seeds
+
                term::format::secondary(min_replicas.max(progress.synced())),
+
            ));
+
        },
+
    ) {
        Ok(result) => {
            spinner.message(format!(
                "Synced with {} seed(s)",
modified crates/radicle-cli/tests/commands.rs
@@ -1704,7 +1704,7 @@ fn test_clone_without_seeds() {
    let working = environment.tempdir().join("working");
    let rid = alice.project("heartwood", "Radicle Heartwood Protocol & Stack");
    let mut alice = alice.spawn();
-
    let seeds = alice.handle.seeds(rid).unwrap();
+
    let seeds = alice.handle.seeds_for(rid, [alice.id]).unwrap();
    let connected = seeds.connected().collect::<Vec<_>>();

    assert!(connected.is_empty());
@@ -1772,7 +1772,7 @@ fn test_cob_replication() {
    // announcement, otherwise Alice will consider it stale.
    thread::sleep(time::Duration::from_millis(3));

-
    bob.handle.announce_refs(rid).unwrap();
+
    bob.handle.announce_refs_for(rid, [bob.id]).unwrap();

    // Wait for Alice to fetch the issue refs.
    events
modified crates/radicle-node/src/control.rs
@@ -172,11 +172,17 @@ where

            CommandResult::Okay(addrs).to_writer(writer)?;
        }
+
        #[allow(deprecated)]
        Command::Seeds { rid } => {
            let seeds = handle.seeds(rid)?;

            CommandResult::Okay(seeds).to_writer(writer)?;
        }
+
        Command::SeedsFor { rid, namespaces } => {
+
            let seeds = handle.seeds_for(rid, namespaces)?;
+

+
            CommandResult::Okay(seeds).to_writer(writer)?;
+
        }
        Command::Sessions => {
            let sessions = handle.sessions()?;

@@ -219,11 +225,17 @@ where
                return Err(CommandError::Runtime(e));
            }
        },
+
        #[allow(deprecated)]
        Command::AnnounceRefs { rid } => {
            let refs = handle.announce_refs(rid)?;

            CommandResult::Okay(refs).to_writer(writer)?;
        }
+
        Command::AnnounceRefsFor { rid, namespaces } => {
+
            let refs = handle.announce_refs_for(rid, namespaces)?;
+

+
            CommandResult::Okay(refs).to_writer(writer)?;
+
        }
        Command::AnnounceInventory => {
            if let Err(e) = handle.announce_inventory() {
                return Err(CommandError::Runtime(e));
@@ -308,6 +320,7 @@ mod tests {
        let socket = tmp.path().join("alice.sock");
        let rids = test::arbitrary::set::<RepoId>(1..3);
        let listener = Listener::bind(&socket).unwrap();
+
        let nid = handle.nid().unwrap();

        thread::spawn({
            let handle = handle.clone();
@@ -324,8 +337,9 @@ mod tests {
            writeln!(
                &mut stream,
                "{}",
-
                json::to_string(&Command::AnnounceRefs {
-
                    rid: rid.to_owned()
+
                json::to_string(&Command::AnnounceRefsFor {
+
                    rid: rid.to_owned(),
+
                    namespaces: [nid].into(),
                })
                .unwrap()
            )
@@ -345,7 +359,7 @@ mod tests {
        }

        for rid in &rids {
-
            assert!(handle.updates.lock().unwrap().contains(rid));
+
            assert!(handle.updates.lock().unwrap().contains(&(*rid, nid)));
        }
    }

modified crates/radicle-node/src/runtime/handle.rs
@@ -1,3 +1,4 @@
+
use std::collections::HashSet;
use std::net;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -9,6 +10,7 @@ use std::os::unix::net::UnixStream as Stream;
use winpipe::WinStream as Stream;

use crossbeam_channel as chan;
+
use radicle::crypto::PublicKey;
use radicle::node::events::{Event, Events};
use radicle::node::policy;
use radicle::node::{Config, NodeId};
@@ -200,9 +202,17 @@ impl radicle::node::Handle for Handle {
            .map_err(Error::from)
    }

-
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
+
    fn seeds_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Seeds(id, sender))?;
+
        self.command(service::Command::Seeds(
+
            id,
+
            HashSet::from_iter(namespaces),
+
            sender,
+
        ))?;
        receiver.recv().map_err(Error::from)
    }

@@ -253,9 +263,17 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
+
    fn announce_refs_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::AnnounceRefs(id, sender))?;
+
        self.command(service::Command::AnnounceRefs(
+
            id,
+
            HashSet::from_iter(namespaces),
+
            sender,
+
        ))?;
        receiver.recv().map_err(Error::from)
    }

modified crates/radicle-node/src/test/handle.rs
@@ -3,6 +3,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time;

+
use radicle::crypto::PublicKey;
use radicle::git;
use radicle::storage::refs::RefsAt;

@@ -14,7 +15,7 @@ use radicle::node::NodeId;

#[derive(Default, Clone)]
pub struct Handle {
-
    pub updates: Arc<Mutex<Vec<RepoId>>>,
+
    pub updates: Arc<Mutex<Vec<(RepoId, PublicKey)>>>,
    pub seeding: Arc<Mutex<HashSet<RepoId>>>,
    pub following: Arc<Mutex<HashSet<NodeId>>>,
}
@@ -54,7 +55,11 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

-
    fn seeds(&mut self, _id: RepoId) -> Result<Seeds, Self::Error> {
+
    fn seeds_for(
+
        &mut self,
+
        _id: RepoId,
+
        _namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Self::Error> {
        unimplemented!();
    }

@@ -91,8 +96,15 @@ impl radicle::node::Handle for Handle {
        Ok(self.following.lock().unwrap().remove(&id))
    }

-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
-
        self.updates.lock().unwrap().push(id);
+
    fn announce_refs_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Self::Error> {
+
        self.updates
+
            .lock()
+
            .unwrap()
+
            .extend(namespaces.into_iter().map(|ns| (id, ns)));

        Ok(RefsAt {
            remote: self.nid()?,
modified crates/radicle-node/src/test/node.rs
@@ -200,7 +200,7 @@ impl<G: Signer<Signature> + cyphernet::Ecdh> NodeHandle<G> {
        log::debug!(target: "test", "Waiting for {} to be in sync with {nid} for {rid}", self.id);

        loop {
-
            let seeds = self.handle.seeds(*rid).unwrap();
+
            let seeds = self.handle.seeds_for(*rid, [self.id]).unwrap();
            if seeds.iter().any(|s| s.nid == *nid && s.is_synced()) {
                break;
            }
modified crates/radicle-node/src/tests/e2e.rs
@@ -183,7 +183,7 @@ fn test_replication() {
    let updated = alice.handle.seed(acme, Scope::All).unwrap();
    assert!(updated);

-
    let seeds = alice.handle.seeds(acme).unwrap();
+
    let seeds = alice.handle.seeds_for(acme, None).unwrap();
    assert!(seeds.is_connected(&bob.id));

    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
@@ -549,7 +549,7 @@ fn test_clone() {
    transport::local::register(alice.storage.clone());

    let _ = alice.handle.seed(acme, Scope::All).unwrap();
-
    let seeds = alice.handle.seeds(acme).unwrap();
+
    let seeds = alice.handle.seeds_for(acme, None).unwrap();
    assert!(seeds.is_connected(&bob.id));

    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
@@ -1378,7 +1378,7 @@ fn test_background_foreground_fetch() {
        Title::new("Concurrent fetches").unwrap(),
        "Concurrent fetches are harshing my vibes",
    );
-
    bob.handle.announce_refs(rid).unwrap();
+
    bob.handle.announce_refs_for(rid, [bob.id]).unwrap();
    alice_events
        .wait(
            |e| matches!(e, Event::RefsAnnounced { .. }).then_some(()),
@@ -1427,7 +1427,7 @@ fn test_catchup_on_refs_announcements() {

    log::debug!(target: "test", "Bob creating his issue..");
    bob.issue(acme, Title::new("Bob's issue").unwrap(), "[..]");
-
    bob.handle.announce_refs(acme).unwrap();
+
    bob.handle.announce_refs_for(acme, [bob.id]).unwrap();

    log::debug!(target: "test", "Waiting for seed to fetch Bob's refs from Bob..");
    seed.has_remote_of(&acme, &bob.id); // Seed fetches Bob's refs.
modified crates/radicle-protocol/src/service.rs
@@ -238,8 +238,8 @@ pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Se

/// Commands sent to the service by the operator.
pub enum Command {
-
    /// Announce repository references for given repository to peers.
-
    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
+
    /// Announce repository references for given repository and namespaces to peers.
+
    AnnounceRefs(RepoId, HashSet<PublicKey>, chan::Sender<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
    /// Add repository to local inventory.
@@ -252,8 +252,9 @@ pub enum Command {
    Config(chan::Sender<Config>),
    /// Get the node's listen addresses.
    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
-
    /// Lookup seeds for the given repository in the routing table.
-
    Seeds(RepoId, chan::Sender<Seeds>),
+
    /// Lookup seeds for the given repository in the routing table, and report
+
    /// sync status for given namespaces.
+
    Seeds(RepoId, HashSet<PublicKey>, chan::Sender<Seeds>),
    /// Fetch the given repository from the network.
    Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
    /// Seed the given repository.
@@ -271,14 +272,14 @@ pub enum Command {
impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
+
            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Config(_) => write!(f, "Config"),
            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
-
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
+
            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
@@ -880,7 +881,7 @@ where
            Command::ListenAddrs(resp) => {
                resp.send(self.listening.clone()).ok();
            }
-
            Command::Seeds(rid, resp) => match self.seeds(&rid) {
+
            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
                Ok(seeds) => {
                    let (connected, disconnected) = seeds.partition();
                    debug!(
@@ -930,7 +931,7 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => {
+
            Command::AnnounceRefs(id, namespaces, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
                    Ok(None) => {
@@ -943,14 +944,12 @@ where
                    }
                };

-
                match self.announce_own_refs(id, doc) {
-
                    Ok(refs) => match refs.as_slice() {
-
                        &[refs] => {
-
                            resp.send(refs).ok();
+
                match self.announce_own_refs(id, doc, namespaces) {
+
                    Ok((refs, _timestamp)) => {
+
                        for r in refs {
+
                            resp.send(r).ok();
                        }
-
                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
-
                        [..] => panic!("Service::command: unexpected refs returned"),
-
                    },
+
                    }
                    Err(err) => {
                        error!(target: "service", "Error announcing refs: {err}");
                    }
@@ -1226,7 +1225,7 @@ where
                } else {
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
                    // beyond just knowing that we have added an item to our inventory.
-
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
+
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
                        error!(target: "service", "Failed to announce new refs: {e}");
                    }
                }
@@ -2167,16 +2166,21 @@ where
    }

    /// Announce our own refs for the given repo.
-
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
-
        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
+
    fn announce_own_refs(
+
        &mut self,
+
        rid: RepoId,
+
        doc: Doc,
+
        namespaces: impl IntoIterator<Item = NodeId>,
+
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
+
        let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;

        // Update refs database with our signed refs branches.
        // This isn't strictly necessary for now, as we only use the database for fetches, and
        // we don't fetch our own refs that are announced, but it's for good measure.
-
        if let &[r] = refs.as_slice() {
+
        for r in refs.iter() {
            self.emitter.emit(Event::LocalRefsAnnounced {
                rid,
-
                refs: r,
+
                refs: *r,
                timestamp,
            });
            if let Err(e) = self.database_mut().refs_mut().set(
@@ -2193,7 +2197,7 @@ where
                );
            }
        }
-
        Ok(refs)
+
        Ok((refs, timestamp))
    }

    /// Announce local refs for given repo.
@@ -2202,6 +2206,7 @@ where
        rid: RepoId,
        doc: Doc,
        remotes: impl IntoIterator<Item = NodeId>,
+
        own: bool,
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
        let timestamp = ann.timestamp();
@@ -2209,18 +2214,13 @@ where

        // Update our sync status for our own refs. This is useful for determining if refs were
        // updated while the node was stopped.
-
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
+
        for r in refs.iter().filter(|r| own || r.remote == ann.node) {
            info!(
                target: "service",
-
                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
-
                refs.at
+
                "Announcing refs {rid}/{r} to peers (t={timestamp})..",
            );
            // Update our local node's sync status to mark the refs as announced.
-
            if let Err(e) = self
-
                .db
-
                .seeds_mut()
-
                .synced(&rid, &ann.node, refs.at, timestamp)
-
            {
+
            if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
                error!(target: "service", "Error updating sync status for local node: {e}");
            } else {
                debug!(target: "service", "Saved local sync status for {rid}..");
@@ -2281,14 +2281,19 @@ where
        Ok(())
    }

-
    fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
+
    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
        let mut seeds = Seeds::new(self.rng.clone());

-
        // First build a list from peers that have synced our own refs, if any.
-
        // This step is skipped if we don't have the repository yet, or don't have
-
        // our own refs.
+
        // First, build a list of peers that have synced refs for `namespaces`, if any.
+
        // This step is skipped:
+
        //  1. For the repository (and thus all `namespaces`), if it not exist in storage.
+
        //  2. For each `namespace` in `namespaces`, which does not exist in storage.
        if let Ok(repo) = self.storage.repository(*rid) {
-
            if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
+
            for namespace in namespaces.iter() {
+
                let Ok(local) = RefsAt::new(&repo, *namespace) else {
+
                    continue;
+
                };
+

                for seed in self.db.seeds().seeds_for(rid)? {
                    let seed = seed?;
                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
@@ -2311,7 +2316,7 @@ where
        // These peers have announced that they seed the repository via an inventory
        // announcement, but we haven't received any ref announcements from them.
        for nid in self.db.routing().get(rid)? {
-
            if nid == self.node_id() {
+
            if namespaces.contains(&nid) {
                continue;
            }
            if seeds.contains(&nid) {
@@ -2529,7 +2534,7 @@ where
            if self.storage.contains(&rid)? {
                continue;
            }
-
            match self.seeds(&rid) {
+
            match self.seeds(&rid, [self.node_id()].into()) {
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
modified crates/radicle/CHANGELOG.md
@@ -9,6 +9,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

+
- `radicle::node::Handle::announce_refs_for` now allows specifying for which
+
  namespaces changes should be announced. A corresponding enum variant
+
  `radicle::node::Command::AnnounceRefsFor` is added.
+
- `radicle::node::Handle::seeds_for` now allows specifying for which
+
  namespaces sync status should be reported. A corresponding enum variant
+
  `radicle::node::Command::SeedsFor` is added.
+

+
### Changed
+

+
### Deprecated
+

+
- `radicle::node::Handle::announce_refs` is deprecated in favor of
+
  `radicle::node::Handle::announce_refs_for`.
+
- `radicle::node::Handle::seeds` is deprecated in favor of
+
  `radicle::node::Handle::seeds_for`.
+

+
### Removed
+

+
### Security
+

+
## 0.20.0
+

+
### Added
+

- Introduce a node event for canonical reference updates, `Event::CanonicalRefUpdated`.
  Whenever the node fetches new updates, it checks if canonical references can
  be updated. The node has learned how to return these results and emit them as
@@ -22,8 +46,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `radicle::profile::Home::socket` defaults to the path `\\.\pipe\radicle-node`
  on Windows. The behavior on Unix-like systems has *not* changed.

-
### Deprecated
-

### Removed

- `radicle::node::DEFAULT_SOCKET_NAME`, use `radicle::profile::Home::socket`
modified crates/radicle/src/node.rs
@@ -3,6 +3,7 @@
mod features;

pub mod address;
+
pub mod command;
pub mod config;
pub mod db;
pub mod device;
@@ -43,6 +44,7 @@ use crate::storage::refs::RefsAt;
use crate::storage::RefUpdate;

pub use address::KnownAddress;
+
pub use command::{Command, CommandResult, ConnectOptions, Success, DEFAULT_TIMEOUT};
pub use config::Config;
pub use cyphernet::addr::{HostName, PeerAddr};
pub use db::Database;
@@ -55,8 +57,6 @@ pub use timestamp::Timestamp;
pub const PROTOCOL_VERSION: u8 = 1;
/// Default radicle protocol port.
pub const DEFAULT_PORT: u16 = 8776;
-
/// Default timeout when waiting for the node to respond with data.
-
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
/// Default timeout when waiting for an event to be received on the
/// [`Handle::subscribe`] channel.
pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
@@ -436,97 +436,6 @@ impl TryFrom<&sqlite::Value> for Alias {
    }
}

-
/// Options passed to the "connect" node command.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
-
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
-
pub struct ConnectOptions {
-
    /// Establish a persistent connection.
-
    pub persistent: bool,
-
    /// How long to wait for the connection to be established.
-
    pub timeout: time::Duration,
-
}
-

-
impl Default for ConnectOptions {
-
    fn default() -> Self {
-
        Self {
-
            persistent: false,
-
            timeout: DEFAULT_TIMEOUT,
-
        }
-
    }
-
}
-

-
/// Result of a command, on the node control socket.
-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-
#[serde(untagged)]
-
pub enum CommandResult<T> {
-
    /// Response on node socket indicating that a command was carried out successfully.
-
    Okay(T),
-
    /// Response on node socket indicating that an error occured.
-
    Error {
-
        /// The reason for the error.
-
        #[serde(rename = "error")]
-
        reason: String,
-
    },
-
}
-

-
impl<T, E> From<Result<T, E>> for CommandResult<T>
-
where
-
    E: std::error::Error,
-
{
-
    fn from(result: Result<T, E>) -> Self {
-
        match result {
-
            Ok(t) => Self::Okay(t),
-
            Err(e) => Self::Error {
-
                reason: e.to_string(),
-
            },
-
        }
-
    }
-
}
-

-
impl From<Event> for CommandResult<Event> {
-
    fn from(event: Event) -> Self {
-
        Self::Okay(event)
-
    }
-
}
-

-
/// A success response.
-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
-
pub struct Success {
-
    /// Whether something was updated.
-
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
-
    updated: bool,
-
}
-

-
impl CommandResult<Success> {
-
    /// Create an "updated" response.
-
    pub fn updated(updated: bool) -> Self {
-
        Self::Okay(Success { updated })
-
    }
-

-
    /// Create an "ok" response.
-
    pub fn ok() -> Self {
-
        Self::Okay(Success { updated: false })
-
    }
-
}
-

-
impl CommandResult<()> {
-
    /// Create an error result.
-
    pub fn error(err: impl std::error::Error) -> Self {
-
        Self::Error {
-
            reason: err.to_string(),
-
        }
-
    }
-
}
-

-
impl<T: Serialize> CommandResult<T> {
-
    /// Write this command result to a stream, including a terminating LF character.
-
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
-
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
-
        w.write_all(b"\n")
-
    }
-
}
-

/// Peer public protocol address.
#[derive(Clone, Eq, PartialEq, Debug, Hash, From, Wrapper, WrapperMut, Serialize, Deserialize)]
#[wrapper(Deref, Display, FromStr)]
@@ -608,126 +517,6 @@ impl From<Address> for HostName {
    }
}

-
/// Command name.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
-
#[serde(rename_all = "camelCase", tag = "command")]
-
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
-
pub enum Command {
-
    /// Announce repository references for given repository to peers.
-
    #[serde(rename_all = "camelCase")]
-
    AnnounceRefs { rid: RepoId },
-

-
    /// Announce local repositories to peers.
-
    #[serde(rename_all = "camelCase")]
-
    AnnounceInventory,
-

-
    /// Update node's inventory.
-
    AddInventory { rid: RepoId },
-

-
    /// Get the current node condiguration.
-
    Config,
-

-
    /// Get the node's listen addresses.
-
    ListenAddrs,
-

-
    /// Connect to node with the given address.
-
    #[serde(rename_all = "camelCase")]
-
    Connect {
-
        addr: config::ConnectAddress,
-
        opts: ConnectOptions,
-
    },
-

-
    /// Disconnect from a node.
-
    #[serde(rename_all = "camelCase")]
-
    Disconnect {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
    },
-

-
    /// Lookup seeds for the given repository in the routing table.
-
    #[serde(rename_all = "camelCase")]
-
    Seeds { rid: RepoId },
-

-
    /// Get the current peer sessions.
-
    Sessions,
-

-
    /// Get a specific peer session.
-
    Session {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
    },
-

-
    /// Fetch the given repository from the network.
-
    #[serde(rename_all = "camelCase")]
-
    Fetch {
-
        rid: RepoId,
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
        timeout: time::Duration,
-
    },
-

-
    /// Seed the given repository.
-
    #[serde(rename_all = "camelCase")]
-
    Seed { rid: RepoId, scope: policy::Scope },
-

-
    /// Unseed the given repository.
-
    #[serde(rename_all = "camelCase")]
-
    Unseed { rid: RepoId },
-

-
    /// Follow the given node.
-
    #[serde(rename_all = "camelCase")]
-
    Follow {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
        alias: Option<Alias>,
-
    },
-

-
    /// Unfollow the given node.
-
    #[serde(rename_all = "camelCase")]
-
    Unfollow {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
    },
-

-
    /// Get the node's status.
-
    Status,
-

-
    /// Get node debug information.
-
    Debug,
-

-
    /// Get the node's NID.
-
    NodeId,
-

-
    /// Shutdown the node.
-
    Shutdown,
-

-
    /// Subscribe to events.
-
    Subscribe,
-
}
-

-
impl Command {
-
    /// Write this command to a stream, including a terminating LF character.
-
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
-
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
-
        w.write_all(b"\n")
-
    }
-
}
-

/// Connection link direction.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -1094,8 +883,21 @@ pub trait Handle: Clone + Sync + Send {
    ) -> Result<ConnectResult, Self::Error>;
    /// Disconnect from a peer.
    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
-
    /// Lookup the seeds of a given repository in the routing table.
-
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error>;
+

+
    /// Look up the seeds of a given repository in the routing table.
+
    #[deprecated(note = "use `seeds_for` instead")]
+
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
+
        self.seeds_for(id, [self.nid()?])
+
    }
+

+
    /// Look up the seeds of a given repository in the routing table
+
    /// and report sync status for `namespaces`.
+
    fn seeds_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Self::Error>;
+

    /// Fetch a repository from the network.
    fn fetch(
        &mut self,
@@ -1112,8 +914,22 @@ pub trait Handle: Clone + Sync + Send {
    fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
    /// Unfollow the given peer.
    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
-
    /// Notify the service that a project has been updated, and announce local refs.
-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error>;
+

+
    /// Notify the service that a repository has been updated, and references
+
    /// should be announced over the network.
+
    #[deprecated(note = "use `announce_refs_for` instead")]
+
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
+
        self.announce_refs_for(id, [self.nid()?])
+
    }
+

+
    /// Notify the service that a repository has been updated, and references
+
    /// for the given `namespaces` should be announced over the network.
+
    fn announce_refs_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Self::Error>;
+

    /// Announce local inventory.
    fn announce_inventory(&mut self) -> Result<(), Self::Error>;
    /// Notify the service that our inventory was updated with the given repository.
@@ -1215,12 +1031,13 @@ impl Node {
    pub fn announce(
        &mut self,
        rid: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
        timeout: time::Duration,
        mut announcer: sync::Announcer,
        mut report: impl FnMut(&NodeId, sync::announce::Progress),
    ) -> Result<sync::AnnouncerResult, Error> {
        let mut events = self.subscribe(timeout)?;
-
        let refs = self.announce_refs(rid)?;
+
        let refs = self.announce_refs_for(rid, namespaces)?;

        let started = time::Instant::now();

@@ -1333,9 +1150,19 @@ impl Handle for Node {
        Ok(())
    }

-
    fn seeds(&mut self, rid: RepoId) -> Result<Seeds, Error> {
+
    fn seeds_for(
+
        &mut self,
+
        rid: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Error> {
        let seeds = self
-
            .call::<Seeds>(Command::Seeds { rid }, DEFAULT_TIMEOUT)?
+
            .call::<Seeds>(
+
                Command::SeedsFor {
+
                    rid,
+
                    namespaces: HashSet::from_iter(namespaces),
+
                },
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse)??;

@@ -1391,9 +1218,19 @@ impl Handle for Node {
        Ok(response.updated)
    }

-
    fn announce_refs(&mut self, rid: RepoId) -> Result<RefsAt, Error> {
+
    fn announce_refs_for(
+
        &mut self,
+
        rid: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Error> {
        let refs: RefsAt = self
-
            .call(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)?
+
            .call(
+
                Command::AnnounceRefsFor {
+
                    rid,
+
                    namespaces: HashSet::from_iter(namespaces),
+
                },
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse)??;

added crates/radicle/src/node/command.rs
@@ -0,0 +1,337 @@
+
//! Commands sent to the node via the control socket, and auxiliary types, as
+
//! well as their results (responses on the socket).
+

+
// There are derives on an enum with a deprecated variant
+
// in this module, see [`Command::AnnounceRefs`] and also
+
// <https://github.com/rust-lang/rust/issues/92313>.
+
#![allow(deprecated)]
+

+
use std::collections::HashSet;
+
use std::io;
+
use std::time;
+

+
use serde::{Deserialize, Serialize};
+
use serde_json as json;
+

+
use crate::crypto::PublicKey;
+
use crate::identity::RepoId;
+

+
use super::events::Event;
+
use super::NodeId;
+

+
/// Default timeout when waiting for the node to respond with data.
+
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
+

+
/// Command name.
+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
#[serde(rename_all = "camelCase", tag = "command")]
+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
pub enum Command {
+
    /// Announce repository references for given repository to peers.
+
    #[serde(rename_all = "camelCase")]
+
    #[deprecated(note = "use `AnnounceRefsFor` instead")]
+
    AnnounceRefs { rid: RepoId },
+

+
    /// Announce repository references for given repository
+
    /// and namespaces to peers.
+
    #[serde(rename_all = "camelCase")]
+
    AnnounceRefsFor {
+
        /// The ID of the repository for which references should be announced.
+
        rid: RepoId,
+

+
        /// The namespaces for which references should be announced.
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "HashSet<crate::schemars_ext::crypto::PublicKey>")
+
        )]
+
        namespaces: HashSet<PublicKey>,
+
    },
+

+
    /// Announce local repositories to peers.
+
    #[serde(rename_all = "camelCase")]
+
    AnnounceInventory,
+

+
    /// Update node's inventory.
+
    AddInventory { rid: RepoId },
+

+
    /// Get the current node condiguration.
+
    Config,
+

+
    /// Get the node's listen addresses.
+
    ListenAddrs,
+

+
    /// Connect to node with the given address.
+
    #[serde(rename_all = "camelCase")]
+
    Connect {
+
        addr: super::config::ConnectAddress,
+
        opts: ConnectOptions,
+
    },
+

+
    /// Disconnect from a node.
+
    #[serde(rename_all = "camelCase")]
+
    Disconnect {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
    },
+

+
    /// Look up seeds for the given repository in the routing table.
+
    #[serde(rename_all = "camelCase")]
+
    #[deprecated(note = "use `SeedsFor` instead")]
+
    Seeds { rid: RepoId },
+

+
    /// Look up seeds for the given repository in the routing table and
+
    /// report sync status for the given namespaces.
+
    #[serde(rename_all = "camelCase")]
+
    SeedsFor {
+
        /// The ID of the repository for which seeds should be looked up
+
        /// in the routing table.
+
        rid: RepoId,
+

+
        /// The namespaces for which references should be announced.
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "HashSet<crate::schemars_ext::crypto::PublicKey>")
+
        )]
+
        namespaces: HashSet<PublicKey>,
+
    },
+

+
    /// Get the current peer sessions.
+
    Sessions,
+

+
    /// Get a specific peer session.
+
    Session {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
    },
+

+
    /// Fetch the given repository from the network.
+
    #[serde(rename_all = "camelCase")]
+
    Fetch {
+
        rid: RepoId,
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
        timeout: time::Duration,
+
    },
+

+
    /// Seed the given repository.
+
    #[serde(rename_all = "camelCase")]
+
    Seed {
+
        rid: RepoId,
+
        scope: super::policy::Scope,
+
    },
+

+
    /// Unseed the given repository.
+
    #[serde(rename_all = "camelCase")]
+
    Unseed { rid: RepoId },
+

+
    /// Follow the given node.
+
    #[serde(rename_all = "camelCase")]
+
    Follow {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
        alias: Option<super::Alias>,
+
    },
+

+
    /// Unfollow the given node.
+
    #[serde(rename_all = "camelCase")]
+
    Unfollow {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
    },
+

+
    /// Get the node's status.
+
    Status,
+

+
    /// Get node debug information.
+
    Debug,
+

+
    /// Get the node's NID.
+
    NodeId,
+

+
    /// Shutdown the node.
+
    Shutdown,
+

+
    /// Subscribe to events.
+
    Subscribe,
+
}
+

+
impl Command {
+
    /// Write this command to a stream, including a terminating LF character.
+
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
+
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
+
        w.write_all(b"\n")
+
    }
+
}
+

+
/// Options passed to the "connect" node command.
+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
pub struct ConnectOptions {
+
    /// Establish a persistent connection.
+
    pub persistent: bool,
+
    /// How long to wait for the connection to be established.
+
    pub timeout: time::Duration,
+
}
+

+
impl Default for ConnectOptions {
+
    fn default() -> Self {
+
        Self {
+
            persistent: false,
+
            timeout: DEFAULT_TIMEOUT,
+
        }
+
    }
+
}
+

+
/// Result of a command, on the node control socket.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[serde(untagged)]
+
pub enum CommandResult<T> {
+
    /// Response on node socket indicating that a command was carried out successfully.
+
    Okay(T),
+
    /// Response on node socket indicating that an error occured.
+
    Error {
+
        /// The reason for the error.
+
        #[serde(rename = "error")]
+
        reason: String,
+
    },
+
}
+

+
impl<T, E> From<Result<T, E>> for CommandResult<T>
+
where
+
    E: std::error::Error,
+
{
+
    fn from(result: Result<T, E>) -> Self {
+
        match result {
+
            Ok(t) => Self::Okay(t),
+
            Err(e) => Self::Error {
+
                reason: e.to_string(),
+
            },
+
        }
+
    }
+
}
+

+
impl From<Event> for CommandResult<Event> {
+
    fn from(event: Event) -> Self {
+
        Self::Okay(event)
+
    }
+
}
+

+
/// A success response.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
pub struct Success {
+
    /// Whether something was updated.
+
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
+
    pub(super) updated: bool,
+
}
+

+
impl CommandResult<Success> {
+
    /// Create an "updated" response.
+
    pub fn updated(updated: bool) -> Self {
+
        Self::Okay(Success { updated })
+
    }
+

+
    /// Create an "ok" response.
+
    pub fn ok() -> Self {
+
        Self::Okay(Success { updated: false })
+
    }
+
}
+

+
impl CommandResult<()> {
+
    /// Create an error result.
+
    pub fn error(err: impl std::error::Error) -> Self {
+
        Self::Error {
+
            reason: err.to_string(),
+
        }
+
    }
+
}
+

+
impl<T: Serialize> CommandResult<T> {
+
    /// Write this command result to a stream, including a terminating LF character.
+
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
+
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
+
        w.write_all(b"\n")
+
    }
+
}
+

+
#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
+
mod test {
+
    use super::*;
+
    use std::collections::VecDeque;
+

+
    use localtime::LocalTime;
+

+
    use crate::assert_matches;
+
    use crate::node::{Seeds, State};
+

+
    #[test]
+
    fn command_result() {
+
        #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+
        struct Test {
+
            value: u32,
+
        }
+

+
        assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
+
        assert_eq!(
+
            json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
+
            "{\"value\":42}"
+
        );
+
        assert_eq!(
+
            json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
+
            CommandResult::Okay(Test { value: 42 })
+
        );
+
        assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
+
        assert_eq!(
+
            json::to_string(&CommandResult::updated(true)).unwrap(),
+
            "{\"updated\":true}"
+
        );
+
        assert_eq!(
+
            json::to_string(&CommandResult::error(io::Error::from(
+
                io::ErrorKind::NotFound
+
            )))
+
            .unwrap(),
+
            "{\"error\":\"entity not found\"}"
+
        );
+

+
        json::from_str::<CommandResult<State>>(
+
            &serde_json::to_string(&CommandResult::Okay(State::Connected {
+
                since: LocalTime::now(),
+
                ping: Default::default(),
+
                fetching: Default::default(),
+
                latencies: VecDeque::default(),
+
                stable: false,
+
            }))
+
            .unwrap(),
+
        )
+
        .unwrap();
+

+
        assert_matches!(
+
            json::from_str::<CommandResult<State>>(
+
                r#"{"connected":{"since":1699636852107,"fetching":[]}}"#
+
            ),
+
            Ok(CommandResult::Okay(_))
+
        );
+
        assert_matches!(
+
            json::from_str::<CommandResult<Seeds>>(
+
                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
+
            ),
+
            Ok(CommandResult::Okay(_))
+
        );
+
    }
+
}
modified crates/radicle/src/storage/refs.rs
@@ -405,6 +405,12 @@ impl RefsAt {
    }
}

+
impl std::fmt::Display for RefsAt {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        write!(f, "{} @ {}", self.remote, self.at)
+
    }
+
}
+

/// Verified [`SignedRefs`] that keeps track of their content address
/// [`Oid`].
#[derive(Debug, Clone, PartialEq, Eq)]