Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Consolidate `node::Handle` traits into one
Alexis Sellier committed 3 years ago
commit 7b8bb08b0868b91cdcc77338e9138426c62aefb9
parent a100f1c683c05349866c301b4e243d80bf4d578b
11 files changed +130 -99
modified radicle-cli/src/commands/clone.rs
@@ -80,12 +80,12 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

pub fn clone(id: Id, _interactive: Interactive, ctx: impl term::Context) -> anyhow::Result<()> {
    let profile = ctx.profile()?;
-
    let node = radicle::node::connect(profile.node())?;
+
    let mut node = radicle::node::connect(profile.node())?;
    let signer = term::signer(&profile)?;

    // Track & fetch project.
-
    node.track_repo(&id).context("track")?;
-
    node.fetch(&id).context("fetch")?;
+
    node.track_repo(id).context("track")?;
+
    node.fetch(id).context("fetch")?;

    // Create a local fork of the project, under our own id.
    rad::fork(id, &signer, &profile.storage).context("fork")?;
modified radicle-cli/src/commands/track.rs
@@ -95,7 +95,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
    let storage = &profile.storage;
    let (_, rid) = radicle::rad::cwd().context("this command must be run within a project")?;
    let Doc { payload, .. } = storage.repository(rid)?.project_of(profile.id())?;
-
    let node = radicle::node::connect(&profile.node())?;
+
    let mut node = radicle::node::connect(&profile.node())?;

    term::info!(
        "Establishing 🌱 tracking relationship for {}",
@@ -103,7 +103,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
    );
    term::blank();

-
    let tracked = node.track_node(&peer, options.alias.as_deref())?;
+
    let tracked = node.track_node(peer, options.alias.clone())?;
    let outcome = if tracked { "established" } else { "exists" };

    if let Some(alias) = options.alias {
@@ -118,7 +118,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
    }

    if options.fetch {
-
        node.fetch(&rid)?;
+
        node.fetch(rid)?;
    }

    Ok(())
modified radicle-cli/src/commands/untrack.rs
@@ -88,6 +88,6 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
}

pub fn untrack(id: Id, profile: &Profile) -> anyhow::Result<bool> {
-
    let node = radicle::node::connect(profile.node())?;
-
    node.untrack_repo(&id).map_err(|e| anyhow!(e))
+
    let mut node = radicle::node::connect(profile.node())?;
+
    node.untrack_repo(id).map_err(|e| anyhow!(e))
}
modified radicle-node/src/client/handle.rs
@@ -55,12 +55,25 @@ pub struct Handle<W: Waker> {
    pub(crate) waker: W,
}

-
impl<W: Waker> traits::Handle for Handle<W> {
+
impl<W: Waker> Handle<W> {
+
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
+
        self.commands.send(cmd)?;
+
        self.waker.wake()?;
+

+
        Ok(())
+
    }
+
}
+

+
impl<W: Waker> radicle::node::Handle for Handle<W> {
+
    type Session = Session;
+
    type FetchLookup = FetchLookup;
+
    type Error = Error;
+

