Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Allow announcing refs for given public keys
Lorenz Leutgeb committed 7 months ago
commit 9689de9af08d61ec1134269d3d03a72465767416
parent 4dbb022d229fc4b0170de9b497837943d91af6e4
11 files changed +144 -53
modified crates/radicle-cli/src/node.rs
@@ -199,18 +199,24 @@ where
        reporting.completion.clone(),
    );

-
    match node.announce(rid, settings.timeout, announcer, |node, progress| {
-
        spinner.message(format!(
-
            "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
-
            term::format::node_id_human_compact(node),
-
            term::format::secondary(progress.preferred()),
-
            term::format::secondary(n_preferred_seeds),
-
            term::format::secondary(progress.synced()),
-
            // N.b. the number of replicas could exceed the target if we're
-
            // waiting for preferred seeds
-
            term::format::secondary(min_replicas.max(progress.synced())),
-
        ));
-
    }) {
+
    match node.announce(
+
        rid,
+
        [profile.did().into()],
+
        settings.timeout,
+
        announcer,
+
        |node, progress| {
+
            spinner.message(format!(
+
                "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
+
                term::format::node_id_human_compact(node),
+
                term::format::secondary(progress.preferred()),
+
                term::format::secondary(n_preferred_seeds),
+
                term::format::secondary(progress.synced()),
+
                // N.b. the number of replicas could exceed the target if we're
+
                // waiting for preferred seeds
+
                term::format::secondary(min_replicas.max(progress.synced())),
+
            ));
+
        },
+
    ) {
        Ok(result) => {
            spinner.message(format!(
                "Synced with {} seed(s)",
modified crates/radicle-cli/tests/commands.rs
@@ -1772,7 +1772,7 @@ fn test_cob_replication() {
    // announcement, otherwise Alice will consider it stale.
    thread::sleep(time::Duration::from_millis(3));

-
    bob.handle.announce_refs(rid).unwrap();
+
    bob.handle.announce_refs_for(rid, [bob.id]).unwrap();

    // Wait for Alice to fetch the issue refs.
    events
modified crates/radicle-node/src/control.rs
@@ -219,11 +219,17 @@ where
                return Err(CommandError::Runtime(e));
            }
        },
+
        #[allow(deprecated)]
        Command::AnnounceRefs { rid } => {
            let refs = handle.announce_refs(rid)?;

            CommandResult::Okay(refs).to_writer(writer)?;
        }
+
        Command::AnnounceRefsFor { rid, namespaces } => {
+
            let refs = handle.announce_refs_for(rid, namespaces)?;
+

+
            CommandResult::Okay(refs).to_writer(writer)?;
+
        }
        Command::AnnounceInventory => {
            if let Err(e) = handle.announce_inventory() {
                return Err(CommandError::Runtime(e));
@@ -308,6 +314,7 @@ mod tests {
        let socket = tmp.path().join("alice.sock");
        let rids = test::arbitrary::set::<RepoId>(1..3);
        let listener = Listener::bind(&socket).unwrap();
+
        let nid = handle.nid().unwrap();

        thread::spawn({
            let handle = handle.clone();
@@ -324,8 +331,9 @@ mod tests {
            writeln!(
                &mut stream,
                "{}",
-
                json::to_string(&Command::AnnounceRefs {
-
                    rid: rid.to_owned()
+
                json::to_string(&Command::AnnounceRefsFor {
+
                    rid: rid.to_owned(),
+
                    namespaces: [nid].into(),
                })
                .unwrap()
            )
@@ -345,7 +353,7 @@ mod tests {
        }

        for rid in &rids {
-
            assert!(handle.updates.lock().unwrap().contains(rid));
+
            assert!(handle.updates.lock().unwrap().contains(&(*rid, nid)));
        }
    }

modified crates/radicle-node/src/runtime/handle.rs
@@ -1,3 +1,4 @@
+
use std::collections::HashSet;
use std::net;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -9,6 +10,7 @@ use std::os::unix::net::UnixStream as Stream;
use winpipe::WinStream as Stream;

use crossbeam_channel as chan;
+
use radicle::crypto::PublicKey;
use radicle::node::events::{Event, Events};
use radicle::node::policy;
use radicle::node::{Config, NodeId};
@@ -253,9 +255,17 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
+
    fn announce_refs_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::AnnounceRefs(id, sender))?;
+
        self.command(service::Command::AnnounceRefs(
+
            id,
+
            HashSet::from_iter(namespaces),
+
            sender,
+
        ))?;
        receiver.recv().map_err(Error::from)
    }

modified crates/radicle-node/src/test/handle.rs
@@ -3,6 +3,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time;

+
use radicle::crypto::PublicKey;
use radicle::git;
use radicle::storage::refs::RefsAt;

@@ -14,7 +15,7 @@ use radicle::node::NodeId;

#[derive(Default, Clone)]
pub struct Handle {
-
    pub updates: Arc<Mutex<Vec<RepoId>>>,
+
    pub updates: Arc<Mutex<Vec<(RepoId, PublicKey)>>>,
    pub seeding: Arc<Mutex<HashSet<RepoId>>>,
    pub following: Arc<Mutex<HashSet<NodeId>>>,
}
@@ -91,8 +92,15 @@ impl radicle::node::Handle for Handle {
        Ok(self.following.lock().unwrap().remove(&id))
    }

-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
-
        self.updates.lock().unwrap().push(id);
+
    fn announce_refs_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Self::Error> {
+
        self.updates
+
            .lock()
+
            .unwrap()
+
            .extend(namespaces.into_iter().map(|ns| (id, ns)));

        Ok(RefsAt {
            remote: self.nid()?,
modified crates/radicle-node/src/tests/e2e.rs
@@ -1378,7 +1378,7 @@ fn test_background_foreground_fetch() {
        Title::new("Concurrent fetches").unwrap(),
        "Concurrent fetches are harshing my vibes",
    );
-
    bob.handle.announce_refs(rid).unwrap();
+
    bob.handle.announce_refs_for(rid, [bob.id]).unwrap();
    alice_events
        .wait(
            |e| matches!(e, Event::RefsAnnounced { .. }).then_some(()),
@@ -1427,7 +1427,7 @@ fn test_catchup_on_refs_announcements() {

    log::debug!(target: "test", "Bob creating his issue..");
    bob.issue(acme, Title::new("Bob's issue").unwrap(), "[..]");
-
    bob.handle.announce_refs(acme).unwrap();
+
    bob.handle.announce_refs_for(acme, [bob.id]).unwrap();

    log::debug!(target: "test", "Waiting for seed to fetch Bob's refs from Bob..");
    seed.has_remote_of(&acme, &bob.id); // Seed fetches Bob's refs.
modified crates/radicle-protocol/src/service.rs
@@ -238,8 +238,8 @@ pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Se

/// Commands sent to the service by the operator.
pub enum Command {
-
    /// Announce repository references for given repository to peers.
-
    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
+
    /// Announce repository references for given repository and namespaces to peers.
+
    AnnounceRefs(RepoId, HashSet<PublicKey>, chan::Sender<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
    /// Add repository to local inventory.
@@ -271,7 +271,7 @@ pub enum Command {
impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
+
            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
@@ -930,7 +930,7 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => {
+
            Command::AnnounceRefs(id, namespaces, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
                    Ok(None) => {
@@ -943,14 +943,12 @@ where
                    }
                };

-
                match self.announce_own_refs(id, doc) {
-
                    Ok(refs) => match refs.as_slice() {
-
                        &[refs] => {
-
                            resp.send(refs).ok();
+
                match self.announce_own_refs(id, doc, namespaces) {
+
                    Ok((refs, _timestamp)) => {
+
                        for r in refs {
+
                            resp.send(r).ok();
                        }
-
                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
-
                        [..] => panic!("Service::command: unexpected refs returned"),
-
                    },
+
                    }
                    Err(err) => {
                        error!(target: "service", "Error announcing refs: {err}");
                    }
@@ -1226,7 +1224,7 @@ where
                } else {
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
                    // beyond just knowing that we have added an item to our inventory.
-
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
+
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
                        error!(target: "service", "Failed to announce new refs: {e}");
                    }
                }
@@ -2167,16 +2165,21 @@ where
    }

    /// Announce our own refs for the given repo.
-
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
-
        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;
+
    fn announce_own_refs(
+
        &mut self,
+
        rid: RepoId,
+
        doc: Doc,
+
        namespaces: impl IntoIterator<Item = NodeId>,
+
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
+
        let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;

        // Update refs database with our signed refs branches.
        // This isn't strictly necessary for now, as we only use the database for fetches, and
        // we don't fetch our own refs that are announced, but it's for good measure.
-
        if let &[r] = refs.as_slice() {
+
        for r in refs.iter() {
            self.emitter.emit(Event::LocalRefsAnnounced {
                rid,
-
                refs: r,
+
                refs: *r,
                timestamp,
            });
            if let Err(e) = self.database_mut().refs_mut().set(
@@ -2193,7 +2196,7 @@ where
                );
            }
        }
-
        Ok(refs)
+
        Ok((refs, timestamp))
    }

    /// Announce local refs for given repo.
@@ -2202,6 +2205,7 @@ where
        rid: RepoId,
        doc: Doc,
        remotes: impl IntoIterator<Item = NodeId>,
+
        own: bool,
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
        let timestamp = ann.timestamp();
@@ -2209,18 +2213,13 @@ where

        // Update our sync status for our own refs. This is useful for determining if refs were
        // updated while the node was stopped.
-
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
+
        for r in refs.iter().filter(|r| own || r.remote == ann.node) {
            info!(
                target: "service",
-
                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
-
                refs.at
+
                "Announcing refs {rid}/{r} to peers (t={timestamp})..",
            );
            // Update our local node's sync status to mark the refs as announced.
-
            if let Err(e) = self
-
                .db
-
                .seeds_mut()
-
                .synced(&rid, &ann.node, refs.at, timestamp)
-
            {
+
            if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
                error!(target: "service", "Error updating sync status for local node: {e}");
            } else {
                debug!(target: "service", "Saved local sync status for {rid}..");
modified crates/radicle/CHANGELOG.md
@@ -9,11 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

+
- `radicle::node::Handle::announce_refs_for` now allows specifying for which
+
  namespaces changes should be announced. A corresponding enum variant
+
  `radicle::node::Command::AnnounceRefsFor` is added.

### Changed

### Deprecated

+
- `radicle::node::Handle::announce_refs` is deprecated in favor of
+
  `radicle::node::Handle::announce_refs_for`.
+

### Removed

### Security
modified crates/radicle/src/node.rs
@@ -901,8 +901,22 @@ pub trait Handle: Clone + Sync + Send {
    fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
    /// Unfollow the given peer.
    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
-
    /// Notify the service that a project has been updated, and announce local refs.
-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error>;
+

+
    /// Notify the service that a repository has been updated, and references
+
    /// should be announced over the network.
+
    #[deprecated(note = "use `announce_refs_for` instead")]
+
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
+
        self.announce_refs_for(id, [self.nid()?])
+
    }
+

+
    /// Notify the service that a repository has been updated, and references
+
    /// for the given `namespaces` should be announced over the network.
+
    fn announce_refs_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Self::Error>;
+

    /// Announce local inventory.
    fn announce_inventory(&mut self) -> Result<(), Self::Error>;
    /// Notify the service that our inventory was updated with the given repository.
@@ -1004,12 +1018,13 @@ impl Node {
    pub fn announce(
        &mut self,
        rid: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
        timeout: time::Duration,
        mut announcer: sync::Announcer,
        mut report: impl FnMut(&NodeId, sync::announce::Progress),
    ) -> Result<sync::AnnouncerResult, Error> {
        let mut events = self.subscribe(timeout)?;
-
        let refs = self.announce_refs(rid)?;
+
        let refs = self.announce_refs_for(rid, namespaces)?;

        let started = time::Instant::now();

@@ -1180,9 +1195,19 @@ impl Handle for Node {
        Ok(response.updated)
    }

-
    fn announce_refs(&mut self, rid: RepoId) -> Result<RefsAt, Error> {
+
    fn announce_refs_for(
+
        &mut self,
+
        rid: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Error> {
        let refs: RefsAt = self
-
            .call(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)?
+
            .call(
+
                Command::AnnounceRefsFor {
+
                    rid,
+
                    namespaces: HashSet::from_iter(namespaces),
+
                },
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse)??;

modified crates/radicle/src/node/command.rs
@@ -1,12 +1,19 @@
//! Commands sent to the node via the control socket, and auxiliary types, as
//! well as their results (responses on the socket).

+
// There are derives on an enum with a deprecated variant
+
// in this module, see [`Command::AnnounceRefs`] and also
+
// <https://github.com/rust-lang/rust/issues/92313>.
+
#![allow(deprecated)]
+

+
use std::collections::HashSet;
use std::io;
use std::time;

use serde::{Deserialize, Serialize};
use serde_json as json;

+
use crate::crypto::PublicKey;
use crate::identity::RepoId;

use super::events::Event;
@@ -22,8 +29,24 @@ pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
pub enum Command {
    /// Announce repository references for given repository to peers.
    #[serde(rename_all = "camelCase")]
+
    #[deprecated(note = "use `AnnounceRefsFor` instead")]
    AnnounceRefs { rid: RepoId },

+
    /// Announce repository references for given repository
+
    /// and namespaces to peers.
+
    #[serde(rename_all = "camelCase")]
+
    AnnounceRefsFor {
+
        /// The ID of the repository for which references should be announced.
+
        rid: RepoId,
+

+
        /// The namespaces for which references should be announced.
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "HashSet<crate::schemars_ext::crypto::PublicKey>")
+
        )]
+
        namespaces: HashSet<PublicKey>,
+
    },
+

    /// Announce local repositories to peers.
    #[serde(rename_all = "camelCase")]
    AnnounceInventory,
modified crates/radicle/src/storage/refs.rs
@@ -405,6 +405,12 @@ impl RefsAt {
    }
}

+
impl std::fmt::Display for RefsAt {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        write!(f, "{} @ {}", self.remote, self.at)
+
    }
+
}
+

/// Verified [`SignedRefs`] that keeps track of their content address
/// [`Oid`].
#[derive(Debug, Clone, PartialEq, Eq)]