Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Only fetch from tracked nodes
Han Xu committed 3 years ago
commit b030e9ea80763b8120a264288da32af7cdbdabd9
parent a5746b4f08bac6eac67dc2ca4259abe33b4cc3d2
4 files changed +206 -17
modified radicle-node/src/service.rs
@@ -50,6 +50,7 @@ pub use crate::service::session::Session;
use self::gossip::Gossip;
use self::message::InventoryAnnouncement;
use self::reactor::Reactor;
+
use self::tracking::NamespacesError;

/// Target number of peers to maintain connections to.
pub const TARGET_OUTBOUND_PEERS: usize = 8;
@@ -102,6 +103,8 @@ pub enum Error {
    Routing(#[from] routing::Error),
    #[error(transparent)]
    Tracking(#[from] tracking::Error),
+
    #[error("namespaces error: {0}")]
+
    Namespaces(#[from] NamespacesError),
}

/// Function used to query internal service state.
@@ -871,27 +874,29 @@ where
                }

                // TODO: Buffer/throttle fetches.
-
                if self
-
                    .tracking
-
                    .is_repo_tracked(&message.rid)
-
                    .expect("Service::handle_announcement: error accessing tracking configuration")
-
                {
+
                let repo_entry = self.tracking.repo_policy(&message.rid).expect(
+
                    "Service::handle_announcement: error accessing repo tracking configuration",
+
                );
+

+
                if repo_entry.policy == tracking::Policy::Track {
                    // Refs can be relayed by peers who don't have the data in storage,
                    // therefore we only check whether we are connected to the *announcer*,
                    // which is required by the protocol to only announce refs it has.
                    if self.sessions.is_connected(announcer) {
-
                        match message.is_fresh(&self.storage) {
-
                            Ok(is_fresh) => {
-
                                if is_fresh {
-
                                    // TODO: Only fetch if the refs announced are for peers we're tracking.
-
                                    self.fetch(message.rid, announcer);
-
                                }
+
                        match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
+
                            Ok(true) => self.fetch(message.rid, announcer),
+
                            Ok(false) => {
+
                                debug!(target: "service", "Skip fetch the refs from {announcer}")
                            }
                            Err(e) => {
-
                                error!(target: "service", "Failed to check ref announcement freshness: {e}");
+
                                error!(target: "service", "Failed to check refs announcement: {e}");
+
                                return Err(session::Error::Misbehavior);
                            }
                        }
+
                    } else {
+
                        debug!(target: "service", "No sessions connected to {announcer}");
                    }
+

                    return Ok(relay);
                } else {
                    debug!(
@@ -963,6 +968,53 @@ where
        Ok(false)
    }

+
    /// A convenient method to check if we should fetch from a `RefsAnnouncement`
+
    /// with `scope`.
+
    fn should_fetch_refs_announcement(
+
        &self,
+
        message: &RefsAnnouncement,
+
        scope: &tracking::Scope,
+
    ) -> Result<bool, Error> {
+
        // First, check the freshness.
+
        if !message.is_fresh(&self.storage)? {
+
            debug!(target: "service", "All refs of {} are already in the local node", &message.rid);
+
            return Ok(false);
+
        }
+

+
        // Second, check the scope.
+
        match scope {
+
            tracking::Scope::All => Ok(true),
+
            tracking::Scope::Trusted => {
+
                match self.tracking.namespaces_for(&self.storage, &message.rid) {
+
                    Ok(Namespaces::All) => Ok(true),
+
                    Ok(Namespaces::Many(nodes)) => {
+
                        // Get the set of trusted nodes except self.
+
                        let my_id = self.node_id();
+
                        let node_set: HashSet<_> =
+
                            nodes.iter().filter(|key| *key != &my_id).collect();
+

+
                        // Check if there is at least one trusted ref.
+
                        Ok(message
+
                            .refs
+
                            .iter()
+
                            .any(|(pub_key, _refs)| node_set.contains(pub_key)))
+
                    }
+
                    Ok(Namespaces::One(key)) => {
+
                        Ok(message.refs.iter().any(|(pub_key, _refs)| pub_key == &key))
+
                    }
+
                    Err(NamespacesError::NoTrusted { rid }) => {
+
                        debug!(target: "service", "No trusted nodes to fetch {}", &rid);
+
                        Ok(false)
+
                    }
+
                    Err(e) => {
+
                        error!(target: "service", "Failed to obtain namespaces: {e}");
+
                        Err(e.into())
+
                    }
+
                }
+
            }
+
        }
+
    }
+

    pub fn handle_message(
        &mut self,
        remote: &NodeId,
modified radicle-node/src/test/peer.rs
@@ -5,6 +5,7 @@ use std::ops::{Deref, DerefMut};

use crossbeam_channel as chan;
use log::*;
+
use radicle::storage::ReadRepository;

use crate::address;
use crate::address::Store;
@@ -250,7 +251,20 @@ where
    }

    pub fn refs_announcement(&self, rid: Id) -> Message {
-
        let refs = BoundedVec::new();
+
        let mut refs = BoundedVec::new();
+
        if let Ok(repo) = self.storage().repository(rid) {
+
            if let Ok(false) = repo.is_empty() {
+
                if let Ok(remotes) = repo.remotes() {
+
                    for (remote_id, remote) in remotes.into_iter() {
+
                        if let Err(e) = refs.push((remote_id, remote.refs.unverified())) {
+
                            debug!(target: "test", "Failed to push {remote_id} to refs: {e}");
+
                            break;
+
                        }
+
                    }
+
                }
+
            }
+
        }
+

        let ann = AnnouncementMessage::from(RefsAnnouncement {
            rid,
            refs,
modified radicle-node/src/tests.rs
@@ -669,6 +669,98 @@ fn test_refs_announcement_relay() {
    );
}

+
/// Even if Alice is not tracking Bob, Alice will fetch Bob's refs for a repo she doesn't have.
+
#[test]
+
fn test_refs_announcement_fetch_trusted_no_inventory() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Peer::config(
+
        "alice",
+
        [7, 7, 7, 7],
+
        Storage::open(tmp.path().join("alice")).unwrap(),
+
        peer::Config::default(),
+
    );
+
    let bob = {
+
        let mut rng = fastrand::Rng::new();
+
        let signer = MockSigner::new(&mut rng);
+
        let storage = fixtures::storage(tmp.path().join("bob"), &signer).unwrap();
+

+
        Peer::config(
+
            "bob",
+
            [9, 9, 9, 9],
+
            storage,
+
            peer::Config {
+
                signer,
+
                rng,
+
                ..peer::Config::default()
+
            },
+
        )
+
    };
+
    let bob_inv = bob.storage().inventory().unwrap();
+
    let rid = bob_inv[0];
+

+
    alice.track_repo(&rid, tracking::Scope::Trusted).unwrap();
+
    alice.connect_to(&bob);
+

+
    // Alice receives Bob's refs.
+
    alice.receive(bob.id(), bob.refs_announcement(rid));
+

+
    // Alice fetches Bob's refs as this is a new repo.
+
    assert_eq!(
+
        alice.messages(bob.id()).next(),
+
        Some(Message::Fetch { rid })
+
    );
+
}
+

+
/// Alice and Bob both have the same repo.
+
///
+
/// First, Alice will not fetch from Bob's `RefsAnnouncement` as Alice does not
+
/// track Bob as `Trusted`.
+
///
+
/// Later Alice tracks Bob, and will be able to fetch Bob's refs.
+
#[test]
+
fn test_refs_announcement_trusted() {
+
    logger::init(log::Level::Debug);
+

+
    // Create MockStorage for Alice and Bob. Both will have repo with `rid`.
+
    let storage_alice = arbitrary::nonempty_storage(1);
+
    let rid = *storage_alice.inventory.keys().next().unwrap();
+
    let storage_bob = storage_alice.clone();
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);
+
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);
+

+
    // Generate some refs for Bob under their own node_id.
+
    let refs = arbitrary::gen::<Refs>(8);
+
    let signed_refs = refs.signed(bob.signer()).unwrap();
+
    let node_id = bob.id;
+
    bob.storage_mut().insert_remote(rid, node_id, signed_refs);
+

+
    // Alice uses Scope::Trusted, and did not track Bob yet.
+
    alice.connect_to(&bob);
+
    alice.track_repo(&rid, tracking::Scope::Trusted).unwrap();
+

+
    // Alice receives Bob's refs
+
    alice.receive(bob.id(), bob.refs_announcement(rid));
+

+
    // Alice does not fetch as Alice is not tracking Bob.
+
    assert!(
+
        alice.messages(bob.id()).next().is_none(),
+
        "Alice is not tracking bob yet."
+
    );
+

+
    // Alice starts to track Bob.
+
    let (sender, receiver) = chan::bounded(1);
+
    alice.command(Command::TrackNode(bob.id, Some("bob".to_string()), sender));
+
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    assert!(policy_change);
+

+
    // Bob announces refs again.
+
    bob.elapse(LocalDuration::from_mins(1)); // Make sure our announcement is fresh.
+
    alice.receive(bob.id(), bob.refs_announcement(rid));
+
    assert_matches!(alice.messages(bob.id()).next(), Some(Message::Fetch { .. }));
+
}
+

#[test]
fn test_refs_announcement_no_subscribe() {
    let storage = arbitrary::nonempty_storage(1);
modified radicle/src/test/storage.rs
@@ -8,6 +8,7 @@ use radicle_git_ext as git_ext;
use crate::crypto::{Signer, Verified};
use crate::identity::doc::{Doc, Id};
use crate::identity::IdentityError;
+
use crate::node::NodeId;

pub use crate::storage::*;

@@ -15,6 +16,10 @@ pub use crate::storage::*;
pub struct MockStorage {
    pub path: PathBuf,
    pub inventory: HashMap<Id, Doc<Verified>>,
+

+
    /// All refs keyed by RID.
+
    /// Each value is a map of refs keyed by node Id (public key).
+
    pub remotes: HashMap<Id, HashMap<NodeId, refs::SignedRefs<Verified>>>,
}

impl MockStorage {
@@ -22,6 +27,7 @@ impl MockStorage {
        Self {
            path: PathBuf::default(),
            inventory: inventory.into_iter().collect(),
+
            remotes: HashMap::new(),
        }
    }

@@ -29,8 +35,22 @@ impl MockStorage {
        Self {
            path: PathBuf::default(),
            inventory: HashMap::new(),
+
            remotes: HashMap::new(),
        }
    }
+

+
    /// Add a remote `node` with `signed_refs` for the repo `rid`.
+
    pub fn insert_remote(
+
        &mut self,
+
        rid: Id,
+
        node: NodeId,
+
        signed_refs: refs::SignedRefs<Verified>,
+
    ) {
+
        self.remotes
+
            .entry(rid)
+
            .or_insert(HashMap::new())
+
            .insert(node, signed_refs);
+
    }
}

impl ReadStorage for MockStorage {
@@ -64,6 +84,7 @@ impl ReadStorage for MockStorage {
        Ok(MockRepository {
            id: rid,
            doc: doc.clone(),
+
            remotes: self.remotes.get(&rid).cloned().unwrap_or_default(),
        })
    }
}
@@ -76,6 +97,7 @@ impl WriteStorage for MockStorage {
        Ok(MockRepository {
            id: rid,
            doc: doc.clone(),
+
            remotes: self.remotes.get(&rid).cloned().unwrap_or_default(),
        })
    }

@@ -84,9 +106,11 @@ impl WriteStorage for MockStorage {
    }
}

+
#[derive(Clone, Debug)]
pub struct MockRepository {
    id: Id,
    doc: Doc<Verified>,
+
    remotes: HashMap<NodeId, refs::SignedRefs<Verified>>,
}

impl ReadRepository for MockRepository {
@@ -95,7 +119,7 @@ impl ReadRepository for MockRepository {
    }

    fn is_empty(&self) -> Result<bool, git2::Error> {
-
        Ok(true)
+
        Ok(self.remotes.is_empty())
    }

    fn head(&self) -> Result<(fmt::Qualified, Oid), IdentityError> {
@@ -114,12 +138,19 @@ impl ReadRepository for MockRepository {
        todo!()
    }

-
    fn remote(&self, _remote: &RemoteId) -> Result<Remote<Verified>, refs::Error> {
-
        todo!()
+
    fn remote(&self, remote: &RemoteId) -> Result<Remote<Verified>, refs::Error> {
+
        self.remotes
+
            .get(remote)
+
            .map(|refs| Remote::new(*remote, refs.clone()))
+
            .ok_or(refs::Error::InvalidRef)
    }

    fn remotes(&self) -> Result<Remotes<Verified>, refs::Error> {
-
        todo!()
+
        Ok(self
+
            .remotes
+
            .iter()
+
            .map(|(id, refs)| (*id, Remote::new(*id, refs.clone())))
+
            .collect())
    }

    fn commit(&self, _oid: Oid) -> Result<git2::Commit, git_ext::Error> {