Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Return Errors for Service Commands
Merged fintohaps opened 2 months ago

This is a refactor of the service commands so that they learn how to respond with errors, which should allow users to receive errors in the rad CLI, rather than just timeouts.

5 files changed +350 -172 fa94638a 980ed561
modified CHANGELOG.md
@@ -27,6 +27,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
  This is fixed by using `winsplit` on Windows instead.
- On Windows, zombie `git-upload-pack` processes are now prevented by using the
  "Job" API of the operating system to group child processes and their children.
+
- Commands sent to the `Service` would never respond when it encountered errors.
+
  This would result in timeouts when commands are run from the `rad` CLI.
+
  The `Service` has now learned to return results when an error occurs which
+
  will be reported back to the user.

## Deprecations

modified crates/radicle-node/src/runtime/handle.rs
@@ -24,7 +24,7 @@ use crate::profile::Home;
use crate::reactor;
use crate::runtime::Emitter;
use crate::service;
-
use crate::service::{CommandError, QueryState};
+
use crate::service::QueryState;
use crate::storage::refs::RefsAt;
use crate::wire;
use crate::wire::StreamId;
@@ -38,7 +38,7 @@ pub enum Error {
    ChannelDisconnected,
    /// The command returned an error.
    #[error("command failed: {0}")]
-
    Command(#[from] CommandError),
+
    Command(#[from] service::command::Error),
    /// The operation timed out.
    #[error("the operation timed out")]
    Timeout,
@@ -203,25 +203,25 @@ impl radicle::node::Handle for Handle {
        id: RepoId,
        namespaces: impl IntoIterator<Item = PublicKey>,
    ) -> Result<Seeds, Self::Error> {
-
        let (sender, receiver) = chan::bounded(1);
+
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Seeds(
            id,
            HashSet::from_iter(namespaces),
-
            sender,
+
            responder,
        ))?;
-
        receiver.recv().map_err(Error::from)
+
        Ok(receiver.recv()??)
    }

    fn config(&self) -> Result<Config, Self::Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Config(sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::Config(responder))?;
+
        Ok(receiver.recv()??)
    }

    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::ListenAddrs(sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::ListenAddrs(responder))?;
+
        Ok(receiver.recv()??)
    }

    fn fetch(
@@ -230,33 +230,33 @@ impl radicle::node::Handle for Handle {
        from: NodeId,
        timeout: time::Duration,
    ) -> Result<FetchResult, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Fetch(id, from, timeout, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::Fetch(id, from, timeout, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Follow(id, alias, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::Follow(id, alias, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Unfollow(id, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::Unfollow(id, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Seed(id, scope, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::Seed(id, scope, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Unseed(id, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::Unseed(id, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn announce_refs_for(
@@ -264,13 +264,13 @@ impl radicle::node::Handle for Handle {
        id: RepoId,
        namespaces: impl IntoIterator<Item = PublicKey>,
    ) -> Result<RefsAt, Error> {
-
        let (sender, receiver) = chan::bounded(1);
+
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::AnnounceRefs(
            id,
            HashSet::from_iter(namespaces),
-
            sender,
+
            responder,
        ))?;
-
        receiver.recv().map_err(Error::from)
+
        Ok(receiver.recv()??)
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
@@ -279,9 +279,9 @@ impl radicle::node::Handle for Handle {
    }

    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::AddInventory(rid, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::oneshot();
+
        self.command(service::Command::AddInventory(rid, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
modified crates/radicle-node/src/tests.rs
@@ -8,7 +8,8 @@ use std::sync::Arc;
use std::sync::LazyLock;
use std::time;

-
use crossbeam_channel as chan;
+
use test_log::test;
+

use radicle::cob;
use radicle::identity::Visibility;
use radicle::node::address::Store as _;
@@ -414,15 +415,23 @@ fn test_seeding() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let proj_id: identity::RepoId = test::arbitrary::gen(1);

-
    let (sender, receiver) = chan::bounded(1);
-
    alice.command(Command::Seed(proj_id, policy::Scope::default(), sender));
-
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    let (cmd, receiver) = Command::seed(proj_id, policy::Scope::default());
+
    alice.command(cmd);
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(runtime::HandleError::from)
+
        .unwrap()
+
        .unwrap();
    assert!(policy_change);
    assert!(alice.policies().is_seeding(&proj_id).unwrap());

-
    let (sender, receiver) = chan::bounded(1);
-
    alice.command(Command::Unseed(proj_id, sender));
-
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    let (cmd, receiver) = Command::unseed(proj_id);
+
    alice.command(cmd);
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(runtime::HandleError::from)
+
        .unwrap()
+
        .unwrap();
    assert!(policy_change);
    assert!(!alice.policies().is_seeding(&proj_id).unwrap());
}
@@ -929,13 +938,13 @@ fn test_refs_announcement_followed() {
    );

    // Alice starts to track Bob.
-
    let (sender, receiver) = chan::bounded(1);
-
    alice.command(Command::Follow(
-
        bob.id,
-
        Some(node::Alias::new("bob")),
-
        sender,
-
    ));
-
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    let (cmd, receiver) = Command::follow(bob.id, Some(node::Alias::new("bob")));
+
    alice.command(cmd);
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(runtime::HandleError::from)
+
        .unwrap()
+
        .unwrap();
    assert!(policy_change);

    // Bob announces refs again.
@@ -1402,11 +1411,11 @@ fn test_seed_repo_subscribe() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let rid = arbitrary::gen::<RepoId>(1);
-
    let (send, recv) = chan::bounded(1);

    alice.connect_to(&bob);
-
    alice.command(Command::Seed(rid, policy::Scope::default(), send));
-
    assert!(recv.recv().unwrap());
+
    let (cmd, recv) = Command::seed(rid, policy::Scope::default());
+
    alice.command(cmd);
+
    assert!(recv.recv().unwrap().unwrap());

    assert_matches!(
        alice.messages(bob.id).next(),
@@ -1491,16 +1500,16 @@ fn test_queued_fetch_max_capacity() {
    alice.connect_to(&bob);

    // Send the first fetch.
-
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, bob.id, DEFAULT_TIMEOUT, send));
+
    let (cmd, _recv1) = Command::fetch(rid1, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 2nd fetch that will be queued.
-
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid2, bob.id, DEFAULT_TIMEOUT, send2));
+
    let (cmd, _recv2) = Command::fetch(rid2, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 3rd fetch that will be queued.
-
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid3, bob.id, DEFAULT_TIMEOUT, send3));
+
    let (cmd, _recv3) = Command::fetch(rid3, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // The first fetch is initiated.
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid1);
@@ -1514,14 +1523,14 @@ fn test_queued_fetch_max_capacity() {
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));

    // Now the 1st fetch is done, the 2nd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid2);
+
    assert_eq!(alice.fetches().next(), Some((rid2, bob.id)));
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::new(doc)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid3);
+
    assert_eq!(alice.fetches().next(), Some((rid3, bob.id)));
}

#[test]
@@ -1615,16 +1624,16 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.connect_to(&carol);

    // Send the first fetch.
-
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, bob.id, DEFAULT_TIMEOUT, send));
+
    let (cmd, _recv1) = Command::fetch(rid1, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 2nd fetch that will be queued.
-
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, eve.id, DEFAULT_TIMEOUT, send2));
+
    let (cmd, _recv2) = Command::fetch(rid1, eve.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 3rd fetch that will be queued.
-
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, carol.id, DEFAULT_TIMEOUT, send3));
+
    let (cmd, _recv3) = Command::fetch(rid1, carol.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Peers Alice will fetch from.
    let mut peers = [bob.id, eve.id, carol.id]
@@ -1771,29 +1780,20 @@ fn test_init_and_seed() {
    assert!(bob.get(proj_id).unwrap().is_none());

    // Bob seeds Alice's project.
-
    let (sender, receiver) = chan::bounded(1);
-
    bob.command(service::Command::Seed(
-
        proj_id,
-
        policy::Scope::default(),
-
        sender,
-
    ));
-
    assert!(receiver.recv().unwrap());
+
    let (cmd, receiver) = service::Command::seed(proj_id, policy::Scope::default());
+
    bob.command(cmd);
+
    assert!(receiver.recv().unwrap().unwrap());

    // Eve seeds Alice's project.
-
    let (sender, receiver) = chan::bounded(1);
-
    eve.command(service::Command::Seed(
-
        proj_id,
-
        policy::Scope::default(),
-
        sender,
-
    ));
-
    assert!(receiver.recv().unwrap());
+
    let (cmd, receiver) = service::Command::seed(proj_id, policy::Scope::default());
+
    eve.command(cmd);
+
    assert!(receiver.recv().unwrap().unwrap());

-
    let (send, _) = chan::bounded(1);
-
    // Alice announces her inventory.
    // We now expect Eve to fetch Alice's project from Alice.
    // Then we expect Bob to fetch Alice's project from Eve.
    alice.elapse(LocalDuration::from_secs(1)); // Make sure our announcement is fresh.
-
    alice.command(service::Command::AddInventory(proj_id, send));
+
    let (cmd, _) = service::Command::add_inventory(proj_id);
+
    alice.command(cmd);

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

@@ -2032,13 +2032,13 @@ fn test_announcement_message_amplification() {
            continue;
        }

-
        let (tx, _) = chan::bounded(1);
        let timestamp = (*alice.clock()).into();
        alice
            .storage_mut()
            .repos
            .insert(rid, gen::<MockRepository>(1));
-
        alice.command(Command::AddInventory(rid, tx));
+
        let (cmd, _) = Command::add_inventory(rid);
+
        alice.command(cmd);

        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
            s.elapsed() < LocalDuration::from_mins(3)
modified crates/radicle-protocol/src/service.rs
@@ -2,6 +2,9 @@
#![allow(clippy::collapsible_match)]
#![allow(clippy::collapsible_if)]
#![warn(clippy::unwrap_used)]
+
pub mod command;
+
pub use command::{Command, QueryState};
+

pub mod filter;
pub mod gossip;
pub mod io;
@@ -33,7 +36,7 @@ use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
-
use radicle::node::{ConnectOptions, Penalty, Severity};
+
use radicle::node::{Penalty, Severity};
use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::RepositoryError;
use radicle_fetch::policy::SeedingPolicy;
@@ -50,9 +53,7 @@ use radicle::identity::RepoId;
use radicle::node::events::Emitter;
use radicle::node::routing;
use radicle::node::routing::InsertResult;
-
use radicle::node::{
-
    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
-
};
+
use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
use radicle::prelude::*;
use radicle::storage;
use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
@@ -238,74 +239,6 @@ pub trait Store:

impl Store for radicle::node::Database {}

-
/// Function used to query internal service state.
-
pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
-

-
/// Commands sent to the service by the operator.
-
pub enum Command {
-
    /// Announce repository references for given repository and namespaces to peers.
-
    AnnounceRefs(RepoId, HashSet<PublicKey>, chan::Sender<RefsAt>),
-
    /// Announce local repositories to peers.
-
    AnnounceInventory,
-
    /// Add repository to local inventory.
-
    AddInventory(RepoId, chan::Sender<bool>),
-
    /// Connect to node with the given address.
-
    Connect(NodeId, Address, ConnectOptions),
-
    /// Disconnect from node.
-
    Disconnect(NodeId),
-
    /// Get the node configuration.
-
    Config(chan::Sender<Config>),
-
    /// Get the node's listen addresses.
-
    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
-
    /// Lookup seeds for the given repository in the routing table, and report
-
    /// sync status for given namespaces.
-
    Seeds(RepoId, HashSet<PublicKey>, chan::Sender<Seeds>),
-
    /// Fetch the given repository from the network.
-
    Fetch(RepoId, NodeId, time::Duration, chan::Sender<FetchResult>),
-
    /// Seed the given repository.
-
    Seed(RepoId, Scope, chan::Sender<bool>),
-
    /// Unseed the given repository.
-
    Unseed(RepoId, chan::Sender<bool>),
-
    /// Follow the given node.
-
    Follow(NodeId, Option<Alias>, chan::Sender<bool>),
-
    /// Unfollow the given node.
-
    Unfollow(NodeId, chan::Sender<bool>),
-
    /// Query the internal service state.
-
    QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
-
}
-

-
impl fmt::Debug for Command {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        match self {
-
            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
-
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
-
            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
-
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
-
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
-
            Self::Config(_) => write!(f, "Config"),
-
            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
-
            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
-
            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
-
            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
-
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
-
            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
-
            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
-
            Self::QueryState { .. } => write!(f, "QueryState(..)"),
-
        }
-
    }
-
}
-

-
/// Command-related errors.
-
#[derive(thiserror::Error, Debug)]
-
pub enum CommandError {
-
    #[error(transparent)]
-
    Storage(#[from] storage::Error),
-
    #[error(transparent)]
-
    Routing(#[from] routing::Error),
-
    #[error(transparent)]
-
    Policy(#[from] policy::Error),
-
}
-

/// Fetch state for an ongoing fetch.
#[derive(Debug)]
pub struct FetchState {
@@ -416,7 +349,7 @@ pub struct Service<D, S, G> {
    inventory: InventoryAnnouncement,
    /// Source of entropy.
    rng: Rng,
-
    fetcher: FetcherService<chan::Sender<FetchResult>>,
+
    fetcher: FetcherService<command::Responder<FetchResult>>,
    /// Request/connection rate limiter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -866,10 +799,10 @@ where
                self.outbox.disconnect(nid, DisconnectReason::Command);
            }
            Command::Config(resp) => {
-
                resp.send(self.config.clone()).ok();
+
                resp.ok(self.config.clone()).ok();
            }
            Command::ListenAddrs(resp) => {
-
                resp.send(self.listening.clone()).ok();
+
                resp.ok(self.listening.clone()).ok();
            }
            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
                Ok(seeds) => {
@@ -879,10 +812,11 @@ where
                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
                        connected.len(), disconnected.len(),  rid
                    );
-
                    resp.send(seeds).ok();
+
                    resp.ok(seeds).ok();
                }
                Err(e) => {
                    warn!(target: "service", "Failed to get seeds for {rid}: {e}");
+
                    resp.err(e).ok();
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
@@ -893,7 +827,7 @@ where
                let seeded = self
                    .seed(&rid, scope)
                    .expect("Service::command: error seeding repository");
-
                resp.send(seeded).ok();
+
                resp.ok(seeded).ok();

                // Let all our peers know that we're interested in this repo from now on.
                self.outbox.broadcast(
@@ -905,43 +839,55 @@ where
                let updated = self
                    .unseed(&id)
                    .expect("Service::command: error unseeding repository");
-
                resp.send(updated).ok();
+
                resp.ok(updated).ok();
            }
            Command::Follow(id, alias, resp) => {
                let seeded = self
                    .policies
                    .follow(&id, alias.as_ref())
                    .expect("Service::command: error following node");
-
                resp.send(seeded).ok();
+
                resp.ok(seeded).ok();
            }
            Command::Unfollow(id, resp) => {
                let updated = self
                    .policies
                    .unfollow(&id)
                    .expect("Service::command: error unfollowing node");
-
                resp.send(updated).ok();
+
                resp.ok(updated).ok();
            }
            Command::AnnounceRefs(id, namespaces, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
                    Ok(None) => {
                        warn!(target: "service", "Failed to announce refs: repository {id} not found");
+
                        resp.err(command::Error::custom(format!("repository {id} not found")))
+
                            .ok();
                        return;
                    }
                    Err(e) => {
                        warn!(target: "service", "Failed to announce refs: doc error: {e}");
+
                        resp.err(e).ok();
                        return;
                    }
                };

                match self.announce_own_refs(id, doc, namespaces) {
                    Ok((refs, _timestamp)) => {
-
                        for r in refs {
-
                            resp.send(r).ok();
+
                        // TODO(finto): currently the command caller only
+
                        // expects one `RefsAt`, this should be fixed in the
+
                        // trait, eventually.
+
                        if let Some(refs) = refs.first() {
+
                            resp.ok(*refs).ok();
+
                        } else {
+
                            resp.err(command::Error::custom(format!(
+
                                "no refs were announced for {id}"
+
                            )))
+
                            .ok();
                        }
                    }
                    Err(err) => {
                        warn!(target: "service", "Failed to announce refs: {err}");
+
                        resp.err(err).ok();
                    }
                }
            }
@@ -950,10 +896,11 @@ where
            }
            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
                Ok(updated) => {
-
                    resp.send(updated).ok();
+
                    resp.ok(updated).ok();
                }
                Err(e) => {
                    warn!(target: "service", "Failed to add {rid} to inventory: {e}");
+
                    resp.err(e).ok();
                }
            },
            Command::QueryState(query, sender) => {
@@ -971,7 +918,7 @@ where
        refs: NonEmpty<RefsAt>,
        scope: Scope,
        timeout: time::Duration,
-
        channel: Option<chan::Sender<FetchResult>>,
+
        channel: Option<command::Responder<FetchResult>>,
    ) -> bool {
        match self.refs_status_of(rid, refs, &scope) {
            Ok(status) => {
@@ -996,19 +943,19 @@ where
        from: NodeId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
-
        channel: Option<chan::Sender<FetchResult>>,
+
        channel: Option<command::Responder<FetchResult>>,
    ) {
        let session = {
            let reason = format!("peer {from} is not connected; cannot initiate fetch");
            let Some(session) = self.sessions.get_mut(&from) else {
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed { reason }).ok();
+
                    c.ok(FetchResult::Failed { reason }).ok();
                }
                return;
            };
            if !session.is_connected() {
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed { reason }).ok();
+
                    c.ok(FetchResult::Failed { reason }).ok();
                }
                return;
            }
@@ -1024,7 +971,7 @@ where
        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);

        if let Some(c) = rejected {
-
            c.send(FetchResult::Failed {
+
            c.ok(FetchResult::Failed {
                reason: "fetch queue at capacity".to_string(),
            })
            .ok();
@@ -1091,7 +1038,7 @@ where
                    },
                };
                for responder in subscribers {
-
                    responder.send(fetch_result.clone()).ok();
+
                    responder.ok(fetch_result.clone()).ok();
                }
                match result {
                    Ok(crate::worker::fetch::FetchResult {
@@ -1362,7 +1309,7 @@ where
        // Notify orphaned responders
        for (rid, responder) in orphaned {
            responder
-
                .send(FetchResult::Failed {
+
                .ok(FetchResult::Failed {
                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
                })
                .ok();
added crates/radicle-protocol/src/service/command.rs
@@ -0,0 +1,227 @@
+
use std::{collections::HashSet, fmt, sync::Arc, time};
+

+
use crossbeam_channel::Receiver;
+
use crossbeam_channel::SendError;
+
use crossbeam_channel::Sender;
+
use radicle::crypto::PublicKey;
+
use radicle::node::policy::Scope;
+
use radicle::node::FetchResult;
+
use radicle::node::Seeds;
+
use radicle::node::{Address, Alias, Config, ConnectOptions};
+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+
use thiserror::Error;
+

+
use super::ServiceState;
+

+
/// Function used to query internal service state.
+
pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<()> + Send + Sync;
+

+
/// A result returned from processing a [`Command`].
+
///
+
/// It is a type synonym for a [`std::result::Result`]
+
pub type Result<T> = std::result::Result<T, Error>;
+

+
/// A [`Responder`] returns results after processing a service [`Command`].
+
///
+
/// To construct a [`Responder`], use [`Responder::oneshot`], which also returns its
+
/// corresponding [`Receiver`].
+
///
+
/// To send results, use either:
+
/// - [`Responder::send`]
+
/// - [`Responder::ok`]
+
/// - [`Responder::err`]
+
#[derive(Debug)]
+
pub struct Responder<T> {
+
    channel: Sender<Result<T>>,
+
}
+

+
impl<T> Responder<T> {
+
    /// Construct a new [`Responder`] and its corresponding [`Receiver`].
+
    pub fn oneshot() -> (Self, Receiver<Result<T>>) {
+
        let (sender, receiver) = crossbeam_channel::bounded(1);
+
        (Self { channel: sender }, receiver)
+
    }
+

+
    /// Send a [`Result`] to the receiver.
+
    pub fn send(self, result: Result<T>) -> std::result::Result<(), SendError<Result<T>>> {
+
        self.channel.send(result)
+
    }
+

+
    /// Send a [`Result::Ok`] to the receiver.
+
    pub fn ok(self, value: T) -> std::result::Result<(), SendError<Result<T>>> {
+
        self.send(Ok(value))
+
    }
+

+
    /// Send a [`Result::Err`] to the receiver.
+
    pub fn err<E>(self, error: E) -> std::result::Result<(), SendError<Result<T>>>
+
    where
+
        E: std::error::Error + Send + Sync + 'static,
+
    {
+
        self.send(Err(Error::other(error)))
+
    }
+
}
+

+
/// Commands sent to the service by the operator.
+
///
+
/// Each variant has a corresponding helper constructor, e.g. [`Command::Seed`]
+
/// and [`Command::seed`]. These constructors will hide the construction of the
+
/// [`Responder`], and return the corresponding [`Receiver`] to receive the
+
/// result of the command process.
+
///
+
/// If the command does not return a [`Responder`], then it will only return the
+
/// [`Command`] variant, e.g. [`Command::AnnounceInventory`].
+
pub enum Command {
+
    /// Announce repository references for given repository and namespaces to peers.
+
    AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
+
    /// Announce local repositories to peers.
+
    AnnounceInventory,
+
    /// Add repository to local inventory.
+
    AddInventory(RepoId, Responder<bool>),
+
    /// Connect to node with the given address.
+
    Connect(NodeId, Address, ConnectOptions),
+
    /// Disconnect from node.
+
    Disconnect(NodeId),
+
    /// Get the node configuration.
+
    Config(Responder<Config>),
+
    /// Get the node's listen addresses.
+
    ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
+
    /// Lookup seeds for the given repository in the routing table, and report
+
    /// sync status for given namespaces.
+
    Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
+
    /// Fetch the given repository from the network.
+
    Fetch(RepoId, NodeId, time::Duration, Responder<FetchResult>),
+
    /// Seed the given repository.
+
    Seed(RepoId, Scope, Responder<bool>),
+
    /// Unseed the given repository.
+
    Unseed(RepoId, Responder<bool>),
+
    /// Follow the given node.
+
    Follow(NodeId, Option<Alias>, Responder<bool>),
+
    /// Unfollow the given node.
+
    Unfollow(NodeId, Responder<bool>),
+
    /// Query the internal service state.
+
    QueryState(Arc<QueryState>, Sender<Result<()>>),
+
}
+

+
impl Command {
+
    pub fn announce_refs(
+
        rid: RepoId,
+
        keys: HashSet<PublicKey>,
+
    ) -> (Self, Receiver<Result<RefsAt>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::AnnounceRefs(rid, keys, responder), receiver)
+
    }
+

+
    pub fn announce_inventory() -> Self {
+
        Self::AnnounceInventory
+
    }
+

+
    pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::AddInventory(rid, responder), receiver)
+
    }
+

+
    pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
+
        Self::Connect(node_id, address, options)
+
    }
+

+
    pub fn disconnect(node_id: NodeId) -> Self {
+
        Self::Disconnect(node_id)
+
    }
+

+
    pub fn config() -> (Self, Receiver<Result<Config>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Config(responder), receiver)
+
    }
+

+
    pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::ListenAddrs(responder), receiver)
+
    }
+

+
    pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Seeds(rid, keys, responder), receiver)
+
    }
+

+
    pub fn fetch(
+
        rid: RepoId,
+
        node_id: NodeId,
+
        duration: time::Duration,
+
    ) -> (Self, Receiver<Result<FetchResult>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Fetch(rid, node_id, duration, responder), receiver)
+
    }
+

+
    pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Seed(rid, scope, responder), receiver)
+
    }
+

+
    pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Unseed(rid, responder), receiver)
+
    }
+

+
    pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Follow(node_id, alias, responder), receiver)
+
    }
+

+
    pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::oneshot();
+
        (Self::Unfollow(node_id, responder), receiver)
+
    }
+

+
    pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
+
        Self::QueryState(state, sender)
+
    }
+
}
+

+
impl fmt::Debug for Command {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
+
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
+
            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
+
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
+
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
+
            Self::Config(_) => write!(f, "Config"),
+
            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
+
            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
+
            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
+
            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
+
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
+
            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
+
            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
+
            Self::QueryState { .. } => write!(f, "QueryState(..)"),
+
        }
+
    }
+
}
+

+
/// An error that occurred when processing a service [`Command`].
+
#[non_exhaustive]
+
#[derive(Debug, Error)]
+
pub enum Error {
+
    #[error("{0}")]
+
    Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
+
}
+

+
impl Error {
+
    pub(super) fn other<E>(error: E) -> Self
+
    where
+
        E: std::error::Error + Send + Sync + 'static,
+
    {
+
        Self::Other(Box::new(error))
+
    }
+

+
    pub(super) fn custom(message: String) -> Self {
+
        Self::other(Custom { message })
+
    }
+
}
+

+
#[derive(Debug, Error)]
+
#[error("{message}")]
+
struct Custom {
+
    message: String,
+
}