Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Improve `rad node connect` feedback
cloudhead committed 2 years ago
commit fae518bca280d0141846821683a58adc0aa8c21a
parent fb4a4e0079261e5671a75ccf4397cfa6ef9ac3d3
14 files changed +148 -63
modified radicle-cli/examples/rad-node.md
@@ -18,15 +18,6 @@ $ rad node status
✓ Node is running.
```

-
The node also allows us to connect with other nodes in the
-
network. For example, let's connect to a known peer using their Node
-
ID and their network address:
-

-
```
-
$ rad node connect z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk@0.0.0.0:3679
-
✓ Connecting to z6Mkt67…v4N1tRk@0.0.0.0:3679...
-
```
-

The node also allows us to query data that it has access too such as
the tracking relationships and the routing table. Before we explore
those commands we'll first track a peer so that we have something to
modified radicle-cli/src/commands/node.rs
@@ -69,6 +69,7 @@ pub struct Options {
pub enum Operation {
    Connect {
        addr: PeerAddr<NodeId, Address>,
+
        timeout: time::Duration,
    },
    Events {
        timeout: time::Duration,
@@ -100,7 +101,7 @@ pub enum TrackingMode {
    Nodes,
}

-
#[derive(Default)]
+
#[derive(Default, PartialEq, Eq)]
pub enum OperationName {
    Connect,
    Events,
@@ -159,7 +160,9 @@ impl Args for Options {
                    nid = term::args::nid(&val).ok();
                }
                Long("json") if matches!(op, Some(OperationName::Routing)) => json = true,
-
                Long("timeout") if matches!(op, Some(OperationName::Events)) => {
+
                Long("timeout")
+
                    if op == Some(OperationName::Events) || op == Some(OperationName::Connect) =>
+
                {
                    let val = parser.value()?;
                    timeout = term::args::seconds(&val)?;
                }
@@ -191,6 +194,7 @@ impl Args for Options {
                addr: addr.ok_or_else(|| {
                    anyhow!("an address of the form `<nid>@<host>:<port>` must be provided")
                })?,
+
                timeout,
            },
            OperationName::Events => Operation::Events { timeout, count },
            OperationName::Routing => Operation::Routing { rid, nid, json },
@@ -214,7 +218,9 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
    let mut node = Node::new(profile.socket());

    match options.op {
-
        Operation::Connect { addr } => control::connect(&mut node, addr.id, addr.addr)?,
+
        Operation::Connect { addr, timeout } => {
+
            control::connect(&mut node, addr.id, addr.addr, timeout)?
+
        }
        Operation::Events { timeout, count } => {
            events::run(node, count, timeout)?;
        }
modified radicle-cli/src/commands/node/control.rs
@@ -7,7 +7,7 @@ use anyhow::Context as _;
use localtime::LocalTime;

use radicle::node;
-
use radicle::node::{Address, Handle as _, NodeId};
+
use radicle::node::{Address, ConnectResult, Handle as _, NodeId};
use radicle::Node;
use radicle::{profile, Profile};

@@ -122,20 +122,27 @@ pub fn logs(lines: usize, follow: Option<time::Duration>, profile: &Profile) ->
    Ok(())
}

-
pub fn connect(node: &mut Node, nid: NodeId, addr: Address) -> anyhow::Result<()> {
+
pub fn connect(
+
    node: &mut Node,
+
    nid: NodeId,
+
    addr: Address,
+
    timeout: time::Duration,
+
) -> anyhow::Result<()> {
    let spinner = term::spinner(format!(
        "Connecting to {}@{addr}...",
        term::format::node(&nid)
    ));
-
    if let Err(err) = node.connect(nid, addr.clone(), node::ConnectOptions { persistent: true }) {
-
        spinner.error(format!(
-
            "Failed to connect to {}@{}: {}",
-
            term::format::node(&nid),
-
            term::format::secondary(addr),
-
            err,
-
        ))
-
    } else {
-
        spinner.finish()
+
    match node.connect(
+
        nid,
+
        addr,
+
        node::ConnectOptions {
+
            persistent: true,
+
            timeout,
+
        },
+
    ) {
+
        Ok(ConnectResult::Connected) => spinner.finish(),
+
        Ok(ConnectResult::Disconnected { reason }) => spinner.error(reason),
+
        Err(err) => spinner.error(err.to_string()),
    }
    Ok(())
}
modified radicle-cli/tests/commands.rs
@@ -239,16 +239,40 @@ fn rad_id_rebase() {
}

#[test]
-
fn rad_node() {
+
fn rad_node_connect() {
    logger::init(log::Level::Debug);

    let mut environment = Environment::new();
    let alice = environment.node(Config::test(Alias::new("alice")));
    let bob = environment.node(Config::test(Alias::new("bob")));
    let working = tempfile::tempdir().unwrap();
+
    let alice = alice.spawn();
+
    let bob = bob.spawn();
+

+
    alice
+
        .rad(
+
            "node",
+
            &["connect", format!("{}@{}", bob.id, bob.addr).as_str()],
+
            working.path(),
+
        )
+
        .unwrap();
+

+
    let sessions = alice.handle.sessions().unwrap();
+
    let session = sessions.first().unwrap();

+
    assert_eq!(session.nid, bob.id);
+
    assert_eq!(session.addr, bob.addr.into());
+
    assert!(session.state.is_connected());
+
}
+

+
#[test]
+
fn rad_node() {
+
    logger::init(log::Level::Debug);
+

+
    let mut environment = Environment::new();
+
    let alice = environment.node(Config::test(Alias::new("alice")));
+
    let working = tempfile::tempdir().unwrap();
    let alice = alice.spawn();
-
    let _bob = bob.spawn();

    fixtures::repository(working.path().join("alice"));

@@ -681,7 +705,7 @@ fn test_cob_replication() {
    // Wait for Alice to fetch the clone refs.
    events
        .wait(
-
            |e| matches!(e, Event::RefsFetched { .. }),
+
            |e| matches!(e, Event::RefsFetched { .. }).then_some(()),
            time::Duration::from_secs(6),
        )
        .unwrap();
modified radicle-node/src/control.rs
@@ -94,10 +94,12 @@ where
    match cmd {
        Command::Connect { addr, opts } => {
            let (nid, addr) = addr.into();
-
            if let Err(e) = handle.connect(nid, addr, opts) {
-
                return Err(CommandError::Runtime(e));
-
            } else {
-
                CommandResult::Okay { updated: true }.to_writer(writer)?;
+
            match handle.connect(nid, addr, opts) {
+
                Err(e) => return Err(CommandError::Runtime(e)),
+
                Ok(result) => {
+
                    json::to_writer(&mut writer, &result)?;
+
                    writer.write_all(b"\n")?;
+
                }
            }
        }
        Command::Fetch { rid, nid } => {
modified radicle-node/src/runtime/handle.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::{fmt, io, time};

use crossbeam_channel as chan;
-
use radicle::node::{ConnectOptions, Seeds};
+
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
use reactor::poller::popol::PopolWaker;
use thiserror::Error;

@@ -148,10 +148,33 @@ impl radicle::node::Handle for Handle {
        node: NodeId,
        addr: radicle::node::Address,
        opts: ConnectOptions,
-
    ) -> Result<(), Error> {
+
    ) -> Result<ConnectResult, Error> {
+
        let events = self.events();
+
        let timeout = opts.timeout;
+
        let sessions = self.sessions()?;
+
        let session = sessions.iter().find(|s| s.nid == node);
+

+
        if let Some(s) = session {
+
            if s.state.is_connected() {
+
                return Ok(ConnectResult::Connected);
+
            }
+
        }
        self.command(service::Command::Connect(node, addr, opts))?;

-
        Ok(())
+
        events
+
            .wait(
+
                |e| match e {
+
                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
+
                    Event::PeerDisconnected { nid, reason } if nid == &node => {
+
                        Some(ConnectResult::Disconnected {
+
                            reason: reason.clone(),
+
                        })
+
                    }
+
                    _ => None,
+
                },
+
                timeout,
+
            )
+
            .map_err(Error::from)
    }

    fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error> {
modified radicle-node/src/service.rs
@@ -505,7 +505,9 @@ where
                if opts.persistent {
                    self.config.connect.insert((nid, addr.clone()).into());
                }
-
                self.connect(nid, addr);
+
                if !self.connect(nid, addr) {
+
                    // TODO: Return error to command.
+
                }
            }
            Command::Disconnect(nid) => {
                self.outbox.disconnect(nid, DisconnectReason::Command);
@@ -784,6 +786,10 @@ where
        let since = self.local_time();

        debug!(target: "service", "Disconnected from {} ({})", remote, reason);
+
        self.emitter.emit(Event::PeerDisconnected {
+
            nid: remote,
+
            reason: reason.to_string(),
+
        });

        let Some(session) = self.sessions.get_mut(&remote) else {
            if cfg!(debug_assertions) {
modified radicle-node/src/service/session.rs
@@ -151,7 +151,7 @@ impl Session {
    }

    pub fn is_connected(&self) -> bool {
-
        matches!(self.state, State::Connected { .. })
+
        self.state.is_connected()
    }

    pub fn is_disconnected(&self) -> bool {
modified radicle-node/src/test/environment.rs
@@ -166,7 +166,7 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {

        self.handle
            .connect(remote.id, remote.addr.into(), ConnectOptions::default())
-
            .unwrap();
+
            .ok();

        local_events
            .iter()
@@ -226,7 +226,7 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
            }
            events
                .wait(
-
                    |e| matches!(e, Event::SeedDiscovered { .. }),
+
                    |e| matches!(e, Event::SeedDiscovered { .. }).then_some(()),
                    time::Duration::from_secs(6),
                )
                .unwrap();
@@ -247,7 +247,7 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
            }
            events
                .wait(
-
                    |e| matches!(e, Event::RefsFetched { .. }),
+
                    |e| matches!(e, Event::RefsFetched { .. }).then_some(()),
                    time::Duration::from_secs(6),
                )
                .unwrap();
modified radicle-node/src/test/handle.rs
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use std::{io, time};

use crate::identity::Id;
-
use crate::node::{Alias, ConnectOptions, Event, FetchResult, Seeds};
+
use crate::node::{Alias, ConnectOptions, ConnectResult, Event, FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service::tracking;
use crate::service::NodeId;
@@ -33,7 +33,7 @@ impl radicle::node::Handle for Handle {
        _node: NodeId,
        _addr: radicle::node::Address,
        _opts: ConnectOptions,
-
    ) -> Result<(), Self::Error> {
+
    ) -> Result<ConnectResult, Self::Error> {
        unimplemented!();
    }

modified radicle-node/src/tests.rs
@@ -1230,9 +1230,9 @@ fn test_refs_synced_event() {
                    assert_eq!(remote, &bob.id);
                    assert_eq!(rid, &acme);

-
                    true
+
                    Some(())
                } else {
-
                    false
+
                    None
                }
            },
            time::Duration::from_secs(3),
modified radicle-node/src/tests/e2e.rs
@@ -603,7 +603,10 @@ fn test_large_fetch() {

    bob_events
        .wait(
-
            |e| matches!(e, service::Event::RefsFetched { updated, .. } if !updated.is_empty()),
+
            |e| {
+
                matches!(e, service::Event::RefsFetched { updated, .. } if !updated.is_empty())
+
                    .then_some(())
+
            },
            time::Duration::from_secs(9 * scale as u64),
        )
        .unwrap();
modified radicle/src/node.rs
@@ -92,6 +92,13 @@ pub enum State {
    },
}

+
impl State {
+
    /// Check if this is a connected state.
+
    pub fn is_connected(&self) -> bool {
+
        matches!(self, Self::Connected { .. })
+
    }
+
}
+

impl fmt::Display for State {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
@@ -200,6 +207,8 @@ impl FromStr for Alias {
pub struct ConnectOptions {
    /// Establish a persistent connection.
    pub persistent: bool,
+
    /// How long to wait for the connection to be established.
+
    pub timeout: time::Duration,
}

/// Result of a command, on the node control socket.
@@ -597,6 +606,13 @@ pub enum CallError {
    },
}

+
#[derive(Debug, Serialize, Deserialize)]
+
#[serde(rename_all = "camelCase", tag = "status")]
+
pub enum ConnectResult {
+
    Connected,
+
    Disconnected { reason: String },
+
}
+

/// A handle to send commands to the node or request information.
pub trait Handle: Clone + Sync + Send {
    /// The peer sessions type.
@@ -614,7 +630,7 @@ pub trait Handle: Clone + Sync + Send {
        node: NodeId,
        addr: Address,
        opts: ConnectOptions,
-
    ) -> Result<(), Self::Error>;
+
    ) -> Result<ConnectResult, Self::Error>;
    /// Lookup the seeds of a given repository in the routing table.
    fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error>;
    /// Fetch a repository from the network.
@@ -760,18 +776,25 @@ impl Handle for Node {
        matches!(result, CommandResult::Okay { .. })
    }

-
    fn connect(&mut self, nid: NodeId, addr: Address, opts: ConnectOptions) -> Result<(), Error> {
-
        self.call::<CommandResult>(
-
            Command::Connect {
-
                addr: (nid, addr).into(),
-
                opts,
-
            },
-
            DEFAULT_TIMEOUT,
-
        )?
-
        .next()
-
        .ok_or(Error::EmptyResponse)??;
+
    fn connect(
+
        &mut self,
+
        nid: NodeId,
+
        addr: Address,
+
        opts: ConnectOptions,
+
    ) -> Result<ConnectResult, Error> {
+
        let timeout = opts.timeout;
+
        let result = self
+
            .call::<ConnectResult>(
+
                Command::Connect {
+
                    addr: (nid, addr).into(),
+
                    opts,
+
                },
+
                timeout,
+
            )?
+
            .next()
+
            .ok_or(Error::EmptyResponse)??;

-
        Ok(())
+
        Ok(result)
    }

    fn seeds(&mut self, rid: Id) -> Result<Seeds, Error> {
modified radicle/src/node/events.rs
@@ -30,6 +30,10 @@ pub enum Event {
    PeerConnected {
        nid: NodeId,
    },
+
    PeerDisconnected {
+
        nid: NodeId,
+
        reason: String,
+
    },
}

/// Events feed.
@@ -61,13 +65,9 @@ impl Deref for Events {
impl Events {
    /// Listen for events, and wait for the given predicate to return something,
    /// or timeout if the specified amount of time has elapsed.
-
    pub fn wait<F>(
-
        &self,
-
        mut f: F,
-
        timeout: time::Duration,
-
    ) -> Result<Event, chan::RecvTimeoutError>
+
    pub fn wait<F, T>(&self, mut f: F, timeout: time::Duration) -> Result<T, chan::RecvTimeoutError>
    where
-
        F: FnMut(&Event) -> bool,
+
        F: FnMut(&Event) -> Option<T>,
    {
        let start = time::Instant::now();

@@ -75,8 +75,8 @@ impl Events {
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
                match self.recv_timeout(timeout) {
                    Ok(event) => {
-
                        if f(&event) {
-
                            return Ok(event);
+
                        if let Some(output) = f(&event) {
+
                            return Ok(output);
                        }
                    }
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {