Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Get `rad clone` working correctly
Alexis Sellier committed 3 years ago
commit 963ad3c3d77806611e98bcbd7a778aca5d0adddb
parent c888f40660b6ad1d3a37bfd4e1f7a855d96605f7
18 files changed +269 -192
modified radicle-cli/src/commands/checkout.rs
@@ -5,6 +5,8 @@ use anyhow::anyhow;
use anyhow::Context as _;

use radicle::prelude::*;
+
use radicle::storage::git::transport;
+
use radicle::storage::RemoteId;
use radicle::storage::WriteStorage;

use crate::project;
@@ -22,6 +24,7 @@ Usage

Options

+
    --remote <id>   Remote namespace to checkout
    --no-confirm    Don't ask for confirmation during checkout
    --help          Print help
"#,
@@ -29,6 +32,7 @@ Options

pub struct Options {
    pub id: Id,
+
    pub remote: Option<RemoteId>,
}

impl Args for Options {
@@ -38,6 +42,7 @@ impl Args for Options {

        let mut parser = lexopt::Parser::from_args(args);
        let mut id = None;
+
        let mut remote = None;

        while let Some(arg) = parser.next()? {
            match arg {
@@ -45,6 +50,16 @@ impl Args for Options {
                    // Ignored for now.
                }
                Long("help") => return Err(Error::Help.into()),
+
                Long("remote") => {
+
                    let val = parser.value().unwrap();
+
                    let val = val.to_string_lossy();
+

+
                    if let Ok(val) = NodeId::from_str(&val) {
+
                        remote = Some(val);
+
                    } else {
+
                        return Err(anyhow!("invalid Node ID '{}'", val));
+
                    }
+
                }
                Value(val) if id.is_none() => {
                    let val = val.to_string_lossy();
                    let val = Id::from_str(&val).context(format!("invalid id '{}'", val))?;
@@ -58,6 +73,7 @@ impl Args for Options {
        Ok((
            Options {
                id: id.ok_or_else(|| anyhow!("a project id to checkout must be provided"))?,
+
                remote,
            },
            vec![],
        ))
@@ -79,13 +95,16 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
pub fn execute(options: Options, profile: &Profile) -> anyhow::Result<PathBuf> {
    let id = options.id;
    let storage = &profile.storage;
+
    let remote = options.remote.unwrap_or(*profile.id());
    let doc = storage
        .repository(id)?
-
        .identity_of(profile.id())
+
        .identity_of(&remote)
        .context("project could not be found in local storage")?;
    let payload = doc.project()?;
    let path = PathBuf::from(payload.name().clone());

+
    transport::local::register(storage.clone());
+

    if path.exists() {
        anyhow::bail!("the local path {:?} already exists", path.as_path());
    }
@@ -97,7 +116,7 @@ pub fn execute(options: Options, profile: &Profile) -> anyhow::Result<PathBuf> {
    ));

    let spinner = term::spinner("Performing checkout...");
-
    let repo = match radicle::rad::checkout(options.id, profile.id(), path.clone(), &storage) {
+
    let repo = match radicle::rad::checkout(options.id, &remote, path.clone(), &storage) {
        Ok(repo) => repo,
        Err(err) => {
            spinner.failed();
modified radicle-cli/src/commands/clone.rs
@@ -80,15 +80,15 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

pub fn clone(id: Id, _interactive: Interactive, ctx: impl term::Context) -> anyhow::Result<()> {
    let profile = ctx.profile()?;
-
    let mut node = radicle::node::connect(profile.socket())?;
    let signer = term::signer(&profile)?;
+
    let mut node = radicle::Node::new(profile.socket());

    // Track & fetch project.
    node.track_repo(id).context("track")?;
-
    node.fetch(id).context("fetch")?;
+
    node.fetch(id).context("fetch")?; // FIXME: Handle output

    // Create a local fork of the project, under our own id.
-
    rad::fork(id, &signer, &profile.storage).context("fork")?;
+
    rad::fork(id, &signer, &profile.storage).context("fork error")?;

    let doc = profile
        .storage
modified radicle-cli/src/commands/track.rs
@@ -94,7 +94,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
    let storage = &profile.storage;
    let (_, rid) = radicle::rad::cwd().context("this command must be run within a project")?;
    let project = storage.repository(rid)?.project_of(profile.id())?;
-
    let mut node = radicle::node::connect(profile.socket())?;
+
    let mut node = radicle::Node::new(profile.socket());

    term::info!(
        "Establishing 🌱 tracking relationship for {}",
modified radicle-cli/src/commands/untrack.rs
@@ -88,6 +88,6 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
}

pub fn untrack(id: Id, profile: &Profile) -> anyhow::Result<bool> {
-
    let mut node = radicle::node::connect(profile.socket())?;
+
    let mut node = radicle::Node::new(profile.socket());
    node.untrack_repo(id).map_err(|e| anyhow!(e))
}
modified radicle-node/src/client.rs
@@ -1,3 +1,4 @@
+
use std::os::unix::net::UnixListener;
use std::{io, net, thread, time};

use crossbeam_channel as chan;
@@ -127,9 +128,13 @@ impl<G: Signer + EcSign> Runtime<G> {
        }
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
        let handle = Handle::new(home, reactor.controller());
+

+
        log::info!("Binding control socket {}..", node_sock.display());
+

+
        let listener = UnixListener::bind(&node_sock)?;
        let control = thread::spawn({
            let handle = handle.clone();
-
            move || control::listen(node_sock, handle)
+
            move || control::listen(listener, handle)
        });

        let pool = WorkerPool::with(
modified radicle-node/src/client/handle.rs
@@ -108,6 +108,10 @@ impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
    type Sessions = Sessions;
    type Error = Error;

+
    fn is_running(&self) -> bool {
+
        true
+
    }
+

    fn connect(&mut self, node: NodeId, addr: radicle::node::Address) -> Result<(), Error> {
        self.command(service::Command::Connect(node, addr))?;

modified radicle-node/src/control.rs
@@ -4,8 +4,8 @@ use std::io::BufReader;
use std::io::LineWriter;
use std::os::unix::net::UnixListener;
use std::os::unix::net::UnixStream;
-
use std::path::{Path, PathBuf};
-
use std::{fs, io, net};
+
use std::path::PathBuf;
+
use std::{io, net};

use radicle::node::Handle;

@@ -23,21 +23,10 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<P: AsRef<Path>, H: Handle<Error = client::handle::Error>>(
-
    path: P,
+
pub fn listen<H: Handle<Error = client::handle::Error>>(
+
    listener: UnixListener,
    mut handle: H,
) -> Result<(), Error> {
-
    fs::create_dir_all(
-
        path.as_ref()
-
            .parent()
-
            .ok_or_else(|| Error::InvalidPath(path.as_ref().to_path_buf()))?,
-
    )
-
    .ok();
-

-
    log::info!("Binding control socket {}..", path.as_ref().display());
-

-
    // TODO: Move socket binding to main thread.
-
    let listener = UnixListener::bind(&path).map_err(Error::Bind)?;
    for incoming in listener.incoming() {
        match incoming {
            Ok(mut stream) => {
@@ -55,23 +44,20 @@ pub fn listen<P: AsRef<Path>, H: Handle<Error = client::handle::Error>>(

                    stream.flush().ok();
                    stream.shutdown(net::Shutdown::Both).ok();
-
                } else {
-
                    writeln!(stream, "ok").ok();
                }
            }
            Err(e) => log::error!("Failed to accept incoming connection: {}", e),
        }
    }
    log::debug!("Exiting control loop..");
-
    fs::remove_file(&path).ok();

    Ok(())
}

#[derive(thiserror::Error, Debug)]
enum DrainError {
-
    #[error("invalid command argument `{0}`")]
-
    InvalidCommandArg(String),
+
    #[error("invalid command argument `{0}`, {1}")]
+
    InvalidCommandArg(String, Box<dyn std::error::Error>),
    #[error("unknown command `{0}`")]
    UnknownCommand(String),
    #[error("client error: {0}")]
@@ -88,132 +74,142 @@ fn drain<H: Handle<Error = client::handle::Error>>(
) -> Result<(), DrainError> {
    let mut reader = BufReader::new(stream);
    let mut writer = LineWriter::new(stream);
+
    let mut line = String::new();
+

+
    reader.read_line(&mut line)?;
+

+
    let cmd = line.trim_end();

    // TODO: refactor to include helper
-
    for line in reader.by_ref().lines().flatten() {
-
        match line.split_once(' ') {
-
            Some(("fetch", arg)) => {
-
                if let Ok(id) = arg.parse() {
-
                    fetch(id, LineWriter::new(stream), handle)?;
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
-
                }
+
    match cmd.split_once(' ') {
+
        Some(("fetch", arg)) => match arg.parse() {
+
            Ok(id) => {
+
                fetch(id, LineWriter::new(stream), handle)?;
            }
-
            Some(("track-repo", arg)) => {
-
                if let Ok(id) = arg.parse() {
-
                    match handle.track_repo(id) {
-
                        Ok(updated) => {
-
                            if updated {
-
                                writeln!(writer, "{}", node::RESPONSE_OK)?;
-
                            } else {
-
                                writeln!(writer, "{}", node::RESPONSE_NOOP)?;
-
                            }
-
                        }
-
                        Err(e) => {
-
                            return Err(DrainError::Client(e));
-
                        }
+
            Err(err) => {
+
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
+
            }
+
        },
+
        Some(("track-repo", arg)) => match arg.parse() {
+
            Ok(id) => match handle.track_repo(id) {
+
                Ok(updated) => {
+
                    if updated {
+
                        writeln!(writer, "{}", node::RESPONSE_OK)?;
+
                    } else {
+
                        writeln!(writer, "{}", node::RESPONSE_NOOP)?;
                    }
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
                }
+
                Err(e) => {
+
                    return Err(DrainError::Client(e));
+
                }
+
            },
+
            Err(err) => {
+
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
            }
-
            Some(("untrack-repo", arg)) => {
-
                if let Ok(id) = arg.parse() {
-
                    match handle.untrack_repo(id) {
-
                        Ok(updated) => {
-
                            if updated {
-
                                writeln!(writer, "{}", node::RESPONSE_OK)?;
-
                            } else {
-
                                writeln!(writer, "{}", node::RESPONSE_NOOP)?;
-
                            }
-
                        }
-
                        Err(e) => {
-
                            return Err(DrainError::Client(e));
-
                        }
+
        },
+
        Some(("untrack-repo", arg)) => match arg.parse() {
+
            Ok(id) => match handle.untrack_repo(id) {
+
                Ok(updated) => {
+
                    if updated {
+
                        writeln!(writer, "{}", node::RESPONSE_OK)?;
+
                    } else {
+
                        writeln!(writer, "{}", node::RESPONSE_NOOP)?;
                    }
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
                }
+
                Err(e) => {
+
                    return Err(DrainError::Client(e));
+
                }
+
            },
+
            Err(err) => {
+
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
            }
-
            Some(("track-node", args)) => {
-
                let (peer, alias) = if let Some((peer, alias)) = args.split_once(' ') {
-
                    (peer, Some(alias.to_owned()))
-
                } else {
-
                    (args, None)
-
                };
-
                if let Ok(id) = peer.parse() {
-
                    match handle.track_node(id, alias) {
-
                        Ok(updated) => {
-
                            if updated {
-
                                writeln!(writer, "{}", node::RESPONSE_OK)?;
-
                            } else {
-
                                writeln!(writer, "{}", node::RESPONSE_NOOP)?;
-
                            }
-
                        }
-
                        Err(e) => {
-
                            return Err(DrainError::Client(e));
+
        },
+
        Some(("track-node", args)) => {
+
            let (peer, alias) = if let Some((peer, alias)) = args.split_once(' ') {
+
                (peer, Some(alias.to_owned()))
+
            } else {
+
                (args, None)
+
            };
+
            match peer.parse() {
+
                Ok(id) => match handle.track_node(id, alias) {
+
                    Ok(updated) => {
+
                        if updated {
+
                            writeln!(writer, "{}", node::RESPONSE_OK)?;
+
                        } else {
+
                            writeln!(writer, "{}", node::RESPONSE_NOOP)?;
                        }
                    }
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(args.to_owned()));
+
                    Err(e) => {
+
                        return Err(DrainError::Client(e));
+
                    }
+
                },
+
                Err(err) => {
+
                    return Err(DrainError::InvalidCommandArg(
+
                        args.to_owned(),
+
                        Box::new(err),
+
                    ));
                }
            }
-
            Some(("untrack-node", arg)) => {
-
                if let Ok(id) = arg.parse() {
-
                    match handle.untrack_node(id) {
-
                        Ok(updated) => {
-
                            if updated {
-
                                writeln!(writer, "{}", node::RESPONSE_OK)?;
-
                            } else {
-
                                writeln!(writer, "{}", node::RESPONSE_NOOP)?;
-
                            }
-
                        }
-
                        Err(e) => {
-
                            return Err(DrainError::Client(e));
-
                        }
+
        }
+
        Some(("untrack-node", arg)) => match arg.parse() {
+
            Ok(id) => match handle.untrack_node(id) {
+
                Ok(updated) => {
+
                    if updated {
+
                        writeln!(writer, "{}", node::RESPONSE_OK)?;
+
                    } else {
+
                        writeln!(writer, "{}", node::RESPONSE_NOOP)?;
                    }
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
                }
+
                Err(e) => {
+
                    return Err(DrainError::Client(e));
+
                }
+
            },
+
            Err(err) => {
+
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
            }
-
            Some(("announce-refs", arg)) => {
-
                if let Ok(id) = arg.parse() {
-
                    if let Err(e) = handle.announce_refs(id) {
-
                        return Err(DrainError::Client(e));
-
                    }
-
                    writeln!(writer, "{}", node::RESPONSE_OK)?;
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
+
        },
+
        Some(("announce-refs", arg)) => match arg.parse() {
+
            Ok(id) => {
+
                if let Err(e) = handle.announce_refs(id) {
+
                    return Err(DrainError::Client(e));
                }
+
                writeln!(writer, "{}", node::RESPONSE_OK)?;
+
            }
+
            Err(err) => {
+
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
            }
-
            Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),
+
        },
+
        Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),

-
            // Commands with no arguments.
-
            None => match line.as_str() {
-
                "routing" => match handle.routing() {
-
                    Ok(c) => {
-
                        for (id, seed) in c.iter() {
-
                            writeln!(writer, "{id} {seed}",)?;
-
                        }
-
                    }
-
                    Err(e) => return Err(DrainError::Client(e)),
-
                },
-
                "inventory" => match handle.inventory() {
-
                    Ok(c) => {
-
                        for id in c.iter() {
-
                            writeln!(writer, "{id}")?;
-
                        }
+
        // Commands with no arguments.
+
        None => match cmd {
+
            "status" => {
+
                println!("RECEIVED 'status'");
+
                writeln!(writer, "{}", node::RESPONSE_OK).ok();
+
            }
+
            "routing" => match handle.routing() {
+
                Ok(c) => {
+
                    for (id, seed) in c.iter() {
+
                        writeln!(writer, "{id} {seed}",)?;
                    }
-
                    Err(e) => return Err(DrainError::Client(e)),
-
                },
-
                "shutdown" => {
-
                    return Err(DrainError::Shutdown);
                }
-
                _ => {
-
                    return Err(DrainError::UnknownCommand(line));
+
                Err(e) => return Err(DrainError::Client(e)),
+
            },
+
            "inventory" => match handle.inventory() {
+
                Ok(c) => {
+
                    for id in c.iter() {
+
                        writeln!(writer, "{id}")?;
+
                    }
                }
+
                Err(e) => return Err(DrainError::Client(e)),
            },
-
        }
+
            "shutdown" => {
+
                return Err(DrainError::Shutdown);
+
            }
+
            _ => {
+
                return Err(DrainError::UnknownCommand(line));
+
            }
+
        },
    }
    Ok(())
}
@@ -232,13 +228,16 @@ fn fetch<W: Write, H: Handle<Error = client::handle::Error>>(

            writeln!(
                writer,
-
                "ok: found {} seeds for {} ({:?})",
+
                "ok: found {} seeds for {} ({:?})", // TODO: Better output
                seeds.len(),
                &id,
                &seeds,
            )?;

-
            for result in results.iter() {
+
            for result in results
+
                .iter()
+
                .take(results.capacity().unwrap_or(seeds.len()))
+
            {
                match result.result {
                    Ok(updated) => {
                        writeln!(writer, "ok: {} fetched from {}", &id, result.remote)?;
@@ -274,7 +273,7 @@ fn fetch<W: Write, H: Handle<Error = client::handle::Error>>(
mod tests {
    use std::io::prelude::*;
    use std::os::unix::net::UnixStream;
-
    use std::{net, thread};
+
    use std::thread;

    use super::*;
    use crate::identity::Id;
@@ -288,28 +287,26 @@ mod tests {
        let handle = test::handle::Handle::default();
        let socket = tmp.path().join("alice.sock");
        let projs = test::arbitrary::set::<Id>(1..3);
+
        let listener = UnixListener::bind(&socket).unwrap();

        thread::spawn({
-
            let socket = socket.clone();
            let handle = handle.clone();

-
            move || listen(socket, handle)
+
            move || listen(listener, handle)
        });

-
        let mut stream = loop {
-
            if let Ok(stream) = UnixStream::connect(&socket) {
-
                break stream;
-
            }
-
        };
        for proj in &projs {
-
            writeln!(&stream, "announce-refs {}", proj).unwrap();
+
            let mut buf = [0; 2];
+
            let mut stream = loop {
+
                if let Ok(stream) = UnixStream::connect(&socket) {
+
                    break stream;
+
                }
+
            };
+
            writeln!(&stream, "announce-refs {proj}").unwrap();
+
            stream.read_exact(&mut buf).unwrap();
+
            assert_eq!(&buf, &[b'o', b'k']);
        }

-
        let mut buf = [0; 2];
-
        stream.shutdown(net::Shutdown::Write).unwrap();
-
        stream.read_exact(&mut buf).unwrap();
-

-
        assert_eq!(&buf, &[b'o', b'k']);
        for proj in &projs {
            assert!(handle.updates.lock().unwrap().contains(proj));
        }
@@ -321,19 +318,17 @@ mod tests {
        let socket = tmp.path().join("node.sock");
        let proj = test::arbitrary::gen::<Id>(1);
        let peer = test::arbitrary::gen::<NodeId>(1);
+
        let listener = UnixListener::bind(&socket).unwrap();
+
        let mut handle = Node::new(&socket);

        thread::spawn({
-
            let socket = socket.clone();
            let handle = crate::test::handle::Handle::default();

-
            move || crate::control::listen(socket, handle)
+
            move || crate::control::listen(listener, handle)
        });

-
        let mut handle = loop {
-
            if let Ok(conn) = Node::connect(&socket) {
-
                break conn;
-
            }
-
        };
+
        // Wait for node to be online.
+
        while !handle.is_running() {}

        assert!(handle.track_repo(proj).unwrap());
        assert!(!handle.track_repo(proj).unwrap());
modified radicle-node/src/service.rs
@@ -405,7 +405,10 @@ where
                }

                let seeds = match self.routing.get(&id) {
-
                    Ok(seeds) => seeds,
+
                    Ok(seeds) => seeds
+
                        .into_iter()
+
                        .filter(|node| *node != self.node_id())
+
                        .collect(),
                    Err(err) => {
                        log::error!("Error reading routing table for {id}: {err}");
                        resp.send(FetchLookup::NotFound).ok();
@@ -413,13 +416,14 @@ where
                        return;
                    }
                };
-
                let Some(seeds) = NonEmpty::from_vec(seeds.into_iter().collect()) else {
-
                    log::warn!("No seeds found for {}", id);
+

+
                let Some(seeds) = NonEmpty::from_vec(seeds) else {
+
                    log::warn!("No seeds found to fetch from, for {}", id);
                    resp.send(FetchLookup::NotFound).ok();

                    return;
                };
-
                log::debug!("Found {} seed(s) for {}", seeds.len(), id);
+
                log::debug!("Found {} seed(s) to fetch from, for {}", seeds.len(), id);

                let (results_send, results) = chan::bounded(seeds.len());
                resp.send(FetchLookup::Found {
@@ -432,7 +436,11 @@ where

                // TODO: Limit the number of seeds we fetch from? Randomize?
                for seed in seeds {
-
                    self.fetch(id, &seed);
+
                    if let Some(session) = self.sessions.get_mut(&seed) {
+
                        Self::fetch(id, session, &mut self.reactor);
+
                    } else {
+
                        // TODO: Establish connection?
+
                    }
                }
            }
            Command::TrackRepo(id, resp) => {
@@ -472,15 +480,13 @@ where
        }
    }

-
    pub fn fetch(&mut self, rid: Id, seed: &NodeId) {
-
        let Some(session) = self.sessions.get_mut(seed) else {
-
            panic!("Service::fetch: attempted to fetch from unknown peer {seed}");
-
        };
+
    pub fn fetch(rid: Id, session: &mut Session, reactor: &mut Reactor) {
+
        let seed = session.id;

        if let Some(fetch) = session.fetch(rid) {
            debug!("Fetch initiated for {rid} with {seed}..");

-
            self.reactor.write(session.id, fetch);
+
            reactor.write(session.id, fetch);
        } else {
            // TODO: If we can't fetch, it's because we're already fetching from
            // this peer. So we need to queue the request, or find another peer.
@@ -709,7 +715,10 @@ where
                    // Refs are only supposed to be relayed by peers who are tracking
                    // the resource. Therefore, it's safe to fetch from the remote
                    // peer, even though it isn't the announcer.
-
                    self.fetch(message.id, relayer);
+
                    let Some(session) = self.sessions.get_mut(relayer) else {
+
                        panic!(); // TODO
+
                    };
+
                    Self::fetch(message.id, session, &mut self.reactor);

                    return Ok(true);
                } else {
@@ -1140,7 +1149,7 @@ pub enum DisconnectReason {
    /// Error with an underlying established connection. Sometimes, reconnecting
    /// after such an error is possible.
    Connection(Arc<dyn std::error::Error + Sync + Send>),
-
    // Session error.
+
    /// Session error.
    Session(session::Error),
}

modified radicle-node/src/test/handle.rs
@@ -20,6 +20,10 @@ impl radicle::node::Handle for Handle {
    type Error = Error;
    type Sessions = service::Sessions;

+
    fn is_running(&self) -> bool {
+
        true
+
    }
+

    fn connect(&mut self, _node: NodeId, _addr: radicle::node::Address) -> Result<(), Error> {
        unimplemented!();
    }
modified radicle-node/src/wire/protocol.rs
@@ -522,7 +522,19 @@ where
                log::error!(target: "wire", "Received error: peer {} disconnected: {}", id, err);
                self.actions.push_back(Action::UnregisterTransport(*id));
            }
+
            // TODO: Why is the error an `i16`?
            reactor::Error::TransportDisconnect(id, _, err) => {
+
                if let Some(remote) = self.peers.get(id) {
+
                    if let Some(id) = remote.id() {
+
                        self.service.disconnected(
+
                            *id,
+
                            &DisconnectReason::Connection(Arc::new(io::Error::from(
+
                                io::ErrorKind::ConnectionReset,
+
                            ))),
+
                        );
+
                    }
+
                }
+
                // TODO: Notify service.
                log::error!(target: "wire", "Received error: peer {} disconnected: {}", id, err);
            }
            reactor::Error::WriteFailure(id, err) => {
modified radicle-node/src/worker.rs
@@ -90,6 +90,9 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            let result = self.fetch(fetch, &mut tunnel);
            let session = tunnel.into_session();

+
            if let Err(err) = &result {
+
                log::error!(target: "worker", "Fetch error: {err}");
+
            }
            (session, result)
        } else {
            log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.repo);
@@ -106,6 +109,9 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            let result = self.upload_pack(fetch, drain, &mut stream_r, &mut stream_w);
            let session = WireSession::from_split_io(stream_r, stream_w);

+
            if let Err(err) = &result {
+
                log::error!(target: "worker", "Upload-pack error: {err}");
+
            }
            (session, result)
        }
    }
@@ -141,8 +147,6 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        let status = child.wait()?;

        // TODO: Parse fetch output to return updates.
-
        log::debug!(target: "worker", "Fetch for {} exited with status {:?}", fetch.repo, status.code());
-

        if let Some(status) = status.code() {
            log::debug!(target: "worker", "Fetch for {} exited with status {:?}", fetch.repo, status);
        } else {
@@ -210,6 +214,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        thread::scope(|scope| {
            // Data coming from the remote peer is written to the standard input of the
            // `upload-pack` process.
+
            // FIXME: This sometimes returns a `WouldBlock`.
            let t = scope.spawn(move || io::copy(&mut reader, &mut stdin));
            // Output of `upload-pack` is sent back to the remote peer.
            io::copy(&mut stdout, stream_w)?;
modified radicle-remote-helper/src/lib.rs
@@ -11,6 +11,8 @@ use radicle::storage::{ReadRepository, WriteRepository, WriteStorage};

/// The service invoked by git on the remote repository, during a push.
const GIT_RECEIVE_PACK: &str = "git-receive-pack";
+
/// The service invoked by git on the remote repository, during a fetch.
+
const GIT_UPLOAD_PACK: &str = "git-upload-pack";

#[derive(Debug, Error)]
pub enum Error {
@@ -82,7 +84,7 @@ pub fn run(profile: radicle::Profile) -> Result<(), Box<dyn std::error::Error +
                //    won't match the remote we're pushing to.
                let signer = if *service == GIT_RECEIVE_PACK {
                    if profile.public_key != namespace {
-
                        return Err(Error::KeyMismatch(namespace).into());
+
                        return Err(Error::KeyMismatch(profile.public_key).into());
                    }
                    let signer = profile.signer()?;

@@ -90,6 +92,10 @@ pub fn run(profile: radicle::Profile) -> Result<(), Box<dyn std::error::Error +
                } else {
                    None
                };
+

+
                if *service == GIT_UPLOAD_PACK {
+
                    // TODO: Fetch from network.
+
                }
                println!(); // Empty line signifies connection is established.

                let mut child = process::Command::new(service)
@@ -101,15 +107,16 @@ pub fn run(profile: radicle::Profile) -> Result<(), Box<dyn std::error::Error +
                    .stdin(process::Stdio::inherit())
                    .spawn()?;

-
                if child.wait()?.success() {
+
                if child.wait()?.success() && *service == GIT_RECEIVE_PACK {
                    if let Some(signer) = signer {
                        proj.sign_refs(&signer)?;
                        proj.set_head()?;
                        // Connect to local node and announce refs to the network.
                        // If our node is not running, we simply skip this step, as the
                        // refs will be announced eventually, when the node restarts.
-
                        if let Ok(mut conn) = radicle::node::connect(profile.socket()) {
-
                            conn.announce_refs(url.repo)?;
+
                        let mut node = radicle::Node::new(profile.socket());
+
                        if node.is_running() {
+
                            node.announce_refs(url.repo)?;
                        }
                    }
                }
modified radicle-tools/src/rad-clone.rs
@@ -10,7 +10,7 @@ fn main() -> anyhow::Result<()> {

    if let Some(id) = env::args().nth(1) {
        let id = Id::from_urn(&id)?;
-
        let mut node = radicle::node::connect(profile.socket())?;
+
        let mut node = radicle::Node::new(profile.socket());
        let repo = radicle::rad::clone(id, &cwd, &signer, &profile.storage, &mut node)?;

        println!(
modified radicle-tools/src/rad-push.rs
@@ -16,7 +16,7 @@ fn main() -> anyhow::Result<()> {
    let sigrefs = project.sign_refs(&signer)?;
    let head = project.set_head()?;

-
    radicle::node::connect(profile.socket())?.announce_refs(id)?;
+
    radicle::Node::new(profile.socket()).announce_refs(id)?;

    println!("head: {}", head);
    println!("ok: {}", sigrefs.signature);
modified radicle/src/git.rs
@@ -300,7 +300,7 @@ pub fn configure_remote<'r>(
    url: &Url,
) -> Result<git2::Remote<'r>, git2::Error> {
    let fetch = format!("+refs/heads/*:refs/remotes/{name}/*");
-
    let remote = repo.remote_with_fetch(name, dbg!(url.to_string().as_str()), &fetch)?;
+
    let remote = repo.remote_with_fetch(name, url.to_string().as_str(), &fetch)?;

    Ok(remote)
}
modified radicle/src/lib.rs
@@ -22,6 +22,7 @@ pub mod storage;
#[cfg(any(test, feature = "test"))]
pub mod test;

+
pub use node::Node;
pub use profile::Profile;
pub use storage::git::Storage;

modified radicle/src/node.rs
@@ -4,7 +4,7 @@ use amplify::WrapperMut;
use std::io::{BufRead, BufReader, Write};
use std::ops::Deref;
use std::os::unix::net::UnixStream;
-
use std::path::Path;
+
use std::path::{Path, PathBuf};
use std::{io, net};

use crossbeam_channel as chan;
@@ -122,6 +122,8 @@ pub trait Handle {
    /// The error returned by all methods.
    type Error: std::error::Error;

+
    /// Check if the node is running. to a peer.
+
    fn is_running(&self) -> bool;
    /// Connect to a peer.
    fn connect(&mut self, node: NodeId, addr: Address) -> Result<(), Self::Error>;
    /// Retrieve or update the project from network.
@@ -153,15 +155,15 @@ pub type NodeId = PublicKey;
/// Node controller.
#[derive(Debug)]
pub struct Node {
-
    stream: UnixStream,
+
    socket: PathBuf,
}

impl Node {
    /// Connect to the node, via the socket at the given path.
-
    pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
-
        let stream = UnixStream::connect(path).map_err(Error::Connect)?;
-

-
        Ok(Self { stream })
+
    pub fn new<P: AsRef<Path>>(path: P) -> Self {
+
        Self {
+
            socket: path.as_ref().to_path_buf(),
+
        }
    }

    /// Call a command on the node.
@@ -169,15 +171,20 @@ impl Node {
        &self,
        cmd: &str,
        args: &[A],
-
    ) -> Result<impl Iterator<Item = Result<String, io::Error>> + '_, io::Error> {
+
    ) -> Result<impl Iterator<Item = Result<String, io::Error>>, io::Error> {
+
        let stream = UnixStream::connect(&self.socket)?;
        let args = args
            .iter()
            .map(ToString::to_string)
            .collect::<Vec<_>>()
            .join(" ");
-
        writeln!(&self.stream, "{cmd} {args}")?;

-
        Ok(BufReader::new(&self.stream).lines())
+
        if args.is_empty() {
+
            writeln!(&stream, "{cmd}")?;
+
        } else {
+
            writeln!(&stream, "{cmd} {args}")?;
+
        }
+
        Ok(BufReader::new(stream).lines())
    }
}

@@ -185,6 +192,16 @@ impl Handle for Node {
    type Sessions = ();
    type Error = Error;

+
    fn is_running(&self) -> bool {
+
        let Ok(mut lines) = self.call::<&str>("status", &[]) else {
+
            return false;
+
        };
+
        let Some(Ok(line)) = lines.next() else {
+
            return false;
+
        };
+
        line == RESPONSE_OK
+
    }
+

    fn connect(&mut self, _node: NodeId, _addr: Address) -> Result<(), Error> {
        todo!()
    }
@@ -194,7 +211,8 @@ impl Handle for Node {
            let line = line?;
            log::debug!("node: {}", line);
        }
-
        todo!()
+
        // TODO: Return parsed lookup results.
+
        Ok(FetchLookup::NotFound)
    }

    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error> {
@@ -298,8 +316,3 @@ impl Handle for Node {
        todo!();
    }
}
-

-
/// Connect to the local node.
-
pub fn connect<P: AsRef<Path>>(path: P) -> Result<Node, Error> {
-
    Node::connect(path)
-
}
modified radicle/src/rad.rs
@@ -178,13 +178,13 @@ pub fn fork<G: Signer, S: storage::WriteStorage>(
    raw.reference(
        &canonical_branch.with_namespace(me.into()),
        *canonical_head,
-
        false,
+
        true,
        &format!("creating default branch for {me}"),
    )?;
    raw.reference(
        &git::refs::storage::id(me),
        canonical_id.into(),
-
        false,
+
        true,
        &format!("creating identity branch for {me}"),
    )?;
    repository.sign_refs(signer)?;
@@ -242,7 +242,10 @@ pub fn clone<P: AsRef<Path>, G: Signer, H: node::Handle>(
        }
    }

+
    log::debug!("Creating fork in local storage..");
    let _ = fork(proj, signer, storage)?;
+

+
    log::debug!("Creating checkout at {}..", path.as_ref().display());
    let working = checkout(proj, signer.public_key(), path, storage)?;

    Ok(working)