Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: determine fetching namespace based on repo policy
Fintan Halpenny committed 3 years ago
commit 1e8f5b225959e4166a767c8c0be0cad75af09701
parent e937cdd79acca6178e54e365c491f2d9de2b0268
4 files changed +98 -10
modified radicle-cli/tests/commands.rs
@@ -476,6 +476,10 @@ fn test_replication_via_seed() {
        )
        .unwrap();

+
    alice
+
        .rad("track", &[&bob.id.to_human()], working.join("alice"))
+
        .unwrap();
+

    alice.routes_to(&[(rid, alice.id), (rid, seed.id)]);
    seed.routes_to(&[(rid, alice.id), (rid, seed.id)]);
    bob.routes_to(&[(rid, alice.id), (rid, seed.id)]);
modified radicle-node/src/service.rs
@@ -33,6 +33,7 @@ use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::reactor::FetchDirection;
+
use crate::service::tracking::Scope;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteStorage};
use crate::storage::{Namespaces, ReadStorage};
@@ -116,7 +117,7 @@ pub enum Command {
    /// Fetch the given repository from the network.
    Fetch(Id, NodeId, chan::Sender<FetchResult>),
    /// Track the given repository.
-
    TrackRepo(Id, tracking::Scope, chan::Sender<bool>),
+
    TrackRepo(Id, Scope, chan::Sender<bool>),
    /// Untrack the given repository.
    UntrackRepo(Id, chan::Sender<bool>),
    /// Track the given node.
@@ -267,7 +268,7 @@ where

    /// Track a repository.
    /// Returns whether or not the tracking policy was updated.
-
    pub fn track_repo(&mut self, id: &Id, scope: tracking::Scope) -> Result<bool, tracking::Error> {
+
    pub fn track_repo(&mut self, id: &Id, scope: Scope) -> Result<bool, tracking::Error> {
        self.out_of_sync = self.tracking.track_repo(id, scope)?;
        self.filter.insert(id);

@@ -1093,13 +1094,18 @@ where
                }
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

-
                let namespaces = if let Ok(_repo) = self.storage.repository(rid) {
-
                    // FIXME(finto): using remotes breaks test_gossip_during_fetch, so we use default for now
-
                    Namespaces::default()
-
                } else {
-
                    Namespaces::default()
+
                let namespaces = match self.tracking.namespaces_for(&self.storage, &rid) {
+
                    Ok(ns) => ns,
+
                    Err(err) => {
+
                        if let Some(resp) = self.fetch_reqs.get(&rid) {
+
                            resp.send(FetchResult::Failed {
+
                                reason: err.to_string(),
+
                            })
+
                            .ok();
+
                        }
+
                        return Ok(());
+
                    }
                };
-

                // Instruct the transport to handover the socket to the worker.
                self.reactor
                    .fetch(peer, rid, FetchDirection::Initiator { namespaces });
@@ -1204,7 +1210,7 @@ where
            Namespaces::Many(pks) => {
                for remote_id in pks.into_iter() {
                    if refs
-
                        .push((*remote_id, repo.remote(&remote_id)?.refs.unverified()))
+
                        .push((*remote_id, repo.remote(remote_id)?.refs.unverified()))
                        .is_err()
                    {
                        warn!(
modified radicle-node/src/service/tracking.rs
@@ -2,6 +2,14 @@ mod store;

use std::ops;

+
use log::{error, warn};
+
use nonempty::NonEmpty;
+
use thiserror::Error;
+

+
use radicle::crypto::PublicKey;
+
use radicle::identity::IdentityError;
+
use radicle::storage::{Namespaces, ReadRepository as _, ReadStorage};
+

use crate::prelude::Id;
use crate::service::NodeId;

@@ -10,6 +18,32 @@ pub use crate::node::tracking::{Alias, Node, Policy, Repo, Scope};
pub use store::Config as Store;
pub use store::Error;

+
#[derive(Debug, Error)]
+
pub enum NamespacesError {
+
    #[error("Failed to find tracking policy for {rid}")]
+
    FailedPolicy {
+
        rid: Id,
+
        #[source]
+
        err: Error,
+
    },
+
    #[error("The policy for {rid} is to block fetching")]
+
    BlockedPolicy { rid: Id },
+
    #[error("Failed to get tracking nodes for {rid}")]
+
    FailedNodes {
+
        rid: Id,
+
        #[source]
+
        err: Error,
+
    },
+
    #[error("Failed to get delegate nodes for {rid}")]
+
    FailedDelegates {
+
        rid: Id,
+
        #[source]
+
        err: IdentityError,
+
    },
+
    #[error("Could not find any trusted nodes for {rid}")]
+
    NoTrusted { rid: Id },
+
}
+

/// Tracking configuration.
#[derive(Debug)]
pub struct Config {
@@ -62,6 +96,50 @@ impl Config {
            policy: self.policy,
        }))
    }
+

+
    pub fn namespaces_for<S>(&self, storage: &S, rid: &Id) -> Result<Namespaces, NamespacesError>
+
    where
+
        S: ReadStorage,
+
    {
+
        use NamespacesError::*;
+

+
        let entry = self
+
            .repo_policy(rid)
+
            .map_err(|err| FailedPolicy { rid: *rid, err })?;
+
        match entry.policy {
+
            Policy::Block => {
+
                error!(target: "service", "Attempted to fetch blocked repo {rid}");
+
                Err(NamespacesError::BlockedPolicy { rid: *rid })
+
            }
+
            Policy::Track => match self.scope {
+
                Scope::All => Ok(Namespaces::All),
+
                Scope::Trusted => {
+
                    let nodes = self
+
                        .node_entries()
+
                        .map_err(|err| FailedNodes { rid: *rid, err })?;
+
                    let mut trusted: Vec<_> = nodes
+
                        .filter_map(|node| (node.policy == Policy::Track).then_some(node.id))
+
                        .collect();
+

+
                    let ns = if let Ok(repo) = storage.repository(*rid) {
+
                        let delegates = repo
+
                            .delegates()
+
                            .map_err(|err| FailedDelegates { rid: *rid, err })?
+
                            .map(PublicKey::from);
+
                        trusted.extend(delegates);
+
                        NonEmpty::from_vec(trusted).map(Namespaces::Many)
+
                    } else {
+
                        Some(Namespaces::All)
+
                    };
+

+
                    ns.ok_or_else(|| {
+
                        warn!(target: "service", "Attempted to fetch repo {rid} with no trusted peers");
+
                        NoTrusted { rid: *rid }
+
                    })
+
                }
+
            },
+
        }
+
    }
}

impl ops::Deref for Config {
modified radicle-node/src/test/peer.rs
@@ -106,7 +106,7 @@ impl Default for Config<MockSigner> {
            config: service::Config::default(),
            addrs: address::Book::memory().unwrap(),
            local_time: LocalTime::now(),
-
            policy: Policy::Block,
+
            policy: Policy::default(),
            scope: Scope::default(),
            signer,
            rng,