Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Allow announcing refs for given public keys
Lorenz Leutgeb committed 7 months ago
commit 3f47d9f4b2d8312d858476face45f3ca4d563ada
parent 2e77d5ef4df526a8453d38d6d2d735fe9ce7e423
8 files changed +98 -28
modified crates/radicle-cli/src/node.rs
@@ -199,7 +199,12 @@ where
        reporting.completion.clone(),
    );

-
    match node.announce(rid, settings.timeout, announcer, |node, progress| {
+
    // Note that technically we could command the node to announce refs
+
    // for arbitrary namespaces. Here, for backwards compatibility, we
+
    // only announce for our own namespace.
+
    let ns = [profile.did().into()];
+

+
    match node.announce(rid, ns, settings.timeout, announcer, |node, progress| {
        spinner.message(format!(
            "Synced with {}, {} of {} preferred seeds, and {} of at least {} replica(s).",
            term::format::node_id_human_compact(node),
modified crates/radicle-cli/tests/commands.rs
@@ -1772,7 +1772,7 @@ fn test_cob_replication() {
    // announcement, otherwise Alice will consider it stale.
    thread::sleep(time::Duration::from_millis(3));

-
    bob.handle.announce_refs(rid).unwrap();
+
    bob.handle.announce_refs(rid, None).unwrap();

    // Wait for Alice to fetch the issue refs.
    events
modified crates/radicle-node/src/control.rs
@@ -219,8 +219,8 @@ where
                return Err(CommandError::Runtime(e));
            }
        },
-
        Command::AnnounceRefs { rid } => {
-
            let refs = handle.announce_refs(rid)?;
+
        Command::AnnounceRefs { rid, ns } => {
+
            let refs = handle.announce_refs(rid, ns)?;

            CommandResult::Okay(refs).to_writer(writer)?;
        }
@@ -291,6 +291,7 @@ fn fetch<W: Write, H: Handle<Error = runtime::HandleError>>(

#[cfg(test)]
mod tests {
+
    use std::collections::HashSet;
    use std::io::prelude::*;
    use std::thread;

@@ -308,6 +309,7 @@ mod tests {
        let socket = tmp.path().join("alice.sock");
        let rids = test::arbitrary::set::<RepoId>(1..3);
        let listener = Listener::bind(&socket).unwrap();
+
        let nid = handle.nid().unwrap();

        thread::spawn({
            let handle = handle.clone();
@@ -325,7 +327,8 @@ mod tests {
                &mut stream,
                "{}",
                json::to_string(&Command::AnnounceRefs {
-
                    rid: rid.to_owned()
+
                    rid: rid.to_owned(),
+
                    ns: HashSet::new(),
                })
                .unwrap()
            )
@@ -345,7 +348,7 @@ mod tests {
        }

        for rid in &rids {
-
            assert!(handle.updates.lock().unwrap().contains(rid));
+
            assert!(handle.updates.lock().unwrap().contains(&(*rid, nid)));
        }
    }

modified crates/radicle-node/src/runtime/handle.rs
@@ -1,3 +1,4 @@
+
use std::collections::HashSet;
use std::net;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -9,6 +10,7 @@ use std::os::unix::net::UnixStream as Stream;
use winpipe::WinStream as Stream;

use crossbeam_channel as chan;
+
use radicle::crypto::PublicKey;
use radicle::node::events::{Event, Events};
use radicle::node::policy;
use radicle::node::{Config, NodeId};
@@ -253,9 +255,17 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
+
    fn announce_refs(
+
        &mut self,
+
        id: RepoId,
+
        ns: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::AnnounceRefs(id, sender))?;
+
        self.command(service::Command::AnnounceRefs(
+
            id,
+
            HashSet::from_iter(ns),
+
            sender,
+
        ))?;
        receiver.recv().map_err(Error::from)
    }

modified crates/radicle-node/src/test/handle.rs
@@ -3,6 +3,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time;

+
use radicle::crypto::PublicKey;
use radicle::git;
use radicle::storage::refs::RefsAt;

@@ -14,7 +15,7 @@ use radicle::node::NodeId;

#[derive(Default, Clone)]
pub struct Handle {
-
    pub updates: Arc<Mutex<Vec<RepoId>>>,
+
    pub updates: Arc<Mutex<Vec<(RepoId, PublicKey)>>>,
    pub seeding: Arc<Mutex<HashSet<RepoId>>>,
    pub following: Arc<Mutex<HashSet<NodeId>>>,
}
@@ -91,8 +92,15 @@ impl radicle::node::Handle for Handle {
        Ok(self.following.lock().unwrap().remove(&id))
    }

-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error> {
-
        self.updates.lock().unwrap().push(id);
+
    fn announce_refs(
+
        &mut self,
+
        id: RepoId,
+
        ns: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Self::Error> {
+
        self.updates
+
            .lock()
+
            .unwrap()
+
            .extend(ns.into_iter().map(|ns| (id, ns)));

        Ok(RefsAt {
            remote: self.nid()?,
modified crates/radicle-node/src/tests/e2e.rs
@@ -1378,7 +1378,7 @@ fn test_background_foreground_fetch() {
        Title::new("Concurrent fetches").unwrap(),
        "Concurrent fetches are harshing my vibes",
    );
-
    bob.handle.announce_refs(rid).unwrap();
+
    bob.handle.announce_refs(rid, None).unwrap();
    alice_events
        .wait(
            |e| matches!(e, Event::RefsAnnounced { .. }).then_some(()),
@@ -1427,7 +1427,7 @@ fn test_catchup_on_refs_announcements() {

    log::debug!(target: "test", "Bob creating his issue..");
    bob.issue(acme, Title::new("Bob's issue").unwrap(), "[..]");
-
    bob.handle.announce_refs(acme).unwrap();
+
    bob.handle.announce_refs(acme, None).unwrap();

    log::debug!(target: "test", "Waiting for seed to fetch Bob's refs from Bob..");
    seed.has_remote_of(&acme, &bob.id); // Seed fetches Bob's refs.
modified crates/radicle-protocol/src/service.rs
@@ -238,8 +238,8 @@ pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Se

/// Commands sent to the service by the operator.
pub enum Command {
-
    /// Announce repository references for given repository to peers.
-
    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
+
    /// Announce repository references for given repository and namespaces to peers.
+
    AnnounceRefs(RepoId, HashSet<PublicKey>, chan::Sender<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
    /// Add repository to local inventory.
@@ -271,7 +271,7 @@ pub enum Command {
impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
+
            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
@@ -930,7 +930,7 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => {
+
            Command::AnnounceRefs(id, ns, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
                    Ok(None) => {
@@ -943,8 +943,14 @@ where
                    }
                };

-
                match self.announce_own_refs(id, doc) {
-
                    Ok(refs) => match refs.as_slice() {
+
                let result = if ns.is_empty() {
+
                    self.announce_own_refs(id, doc)
+
                } else {
+
                    self.announce_refs(id, doc, ns)
+
                };
+

+
                match result {
+
                    Ok((refs, _timestamp)) => match refs.as_slice() {
                        &[refs] => {
                            resp.send(refs).ok();
                        }
@@ -2167,7 +2173,11 @@ where
    }

    /// Announce our own refs for the given repo.
-
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc) -> Result<Vec<RefsAt>, Error> {
+
    fn announce_own_refs(
+
        &mut self,
+
        rid: RepoId,
+
        doc: Doc,
+
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;

        // Update refs database with our signed refs branches.
@@ -2193,7 +2203,7 @@ where
                );
            }
        }
-
        Ok(refs)
+
        Ok((refs, timestamp))
    }

    /// Announce local refs for given repo.
modified crates/radicle/src/node.rs
@@ -613,9 +613,28 @@ impl From<Address> for HostName {
#[serde(rename_all = "camelCase", tag = "command")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum Command {
-
    /// Announce repository references for given repository to peers.
+
    /// Announce repository references for given repository
+
    /// and namespaces to peers.
    #[serde(rename_all = "camelCase")]
-
    AnnounceRefs { rid: RepoId },
+
    AnnounceRefs {
+
        rid: RepoId,
+

+
        /// The namespaces for which references should be announced.
+
        ///
+
        /// For backwards compatibility, this is optional and defaults to the
+
        /// empty set. However, the empty set is interpreted by the node just
+
        /// like the singleton set containing the Node ID of the node
+
        /// itself, i.e., the node that receives this command.
+
        /// Thus, the node would announce "its own" references.
+
        /// This makes perfect sense when the node and the user have the same
+
        /// cryptographic identity but not when these identities are different.
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "HashSet<crate::schemars_ext::crypto::PublicKey>")
+
        )]
+
        #[serde(default, skip_serializing_if = "HashSet::is_empty")]
+
        ns: HashSet<PublicKey>,
+
    },

    /// Announce local repositories to peers.
    #[serde(rename_all = "camelCase")]
@@ -624,7 +643,7 @@ pub enum Command {
    /// Update node's inventory.
    AddInventory { rid: RepoId },

-
    /// Get the current node condiguration.
+
    /// Get the current node configuration.
    Config,

    /// Get the node's listen addresses.
@@ -1113,7 +1132,11 @@ pub trait Handle: Clone + Sync + Send {
    /// Unfollow the given peer.
    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error>;
    /// Notify the service that a project has been updated, and announce local refs.
-
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error>;
+
    fn announce_refs(
+
        &mut self,
+
        id: RepoId,
+
        ns: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Self::Error>;
    /// Announce local inventory.
    fn announce_inventory(&mut self) -> Result<(), Self::Error>;
    /// Notify the service that our inventory was updated with the given repository.
@@ -1215,12 +1238,13 @@ impl Node {
    pub fn announce(
        &mut self,
        rid: RepoId,
+
        ns: impl IntoIterator<Item = PublicKey>,
        timeout: time::Duration,
        mut announcer: sync::Announcer,
        mut report: impl FnMut(&NodeId, sync::announce::Progress),
    ) -> Result<sync::AnnouncerResult, Error> {
        let mut events = self.subscribe(timeout)?;
-
        let refs = self.announce_refs(rid)?;
+
        let refs = self.announce_refs(rid, ns)?;

        let started = time::Instant::now();

@@ -1391,9 +1415,19 @@ impl Handle for Node {
        Ok(response.updated)
    }

-
    fn announce_refs(&mut self, rid: RepoId) -> Result<RefsAt, Error> {
+
    fn announce_refs(
+
        &mut self,
+
        rid: RepoId,
+
        ns: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<RefsAt, Error> {
        let refs: RefsAt = self
-
            .call(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)?
+
            .call(
+
                Command::AnnounceRefs {
+
                    rid,
+
                    ns: HashSet::from_iter(ns),
+
                },
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse)??;