Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Return only remotes that were fetched from
Alexis Sellier committed 3 years ago
commit 94bef619447cc8ba20afa2b4ed8a849c20f49a3f
parent bb07e255717893c588c5faaca6652d06450f7d11
10 files changed +123 -117
modified radicle-cli/src/commands/fetch.rs
@@ -110,7 +110,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

pub fn fetch(rid: Id, node: &mut Node) -> Result<FetchResults, node::Error> {
    // Get seeds. This consults the local routing table only.
-
    let mut seeds = node.seeds(rid)?;
+
    let seeds = node.seeds(rid)?;
    let mut results = FetchResults::default();

    if seeds.has_connections() {
modified radicle-node/src/service.rs
@@ -512,7 +512,7 @@ where
                resp.send(untracked).ok();
            }
            Command::AnnounceRefs(id) => {
-
                if let Err(err) = self.announce_refs(id, &Namespaces::from_iter([self.node_id()])) {
+
                if let Err(err) = self.announce_refs(id, [self.node_id()]) {
                    error!("Error announcing refs: {}", err);
                }
            }
@@ -582,12 +582,11 @@ where
    pub fn fetched(
        &mut self,
        rid: Id,
-
        namespaces: Namespaces,
        remote: NodeId,
-
        result: Result<Vec<RefUpdate>, FetchError>,
+
        result: Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>,
    ) {
        let result = match result {
-
            Ok(updated) => {
+
            Ok((updated, namespaces)) => {
                log::debug!(target: "service", "Fetched {rid} from {remote} successfully");

                for update in &updated {
@@ -599,7 +598,10 @@ where
                    updated: updated.clone(),
                });

-
                FetchResult::Success { updated }
+
                FetchResult::Success {
+
                    updated,
+
                    namespaces,
+
                }
            }
            Err(err) => {
                let reason = err.to_string();
@@ -630,8 +632,11 @@ where
            // because the user might want to announce his fork, once he has created one,
            // or may choose to not announce anything.
            match result {
-
                FetchResult::Success { updated } if !updated.is_empty() => {
-
                    if let Err(e) = self.announce_refs(rid, &namespaces) {
+
                FetchResult::Success {
+
                    updated,
+
                    namespaces,
+
                } if !updated.is_empty() => {
+
                    if let Err(e) = self.announce_refs(rid, namespaces) {
                        error!(target: "service", "Failed to announce new refs: {e}");
                    }
                }
@@ -1190,39 +1195,27 @@ where
    }

    /// Announce local refs for given id.
-
    fn announce_refs(&mut self, rid: Id, namespaces: &Namespaces) -> Result<(), storage::Error> {
+
    fn announce_refs(
+
        &mut self,
+
        rid: Id,
+
        remotes: impl IntoIterator<Item = NodeId>,
+
    ) -> Result<(), storage::Error> {
        let repo = self.storage.repository(rid)?;
        let peers = self.sessions.connected().map(|(_, p)| p);
        let timestamp = self.time();
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

-
        match namespaces {
-
            Namespaces::All => {
-
                for (remote_id, remote) in repo.remotes()?.into_iter() {
-
                    if refs.push((remote_id, remote.refs.unverified())).is_err() {
-
                        warn!(
-
                            target: "service",
-
                            "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
-
                            REF_REMOTE_LIMIT,
-
                        );
-
                        break;
-
                    }
-
                }
-
            }
-
            Namespaces::Trusted(trusted) => {
-
                for remote_id in trusted.iter() {
-
                    if refs
-
                        .push((*remote_id, repo.remote(remote_id)?.refs.unverified()))
-
                        .is_err()
-
                    {
-
                        warn!(
-
                            target: "service",
-
                            "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
-
                            REF_REMOTE_LIMIT,
-
                        );
-
                        break;
-
                    }
-
                }
+
        for remote_id in remotes.into_iter() {
+
            if refs
+
                .push((remote_id, repo.remote(&remote_id)?.refs.unverified()))
+
                .is_err()
+
            {
+
                warn!(
+
                    target: "service",
+
                    "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
+
                    REF_REMOTE_LIMIT,
+
                );
+
                break;
            }
        }

@@ -1428,7 +1421,7 @@ where

        for rid in missing {
            match self.seeds(&rid) {
-
                Ok(mut seeds) => {
+
                Ok(seeds) => {
                    if seeds.has_connections() {
                        for seed in seeds.connected() {
                            self.fetch(rid, seed);
modified radicle-node/src/test/handle.rs
@@ -6,7 +6,6 @@ use crate::node::{FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service::NodeId;
use crate::service::{self, tracking};
-
use crate::storage::RefUpdate;

#[derive(Default, Clone)]
pub struct Handle {
@@ -32,7 +31,10 @@ impl radicle::node::Handle for Handle {
    }

    fn fetch(&mut self, _id: Id, _from: NodeId) -> Result<FetchResult, Self::Error> {
-
        Ok(FetchResult::from(Ok::<Vec<RefUpdate>, Self::Error>(vec![])))
+
        Ok(FetchResult::Success {
+
            updated: vec![],
+
            namespaces: HashSet::new(),
+
        })
    }

    fn track_repo(&mut self, id: Id, _scope: tracking::Scope) -> Result<bool, Self::Error> {
modified radicle-node/src/test/simulator.rs
@@ -1,8 +1,9 @@
//! A simple P2P network simulator. Acts as the _reactor_, but without doing any I/O.
#![allow(clippy::collapsible_if)]
#![allow(dead_code)]
+
#![allow(clippy::type_complexity)]

-
use std::collections::{BTreeMap, BTreeSet, VecDeque};
+
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
use std::rc::Rc;
@@ -65,9 +66,8 @@ pub enum Input {
    /// Fetch completed for a node.
    Fetched(
        Id,
-
        Namespaces,
        NodeId,
-
        Rc<Result<Vec<RefUpdate>, FetchError>>,
+
        Rc<Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>>,
    ),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
@@ -110,7 +110,7 @@ impl fmt::Display for Scheduled {
            Input::Wake => {
                write!(f, "{}: Tock", self.node)
            }
-
            Input::Fetched(rid, _, nid, _) => {
+
            Input::Fetched(rid, nid, _) => {
                write!(f, "{} <<~ {} ({}): Fetched", self.node, nid, rid)
            }
        }
@@ -409,15 +409,21 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            p.received_message(id, msg);
                        }
                    }
-
                    Input::Fetched(rid, ns, nid, result) => {
+
                    Input::Fetched(rid, nid, result) => {
                        let result = Rc::try_unwrap(result).unwrap();
                        let mut repo = match p.storage().repository_mut(rid) {
                            Ok(repo) => repo,
                            Err(e) if e.is_not_found() => p.storage().create(rid).unwrap(),
                            Err(e) => panic!("Failed to open repository: {e}"),
                        };
-
                        fetch(&mut repo, &nid, ns.clone()).unwrap();
-
                        p.fetched(rid, ns, nid, result);
+
                        match &result {
+
                            Ok((_, remotes)) => {
+
                                fetch(&mut repo, &nid, Namespaces::Trusted(remotes.clone()))
+
                                    .unwrap();
+
                            }
+
                            Err(err) => panic!("Error fetching: {err}"),
+
                        }
+
                        p.fetched(rid, nid, result);
                    }
                }
                while let Some(o) = p.next() {
@@ -621,7 +627,6 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            remote,
                            input: Input::Fetched(
                                rid,
-
                                namespaces,
                                remote,
                                Rc::new(Err(FetchError::Io(io::ErrorKind::Other.into()))),
                            ),
@@ -633,7 +638,17 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        Scheduled {
                            node,
                            remote,
-
                            input: Input::Fetched(rid, namespaces, remote, Rc::new(Ok(vec![]))),
+
                            input: Input::Fetched(
+
                                rid,
+
                                remote,
+
                                Rc::new(Ok((
+
                                    vec![],
+
                                    match namespaces {
+
                                        Namespaces::Trusted(hs) => hs,
+
                                        Namespaces::All => HashSet::new(),
+
                                    },
+
                                ))),
+
                            ),
                        },
                    );
                }
modified radicle-node/src/tests.rs
@@ -24,7 +24,6 @@ use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
use crate::storage::git::Storage;
-
use crate::storage::Namespaces;
use crate::storage::ReadStorage;
use crate::test::arbitrary;
use crate::test::assert_matches;
@@ -1121,14 +1120,14 @@ fn test_queued_fetch() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, Namespaces::All, bob.id, Ok(vec![]));
+
    alice.fetched(rid1, bob.id, Ok((vec![], Default::default())));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid2);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid2, Namespaces::All, bob.id, Ok(vec![]));
+
    alice.fetched(rid2, bob.id, Ok((vec![], Default::default())));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid3);
}
modified radicle-node/src/tests/e2e.rs
@@ -169,7 +169,7 @@ fn test_replication() {
    assert!(result.is_success());

    let updated = match result {
-
        FetchResult::Success { updated } => updated,
+
        FetchResult::Success { updated, .. } => updated,
        FetchResult::Failed { reason } => {
            panic!("Fetch failed from {}: {reason}", bob.id);
        }
@@ -446,7 +446,7 @@ fn test_fetch_preserve_owned_refs() {

    // Fetch shouldn't prune any of our own refs.
    let result = alice.handle.fetch(acme, bob.id).unwrap();
-
    let updated = result.success().unwrap();
+
    let (updated, _) = result.success().unwrap();
    assert_eq!(updated, vec![]);

    let after = alice
@@ -540,7 +540,10 @@ fn test_fetch_up_to_date() {

    // Fetch again! This time, everything's up to date.
    let result = alice.handle.fetch(acme, bob.id).unwrap();
-
    assert_eq!(result.success(), Some(vec![]));
+
    assert_eq!(
+
        result.success(),
+
        Some((vec![], HashSet::from_iter([bob.id])))
+
    );
}

#[test]
modified radicle-node/src/wire/protocol.rs
@@ -365,12 +365,8 @@ where

        // Only call into the service if we initiated this fetch.
        match task.result {
-
            FetchResult::Initiator {
-
                rid,
-
                namespaces,
-
                result,
-
            } => {
-
                self.service.fetched(rid, namespaces, remote, result);
+
            FetchResult::Initiator { rid, result } => {
+
                self.service.fetched(rid, remote, result);
            }
            FetchResult::Responder { .. } => {
                // We don't do anything with upload results for now.
modified radicle-node/src/worker.rs
@@ -2,6 +2,7 @@ mod channels;
mod fetch;
mod tunnel;

+
use std::collections::HashSet;
use std::io::{prelude::*, BufReader};
use std::ops::ControlFlow;
use std::thread::JoinHandle;
@@ -112,10 +113,8 @@ pub enum FetchResult {
    Initiator {
        /// Repo fetched.
        rid: Id,
-
        /// Namespaces fetched.
-
        namespaces: Namespaces,
-
        /// Fetch result.
-
        result: Result<Vec<RefUpdate>, FetchError>,
+
        /// Fetch result, including remotes fetched.
+
        result: Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>,
    },
    Responder {
        /// Upload result.
@@ -200,11 +199,7 @@ impl Worker {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
                let result = self.fetch(rid, remote, stream, &namespaces, channels);

-
                FetchResult::Initiator {
-
                    rid,
-
                    namespaces,
-
                    result,
-
                }
+
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
                log::debug!(target: "worker", "Worker processing incoming fetch..");
@@ -233,7 +228,7 @@ impl Worker {
        stream: StreamId,
        namespaces: &Namespaces,
        mut channels: Channels,
-
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
    ) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError> {
        let staging = fetch::StagingPhaseInitial::new(&self.storage, rid, namespaces.clone())?;
        match self._fetch(
            &staging.repo,
modified radicle-node/src/worker/fetch.rs
@@ -8,7 +8,7 @@ use std::ops::Deref;

use radicle::crypto::{PublicKey, Unverified, Verified};
use radicle::git::url;
-
use radicle::prelude::{Doc, Id};
+
use radicle::prelude::{Doc, Id, NodeId};
use radicle::storage::git::Repository;
use radicle::storage::refs::{SignedRefs, IDENTITY_BRANCH};
use radicle::storage::{Namespaces, RefUpdate, Remote, RemoteId};
@@ -242,7 +242,7 @@ impl<'a> StagingPhaseFinal<'a> {
    ///
    /// All references that were updated are returned as a
    /// [`RefUpdate`].
-
    pub fn transfer(self) -> Result<Vec<RefUpdate>, error::Transfer> {
+
    pub fn transfer(self) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), error::Transfer> {
        let verifications = self.verify();
        let production = match &self.repo {
            StagedRepository::Cloning(repo) => self.production.create(repo.id)?,
@@ -253,7 +253,7 @@ impl<'a> StagingPhaseFinal<'a> {
        let mut updates = Vec::new();

        let callbacks = ref_updates(&mut updates);
-
        {
+
        let remotes = {
            let specs = verifications
                .into_iter()
                .flat_map(|(remote, verified)| match verified {
@@ -339,14 +339,16 @@ impl<'a> StagingPhaseFinal<'a> {
            opts.prune(git::raw::FetchPrune::Off);

            remote.fetch(&specs, Some(&mut opts), None)?;
-
        }
+

+
            fetching
+
        };
        let head = production.set_head()?;
        log::debug!(target: "worker", "Head for {} set to {head}", production.id);

        let head = production.set_identity_head()?;
        log::debug!(target: "worker", "'refs/rad/id' for {} set to {head}", production.id);

-
        Ok(updates)
+
        Ok((updates, remotes))
    }

    fn remotes(&self) -> impl Iterator<Item = Remote> + '_ {
@@ -364,23 +366,21 @@ impl<'a> StagingPhaseFinal<'a> {
    fn verify(&self) -> BTreeMap<RemoteId, VerifiedRemote> {
        self.trusted
            .iter()
+
            .filter_map(|remote| self.repo.remote(remote).ok())
            .map(|remote| {
-
                let verification =
-
                    match (self.repo.identity_doc_of(remote), self.repo.remote(remote)) {
-
                        (Ok(doc), Ok(remote)) => match self.repo.validate_remote(&remote) {
-
                            Ok(()) => VerifiedRemote::Success { _doc: doc, remote },
-
                            Err(e) => VerifiedRemote::Failed {
-
                                reason: e.to_string(),
-
                            },
-
                        },
-
                        (Err(e), _) => VerifiedRemote::Failed {
+
                let remote_id = remote.id;
+
                let verification = match self.repo.identity_doc_of(&remote_id) {
+
                    Ok(doc) => match self.repo.validate_remote(&remote) {
+
                        Ok(()) => VerifiedRemote::Success { _doc: doc, remote },
+
                        Err(e) => VerifiedRemote::Failed {
                            reason: e.to_string(),
                        },
-
                        (_, Err(e)) => VerifiedRemote::Failed {
-
                            reason: e.to_string(),
-
                        },
-
                    };
-
                (*remote, verification)
+
                    },
+
                    Err(e) => VerifiedRemote::Failed {
+
                        reason: e.to_string(),
+
                    },
+
                };
+
                (remote_id, verification)
            })
            .collect()
    }
modified radicle/src/node.rs
@@ -2,7 +2,7 @@ mod features;
pub mod routing;
pub mod tracking;

-
use std::collections::BTreeSet;
+
use std::collections::{BTreeSet, HashSet};
use std::io::{BufRead, BufReader};
use std::ops::Deref;
use std::os::unix::net::UnixStream;
@@ -195,7 +195,6 @@ impl Command {
#[serde(tag = "state", content = "id")]
pub enum Seed {
    Disconnected(NodeId),
-
    Fetching(NodeId),
    Connected(NodeId),
}

@@ -207,31 +206,24 @@ impl Seeds {
        self.0.insert(seed);
    }

-
    pub fn connected(&mut self) -> impl Iterator<Item = &NodeId> {
+
    pub fn connected(&self) -> impl Iterator<Item = &NodeId> {
        self.0.iter().filter_map(|s| match s {
            Seed::Connected(node) => Some(node),
-
            Seed::Fetching(_) | Seed::Disconnected(_) => None,
+
            Seed::Disconnected(_) => None,
        })
    }

-
    pub fn disconnected(&mut self) -> impl Iterator<Item = &NodeId> {
+
    pub fn disconnected(&self) -> impl Iterator<Item = &NodeId> {
        self.0.iter().filter_map(|s| match s {
            Seed::Disconnected(node) => Some(node),
-
            Seed::Fetching(_) | Seed::Connected(_) => None,
-
        })
-
    }
-

-
    pub fn fetching(&mut self) -> impl Iterator<Item = &NodeId> {
-
        self.0.iter().filter_map(|s| match s {
-
            Seed::Fetching(node) => Some(node),
-
            Seed::Connected(_) | Seed::Disconnected(_) => None,
+
            Seed::Connected(_) => None,
        })
    }

    pub fn has_connections(&self) -> bool {
        self.0.iter().any(|s| match s {
            Seed::Connected(_) => true,
-
            Seed::Disconnected(_) | Seed::Fetching(_) => false,
+
            Seed::Disconnected(_) => false,
        })
    }

@@ -242,18 +234,19 @@ impl Seeds {
    pub fn is_disconnected(&self, node: &NodeId) -> bool {
        self.0.contains(&Seed::Disconnected(*node))
    }
-

-
    pub fn is_fetching(&self, node: &NodeId) -> bool {
-
        self.0.contains(&Seed::Fetching(*node))
-
    }
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "kebab-case")]
pub enum FetchResult {
-
    Success { updated: Vec<RefUpdate> },
+
    Success {
+
        updated: Vec<RefUpdate>,
+
        namespaces: HashSet<NodeId>,
+
    },
    // TODO: Create enum for reason.
-
    Failed { reason: String },
+
    Failed {
+
        reason: String,
+
    },
}

impl FetchResult {
@@ -261,18 +254,24 @@ impl FetchResult {
        matches!(self, FetchResult::Success { .. })
    }

-
    pub fn success(self) -> Option<Vec<RefUpdate>> {
+
    pub fn success(self) -> Option<(Vec<RefUpdate>, HashSet<NodeId>)> {
        match self {
-
            Self::Success { updated } => Some(updated),
+
            Self::Success {
+
                updated,
+
                namespaces,
+
            } => Some((updated, namespaces)),
            _ => None,
        }
    }
}

-
impl<S: ToString> From<Result<Vec<RefUpdate>, S>> for FetchResult {
-
    fn from(value: Result<Vec<RefUpdate>, S>) -> Self {
+
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>), S>> for FetchResult {
+
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>), S>) -> Self {
        match value {
-
            Ok(updated) => Self::Success { updated },
+
            Ok((updated, namespaces)) => Self::Success {
+
                updated,
+
                namespaces,
+
            },
            Err(err) => Self::Failed {
                reason: err.to_string(),
            },
@@ -296,10 +295,14 @@ impl FetchResults {
    }

    /// Iterate over successful fetches.
-
    pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate])> {
+
    pub fn success(&self) -> impl Iterator<Item = (&NodeId, &[RefUpdate], HashSet<NodeId>)> {
        self.0.iter().filter_map(|(nid, r)| {
-
            if let FetchResult::Success { updated } = r {
-
                Some((nid, updated.as_slice()))
+
            if let FetchResult::Success {
+
                updated,
+
                namespaces,
+
            } = r
+
            {
+
                Some((nid, updated.as_slice(), namespaces.clone()))
            } else {
                None
            }