    fn listening(&self) -> Result<net::SocketAddr, Error> {
        self.listening.recv().map_err(Error::from)
    }

-
    fn fetch(&mut self, id: Id) -> Result<FetchLookup, Error> {
+
    fn fetch(&mut self, id: Id) -> Result<Self::FetchLookup, Error> {
        let (sender, receiver) = chan::bounded(1);
        self.commands.send(service::Command::Fetch(id, sender))?;
        receiver.recv().map_err(Error::from)
@@ -98,13 +111,6 @@ impl<W: Waker> traits::Handle for Handle<W> {
        self.command(service::Command::AnnounceRefs(id))
    }

-
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.commands.send(cmd)?;
-
        self.waker.wake()?;
-

-
        Ok(())
-
    }
-

    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
        let (sender, receiver) = chan::unbounded();
        let query: Arc<QueryState> = Arc::new(move |state| {
@@ -151,35 +157,3 @@ impl<W: Waker> traits::Handle for Handle<W> {
        Ok(())
    }
}
-

-
pub mod traits {
-
    use super::*;
-

-
    pub trait Handle {
-
        /// Wait for the node's listening socket to be bound.
-
        fn listening(&self) -> Result<net::SocketAddr, Error>;
-
        /// Retrieve or update the project from network.
-
        fn fetch(&mut self, id: Id) -> Result<FetchLookup, Error>;
-
        /// Start tracking the given project. Doesn't do anything if the project is already
-
        /// tracked.
-
        fn track_repo(&mut self, id: Id) -> Result<bool, Error>;
-
        /// Start tracking the given node.
-
        fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error>;
-
        /// Untrack the given project and delete it from storage.
-
        fn untrack_repo(&mut self, id: Id) -> Result<bool, Error>;
-
        /// Untrack the given node.
-
        fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error>;
-
        /// Notify the client that a project has been updated.
-
        fn announce_refs(&mut self, id: Id) -> Result<(), Error>;
-
        /// Send a command to the command channel, and wake up the event loop.
-
        fn command(&self, cmd: service::Command) -> Result<(), Error>;
-
        /// Ask the client to shutdown.
-
        fn shutdown(self) -> Result<(), Error>;
-
        /// Query the routing table entries.
-
        fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error>;
-
        /// Query the peer session state.
-
        fn sessions(&self) -> Result<chan::Receiver<(NodeId, Session)>, Error>;
-
        /// Query the inventory.
-
        fn inventory(&self) -> Result<chan::Receiver<Id>, Error>;
-
    }
-
}
modified radicle-node/src/control.rs
@@ -7,8 +7,9 @@ use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::{fs, io, net};

+
use radicle::node::Handle;
+

use crate::client;
-
use crate::client::handle::traits::Handle;
use crate::identity::Id;
use crate::node;
use crate::service::FetchLookup;
@@ -23,7 +24,13 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<P: AsRef<Path>, H: Handle>(path: P, mut handle: H) -> Result<(), Error> {
+
pub fn listen<
+
    P: AsRef<Path>,
+
    H: Handle<Error = client::handle::Error, FetchLookup = FetchLookup>,
+
>(
+
    path: P,
+
    mut handle: H,
+
) -> Result<(), Error> {
    // Remove the socket file on startup before rebinding.
    fs::remove_file(&path).ok();
    fs::create_dir_all(
@@ -69,7 +76,10 @@ enum DrainError {
    Io(#[from] io::Error),
}

-
fn drain<H: Handle>(stream: &UnixStream, handle: &mut H) -> Result<(), DrainError> {
+
fn drain<H: Handle<Error = client::handle::Error, FetchLookup = FetchLookup>>(
+
    stream: &UnixStream,
+
    handle: &mut H,
+
) -> Result<(), DrainError> {
    let mut reader = BufReader::new(stream);
    let mut writer = LineWriter::new(stream);

@@ -198,7 +208,11 @@ fn drain<H: Handle>(stream: &UnixStream, handle: &mut H) -> Result<(), DrainErro
    Ok(())
}

-
fn fetch<W: Write, H: Handle>(id: Id, mut writer: W, handle: &mut H) -> Result<(), DrainError> {
+
fn fetch<W: Write, H: Handle<Error = client::handle::Error, FetchLookup = FetchLookup>>(
+
    id: Id,
+
    mut writer: W,
+
    handle: &mut H,
+
) -> Result<(), DrainError> {
    match handle.fetch(id) {
        Err(e) => {
            return Err(DrainError::Client(e));
@@ -305,20 +319,24 @@ mod tests {
            move || crate::control::listen(socket, handle)
        });

-
        let handle = loop {
+
        let mut handle = loop {
            if let Ok(conn) = Node::connect(&socket) {
                break conn;
            }
        };

-
        assert!(handle.track_repo(&proj).unwrap());
-
        assert!(!handle.track_repo(&proj).unwrap());
-
        assert!(handle.untrack_repo(&proj).unwrap());
-
        assert!(!handle.untrack_repo(&proj).unwrap());
+
        assert!(handle.track_repo(proj).unwrap());
+
        assert!(!handle.track_repo(proj).unwrap());
+
        assert!(handle.untrack_repo(proj).unwrap());
+
        assert!(!handle.untrack_repo(proj).unwrap());

-
        assert!(handle.track_node(&peer, Some("alice")).unwrap());
-
        assert!(!handle.track_node(&peer, Some("alice")).unwrap());
-
        assert!(handle.untrack_node(&peer).unwrap());
-
        assert!(!handle.untrack_node(&peer).unwrap());
+
        assert!(handle
+
            .track_node(peer, Some(String::from("alice")))
+
            .unwrap());
+
        assert!(!handle
+
            .track_node(peer, Some(String::from("alice")))
+
            .unwrap());
+
        assert!(handle.untrack_node(peer).unwrap());
+
        assert!(!handle.untrack_node(peer).unwrap());
    }
}
modified radicle-node/src/test/handle.rs
@@ -3,7 +3,6 @@ use std::sync::{Arc, Mutex};

use crossbeam_channel as chan;

-
use crate::client::handle::traits;
use crate::client::handle::Error;
use crate::identity::Id;
use crate::service;
@@ -17,7 +16,11 @@ pub struct Handle {
    pub tracking_nodes: HashSet<NodeId>,
}

-
impl traits::Handle for Handle {
+
impl radicle::node::Handle for Handle {
+
    type Error = Error;
+
    type Session = service::Session;
+
    type FetchLookup = FetchLookup;
+

    fn listening(&self) -> Result<std::net::SocketAddr, Error> {
        unimplemented!()
    }
@@ -48,10 +51,6 @@ impl traits::Handle for Handle {
        Ok(())
    }

-
    fn command(&self, _cmd: service::Command) -> Result<(), Error> {
-
        Ok(())
-
    }
-

    fn routing(&self) -> Result<chan::Receiver<(Id, service::NodeId)>, Error> {
        unimplemented!();
    }
modified radicle-remote-helper/src/lib.rs
@@ -108,8 +108,8 @@ pub fn run(profile: radicle::Profile) -> Result<(), Box<dyn std::error::Error +
                        // Connect to local node and announce refs to the network.
                        // If our node is not running, we simply skip this step, as the
                        // refs will be announced eventually, when the node restarts.
-
                        if let Ok(conn) = radicle::node::connect(&profile.node()) {
-
                            conn.announce_refs(&url.repo)?;
+
                        if let Ok(mut conn) = radicle::node::connect(&profile.node()) {
+
                            conn.announce_refs(url.repo)?;
                        }
                    }
                }
modified radicle-tools/src/rad-clone.rs
@@ -11,8 +11,8 @@ fn main() -> anyhow::Result<()> {

    if let Some(id) = env::args().nth(1) {
        let id = Id::from_str(&id)?;
-
        let node = radicle::node::connect(profile.node())?;
-
        let repo = radicle::rad::clone(id, &cwd, &signer, &profile.storage, &node)?;
+
        let mut node = radicle::node::connect(profile.node())?;
+
        let repo = radicle::rad::clone(id, &cwd, &signer, &profile.storage, &mut node)?;

        println!(
            "ok: project {id} cloned into `{}`",
modified radicle-tools/src/rad-push.rs
@@ -16,7 +16,7 @@ fn main() -> anyhow::Result<()> {
    let sigrefs = project.sign_refs(&signer)?;
    let head = project.set_head()?;

-
    radicle::node::connect(&profile.node())?.announce_refs(&id)?;
+
    radicle::node::connect(&profile.node())?.announce_refs(id)?;

    println!("head: {}", head);
    println!("ok: {}", sigrefs.signature);
modified radicle/src/node.rs
@@ -1,12 +1,13 @@
mod features;

-
use std::io;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
+
use std::{io, net};

use crate::crypto::PublicKey;
use crate::identity::Id;
+
use crossbeam_channel as chan;

pub use features::Features;

@@ -27,22 +28,38 @@ pub enum Error {
    EmptyResponse { cmd: &'static str },
}

+
/// A handle to send commands to the node or request information.
pub trait Handle {
-
    /// Fetch a project from the network. Fails if the project isn't tracked.
-
    fn fetch(&self, id: &Id) -> Result<(), Error>;
-
    /// Start tracking the given node. If the node is already tracked,
-
    /// updates the alias if necessary.
-
    fn track_node(&self, id: &NodeId, alias: Option<&str>) -> Result<bool, Error>;
-
    /// Start tracking the given repository.
-
    fn track_repo(&self, id: &Id) -> Result<bool, Error>;
+
    /// The result of a fetch request.
+
    type FetchLookup;
+
    /// The peer session type.
+
    type Session;
+
    /// The error returned by all methods.
+
    type Error: std::error::Error;
+

+
    /// Wait for the node's listening socket to be bound.
+
    fn listening(&self) -> Result<net::SocketAddr, Self::Error>;
+
    /// Retrieve or update the project from network.
+
    fn fetch(&mut self, id: Id) -> Result<Self::FetchLookup, Self::Error>;
+
    /// Start tracking the given project. Doesn't do anything if the project is already
+
    /// tracked.
+
    fn track_repo(&mut self, id: Id) -> Result<bool, Self::Error>;
+
    /// Start tracking the given node.
+
    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Self::Error>;
+
    /// Untrack the given project and delete it from storage.
+
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Self::Error>;
    /// Untrack the given node.
-
    fn untrack_node(&self, id: &NodeId) -> Result<bool, Error>;
-
    /// Untrack the given repository and delete it from storage.
-
    fn untrack_repo(&self, id: &Id) -> Result<bool, Error>;
-
    /// Notify the network that we have new refs.
-
    fn announce_refs(&self, id: &Id) -> Result<(), Error>;
-
    /// Ask the node to shutdown.
-
    fn shutdown(self) -> Result<(), Error>;
+
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error>;
+
    /// Notify the client that a project has been updated.
+
    fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error>;
+
    /// Ask the client to shutdown.
+
    fn shutdown(self) -> Result<(), Self::Error>;
+
    /// Query the routing table entries.
+
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Self::Error>;
+
    /// Query the peer session state.
+
    fn sessions(&self) -> Result<chan::Receiver<(NodeId, Self::Session)>, Self::Error>;
+
    /// Query the inventory.
+
    fn inventory(&self) -> Result<chan::Receiver<Id>, Self::Error>;
}

/// Public node & device identifier.
@@ -80,7 +97,11 @@ impl Node {
}

impl Handle for Node {
-
    fn fetch(&self, id: &Id) -> Result<(), Error> {
+
    type Session = ();
+
    type FetchLookup = ();
+
    type Error = Error;
+

+
    fn fetch(&mut self, id: Id) -> Result<(), Error> {
        for line in self.call("fetch", &[id])? {
            let line = line?;
            log::debug!("node: {}", line);
@@ -88,9 +109,9 @@ impl Handle for Node {
        Ok(())
    }

-
    fn track_node(&self, id: &NodeId, alias: Option<&str>) -> Result<bool, Error> {
+
    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error> {
        let id = id.to_human();
-
        let mut line = if let Some(alias) = alias {
+
        let mut line = if let Some(alias) = alias.as_deref() {
            self.call("track-node", &[id.as_str(), alias])
        } else {
            self.call("track-node", &[id.as_str()])
@@ -111,7 +132,7 @@ impl Handle for Node {
        }
    }

-
    fn track_repo(&self, id: &Id) -> Result<bool, Error> {
+
    fn track_repo(&mut self, id: Id) -> Result<bool, Error> {
        let mut line = self.call("track-repo", &[id])?;
        let line = line
            .next()
@@ -129,7 +150,7 @@ impl Handle for Node {
        }
    }

-
    fn untrack_node(&self, id: &NodeId) -> Result<bool, Error> {
+
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error> {
        let mut line = self.call("untrack-node", &[id])?;
        let line = line.next().ok_or(Error::EmptyResponse {
            cmd: "untrack-node",
@@ -147,7 +168,7 @@ impl Handle for Node {
        }
    }

-
    fn untrack_repo(&self, id: &Id) -> Result<bool, Error> {
+
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Error> {
        let mut line = self.call("untrack-repo", &[id])?;
        let line = line.next().ok_or(Error::EmptyResponse {
            cmd: "untrack-repo",
@@ -165,7 +186,7 @@ impl Handle for Node {
        }
    }

-
    fn announce_refs(&self, id: &Id) -> Result<(), Error> {
+
    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
        for line in self.call("announce-refs", &[id])? {
            let line = line?;
            log::debug!("node: {}", line);
@@ -173,6 +194,22 @@ impl Handle for Node {
        Ok(())
    }

+
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
+
        todo!();
+
    }
+

+
    fn sessions(&self) -> Result<chan::Receiver<(NodeId, Self::Session)>, Error> {
+
        todo!();
+
    }
+

+
    fn listening(&self) -> Result<net::SocketAddr, Error> {
+
        todo!();
+
    }
+

+
    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
+
        todo!();
+
    }
+

    fn shutdown(self) -> Result<(), Error> {
        todo!();
    }
modified radicle/src/rad.rs
@@ -197,10 +197,13 @@ pub fn clone<P: AsRef<Path>, G: Signer, S: storage::WriteStorage, H: node::Handl
    path: P,
    signer: &G,
    storage: &S,
-
    handle: &H,
-
) -> Result<git2::Repository, CloneError> {
-
    let _ = handle.track_repo(&proj)?;
-
    let _ = handle.fetch(&proj)?;
+
    handle: &mut H,
+
) -> Result<git2::Repository, CloneError>
+
where
+
    CloneError: From<H::Error>,
+
{
+
    let _ = handle.track_repo(proj)?;
+
    let _ = handle.fetch(proj)?;
    let _ = fork(proj, signer, storage)?;
    let working = checkout(proj, signer.public_key(), path, storage)?;