Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Report sync status for given namespaces
✗ CI failure Lorenz Leutgeb committed 7 months ago
commit 7effa7c64c63637ae53c4a169bcaeb06c2205c4b
parent 9689de9af08d61ec1134269d3d03a72465767416
2 failed (2 total) View logs
12 files changed +95 -26
modified crates/radicle-cli/src/commands/sync.rs
@@ -345,7 +345,7 @@ fn sync_status(
    const SYMBOL_STATE_UNKNOWN: &str = "•";

    let mut table = Table::<5, term::Label>::new(TableOptions::bordered());
-
    let mut seeds: Vec<_> = node.seeds(rid)?.into();
+
    let mut seeds: Vec<_> = node.seeds_for(rid, [*profile.did()])?.into();
    let local_nid = node.nid()?;
    let aliases = profile.aliases();

@@ -522,7 +522,7 @@ pub fn fetch(
        None => {
            // We push nodes that are in our seed list in attempt to fulfill the
            // replicas, if needed.
-
            let seeds = node.seeds(rid)?;
+
            let seeds = node.seeds_for(rid, [*profile.did()])?;
            let (connected, disconnected) = seeds.partition();
            let candidates = connected
                .into_iter()
modified crates/radicle-cli/src/node.rs
@@ -149,7 +149,7 @@ where

    let config = match sync::PrivateNetwork::private_repo(&doc) {
        None => {
-
            let (synced, unsynced) = node.seeds(rid)?.iter().fold(
+
            let (synced, unsynced) = node.seeds_for(rid, [*me])?.iter().fold(
                (BTreeSet::new(), BTreeSet::new()),
                |(mut synced, mut unsynced), seed| {
                    if seed.is_synced() {
modified crates/radicle-cli/tests/commands.rs
@@ -1704,7 +1704,7 @@ fn test_clone_without_seeds() {
    let working = environment.tempdir().join("working");
    let rid = alice.project("heartwood", "Radicle Heartwood Protocol & Stack");
    let mut alice = alice.spawn();
-
    let seeds = alice.handle.seeds(rid).unwrap();
+
    let seeds = alice.handle.seeds_for(rid, [alice.id]).unwrap();
    let connected = seeds.connected().collect::<Vec<_>>();

    assert!(connected.is_empty());
modified crates/radicle-node/src/control.rs
@@ -172,11 +172,17 @@ where

            CommandResult::Okay(addrs).to_writer(writer)?;
        }
+
        #[allow(deprecated)]
        Command::Seeds { rid } => {
            let seeds = handle.seeds(rid)?;

            CommandResult::Okay(seeds).to_writer(writer)?;
        }
+
        Command::SeedsFor { rid, namespaces } => {
+
            let seeds = handle.seeds_for(rid, namespaces)?;
+

+
            CommandResult::Okay(seeds).to_writer(writer)?;
+
        }
        Command::Sessions => {
            let sessions = handle.sessions()?;

modified crates/radicle-node/src/runtime/handle.rs
@@ -202,9 +202,17 @@ impl radicle::node::Handle for Handle {
            .map_err(Error::from)
    }

-
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
+
    fn seeds_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Seeds(id, sender))?;
+
        self.command(service::Command::Seeds(
+
            id,
+
            HashSet::from_iter(namespaces),
+
            sender,
+
        ))?;
        receiver.recv().map_err(Error::from)
    }

modified crates/radicle-node/src/test/handle.rs
@@ -55,7 +55,11 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

-
    fn seeds(&mut self, _id: RepoId) -> Result<Seeds, Self::Error> {
+
    fn seeds_for(
+
        &mut self,
+
        _id: RepoId,
+
        _namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Self::Error> {
        unimplemented!();
    }

modified crates/radicle-node/src/test/node.rs
@@ -200,7 +200,7 @@ impl<G: Signer<Signature> + cyphernet::Ecdh> NodeHandle<G> {
        log::debug!(target: "test", "Waiting for {} to be in sync with {nid} for {rid}", self.id);

        loop {
-
            let seeds = self.handle.seeds(*rid).unwrap();
+
            let seeds = self.handle.seeds_for(*rid, [self.id]).unwrap();
            if seeds.iter().any(|s| s.nid == *nid && s.is_synced()) {
                break;
            }
modified crates/radicle-node/src/tests/e2e.rs
@@ -183,7 +183,7 @@ fn test_replication() {
    let updated = alice.handle.seed(acme, Scope::All).unwrap();
    assert!(updated);

-
    let seeds = alice.handle.seeds(acme).unwrap();
+
    let seeds = alice.handle.seeds_for(acme, None).unwrap();
    assert!(seeds.is_connected(&bob.id));

    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
@@ -549,7 +549,7 @@ fn test_clone() {
    transport::local::register(alice.storage.clone());

    let _ = alice.handle.seed(acme, Scope::All).unwrap();
-
    let seeds = alice.handle.seeds(acme).unwrap();
+
    let seeds = alice.handle.seeds_for(acme, None).unwrap();
    assert!(seeds.is_connected(&bob.id));

    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
modified crates/radicle-protocol/src/service.rs
@@ -252,8 +252,9 @@ pub enum Command {
    Config(chan::Sender<Config>),
    /// Get the node's listen addresses.
    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
-
    /// Lookup seeds for the given repository in the routing table.
-
    Seeds(RepoId, chan::Sender<Seeds>),
+
    /// Lookup seeds for the given repository in the routing table, and report
+
    /// sync status for given namespaces.
+
    Seeds(RepoId, HashSet<PublicKey>, chan::Sender<Seeds>),
    /// Fetch the given repository from the network.
    Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
    /// Seed the given repository.
@@ -278,7 +279,7 @@ impl fmt::Debug for Command {
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Config(_) => write!(f, "Config"),
            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
-
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
+
            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
@@ -880,7 +881,7 @@ where
            Command::ListenAddrs(resp) => {
                resp.send(self.listening.clone()).ok();
            }
-
            Command::Seeds(rid, resp) => match self.seeds(&rid) {
+
            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
                Ok(seeds) => {
                    let (connected, disconnected) = seeds.partition();
                    debug!(
@@ -2280,14 +2281,19 @@ where
        Ok(())
    }

-
    fn seeds(&self, rid: &RepoId) -> Result<Seeds, Error> {
+
    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
        let mut seeds = Seeds::new(self.rng.clone());

-
        // First build a list from peers that have synced our own refs, if any.
-
        // This step is skipped if we don't have the repository yet, or don't have
-
        // our own refs.
+
        // First, build a list of peers that have synced refs for `namespaces`, if any.
+
        // This step is skipped:
+
        //  1. For the repository (and thus all `namespaces`), if it not exist in storage.
+
        //  2. For each `namespace` in `namespaces`, which does not exist in storage.
        if let Ok(repo) = self.storage.repository(*rid) {
-
            if let Ok(local) = RefsAt::new(&repo, self.node_id()) {
+
            for namespace in namespaces.iter() {
+
                let Ok(local) = RefsAt::new(&repo, *namespace) else {
+
                    continue;
+
                };
+

                for seed in self.db.seeds().seeds_for(rid)? {
                    let seed = seed?;
                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
@@ -2310,7 +2316,7 @@ where
        // These peers have announced that they seed the repository via an inventory
        // announcement, but we haven't received any ref announcements from them.
        for nid in self.db.routing().get(rid)? {
-
            if nid == self.node_id() {
+
            if namespaces.contains(&nid) {
                continue;
            }
            if seeds.contains(&nid) {
@@ -2528,7 +2534,7 @@ where
            if self.storage.contains(&rid)? {
                continue;
            }
-
            match self.seeds(&rid) {
+
            match self.seeds(&rid, [self.node_id()].into()) {
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
modified crates/radicle/CHANGELOG.md
@@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `radicle::node::Handle::announce_refs_for` now allows specifying for which
  namespaces changes should be announced. A corresponding enum variant
  `radicle::node::Command::AnnounceRefsFor` is added.
+
- `radicle::node::Handle::seeds_for` now allows specifying for which
+
  namespaces sync status should be reported. A corresponding enum variant
+
  `radicle::node::Command::SeedsFor` is added.

### Changed

@@ -19,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `radicle::node::Handle::announce_refs` is deprecated in favor of
  `radicle::node::Handle::announce_refs_for`.
+
- `radicle::node::Handle::seeds` is deprecated in favor of
+
  `radicle::node::Handle::seeds_for`.

### Removed

modified crates/radicle/src/node.rs
@@ -883,8 +883,21 @@ pub trait Handle: Clone + Sync + Send {
    ) -> Result<ConnectResult, Self::Error>;
    /// Disconnect from a peer.
    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
-
    /// Lookup the seeds of a given repository in the routing table.
-
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error>;
+

+
    /// Look up the seeds of a given repository in the routing table.
+
    #[deprecated(note = "use `seeds_for` instead")]
+
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
+
        self.seeds_for(id, [self.nid()?])
+
    }
+

+
    /// Look up the seeds of a given repository in the routing table
+
    /// and report sync status for `namespaces`.
+
    fn seeds_for(
+
        &mut self,
+
        id: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Self::Error>;
+

    /// Fetch a repository from the network.
    fn fetch(
        &mut self,
@@ -1137,9 +1150,19 @@ impl Handle for Node {
        Ok(())
    }

-
    fn seeds(&mut self, rid: RepoId) -> Result<Seeds, Error> {
+
    fn seeds_for(
+
        &mut self,
+
        rid: RepoId,
+
        namespaces: impl IntoIterator<Item = PublicKey>,
+
    ) -> Result<Seeds, Error> {
        let seeds = self
-
            .call::<Seeds>(Command::Seeds { rid }, DEFAULT_TIMEOUT)?
+
            .call::<Seeds>(
+
                Command::SeedsFor {
+
                    rid,
+
                    namespaces: HashSet::from_iter(namespaces),
+
                },
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse)??;

modified crates/radicle/src/node/command.rs
@@ -77,10 +77,27 @@ pub enum Command {
        nid: NodeId,
    },

-
    /// Lookup seeds for the given repository in the routing table.
+
    /// Look up seeds for the given repository in the routing table.
    #[serde(rename_all = "camelCase")]
+
    #[deprecated(note = "use `SeedsFor` instead")]
    Seeds { rid: RepoId },

+
    /// Look up seeds for the given repository in the routing table and
+
    /// report sync status for the given namespaces.
+
    #[serde(rename_all = "camelCase")]
+
    SeedsFor {
+
        /// The ID of the repository for which seeds should be looked up
+
        /// in the routing table.
+
        rid: RepoId,
+

+
        /// The namespaces for which references should be announced.
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "HashSet<crate::schemars_ext::crypto::PublicKey>")
+
        )]
+
        namespaces: HashSet<PublicKey>,
+
    },
+

    /// Get the current peer sessions.
    Sessions,