Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement `rad sync` command
Alexis Sellier committed 3 years ago
commit 671c1692442170429cb7bdd2ddcc6480ee4e71ad
parent 86feebe4d813a420c55ba7d9e57c2371637b406e
14 files changed +457 -125
added radicle-cli/examples/rad-sync.md
@@ -0,0 +1,27 @@
+
The `rad sync` command announces changes to the network and waits for other
+
nodes to be synchronized with those changes.
+

+
For instance let's create an issue and sync it with the network:
+

+
```
+
$ rad issue open --title "Test `rad sync`" --description "Check that the command works" -q --no-announce
+
```
+

+
Now let's run `rad sync`. This will announce the issue refs to the network and
+
wait for nodes to announce that they have fetched those refs.
+

+
```
+
$ rad sync
+
✓ Synced with 2 node(s)
+
```
+

+
If we try to sync again after the nodes have synced, we will get a timeout
+
after one second, since the nodes will not emit any message:
+

+
```
+
$ rad sync --timeout 1
+
✗ Syncing with 2 node(s)..
+
! Seed z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk timed out..
+
! Seed z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z timed out..
+
✗ Sync failed: all seeds timed out
+
```
modified radicle-cli/src/commands.rs
@@ -46,6 +46,8 @@ pub mod rad_review;
pub mod rad_rm;
#[path = "commands/self.rs"]
pub mod rad_self;
+
#[path = "commands/sync.rs"]
+
pub mod rad_sync;
#[path = "commands/tag.rs"]
pub mod rad_tag;
#[path = "commands/track.rs"]
modified radicle-cli/src/commands/help.rs
@@ -40,6 +40,7 @@ const COMMANDS: &[Help] = &[
    rad_untag::HELP,
    rad_untrack::HELP,
    rad_remote::HELP,
+
    rad_sync::HELP,
];

#[derive(Default)]
added radicle-cli/src/commands/sync.rs
@@ -0,0 +1,146 @@
+
use std::collections::BTreeSet;
+
use std::ffi::OsString;
+
use std::path::Path;
+
use std::{io, time};
+

+
use anyhow::{anyhow, Context as _};
+

+
use radicle::node::Event;
+
use radicle::node::Handle as _;
+
use radicle::prelude::{Id, NodeId};
+

+
use crate::terminal as term;
+
use crate::terminal::args::{Args, Error, Help};
+

+
pub const HELP: Help = Help {
+
    name: "sync",
+
    description: "Sync repositories to the network",
+
    version: env!("CARGO_PKG_VERSION"),
+
    usage: r#"
+
Usage
+

+
    rad sync [<rid>] [<option>...]
+

+
    By default, the current repository is synced.
+

+
Options
+

+
    --timeout <secs>    How many seconds to wait while syncing
+
    --verbose, -v       Verbose output
+
    --help              Print help
+

+
"#,
+
};
+

+
#[derive(Default, Debug)]
+
pub struct Options {
+
    pub rid: Option<Id>,
+
    pub verbose: bool,
+
    pub timeout: time::Duration,
+
}
+

+
impl Args for Options {
+
    fn from_args(args: Vec<OsString>) -> anyhow::Result<(Self, Vec<OsString>)> {
+
        use lexopt::prelude::*;
+

+
        let mut parser = lexopt::Parser::from_args(args);
+
        let mut verbose = false;
+
        let mut timeout = time::Duration::from_secs(9);
+
        let mut rid = None;
+

+
        while let Some(arg) = parser.next()? {
+
            match arg {
+
                Long("verbose") | Short('v') => {
+
                    verbose = true;
+
                }
+
                Long("timeout") | Short('t') => {
+
                    let value = parser.value()?;
+
                    let secs = term::args::parse_value("timeout", value)?;
+

+
                    timeout = time::Duration::from_secs(secs);
+
                }
+
                Long("help") => {
+
                    return Err(Error::Help.into());
+
                }
+
                Value(val) if rid.is_none() => {
+
                    rid = Some(term::args::rid(&val)?);
+
                }
+
                arg => {
+
                    return Err(anyhow!(arg.unexpected()));
+
                }
+
            }
+
        }
+

+
        Ok((
+
            Options {
+
                rid,
+
                verbose,
+
                timeout,
+
            },
+
            vec![],
+
        ))
+
    }
+
}
+

