Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Communicate fetch results through control socket
Alexis Sellier committed 3 years ago
commit 45e31a51920452c65ad5368a74425d0fe37e51f1
parent af3ea6a11a99543046a5a4396d48743cf8c33cac
14 files changed +285 -304
modified radicle-cli/src/commands/clone.rs
@@ -10,7 +10,7 @@ use radicle::git::raw;
use radicle::identity::doc;
use radicle::identity::doc::{DocError, Id};
use radicle::node;
-
use radicle::node::{FetchLookup, Handle};
+
use radicle::node::{FetchResult, Handle as _, Node};
use radicle::prelude::*;
use radicle::rad;
use radicle::storage;
@@ -116,11 +116,9 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
}

#[derive(Error, Debug)]
-
pub enum CloneError<H: node::Handle> {
+
pub enum CloneError {
    #[error("node: {0}")]
    Node(#[from] node::Error),
-
    #[error("fetch: {0}")]
-
    Fetch(#[from] node::FetchError),
    #[error("fork: {0}")]
    Fork(#[from] rad::ForkError),
    #[error("storage: {0}")]
@@ -133,50 +131,45 @@ pub enum CloneError<H: node::Handle> {
    Payload(#[from] doc::PayloadError),
    #[error("project error: {0}")]
    Project(#[from] ProjectError),
-
    #[error("handle error: {0}")]
-
    Handle(H::Error),
}

-
pub fn clone<G: Signer, H: Handle>(
+
pub fn clone<G: Signer>(
    id: Id,
    signer: &G,
    storage: &Storage,
-
    node: &mut H,
-
) -> Result<(raw::Repository, Doc<Verified>, Project), CloneError<H>> {
+
    node: &mut Node,
+
) -> Result<(raw::Repository, Doc<Verified>, Project), CloneError> {
    let me = *signer.public_key();

-
    // Track & fetch project.
-
    if node.track_repo(id).map_err(CloneError::Handle)? {
+
    // Track.
+
    if node.track_repo(id)? {
        term::success!(
            "Tracking relationship restablished for {}",
            term::format::tertiary(id)
        );
    }

-
    let spinner = term::spinner(format!("Fetching {}..", term::format::tertiary(id)));
-
    match node.fetch(id).map_err(CloneError::Handle)? {
-
        FetchLookup::Found { seeds, results } => {
-
            // TODO: If none of them succeeds, output an error. Otherwise tell the caller
-
            // how many succeeded.
-
            for result in results.iter().take(seeds.len()) {
-
                match &*result {
-
                    Ok(_updates) => {}
-
                    Err(_err) => {}
-
                }
+
    // Get seeds.
+
    let seeds = node.seeds(id)?;
+
    // Fetch from all seeds.
+
    for seed in seeds {
+
        let spinner = term::spinner(format!(
+
            "Fetching {} from {}..",
+
            term::format::tertiary(id),
+
            term::format::tertiary(term::format::node(&seed))
+
        ));
+

+
        // TODO: If none of them succeeds, output an error. Otherwise tell the caller
+
        // how many succeeded.
+
        match node.fetch(id, seed)? {
+
            FetchResult::Success { .. } => {
+
                spinner.finish();
+
            }
+
            FetchResult::Failed { reason } => {
+
                spinner.error(reason);
            }
-
        }
-
        FetchLookup::NotFound => {
-
            // TODO: Return error.
-
        }
-
        FetchLookup::NotTracking => {
-
            // SAFETY: Since we track it above, this shouldn't trigger unless there's a bug.
-
            panic!("clone: Repository is not tracked");
-
        }
-
        FetchLookup::Error(err) => {
-
            return Err(err.into());
        }
    }
-
    spinner.finish();

    // Create a local fork of the project, under our own id.
    {
modified radicle-cli/src/commands/track.rs
@@ -117,7 +117,8 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
    }

    if options.fetch {
-
        node.fetch(rid)?;
+
        // TODO: Run a proper fetch here.
+
        term::warning("fetch after track is not yet supported");
    }

    Ok(())
modified radicle-cli/src/terminal/spinner.rs
@@ -32,19 +32,19 @@ impl Spinner {
        self.set_failed();
    }

-
    pub fn error(self, err: anyhow::Error) -> anyhow::Error {
-
        self.progress.finish_and_clear();
-
        term::eprintln(style("!!").red().reverse(), style(&err).red());
+
    pub fn error(mut self, msg: impl ToString) {
+
        let msg = msg.to_string();

-
        err
+
        self.message = format!("{} ({})", self.message, msg);
+
        self.set_failed();
    }

    pub fn clear(self) {
        self.progress.finish_and_clear();
    }

-
    pub fn message(&mut self, msg: impl Into<String>) {
-
        let msg = msg.into();
+
    pub fn message(&mut self, msg: impl ToString) {
+
        let msg = msg.to_string();

        self.progress.set_message(msg.clone());
        self.message = msg;
modified radicle-node/src/control.rs
@@ -8,10 +8,12 @@ use std::path::PathBuf;
use std::{io, net};

use radicle::node::Handle;
+
use serde_json as json;

use crate::identity::Id;
use crate::node;
-
use crate::node::FetchLookup;
+
use crate::node::FetchResult;
+
use crate::node::NodeId;
use crate::runtime;

#[derive(thiserror::Error, Debug)]
@@ -23,7 +25,7 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<H: Handle<Error = runtime::HandleError>>(
+
pub fn listen<H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>>(
    listener: UnixListener,
    mut handle: H,
) -> Result<(), Error> {
@@ -34,8 +36,8 @@ pub fn listen<H: Handle<Error = runtime::HandleError>>(
            Ok(mut stream) => {
                log::debug!(target: "control", "Accepted new client on control socket..");

-
                if let Err(e) = drain(&stream, &mut handle) {
-
                    if let DrainError::Shutdown = e {
+
                if let Err(e) = command(&stream, &mut handle) {
+
                    if let CommandError::Shutdown = e {
                        log::debug!(target: "control", "Shutdown requested..");
                        // Channel might already be disconnected if shutdown
                        // came from somewhere else. Ignore errors.
@@ -57,11 +59,13 @@ pub fn listen<H: Handle<Error = runtime::HandleError>>(
}

#[derive(thiserror::Error, Debug)]
-
enum DrainError {
+
enum CommandError {
    #[error("invalid command argument `{0}`, {1}")]
    InvalidCommandArg(String, Box<dyn std::error::Error>),
    #[error("unknown command `{0}`")]
    UnknownCommand(String),
+
    #[error("serialization failed: {0}")]
+
    Serialization(#[from] json::Error),
    #[error("runtime error: {0}")]
    Runtime(#[from] runtime::HandleError),
    #[error("i/o error: {0}")]
@@ -70,10 +74,10 @@ enum DrainError {
    Shutdown,
}

-
fn drain<H: Handle<Error = runtime::HandleError>>(
+
fn command<H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>>(
    stream: &UnixStream,
    handle: &mut H,
-
) -> Result<(), DrainError> {
+
) -> Result<(), CommandError> {
    let mut reader = BufReader::new(stream);
    let mut writer = LineWriter::new(stream);
    let mut line = String::new();
@@ -86,14 +90,18 @@ fn drain<H: Handle<Error = runtime::HandleError>>(

    // TODO: refactor to include helper
    match cmd.split_once(' ') {
-
        Some(("fetch", arg)) => match arg.parse() {
-
            Ok(id) => {
-
                fetch(id, LineWriter::new(stream), handle)?;
-
            }
-
            Err(err) => {
-
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
+
        Some(("fetch", args)) => {
+
            if let Some((rid, node)) = args.split_once(' ') {
+
                let rid: Id = rid
+
                    .parse()
+
                    .map_err(|e| CommandError::InvalidCommandArg(rid.to_owned(), Box::new(e)))?;
+
                let node: NodeId = node
+
                    .parse()
+
                    .map_err(|e| CommandError::InvalidCommandArg(node.to_owned(), Box::new(e)))?;
+

+
                fetch(rid, node, LineWriter::new(stream), handle)?;
            }
-
        },
+
        }
        Some(("track-repo", arg)) => match arg.parse() {
            Ok(id) => match handle.track_repo(id) {
                Ok(updated) => {
@@ -104,11 +112,14 @@ fn drain<H: Handle<Error = runtime::HandleError>>(
                    }
                }
                Err(e) => {
-
                    return Err(DrainError::Runtime(e));
+
                    return Err(CommandError::Runtime(e));
                }
            },
            Err(err) => {
-
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
+
                return Err(CommandError::InvalidCommandArg(
+
                    arg.to_owned(),
+
                    Box::new(err),
+
                ));
            }
        },
        Some(("untrack-repo", arg)) => match arg.parse() {
@@ -121,11 +132,14 @@ fn drain<H: Handle<Error = runtime::HandleError>>(
                    }
                }
                Err(e) => {
-
                    return Err(DrainError::Runtime(e));
+
                    return Err(CommandError::Runtime(e));
                }
            },
            Err(err) => {
-
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
+
                return Err(CommandError::InvalidCommandArg(
+
                    arg.to_owned(),
+
                    Box::new(err),
+
                ));
            }
        },
        Some(("track-node", args)) => {
@@ -144,11 +158,11 @@ fn drain<H: Handle<Error = runtime::HandleError>>(
                        }
                    }
                    Err(e) => {
-
                        return Err(DrainError::Runtime(e));
+
                        return Err(CommandError::Runtime(e));
                    }
                },
                Err(err) => {
-
                    return Err(DrainError::InvalidCommandArg(
+
                    return Err(CommandError::InvalidCommandArg(
                        args.to_owned(),
                        Box::new(err),
                    ));
@@ -165,30 +179,35 @@ fn drain<H: Handle<Error = runtime::HandleError>>(
                    }
                }
                Err(e) => {
-
                    return Err(DrainError::Runtime(e));
+
                    return Err(CommandError::Runtime(e));
                }
            },
            Err(err) => {
-
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
+
                return Err(CommandError::InvalidCommandArg(
+
                    arg.to_owned(),
+
                    Box::new(err),
+
                ));
            }
        },
        Some(("announce-refs", arg)) => match arg.parse() {
            Ok(id) => {
                if let Err(e) = handle.announce_refs(id) {
-
                    return Err(DrainError::Runtime(e));
+
                    return Err(CommandError::Runtime(e));
                }
                writeln!(writer, "{}", node::RESPONSE_OK)?;
            }
            Err(err) => {
-
                return Err(DrainError::InvalidCommandArg(arg.to_owned(), Box::new(err)));
+
                return Err(CommandError::InvalidCommandArg(
+
                    arg.to_owned(),
+
                    Box::new(err),
+
                ));
            }
        },
-
        Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),
+
        Some((cmd, _)) => return Err(CommandError::UnknownCommand(cmd.to_owned())),

        // Commands with no arguments.
        None => match cmd {
            "status" => {
-
                println!("RECEIVED 'status'");
                writeln!(writer, "{}", node::RESPONSE_OK).ok();
            }
            "routing" => match handle.routing() {
@@ -197,7 +216,7 @@ fn drain<H: Handle<Error = runtime::HandleError>>(
                        writeln!(writer, "{id} {seed}",)?;
                    }
                }
-
                Err(e) => return Err(DrainError::Runtime(e)),
+
                Err(e) => return Err(CommandError::Runtime(e)),
            },
            "inventory" => match handle.inventory() {
                Ok(c) => {
@@ -205,68 +224,31 @@ fn drain<H: Handle<Error = runtime::HandleError>>(
                        writeln!(writer, "{id}")?;
                    }
                }
-
                Err(e) => return Err(DrainError::Runtime(e)),
+
                Err(e) => return Err(CommandError::Runtime(e)),
            },
            "shutdown" => {
-
                return Err(DrainError::Shutdown);
+
                return Err(CommandError::Shutdown);
            }
            _ => {
-
                return Err(DrainError::UnknownCommand(line));
+
                return Err(CommandError::UnknownCommand(line));
            }
        },
    }
    Ok(())
}

-
fn fetch<W: Write, H: Handle<Error = runtime::HandleError>>(
+
fn fetch<W: Write, H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>>(
    id: Id,
+
    node: NodeId,
    mut writer: W,
    handle: &mut H,
-
) -> Result<(), DrainError> {
-
    match handle.fetch(id) {
-
        Err(e) => {
-
            return Err(DrainError::Runtime(e));
+
) -> Result<(), CommandError> {
+
    match handle.fetch(id, node) {
+
        Ok(result) => {
+
            json::to_writer(&mut writer, &result)?;
        }
-
        Ok(FetchLookup::Found { seeds, results }) => {
-
            let seeds = Vec::from(seeds);
-

-
            writeln!(
-
                writer,
-
                "ok: found {} seeds for {} ({:?})", // TODO: Better output
-
                seeds.len(),
-
                &id,
-
                &seeds,
-
            )?;
-

-
            for result in results
-
                .iter()
-
                .take(results.capacity().unwrap_or(seeds.len()))
-
            {
-
                match result.result {
-
                    Ok(updated) => {
-
                        writeln!(writer, "ok: {id} fetched from {}", result.remote)?;
-
                        for update in updated {
-
                            writeln!(writer, "{update}")?;
-
                        }
-
                    }
-
                    Err(err) => {
-
                        writeln!(
-
                            writer,
-
                            "error: {id} failed to fetch from {}: {err}",
-
                            result.remote
-
                        )?;
-
                    }
-
                }
-
            }
-
        }
-
        Ok(FetchLookup::NotFound) => {
-
            writeln!(writer, "error: {id} was not found")?;
-
        }
-
        Ok(FetchLookup::NotTracking) => {
-
            writeln!(writer, "error: {id} is not tracked")?;
-
        }
-
        Ok(FetchLookup::Error(err)) => {
-
            writeln!(writer, "error: {err}")?;
+
        Err(e) => {
+
            return Err(CommandError::Runtime(e));
        }
    }
    Ok(())
modified radicle-node/src/runtime/handle.rs
@@ -10,7 +10,7 @@ use thiserror::Error;

use crate::crypto::Signer;
use crate::identity::Id;
-
use crate::node::FetchLookup;
+
use crate::node::FetchResult;
use crate::profile::Home;
use crate::service;
use crate::service::{CommandError, QueryState};
@@ -107,6 +107,7 @@ impl<G: Signer + EcSign + 'static> Handle<G> {
impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
    type Sessions = Sessions;
    type Error = Error;
+
    type FetchResult = FetchResult;

    fn is_running(&self) -> bool {
        true
@@ -118,9 +119,15 @@ impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
        Ok(())
    }

-
    fn fetch(&mut self, id: Id) -> Result<FetchLookup, Error> {
+
    fn seeds(&mut self, id: Id) -> Result<Vec<NodeId>, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Fetch(id, sender))?;
+
        self.command(service::Command::Seeds(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    fn fetch(&mut self, id: Id, from: NodeId) -> Result<FetchResult, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::Fetch(id, from, sender))?;
        receiver.recv().map_err(Error::from)
    }

modified radicle-node/src/service.rs
@@ -18,7 +18,6 @@ use crossbeam_channel as chan;
use fastrand::Rng;
use localtime::{LocalDuration, LocalTime};
use log::*;
-
use nonempty::NonEmpty;

use crate::address;
use crate::address::AddressBook;
@@ -27,7 +26,7 @@ use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::identity::{Doc, Id};
use crate::node;
-
use crate::node::{Address, Features, FetchError, FetchLookup, FetchResult};
+
use crate::node::{Address, Features};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
@@ -35,6 +34,8 @@ use crate::service::session::Protocol;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteStorage};
use crate::storage::{Namespaces, ReadStorage};
+
use crate::worker;
+
use crate::worker::FetchError;
use crate::Link;

pub use crate::node::NodeId;
@@ -105,8 +106,10 @@ pub enum Command {
    AnnounceRefs(Id),
    /// Connect to node with the given address.
    Connect(NodeId, Address),
+
    /// Lookup seeds for the given repository in the routing table.
+
    Seeds(Id, chan::Sender<Vec<NodeId>>),
    /// Fetch the given repository from the network.
-
    Fetch(Id, chan::Sender<FetchLookup>),
+
    Fetch(Id, NodeId, chan::Sender<node::FetchResult>),
    /// Track the given repository.
    TrackRepo(Id, chan::Sender<bool>),
    /// Untrack the given repository.
@@ -124,7 +127,8 @@ impl fmt::Debug for Command {
        match self {
            Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({id})"),
            Self::Connect(id, addr) => write!(f, "Connect({id}, {addr})"),
-
            Self::Fetch(id, _) => write!(f, "Fetch({id})"),
+
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
+
            Self::Fetch(id, node, _) => write!(f, "Fetch({id}, {node})"),
            Self::TrackRepo(id, _) => write!(f, "TrackRepo({id})"),
            Self::UntrackRepo(id, _) => write!(f, "UntrackRepo({id})"),
            Self::TrackNode(id, _, _) => write!(f, "TrackNode({id})"),
@@ -172,7 +176,7 @@ pub struct Service<R, A, S, G> {
    /// Whether our local inventory no long represents what we have announced to the network.
    out_of_sync: bool,
    /// Fetch requests initiated by user, which are waiting for results.
-
    fetch_reqs: HashMap<Id, chan::Sender<FetchResult>>,
+
    fetch_reqs: HashMap<Id, chan::Sender<node::FetchResult>>,
    /// Current tracked repository bloom filter.
    filter: Filter,
    /// Last time the service was idle.
@@ -394,16 +398,7 @@ where

        match cmd {
            Command::Connect(id, addr) => self.reactor.connect(id, addr),
-
            Command::Fetch(rid, resp) => {
-
                if !self
-
                    .tracking
-
                    .is_repo_tracked(&rid)
-
                    .expect("Service::command: error accessing tracking configuration")
-
                {
-
                    resp.send(FetchLookup::NotTracking).ok();
-
                    return;
-
                }
-

+
            Command::Seeds(rid, resp) => {
                let (connected, unconnected) = match self.routing.get(&rid) {
                    Ok(seeds) => seeds
                        .into_iter()
@@ -411,47 +406,30 @@ where
                        .partition::<Vec<_>, _>(|node| self.sessions.is_negotiated(node)),
                    Err(err) => {
                        error!(target: "service", "Error reading routing table for {rid}: {err}");
-
                        resp.send(FetchLookup::NotFound).ok();
+
                        drop(resp);

                        return;
                    }
                };
-

                debug!(
                    target: "service",
                    "Found {} connected seed(s) and {} unconnected seed(s) for {}",
                    connected.len(), unconnected.len(), rid
                );
-

-
                let Some(seeds) = NonEmpty::from_vec(connected) else {
-
                    warn!(target: "service", "No connected seeds found for {}", rid);
-
                    resp.send(FetchLookup::NotFound).ok();
-

-
                    // TODO: Establish connections to unconnected seeds, and retry.
-
                    // TODO: Fetch requests should be queued and re-checked to see if they can
-
                    //       be fulfilled everytime a new node connects.
-
                    return;
-
                };
-

-
                let (results_send, results) = chan::bounded(seeds.len());
-
                resp.send(FetchLookup::Found {
-
                    seeds: seeds.clone(),
-
                    results,
-
                })
-
                .ok();
-

-
                self.fetch_reqs.insert(rid, results_send);
-

-
                // TODO: Limit the number of seeds we fetch from? Randomize?
-
                for seed in seeds {
-
                    self.fetch(rid, &seed);
-
                }
+
                resp.send(connected).ok();
+
            }
+
            Command::Fetch(rid, seed, resp) => {
+
                // TODO: Establish connections to unconnected seeds, and retry.
+
                // TODO: Fetch requests should be queued and re-checked to see if they can
+
                //       be fulfilled everytime a new node connects.
+
                self.fetch_reqs.insert(rid, resp);
+
                self.fetch(rid, &seed);
            }
            Command::TrackRepo(id, resp) => {
                let tracked = self
                    .track_repo(&id, tracking::Scope::All)
                    .expect("Service::command: error tracking repository");
-
                // TODO: Try to fetch project if we weren't tracking it.
+
                // TODO: Try to fetch project if we weren't tracking it before.
                resp.send(tracked).ok();
            }
            Command::UntrackRepo(id, resp) => {
@@ -508,11 +486,10 @@ where
        }
    }

-
    pub fn fetched(&mut self, result: FetchResult) {
-
        let remote = result.remote;
-
        let rid = result.rid;
-
        let namespaces = result.namespaces;
-
        let initiated = result.initiated;
+
    pub fn fetched(&mut self, result: worker::FetchResult) {
+
        let remote = result.fetch.remote;
+
        let rid = result.fetch.rid;
+
        let initiated = result.fetch.initiated;

        if initiated {
            log::debug!(
@@ -533,7 +510,7 @@ where

                    if let FetchError::Io(_) = err {
                        self.reactor
-
                            .disconnect(result.remote, DisconnectReason::Fetch(err));
+
                            .disconnect(remote, DisconnectReason::Fetch(err));
                        return;
                    } else {
                        Err(err)
@@ -544,16 +521,7 @@ where
            if let Some(results) = self.fetch_reqs.get(&rid) {
                log::debug!(target: "service", "Found existing fetch request, sending result..");

-
                if results
-
                    .send(FetchResult {
-
                        rid,
-
                        initiated,
-
                        remote,
-
                        namespaces,
-
                        result,
-
                    })
-
                    .is_err()
-
                {
+
                if results.send(node::FetchResult::from(result)).is_err() {
                    log::error!(target: "service", "Error sending fetch result for {rid}..");
                    self.fetch_reqs.remove(&rid);
                } else {
modified radicle-node/src/service/reactor.rs
@@ -29,7 +29,7 @@ pub enum Io {
#[derive(Debug, Clone)]
pub struct Fetch {
    /// Repo to fetch.
-
    pub repo: Id,
+
    pub rid: Id,
    /// Namespaces to fetch.
    pub namespaces: Namespaces,
    /// Remote peer we are interacting with.
@@ -87,9 +87,9 @@ impl Reactor {
        self.io.push_back(Io::Wakeup(after));
    }

-
    pub fn fetch(&mut self, remote: NodeId, repo: Id, namespaces: Namespaces, initiated: bool) {
+
    pub fn fetch(&mut self, remote: NodeId, rid: Id, namespaces: Namespaces, initiated: bool) {
        self.io.push_back(Io::Fetch(Fetch {
-
            repo,
+
            rid,
            namespaces,
            remote,
            initiated,
modified radicle-node/src/test/handle.rs
@@ -4,10 +4,11 @@ use std::sync::{Arc, Mutex};
use crossbeam_channel as chan;

use crate::identity::Id;
-
use crate::node::FetchLookup;
+
use crate::node::FetchResult;
use crate::runtime::HandleError;
use crate::service;
use crate::service::NodeId;
+
use crate::storage::RefUpdate;

#[derive(Default, Clone)]
pub struct Handle {
@@ -19,6 +20,7 @@ pub struct Handle {
impl radicle::node::Handle for Handle {
    type Error = HandleError;
    type Sessions = service::Sessions;
+
    type FetchResult = FetchResult;

    fn is_running(&self) -> bool {
        true
@@ -28,8 +30,12 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

-
    fn fetch(&mut self, _id: Id) -> Result<FetchLookup, Self::Error> {
-
        Ok(FetchLookup::NotFound)
+
    fn seeds(&mut self, _id: Id) -> Result<Vec<NodeId>, Self::Error> {
+
        unimplemented!();
+
    }
+

+
    fn fetch(&mut self, _id: Id, _from: NodeId) -> Result<FetchResult, Self::Error> {
+
        Ok(FetchResult::from(Ok::<Vec<RefUpdate>, Self::Error>(vec![])))
    }

    fn track_repo(&mut self, id: Id) -> Result<bool, Self::Error> {
modified radicle-node/src/test/simulator.rs
@@ -14,13 +14,13 @@ use log::*;

use crate::crypto::Signer;
use crate::git::raw as git;
-
use crate::node::{FetchError, FetchResult};
use crate::prelude::Address;
use crate::service::reactor::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::{Namespaces, RefUpdate};
use crate::storage::{WriteRepository, WriteStorage};
use crate::test::peer::Service;
+
use crate::worker::{FetchError, FetchResult};
use crate::Link;

/// Minimum latency between peers.
@@ -112,7 +112,7 @@ impl fmt::Display for Scheduled {
                write!(
                    f,
                    "{} <~ {} ({}): FetchCompleted",
-
                    self.node, result.remote, result.rid
+
                    self.node, result.fetch.remote, result.fetch.rid
                )
            }
        }
@@ -413,9 +413,14 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    }
                    Input::Fetched(result) => {
                        let result = Arc::try_unwrap(result).unwrap();
-
                        let mut repo = p.storage().repository(result.rid).unwrap();
-

-
                        fetch(&mut repo, &result.remote, result.namespaces.clone()).unwrap();
+
                        let mut repo = p.storage().repository(result.fetch.rid).unwrap();
+

+
                        fetch(
+
                            &mut repo,
+
                            &result.fetch.remote,
+
                            result.fetch.namespaces.clone(),
+
                        )
+
                        .unwrap();
                        p.fetched(result);
                    }
                }
@@ -617,10 +622,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            node,
                            remote: fetch.remote,
                            input: Input::Fetched(Arc::new(FetchResult {
-
                                rid: fetch.repo,
-
                                initiated: fetch.initiated,
-
                                remote: fetch.remote,
-
                                namespaces: fetch.namespaces,
+
                                fetch,
                                result: Err(FetchError::Io(io::ErrorKind::Other.into())),
                            })),
                        },
@@ -632,10 +634,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            node,
                            remote: fetch.remote,
                            input: Input::Fetched(Arc::new(FetchResult {
-
                                rid: fetch.repo,
-
                                initiated: fetch.initiated,
-
                                remote: fetch.remote,
-
                                namespaces: fetch.namespaces,
+
                                fetch,
                                result: Ok(vec![]),
                            })),
                        },
modified radicle-node/src/tests/e2e.rs
@@ -10,8 +10,7 @@ use radicle::crypto::test::signer::MockSigner;
use radicle::crypto::Signer;
use radicle::git::refname;
use radicle::identity::Id;
-
use radicle::node::FetchLookup;
-
use radicle::node::Handle as _;
+
use radicle::node::{FetchResult, Handle as _};
use radicle::profile::Home;
use radicle::storage::{ReadRepository, ReadStorage, WriteStorage};
use radicle::test::fixtures;
@@ -348,23 +347,21 @@ fn test_replication() {
    let tracked = alice.handle.track_repo(acme).unwrap();
    assert!(tracked);

-
    let (seeds, results) = match alice.handle.fetch(acme).unwrap() {
-
        FetchLookup::Found { seeds, results } => (seeds, results),
-
        other => panic!("Fetch lookup failed, got {other:?}"),
-
    };
-
    assert_eq!(seeds, nonempty::NonEmpty::new(bob.id));
+
    let seeds = alice.handle.seeds(acme).unwrap();
+
    assert!(seeds.contains(&bob.id));
+

+
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    assert!(result.is_success());

-
    let result = results.recv_timeout(Duration::from_secs(6)).unwrap();
-
    let updated = match result.result {
-
        Ok(updated) => updated,
-
        Err(err) => {
-
            panic!("Fetch failed from {}: {err}", result.remote);
+
    let updated = match result {
+
        FetchResult::Success { updated } => updated,
+
        FetchResult::Failed { reason } => {
+
            panic!("Fetch failed from {}: {reason}", bob.id);
        }
    };
-
    assert_eq!(result.remote, bob.id);
-
    assert_eq!(updated, vec![]);
+
    assert_eq!(*updated, vec![]);

-
    log::debug!(target: "test", "Fetch complete with {}", result.remote);
+
    log::debug!(target: "test", "Fetch complete with {}", bob.id);

    let inventory = alice.handle.inventory().unwrap();
    let alice_repo = alice.storage.repository(acme).unwrap();
@@ -404,13 +401,12 @@ fn test_clone() {
    transport::local::register(alice.storage.clone());

    let _ = alice.handle.track_repo(acme).unwrap();
-
    let lookup = alice.handle.fetch(acme).unwrap();
+
    let seeds = alice.handle.seeds(acme).unwrap();
+
    assert!(seeds.contains(&bob.id));
+

+
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    assert!(result.is_success());

-
    match lookup {
-
        // Drain the channel.
-
        FetchLookup::Found { seeds, results } => for _ in results.iter().take(seeds.len()) {},
-
        other => panic!("Unexpected fetch lookup: {other:?}"),
-
    }
    rad::fork(acme, &alice.signer, &alice.storage).unwrap();

    let working = rad::checkout(
@@ -455,15 +451,10 @@ fn test_fetch_up_to_date() {
    transport::local::register(alice.storage.clone());

    let _ = alice.handle.track_repo(acme).unwrap();
-

-
    match alice.handle.fetch(acme).unwrap() {
-
        FetchLookup::Found { seeds, results } => for _ in results.iter().take(seeds.len()) {},
-
        other => panic!("Unexpected fetch lookup: {other:?}"),
-
    }
+
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    assert!(result.is_success());

    // Fetch again! This time, everything's up to date.
-
    match alice.handle.fetch(acme).unwrap() {
-
        FetchLookup::Found { seeds, results } => for _ in results.iter().take(seeds.len()) {},
-
        other => panic!("Unexpected fetch lookup: {other:?}"),
-
    }
+
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    assert_eq!(result.success(), Some(vec![]));
}
modified radicle-node/src/wire/protocol.rs
@@ -156,7 +156,7 @@ impl Peer {
    fn upgraded(&mut self) -> Fetch {
        if let Self::Upgrading { fetch, id, link } = self {
            let fetch = fetch.clone();
-
            log::debug!(target: "wire", "Peer {id} upgraded for fetch {}", fetch.repo);
+
            log::debug!(target: "wire", "Peer {id} upgraded for fetch {}", fetch.rid);

            *self = Self::Upgraded {
                id: *id,
modified radicle-node/src/worker.rs
@@ -1,4 +1,5 @@
use std::io::{prelude::*, BufReader};
+
use std::ops::Deref;
use std::thread::JoinHandle;
use std::{env, io, net, process, thread, time};

@@ -13,9 +14,9 @@ use radicle::storage::{Namespaces, ReadRepository, RefUpdate, WriteRepository, W
use radicle::{git, Storage};
use reactor::poller::popol;

-
use crate::node::{FetchError, FetchResult};
use crate::runtime::Handle;
use crate::service::reactor::Fetch;
+
use crate::storage;
use crate::wire::{WireReader, WireSession, WireWriter};

/// Worker pool configuration.
@@ -34,6 +35,36 @@ pub struct Config {
    pub storage: Storage,
}

+
/// Result of a fetch request from a specific seed.
+
#[derive(Debug)]
+
pub struct FetchResult {
+
    pub fetch: Fetch,
+
    pub result: Result<Vec<RefUpdate>, FetchError>,
+
}
+

+
impl Deref for FetchResult {
+
    type Target = Result<Vec<RefUpdate>, FetchError>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.result
+
    }
+
}
+

+
/// Error returned by fetch.
+
#[derive(thiserror::Error, Debug)]
+
pub enum FetchError {
+
    #[error(transparent)]
+
    Git(#[from] git::raw::Error),
+
    #[error(transparent)]
+
    Storage(#[from] storage::Error),
+
    #[error(transparent)]
+
    Fetch(#[from] storage::FetchError),
+
    #[error(transparent)]
+
    Io(#[from] io::Error),
+
    #[error(transparent)]
+
    Project(#[from] storage::ProjectError),
+
}
+

/// Task to be accomplished on a worker thread.
/// This is either going to be an outgoing or incoming fetch.
pub struct Task<G: Signer + EcSign> {
@@ -77,13 +108,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        } = task;

        let (session, result) = self._process(&fetch, drain, session);
-
        let result = FetchResult {
-
            rid: fetch.repo,
-
            remote: fetch.remote,
-
            namespaces: fetch.namespaces,
-
            initiated: fetch.initiated,
-
            result,
-
        };
+
        let result = FetchResult { fetch, result };
        log::debug!(target: "worker", "Sending response back to service..");

        if self
@@ -102,7 +127,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        mut session: WireSession<G>,
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
        if fetch.initiated {
-
            log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.repo);
+
            log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.rid);

            let mut tunnel = match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
                Ok(tunnel) => tunnel,
@@ -122,7 +147,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            }
            (session, result)
        } else {
-
            log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.repo);
+
            log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.rid);

            if let Err(err) = session.as_connection_mut().set_nonblocking(false) {
                return (session, Err(err.into()));
@@ -148,7 +173,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        fetch: &Fetch,
        tunnel: &mut Tunnel<WireSession<G>>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        let repo = self.storage.repository(fetch.repo)?;
+
        let repo = self.storage.repository(fetch.rid)?;
        let tunnel_addr = tunnel.local_addr()?;
        let mut cmd = process::Command::new("git");
        cmd.current_dir(repo.path())
@@ -197,13 +222,13 @@ impl<G: Signer + EcSign + 'static> Worker<G> {

        // TODO: Parse fetch output to return updates.
        if child.wait()?.success() {
-
            log::debug!(target: "worker", "Fetch for {} exited successfully", fetch.repo);
+
            log::debug!(target: "worker", "Fetch for {} exited successfully", fetch.rid);
        } else {
-
            log::error!(target: "worker", "Fetch for {} failed", fetch.repo);
+
            log::error!(target: "worker", "Fetch for {} failed", fetch.rid);
        }
        let head = repo.set_head()?;

-
        log::debug!(target: "worker", "Head for {} set to {head}", fetch.repo);
+
        log::debug!(target: "worker", "Head for {} set to {head}", fetch.rid);

        Ok(vec![])
    }
@@ -228,9 +253,9 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            Ok((req, pktline)) => {
                log::debug!(
                    target: "worker",
-
                    "Parsed git command packet-line for {}: {:?}", fetch.repo, req
+
                    "Parsed git command packet-line for {}: {:?}", fetch.rid, req
                );
-
                if req.repo != fetch.repo {
+
                if req.repo != fetch.rid {
                    return Err(FetchError::Git(git::raw::Error::from_str(
                        "git pkt-line command does not match fetch request",
                    )));
@@ -254,17 +279,17 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
                if e.kind() == io::ErrorKind::UnexpectedEof {
                    break;
                }
-
                log::debug!(target: "worker", "Upload of {} to {} returned error: {e}", fetch.repo, fetch.remote);
+
                log::debug!(target: "worker", "Upload of {} to {} returned error: {e}", fetch.rid, fetch.remote);

                return Err(e.into());
            }
            if let Err(e) = stream_r.read_pktlines(&mut daemon_w, &mut buffer) {
-
                log::error!(target: "worker", "Remote returned error for {}: {e}", fetch.repo);
+
                log::error!(target: "worker", "Remote returned error for {}: {e}", fetch.rid);

                return Err(e.into());
            }
        }
-
        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.repo, fetch.remote);
+
        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.rid, fetch.remote);

        // When we aren't the one fetching, no refs are updated.
        Ok(vec![])
modified radicle/src/node.rs
@@ -1,21 +1,20 @@
mod features;

-
use amplify::WrapperMut;
use std::io::{BufRead, BufReader, Write};
-
use std::ops::Deref;
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
+
use std::str::FromStr;
use std::{io, net};

+
use amplify::WrapperMut;
use crossbeam_channel as chan;
use cyphernet::addr::{HostName, NetAddr};
-
use nonempty::NonEmpty;
+
use serde::{Deserialize, Serialize};
+
use serde_json as json;

use crate::crypto::PublicKey;
-
use crate::git;
use crate::identity::Id;
-
use crate::storage;
-
use crate::storage::{Namespaces, RefUpdate};
+
use crate::storage::RefUpdate;

pub use features::Features;

@@ -55,55 +54,35 @@ impl From<net::SocketAddr> for Address {
    }
}

-
/// Result of a fetch request from a specific seed.
-
#[derive(Debug)]
-
#[allow(clippy::large_enum_variant)]
-
pub struct FetchResult {
-
    pub rid: Id,
-
    pub remote: NodeId,
-
    pub initiated: bool,
-
    pub namespaces: Namespaces,
-
    pub result: Result<Vec<RefUpdate>, FetchError>,
+
#[derive(Debug, Serialize, Deserialize)]
+
#[serde(tag = "status", rename_all = "kebab-case")]
+
pub enum FetchResult {
+
    Success { updated: Vec<RefUpdate> },
+
    Failed { reason: String },
}

-
impl Deref for FetchResult {
-
    type Target = Result<Vec<RefUpdate>, FetchError>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.result
+
impl FetchResult {
+
    pub fn is_success(&self) -> bool {
+
        matches!(self, FetchResult::Success { .. })
    }
-
}

-
/// Error returned by fetch.
-
#[derive(thiserror::Error, Debug)]
-
pub enum FetchError {
-
    #[error(transparent)]
-
    Git(#[from] git::raw::Error),
-
    #[error(transparent)]
-
    Storage(#[from] storage::Error),
-
    #[error(transparent)]
-
    Fetch(#[from] storage::FetchError),
-
    #[error(transparent)]
-
    Io(#[from] io::Error),
-
    #[error(transparent)]
-
    Project(#[from] storage::ProjectError),
+
    pub fn success(self) -> Option<Vec<RefUpdate>> {
+
        match self {
+
            Self::Success { updated } => Some(updated),
+
            _ => None,
+
        }
+
    }
}

-
/// Result of looking up seeds in our routing table.
-
/// This object is sent back to the caller who initiated the fetch.
-
#[derive(Debug)]
-
pub enum FetchLookup {
-
    /// Found seeds for the given project.
-
    Found {
-
        seeds: NonEmpty<NodeId>,
-
        results: chan::Receiver<FetchResult>,
-
    },
-
    /// Can't fetch because no seeds were found for this project.
-
    NotFound,
-
    /// Can't fetch because the project isn't tracked.
-
    NotTracking,
-
    /// Error trying to find seeds.
-
    Error(FetchError),
+
impl<S: ToString> From<Result<Vec<RefUpdate>, S>> for FetchResult {
+
    fn from(value: Result<Vec<RefUpdate>, S>) -> Self {
+
        match value {
+
            Ok(updated) => Self::Success { updated },
+
            Err(err) => Self::Failed {
+
                reason: err.to_string(),
+
            },
+
        }
+
    }
}

#[derive(thiserror::Error, Debug)]
@@ -112,6 +91,12 @@ pub enum Error {
    Connect(#[from] io::Error),
    #[error("received invalid response for `{cmd}` command: '{response}'")]
    InvalidResponse { cmd: &'static str, response: String },
+
    #[error("received invalid json in response for `{cmd}` command: '{response}': {error}")]
+
    InvalidJson {
+
        cmd: &'static str,
+
        response: String,
+
        error: json::Error,
+
    },
    #[error("received empty response for `{cmd}` command")]
    EmptyResponse { cmd: &'static str },
}
@@ -122,13 +107,17 @@ pub trait Handle {
    type Sessions;
    /// The error returned by all methods.
    type Error: std::error::Error + Send + Sync + 'static;
+
    /// Result of a fetch.
+
    type FetchResult;

    /// Check if the node is running. to a peer.
    fn is_running(&self) -> bool;
    /// Connect to a peer.
    fn connect(&mut self, node: NodeId, addr: Address) -> Result<(), Self::Error>;
-
    /// Retrieve or update the project from network.
-
    fn fetch(&mut self, id: Id) -> Result<FetchLookup, Self::Error>;
+
    /// Lookup the seeds of a given repository in the routing table.
+
    fn seeds(&mut self, id: Id) -> Result<Vec<NodeId>, Self::Error>;
+
    /// Fetch a repository from the network.
+
    fn fetch(&mut self, id: Id, from: NodeId) -> Result<Self::FetchResult, Self::Error>;
    /// Start tracking the given project. Doesn't do anything if the project is already
    /// tracked.
    fn track_repo(&mut self, id: Id) -> Result<bool, Self::Error>;
@@ -192,6 +181,7 @@ impl Node {
impl Handle for Node {
    type Sessions = ();
    type Error = Error;
+
    type FetchResult = FetchResult;

    fn is_running(&self) -> bool {
        let Ok(mut lines) = self.call::<&str>("status", &[]) else {
@@ -207,13 +197,31 @@ impl Handle for Node {
        todo!()
    }

-
    fn fetch(&mut self, id: Id) -> Result<FetchLookup, Error> {
-
        for line in self.call("fetch", &[id.urn()])? {
-
            let line = line?;
-
            log::debug!("node: {}", line);
-
        }
-
        // TODO: Return parsed lookup results.
-
        Ok(FetchLookup::NotFound)
+
    fn seeds(&mut self, id: Id) -> Result<Vec<NodeId>, Error> {
+
        self.call("seeds", &[id.urn()])?
+
            .map(|line| {
+
                let line = line?;
+
                let node = NodeId::from_str(&line).map_err(|_| Error::InvalidResponse {
+
                    cmd: "seeds",
+
                    response: line,
+
                })?;
+
                Ok(node)
+
            })
+
            .collect()
+
    }
+

+
    fn fetch(&mut self, id: Id, from: NodeId) -> Result<Self::FetchResult, Error> {
+
        let result = self
+
            .call("fetch", &[id.urn(), from.to_human()])?
+
            .next()
+
            .ok_or(Error::EmptyResponse { cmd: "fetch" })??;
+
        let lookup = json::from_str(&result).map_err(|e| Error::InvalidJson {
+
            cmd: "fetch",
+
            response: result,
+
            error: e,
+
        })?;
+

+
        Ok(lookup)
    }

    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error> {
modified radicle/src/storage.rs
@@ -6,7 +6,7 @@ use std::ops::Deref;
use std::path::Path;
use std::{fmt, io};

-
use serde::Serialize;
+
use serde::{Deserialize, Serialize};
use thiserror::Error;

use crypto::{PublicKey, Signer, Unverified, Verified};
@@ -92,7 +92,8 @@ pub enum FetchError {
pub type RemoteId = PublicKey;

/// An update to a reference.
-
#[derive(Debug, Clone, PartialEq, Eq)]
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[serde(rename_all = "kebab-case")]
pub enum RefUpdate {
    Updated { name: RefString, old: Oid, new: Oid },
    Created { name: RefString, oid: Oid },