Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: improve seeds information
Fintan Halpenny committed 3 years ago
commit c70dc71b18ec5ec211e87bfce6b66e07413349a3
parent c0b92d25fbc4e0fde7ea2654fca401c0c7f5519a
7 files changed +121 -25
modified radicle-cli/src/commands/clone.rs
@@ -174,21 +174,21 @@ pub fn clone<G: Signer>(
    }

    // Get seeds. This consults the local routing table only.
-
    let seeds = node.seeds(id)?;
-
    if seeds.is_empty() {
+
    let mut seeds = node.seeds(id)?;
+
    if !seeds.has_connections() {
        return Err(CloneError::NotFound(id));
    }
    // Fetch from all seeds.
-
    for seed in seeds {
+
    for seed in seeds.connected() {
        let spinner = term::spinner(format!(
            "Fetching {} from {}..",
            term::format::tertiary(id),
-
            term::format::tertiary(term::format::node(&seed))
+
            term::format::tertiary(term::format::node(seed))
        ));

        // TODO: If none of them succeeds, output an error. Otherwise tell the caller
        // how many succeeded.
-
        match node.fetch(id, seed)? {
+
        match node.fetch(id, *seed)? {
            FetchResult::Success { .. } => {
                spinner.finish();
            }
modified radicle-node/src/runtime/handle.rs
@@ -6,6 +6,7 @@ use std::sync::Arc;

use crossbeam_channel as chan;
use cyphernet::Ecdh;
+
use radicle::node::Seeds;
use thiserror::Error;

use crate::crypto::Signer;
@@ -118,7 +119,7 @@ impl<G: Signer + Ecdh + 'static> radicle::node::Handle for Handle<G> {
        Ok(())
    }

-
    fn seeds(&mut self, id: Id) -> Result<Vec<NodeId>, Self::Error> {
+
    fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        self.command(service::Command::Seeds(id, sender))?;
        receiver.recv().map_err(Error::from)
modified radicle-node/src/service.rs
@@ -27,7 +27,7 @@ use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::identity::{Doc, Id};
use crate::node;
-
use crate::node::{Address, Features, FetchResult};
+
use crate::node::{Address, Features, FetchResult, Seed, Seeds};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
@@ -108,7 +108,7 @@ pub enum Command {
    /// Connect to node with the given address.
    Connect(NodeId, Address),
    /// Lookup seeds for the given repository in the routing table.
-
    Seeds(Id, chan::Sender<Vec<NodeId>>),
+
    Seeds(Id, chan::Sender<Seeds>),
    /// Fetch the given repository from the network.
    Fetch(Id, NodeId, chan::Sender<FetchResult>),
    /// Track the given repository.
@@ -429,11 +429,33 @@ where
                self.connect(id, addr);
            }
            Command::Seeds(rid, resp) => {
-
                let (connected, unconnected) = match self.routing.get(&rid) {
-
                    Ok(seeds) => seeds
-
                        .into_iter()
-
                        .filter(|node| *node != self.node_id())
-
                        .partition::<Vec<_>, _>(|node| self.sessions.is_connected(node)),
+
                #[derive(Default)]
+
                pub struct Stats {
+
                    connected: usize,
+
                    disconnected: usize,
+
                    fetching: usize,
+
                }
+

+
                let (stats, seeds) = match self.routing.get(&rid) {
+
                    Ok(seeds) => seeds.into_iter().fold(
+
                        (Stats::default(), Seeds::default()),
+
                        |(mut stats, mut seeds), node| {
+
                            if node != self.node_id() {
+
                                if self.sessions.is_fetching(&node) {
+
                                    seeds.insert(Seed::Fetching(node));
+
                                    stats.fetching += 1;
+
                                } else if self.sessions.is_connected(&node) {
+
                                    seeds.insert(Seed::Connected(node));
+
                                    stats.connected += 1;
+
                                } else if self.sessions.is_disconnected(&node) {
+
                                    seeds.insert(Seed::Disconnected(node));
+
                                    stats.connected += 1;
+
                                }
+
                            }
+

+
                            (stats, seeds)
+
                        },
+
                    ),
                    Err(err) => {
                        error!(target: "service", "Error reading routing table for {rid}: {err}");
                        drop(resp);
@@ -443,10 +465,10 @@ where
                };
                debug!(
                    target: "service",
-
                    "Found {} connected seed(s) and {} unconnected seed(s) for {}",
-
                    connected.len(), unconnected.len(), rid
+
                    "Found {} connected seed(s), {} disconnected seed(s), and {} fetching seed(s) for {}",
+
                    stats.connected, stats.disconnected, stats.fetching, rid
                );
-
                resp.send(connected).ok();
+
                resp.send(seeds).ok();
            }
            Command::Fetch(rid, seed, resp) => {
                // TODO: Establish connections to unconnected seeds, and retry.
@@ -1138,7 +1160,7 @@ where
    }

    fn connect(&mut self, node: NodeId, addr: Address) -> bool {
-
        if self.sessions.is_unconnected(&node) {
+
        if self.sessions.is_disconnected(&node) {
            self.reactor.connect(node, addr);
            return true;
        }
@@ -1448,8 +1470,12 @@ impl Sessions {
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
    }

+
    pub fn is_fetching(&self, id: &NodeId) -> bool {
+
        self.0.get(id).map(|s| s.is_fetching()).unwrap_or(false)
+
    }
+

    /// Return whether this node can be connected to.
-
    pub fn is_unconnected(&self, id: &NodeId) -> bool {
+
    pub fn is_disconnected(&self, id: &NodeId) -> bool {
        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
    }
}
modified radicle-node/src/service/session.rs
@@ -186,6 +186,16 @@ impl Session {
        matches!(self.state, State::Connected { .. })
    }

+
    pub fn is_fetching(&self) -> bool {
+
        matches!(
+
            self.state,
+
            State::Connected {
+
                protocol: Protocol::Fetch { .. },
+
                ..
+
            }
+
        )
+
    }
+

    pub fn is_disconnected(&self) -> bool {
        matches!(self.state, State::Disconnected { .. })
    }
modified radicle-node/src/test/handle.rs
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use crossbeam_channel as chan;

use crate::identity::Id;
-
use crate::node::FetchResult;
+
use crate::node::{FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service;
use crate::service::NodeId;
@@ -29,7 +29,7 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

-
    fn seeds(&mut self, _id: Id) -> Result<Vec<NodeId>, Self::Error> {
+
    fn seeds(&mut self, _id: Id) -> Result<Seeds, Self::Error> {
        unimplemented!();
    }

modified radicle-node/src/tests/e2e.rs
@@ -159,7 +159,7 @@ fn test_replication() {
    assert!(tracked);

    let seeds = alice.handle.seeds(acme).unwrap();
-
    assert!(seeds.contains(&bob.id));
+
    assert!(seeds.is_connected(&bob.id));

    let result = alice.handle.fetch(acme, bob.id).unwrap();
    assert!(result.is_success());
@@ -213,7 +213,7 @@ fn test_clone() {

    let _ = alice.handle.track_repo(acme).unwrap();
    let seeds = alice.handle.seeds(acme).unwrap();
-
    assert!(seeds.contains(&bob.id));
+
    assert!(seeds.is_connected(&bob.id));

    let result = alice.handle.fetch(acme, bob.id).unwrap();
    assert!(result.is_success());
modified radicle/src/node.rs
@@ -1,5 +1,6 @@
mod features;

+
use std::collections::BTreeSet;
use std::io::{BufRead, BufReader};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
@@ -180,6 +181,64 @@ impl Command {
    }
}

+
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "kebab-case")]
+
#[serde(tag = "state", content = "id")]
+
pub enum Seed {
+
    Disconnected(NodeId),
+
    Fetching(NodeId),
+
    Connected(NodeId),
+
}
+

+
#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
pub struct Seeds(BTreeSet<Seed>);
+

+
impl Seeds {
+
    pub fn insert(&mut self, seed: Seed) {
+
        self.0.insert(seed);
+
    }
+

+
    pub fn connected(&mut self) -> impl Iterator<Item = &NodeId> {
+
        self.0.iter().filter_map(|s| match s {
+
            Seed::Connected(node) => Some(node),
+
            Seed::Fetching(_) | Seed::Disconnected(_) => None,
+
        })
+
    }
+

+
    pub fn disconnected(&mut self) -> impl Iterator<Item = &NodeId> {
+
        self.0.iter().filter_map(|s| match s {
+
            Seed::Disconnected(node) => Some(node),
+
            Seed::Fetching(_) | Seed::Connected(_) => None,
+
        })
+
    }
+

+
    pub fn fetching(&mut self) -> impl Iterator<Item = &NodeId> {
+
        self.0.iter().filter_map(|s| match s {
+
            Seed::Fetching(node) => Some(node),
+
            Seed::Connected(_) | Seed::Disconnected(_) => None,
+
        })
+
    }
+

+
    pub fn has_connections(&self) -> bool {
+
        self.0.iter().any(|s| match s {
+
            Seed::Connected(_) => true,
+
            Seed::Disconnected(_) | Seed::Fetching(_) => false,
+
        })
+
    }
+

+
    pub fn is_connected(&self, node: &NodeId) -> bool {
+
        self.0.contains(&Seed::Connected(*node))
+
    }
+

+
    pub fn is_disconnected(&self, node: &NodeId) -> bool {
+
        self.0.contains(&Seed::Disconnected(*node))
+
    }
+

+
    pub fn is_fetching(&self, node: &NodeId) -> bool {
+
        self.0.contains(&Seed::Fetching(*node))
+
    }
+
}
+

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "kebab-case")]
pub enum FetchResult {
@@ -249,7 +308,7 @@ pub trait Handle {
    /// Connect to a peer.
    fn connect(&mut self, node: NodeId, addr: Address) -> Result<(), Self::Error>;
    /// Lookup the seeds of a given repository in the routing table.
-
    fn seeds(&mut self, id: Id) -> Result<Vec<NodeId>, Self::Error>;
+
    fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error>;
    /// Fetch a repository from the network.
    fn fetch(&mut self, id: Id, from: NodeId) -> Result<FetchResult, Self::Error>;
    /// Start tracking the given project. Doesn't do anything if the project is already
@@ -332,8 +391,8 @@ impl Handle for Node {
        todo!()
    }

-
    fn seeds(&mut self, id: Id) -> Result<Vec<NodeId>, Error> {
-
        let seeds: Vec<NodeId> =
+
    fn seeds(&mut self, id: Id) -> Result<Seeds, Error> {
+
        let seeds: Seeds =
            self.call(CommandName::Seeds, [id.urn()])?
                .next()
                .ok_or(Error::EmptyResponse {