+
pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
+
    let profile = ctx.profile()?;
+
    let rid = match options.rid {
+
        Some(rid) => rid,
+
        None => {
+
            let (_, rid) = radicle::rad::repo(Path::new("."))
+
                .context("Current directory is not a radicle project")?;
+

+
            rid
+
        }
+
    };
+

+
    let mut node = radicle::Node::new(profile.socket());
+
    let events = node.subscribe(options.timeout)?;
+
    let seeds = node.seeds(rid)?;
+
    let mut seeds = seeds.connected().collect::<BTreeSet<_>>();
+

+
    if seeds.is_empty() {
+
        term::info!("Not connected to any seeds");
+
        return Ok(());
+
    }
+
    node.announce_refs(rid)?;
+

+
    let mut spinner = term::spinner(format!("Syncing with {} node(s)..", seeds.len()));
+
    let mut synced = Vec::new();
+
    let mut timeout: Vec<NodeId> = Vec::new();
+

+
    for e in events {
+
        match e {
+
            Ok(Event::RefsSynced { remote, rid: rid_ }) if rid == rid_ => {
+
                seeds.remove(&remote);
+
                synced.push(remote);
+
                spinner.message(format!("Synced with {remote}.."));
+
            }
+
            Ok(_) => {}
+
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+
                timeout.extend(seeds.into_iter());
+
                break;
+
            }
+
            Err(e) => return Err(e.into()),
+
        }
+
        if seeds.is_empty() {
+
            break;
+
        }
+
    }
+

+
    if synced.is_empty() {
+
        spinner.failed();
+
    } else {
+
        spinner.message(format!("Synced with {} node(s)", synced.len()));
+
        spinner.finish();
+
    }
+

+
    for seed in timeout {
+
        term::notice!("Seed {seed} timed out..");
+
    }
+

+
    if synced.is_empty() {
+
        anyhow::bail!("all seeds timed out");
+
    }
+
    Ok(())
+
}
modified radicle-cli/src/main.rs
@@ -291,6 +291,14 @@ fn run_other(exe: &str, args: &[OsString]) -> Result<(), Option<anyhow::Error>>
                args.to_vec(),
            );
        }
+
        "sync" => {
+
            term::run_command_args::<rad_sync::Options, _>(
+
                rad_sync::HELP,
+
                "Sync",
+
                rad_sync::run,
+
                args.to_vec(),
+
            );
+
        }
        "tag" => {
            term::run_command_args::<rad_tag::Options, _>(
                rad_tag::HELP,
modified radicle-cli/tests/commands.rs
@@ -552,6 +552,50 @@ fn test_cob_replication() {
}

#[test]
+
fn rad_sync() {
+
    logger::init(log::Level::Debug);
+

+
    let mut environment = Environment::new();
+
    let working = environment.tmp().join("working");
+
    let alice = environment.node("alice");
+
    let bob = environment.node("bob");
+
    let eve = environment.node("eve");
+
    let acme = Id::from_str("z42hL2jL4XNk6K8oHQaSWfMgCL7ji").unwrap();
+

+
    fixtures::repository(working.join("acme"));
+

+
    test(
+
        "examples/rad-init.md",
+
        working.join("acme"),
+
        Some(&alice.home),
+
        [],
+
    )
+
    .unwrap();
+

+
    let mut alice = alice.spawn(Config::default());
+
    let mut bob = bob.spawn(Config::default());
+
    let mut eve = eve.spawn(Config::default());
+

+
    bob.handle.track_repo(acme, Scope::All).unwrap();
+
    eve.handle.track_repo(acme, Scope::All).unwrap();
+

+
    alice.connect(&bob);
+
    eve.connect(&alice);
+

+
    bob.routes_to(&[(acme, alice.id)]);
+
    eve.routes_to(&[(acme, alice.id)]);
+
    alice.routes_to(&[(acme, alice.id), (acme, eve.id), (acme, bob.id)]);
+

+
    test(
+
        "examples/rad-sync.md",
+
        working.join("acme"),
+
        Some(&alice.home),
+
        [],
+
    )
+
    .unwrap();
+
}
+

+
#[test]
//
//     alice -- seed -- bob
//
modified radicle-node/src/control.rs
@@ -6,7 +6,7 @@ use std::os::unix::net::UnixListener;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::str::FromStr;
-
use std::{io, net};
+
use std::{io, net, thread, time};

use radicle::node::Handle;
use serde_json as json;
@@ -16,6 +16,9 @@ use crate::node::NodeId;
use crate::node::{Command, CommandName, CommandResult};
use crate::runtime;

+
/// Maximum timeout for waiting for node events.
+
const MAX_TIMEOUT: time::Duration = time::Duration::MAX;
+

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("failed to bind control socket listener: {0}")]
@@ -25,16 +28,16 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<H: Handle<Error = runtime::HandleError>>(
+
pub fn listen<H: Handle<Error = runtime::HandleError> + 'static>(
    listener: UnixListener,
-
    mut handle: H,
+
    handle: H,
) -> Result<(), Error> {
    log::debug!(target: "control", "Control thread listening on socket..");

    for incoming in listener.incoming() {
        match incoming {
            Ok(mut stream) => {
-
                if let Err(e) = command(&stream, &mut handle) {
+
                if let Err(e) = command(&stream, handle.clone()) {
                    if let CommandError::Shutdown = e {
                        log::debug!(target: "control", "Shutdown requested..");
                        // Channel might already be disconnected if shutdown
@@ -74,9 +77,9 @@ enum CommandError {
    Shutdown,
}

-
fn command<H: Handle<Error = runtime::HandleError>>(
+
fn command<H: Handle<Error = runtime::HandleError> + 'static>(
    stream: &UnixStream,
-
    handle: &mut H,
+
    mut handle: H,
) -> Result<(), CommandError> {
    let mut reader = BufReader::new(stream);
    let writer = LineWriter::new(stream);
@@ -99,7 +102,7 @@ fn command<H: Handle<Error = runtime::HandleError>>(
        }
        CommandName::Fetch => {
            let (rid, nid): (Id, NodeId) = parse::args(cmd)?;
-
            fetch(rid, nid, LineWriter::new(stream), handle)?;
+
            fetch(rid, nid, writer, &mut handle)?;
        }
        CommandName::Seeds => {
            let rid: Id = parse::arg(cmd)?;
@@ -184,6 +187,24 @@ fn command<H: Handle<Error = runtime::HandleError>>(
                return Err(CommandError::Runtime(e));
            }
        },
+
        CommandName::Subscribe => {
+
            let mut stream = stream.try_clone()?;
+

+
            thread::spawn(move || {
+
                match handle.subscribe(MAX_TIMEOUT) {
+
                    Ok(events) => {
+
                        for e in events {
+
                            let event = e?;
+
                            let event = serde_json::to_string(&event)?;
+

+
                            writeln!(stream, "{event}")?;
+
                        }
+
                    }
+
                    Err(e) => log::error!(target: "control", "Error subscribing to events: {e}"),
+
                }
+
                Ok::<_, io::Error>(())
+
            });
+
        }
        CommandName::Status => {
            CommandResult::ok().to_writer(writer).ok();
        }
modified radicle-node/src/runtime/handle.rs
@@ -1,7 +1,7 @@
use std::os::unix::net::UnixStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
-
use std::{fmt, io};
+
use std::{fmt, io, time};

use crossbeam_channel as chan;
use radicle::node::Seeds;
@@ -185,6 +185,13 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

+
    fn subscribe(
+
        &self,
+
        _timeout: time::Duration,
+
    ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Error> {
+
        Ok(Box::new(self.events().into_iter().map(Ok)))
+
    }
+

    fn sessions(&self) -> Result<Self::Sessions, Error> {
        let (sender, receiver) = chan::unbounded();
        let query: Arc<QueryState> = Arc::new(move |state| {
modified radicle-node/src/service.rs
@@ -2,7 +2,6 @@
#![allow(clippy::collapsible_match)]
#![allow(clippy::collapsible_if)]
pub mod config;
-
pub mod events;
pub mod filter;
pub mod message;
pub mod reactor;
@@ -41,9 +40,9 @@ use crate::storage::{ReadRepository, RefUpdate};
use crate::worker::FetchError;
use crate::Link;

+
pub use crate::node::events::{Event, Events};
pub use crate::node::NodeId;
pub use crate::service::config::{Config, Network};
-
pub use crate::service::events::{Event, Events};
pub use crate::service::message::{Message, ZeroBytes};
pub use crate::service::session::Session;

deleted radicle-node/src/service/events.rs
@@ -1,85 +0,0 @@
-
use std::ops::Deref;
-
use std::time;
-

-
use crossbeam_channel as chan;
-

-
use radicle::prelude::*;
-
use radicle::storage::RefUpdate;
-

-
/// A service event.
-
#[derive(Debug, Clone)]
-
pub enum Event {
-
    RefsFetched {
-
        remote: NodeId,
-
        rid: Id,
-
        updated: Vec<RefUpdate>,
-
    },
-
    RefsSynced {
-
        remote: NodeId,
-
        rid: Id,
-
    },
-
    SeedDiscovered {
-
        rid: Id,
-
        nid: NodeId,
-
    },
-
    SeedDropped {
-
        rid: Id,
-
        nid: NodeId,
-
    },
-
    PeerConnected {
-
        nid: NodeId,
-
    },
-
}
-

-
/// Events feed.
-
pub struct Events(chan::Receiver<Event>);
-

-
impl From<chan::Receiver<Event>> for Events {
-
    fn from(value: chan::Receiver<Event>) -> Self {
-
        Self(value)
-
    }
-
}
-

-
impl Deref for Events {
-
    type Target = chan::Receiver<Event>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl Events {
-
    /// Listen for events, and wait for the given predicate to return something,
-
    /// or timeout if the specified amount of time has elapsed.
-
    pub fn wait<F>(
-
        &self,
-
        mut f: F,
-
        timeout: time::Duration,
-
    ) -> Result<Event, chan::RecvTimeoutError>
-
    where
-
        F: FnMut(&Event) -> bool,
-
    {
-
        let start = time::Instant::now();
-

-
        loop {
-
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
-
                match self.recv_timeout(timeout) {
-
                    Ok(event) => {
-
                        if f(&event) {
-
                            return Ok(event);
-
                        }
-
                    }
-
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
-
                        return Err(err);
-
                    }
-
                    Err(chan::RecvTimeoutError::Timeout) => {
-
                        // Keep trying until our timeout reaches zero.
-
                        continue;
-
                    }
-
                }
-
            } else {
-
                return Err(chan::RecvTimeoutError::Timeout);
-
            }
-
        }
-
    }
-
}
modified radicle-node/src/test/handle.rs
@@ -1,8 +1,9 @@
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
+
use std::{io, time};

use crate::identity::Id;
-
use crate::node::{FetchResult, Seeds};
+
use crate::node::{Event, FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service::NodeId;
use crate::service::{self, tracking};
@@ -10,8 +11,8 @@ use crate::service::{self, tracking};
#[derive(Default, Clone)]
pub struct Handle {
    pub updates: Arc<Mutex<Vec<Id>>>,
-
    pub tracking_repos: HashSet<Id>,
-
    pub tracking_nodes: HashSet<NodeId>,
+
    pub tracking_repos: Arc<Mutex<HashSet<Id>>>,
+
    pub tracking_nodes: Arc<Mutex<HashSet<NodeId>>>,
}

impl radicle::node::Handle for Handle {
@@ -38,19 +39,26 @@ impl radicle::node::Handle for Handle {
    }

    fn track_repo(&mut self, id: Id, _scope: tracking::Scope) -> Result<bool, Self::Error> {
-
        Ok(self.tracking_repos.insert(id))
+
        Ok(self.tracking_repos.lock().unwrap().insert(id))
    }

    fn untrack_repo(&mut self, id: Id) -> Result<bool, Self::Error> {
-
        Ok(self.tracking_repos.remove(&id))
+
        Ok(self.tracking_repos.lock().unwrap().remove(&id))
    }

    fn track_node(&mut self, id: NodeId, _alias: Option<String>) -> Result<bool, Self::Error> {
-
        Ok(self.tracking_nodes.insert(id))
+
        Ok(self.tracking_nodes.lock().unwrap().insert(id))
+
    }
+

+
    fn subscribe(
+
        &self,
+
        _timeout: time::Duration,
+
    ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Self::Error> {
+
        Ok(Box::new(std::iter::empty()))
    }

    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error> {
-
        Ok(self.tracking_nodes.remove(&id))
+
        Ok(self.tracking_nodes.lock().unwrap().remove(&id))
    }

    fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error> {
modified radicle-term/src/io.rs
@@ -56,7 +56,15 @@ macro_rules! tip {
    })
}

+
#[macro_export]
+
macro_rules! notice {
+
    ($($arg:tt)*) => ({
+
        $crate::io::notice_args(format_args!($($arg)*));
+
    })
+
}
+

pub use info;
+
pub use notice;
pub use success;
pub use tip;

@@ -68,6 +76,10 @@ pub fn tip_args(args: fmt::Arguments) {
    println!("👉 {}", style(format!("{args}")).italic());
}

+
pub fn notice_args(args: fmt::Arguments) {
+
    println!("{} {args}", Paint::new("!").dim());
+
}
+

pub fn columns() -> Option<usize> {
    termion::terminal_size().map(|(cols, _)| cols as usize).ok()
}
modified radicle/src/node.rs
@@ -1,4 +1,6 @@
mod features;
+

+
pub mod events;
pub mod routing;
pub mod tracking;

@@ -7,7 +9,7 @@ use std::io::{BufRead, BufReader};
use std::ops::Deref;
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
-
use std::{fmt, io, net};
+
use std::{fmt, io, net, time};

use amplify::WrapperMut;
use cyphernet::addr::{HostName, NetAddr};
@@ -19,12 +21,15 @@ use crate::crypto::PublicKey;
use crate::identity::Id;
use crate::storage::RefUpdate;

+
pub use events::{Event, Events};
pub use features::Features;

/// Default name for control socket file.
pub const DEFAULT_SOCKET_NAME: &str = "control.sock";
/// Default radicle protocol port.
pub const DEFAULT_PORT: u16 = 8776;
+
/// Default timeout when waiting for the node to respond with data.
+
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9);
/// Filename of routing table database under the node directory.
pub const ROUTING_DB_FILE: &str = "routing.db";
/// Filename of address database under the node directory.
@@ -142,6 +147,8 @@ pub enum CommandName {
    Status,
    /// Shutdown the node.
    Shutdown,
+
    /// Subscribe to events.
+
    Subscribe,
}

impl fmt::Display for CommandName {
@@ -378,7 +385,7 @@ pub enum CallError {
}

/// A handle to send commands to the node or request information.
-
pub trait Handle {
+
pub trait Handle: Clone + Sync + Send {
    /// The peer sessions type.
    type Sessions;
    /// The error returned by all methods.
@@ -411,13 +418,18 @@ pub trait Handle {
    fn shutdown(self) -> Result<(), Self::Error>;
    /// Query the peer session state.
    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
+
    /// Subscribe to node events.
+
    fn subscribe(
+
        &self,
+
        timeout: time::Duration,
+
    ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Self::Error>;
}

/// Public node & device identifier.
pub type NodeId = PublicKey;

/// Node controller.
-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
pub struct Node {
    socket: PathBuf,
}
@@ -435,10 +447,13 @@ impl Node {
        &self,
        name: CommandName,
        args: impl IntoIterator<Item = A>,
+
        timeout: time::Duration,
    ) -> Result<impl Iterator<Item = Result<T, CallError>>, io::Error> {
        let stream = UnixStream::connect(&self.socket)?;
        Command::new(name, args).to_writer(&stream)?;

+
        stream.set_read_timeout(Some(timeout))?;
+

        Ok(BufReader::new(stream).lines().map(move |l| {
            let l = l?;
            let v = json::from_str(&l).map_err(|e| CallError::InvalidJson {
@@ -459,7 +474,7 @@ impl Handle for Node {
    type Error = Error;

    fn is_running(&self) -> bool {
-
        let Ok(mut lines) = self.call::<&str, CommandResult>(CommandName::Status, []) else {
+
        let Ok(mut lines) = self.call::<&str, CommandResult>(CommandName::Status, [], DEFAULT_TIMEOUT) else {
            return false;
        };
        let Some(Ok(result)) = lines.next() else {
@@ -469,28 +484,36 @@ impl Handle for Node {
    }

    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), Error> {
-
        self.call::<_, CommandResult>(CommandName::Connect, [nid.to_human(), addr.to_string()])?
-
            .next()
-
            .ok_or(Error::EmptyResponse {
-
                cmd: CommandName::Connect,
-
            })??;
+
        self.call::<_, CommandResult>(
+
            CommandName::Connect,
+
            [nid.to_human(), addr.to_string()],
+
            DEFAULT_TIMEOUT,
+
        )?
+
        .next()
+
        .ok_or(Error::EmptyResponse {
+
            cmd: CommandName::Connect,
+
        })??;
        Ok(())
    }

    fn seeds(&mut self, id: Id) -> Result<Seeds, Error> {
-
        let seeds: Seeds =
-
            self.call(CommandName::Seeds, [id.urn()])?
-
                .next()
-
                .ok_or(Error::EmptyResponse {
-
                    cmd: CommandName::Seeds,
-
                })??;
+
        let seeds: Seeds = self
+
            .call(CommandName::Seeds, [id.urn()], DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse {
+
                cmd: CommandName::Seeds,
+
            })??;

        Ok(seeds)
    }

    fn fetch(&mut self, id: Id, from: NodeId) -> Result<FetchResult, Error> {
        let result = self
-
            .call(CommandName::Fetch, [id.urn(), from.to_human()])?
+
            .call(
+
                CommandName::Fetch,
+
                [id.urn(), from.to_human()],
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse {
                cmd: CommandName::Fetch,
@@ -507,7 +530,7 @@ impl Handle for Node {
            vec![id.as_str()]
        };

-
        let mut line = self.call(CommandName::TrackNode, args)?;
+
        let mut line = self.call(CommandName::TrackNode, args, DEFAULT_TIMEOUT)?;
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {
            cmd: CommandName::TrackNode,
        })??;
@@ -516,7 +539,11 @@ impl Handle for Node {
    }

    fn track_repo(&mut self, id: Id, scope: tracking::Scope) -> Result<bool, Error> {
-
        let mut line = self.call(CommandName::TrackRepo, [id.urn(), scope.to_string()])?;
+
        let mut line = self.call(
+
            CommandName::TrackRepo,
+
            [id.urn(), scope.to_string()],
+
            DEFAULT_TIMEOUT,
+
        )?;
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {
            cmd: CommandName::TrackRepo,
        })??;
@@ -525,7 +552,7 @@ impl Handle for Node {
    }

    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error> {
-
        let mut line = self.call(CommandName::UntrackNode, [id])?;
+
        let mut line = self.call(CommandName::UntrackNode, [id], DEFAULT_TIMEOUT)?;
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {
            cmd: CommandName::UntrackNode,
        })??;
@@ -534,7 +561,7 @@ impl Handle for Node {
    }

    fn untrack_repo(&mut self, id: Id) -> Result<bool, Error> {
-
        let mut line = self.call(CommandName::UntrackRepo, [id.urn()])?;
+
        let mut line = self.call(CommandName::UntrackRepo, [id.urn()], DEFAULT_TIMEOUT)?;
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {
            cmd: CommandName::UntrackRepo,
        })??;
@@ -543,21 +570,25 @@ impl Handle for Node {
    }

    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
-
        for line in self.call::<_, CommandResult>(CommandName::AnnounceRefs, [id.urn()])? {
+
        for line in
+
            self.call::<_, CommandResult>(CommandName::AnnounceRefs, [id.urn()], DEFAULT_TIMEOUT)?
+
        {
            line?;
        }
        Ok(())
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
-
        for line in self.call::<&str, CommandResult>(CommandName::AnnounceInventory, [])? {
+
        for line in
+
            self.call::<&str, CommandResult>(CommandName::AnnounceInventory, [], DEFAULT_TIMEOUT)?
+
        {
            line?;
        }
        Ok(())
    }

    fn sync_inventory(&mut self) -> Result<bool, Error> {
-
        let mut line = self.call::<&str, _>(CommandName::SyncInventory, [])?;
+
        let mut line = self.call::<&str, _>(CommandName::SyncInventory, [], DEFAULT_TIMEOUT)?;
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {
            cmd: CommandName::SyncInventory,
        })??;
@@ -565,6 +596,22 @@ impl Handle for Node {
        response.into()
    }

+
    fn subscribe(
+
        &self,
+
        timeout: time::Duration,
+
    ) -> Result<Box<dyn Iterator<Item = Result<Event, io::Error>>>, Error> {
+
        let events = self.call::<&str, _>(CommandName::Subscribe, [], timeout)?;
+

+
        Ok(Box::new(events.map(|e| {
+
            e.map_err(|err| match err {
+
                CallError::Io(e) => e,
+
                CallError::InvalidJson { .. } => {
+
                    io::Error::new(io::ErrorKind::InvalidInput, err.to_string())
+
                }
+
            })
+
        })))
+
    }
+

    fn sessions(&self) -> Result<Self::Sessions, Error> {
        todo!();
    }
added radicle/src/node/events.rs
@@ -0,0 +1,95 @@
+
use std::ops::Deref;
+
use std::time;
+

+
use crossbeam_channel as chan;
+

+
use crate::prelude::*;
+
use crate::storage::RefUpdate;
+

+
/// A service event.
+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "kebab-case", tag = "type")]
+
pub enum Event {
+
    RefsFetched {
+
        remote: NodeId,
+
        rid: Id,
+
        updated: Vec<RefUpdate>,
+
    },
+
    RefsSynced {
+
        remote: NodeId,
+
        rid: Id,
+
    },
+
    SeedDiscovered {
+
        rid: Id,
+
        nid: NodeId,
+
    },
+
    SeedDropped {
+
        rid: Id,
+
        nid: NodeId,
+
    },
+
    PeerConnected {
+
        nid: NodeId,
+
    },
+
}
+

+
/// Events feed.
+
pub struct Events(chan::Receiver<Event>);
+

+
impl IntoIterator for Events {
+
    type Item = Event;
+
    type IntoIter = chan::IntoIter<Event>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.0.into_iter()
+
    }
+
}
+

+
impl From<chan::Receiver<Event>> for Events {
+
    fn from(value: chan::Receiver<Event>) -> Self {
+
        Self(value)
+
    }
+
}
+

+
impl Deref for Events {
+
    type Target = chan::Receiver<Event>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl Events {
+
    /// Listen for events, and wait for the given predicate to return something,
+
    /// or timeout if the specified amount of time has elapsed.
+
    pub fn wait<F>(
+
        &self,
+
        mut f: F,
+
        timeout: time::Duration,
+
    ) -> Result<Event, chan::RecvTimeoutError>
+
    where
+
        F: FnMut(&Event) -> bool,
+
    {
+
        let start = time::Instant::now();
+

+
        loop {
+
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+
                match self.recv_timeout(timeout) {
+
                    Ok(event) => {
+
                        if f(&event) {
+
                            return Ok(event);
+
                        }
+
                    }
+
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
+
                        return Err(err);
+
                    }
+
                    Err(chan::RecvTimeoutError::Timeout) => {
+
                        // Keep trying until our timeout reaches zero.
+
                        continue;
+
                    }
+
                }
+
            } else {
+
                return Err(chan::RecvTimeoutError::Timeout);
+
            }
+
        }
+
    }
+
}