Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
protocol: refactor to use Responder
Fintan Halpenny committed 2 months ago
commit 0d628a45e27776d97574287ee69594408f59faf8
parent b1eedd3
5 files changed +210 -106
modified CHANGELOG.md
@@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
  "Job" API of the operating system to group child processes and their children.
- On Windows, signal handling was not supported. The `radicle-node` executable
  will now respect signal handling on Windows.
+
- Commands sent to the `Service` would never respond when it encountered errors.
+
  This would result in timeouts when commands are run from the `rad` CLI.
+
  The `Service` has now learned to return results when an error occurs which
+
  will be reported back to the user.

## Deprecations

modified crates/radicle-node/src/runtime/handle.rs
@@ -203,25 +203,25 @@ impl radicle::node::Handle for Handle {
        id: RepoId,
        namespaces: impl IntoIterator<Item = PublicKey>,
    ) -> Result<Seeds, Self::Error> {
-
        let (sender, receiver) = chan::bounded(1);
+
        let (responder, receiver) = service::command::Responder::new();
        self.command(service::Command::Seeds(
            id,
            HashSet::from_iter(namespaces),
-
            sender,
+
            responder,
        ))?;
-
        receiver.recv().map_err(Error::from)
+
        Ok(receiver.recv()??)
    }

    fn config(&self) -> Result<Config, Self::Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Config(sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::Config(responder))?;
+
        Ok(receiver.recv()??)
    }

    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::ListenAddrs(sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::ListenAddrs(responder))?;
+
        Ok(receiver.recv()??)
    }

    fn fetch(
@@ -230,33 +230,33 @@ impl radicle::node::Handle for Handle {
        from: NodeId,
        timeout: time::Duration,
    ) -> Result<FetchResult, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Fetch(id, from, timeout, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::Fetch(id, from, timeout, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Follow(id, alias, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::Follow(id, alias, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Unfollow(id, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::Unfollow(id, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Seed(id, scope, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::Seed(id, scope, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Unseed(id, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::Unseed(id, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn announce_refs_for(
@@ -264,13 +264,13 @@ impl radicle::node::Handle for Handle {
        id: RepoId,
        namespaces: impl IntoIterator<Item = PublicKey>,
    ) -> Result<RefsAt, Error> {
-
        let (sender, receiver) = chan::bounded(1);
+
        let (responder, receiver) = service::command::Responder::new();
        self.command(service::Command::AnnounceRefs(
            id,
            HashSet::from_iter(namespaces),
-
            sender,
+
            responder,
        ))?;
-
        receiver.recv().map_err(Error::from)
+
        Ok(receiver.recv()??)
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
@@ -279,9 +279,9 @@ impl radicle::node::Handle for Handle {
    }

    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::AddInventory(rid, sender))?;
-
        receiver.recv().map_err(Error::from)
+
        let (responder, receiver) = service::command::Responder::new();
+
        self.command(service::Command::AddInventory(rid, responder))?;
+
        Ok(receiver.recv()??)
    }

    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
modified crates/radicle-node/src/tests.rs
@@ -8,7 +8,8 @@ use std::sync::Arc;
use std::sync::LazyLock;
use std::time;

-
use crossbeam_channel as chan;
+
use test_log::test;
+

use radicle::cob;
use radicle::identity::Visibility;
use radicle::node::address::Store as _;
@@ -414,15 +415,23 @@ fn test_seeding() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let proj_id: identity::RepoId = test::arbitrary::gen(1);

-
    let (sender, receiver) = chan::bounded(1);
-
    alice.command(Command::Seed(proj_id, policy::Scope::default(), sender));
-
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    let (cmd, receiver) = Command::seed(proj_id, policy::Scope::default());
+
    alice.command(cmd);
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(runtime::HandleError::from)
+
        .unwrap()
+
        .unwrap();
    assert!(policy_change);
    assert!(alice.policies().is_seeding(&proj_id).unwrap());

-
    let (sender, receiver) = chan::bounded(1);
-
    alice.command(Command::Unseed(proj_id, sender));
-
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    let (cmd, receiver) = Command::unseed(proj_id);
+
    alice.command(cmd);
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(runtime::HandleError::from)
+
        .unwrap()
+
        .unwrap();
    assert!(policy_change);
    assert!(!alice.policies().is_seeding(&proj_id).unwrap());
}
@@ -929,13 +938,13 @@ fn test_refs_announcement_followed() {
    );

    // Alice starts to track Bob.
-
    let (sender, receiver) = chan::bounded(1);
-
    alice.command(Command::Follow(
-
        bob.id,
-
        Some(node::Alias::new("bob")),
-
        sender,
-
    ));
-
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
+
    let (cmd, receiver) = Command::follow(bob.id, Some(node::Alias::new("bob")));
+
    alice.command(cmd);
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(runtime::HandleError::from)
+
        .unwrap()
+
        .unwrap();
    assert!(policy_change);

    // Bob announces refs again.
@@ -1402,11 +1411,11 @@ fn test_seed_repo_subscribe() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let rid = arbitrary::gen::<RepoId>(1);
-
    let (send, recv) = chan::bounded(1);

    alice.connect_to(&bob);
-
    alice.command(Command::Seed(rid, policy::Scope::default(), send));
-
    assert!(recv.recv().unwrap());
+
    let (cmd, recv) = Command::seed(rid, policy::Scope::default());
+
    alice.command(cmd);
+
    assert!(recv.recv().unwrap().unwrap());

    assert_matches!(
        alice.messages(bob.id).next(),
@@ -1491,16 +1500,16 @@ fn test_queued_fetch_max_capacity() {
    alice.connect_to(&bob);

    // Send the first fetch.
-
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, bob.id, DEFAULT_TIMEOUT, send));
+
    let (cmd, _recv1) = Command::fetch(rid1, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 2nd fetch that will be queued.
-
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid2, bob.id, DEFAULT_TIMEOUT, send2));
+
    let (cmd, _recv2) = Command::fetch(rid2, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 3rd fetch that will be queued.
-
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid3, bob.id, DEFAULT_TIMEOUT, send3));
+
    let (cmd, _recv3) = Command::fetch(rid3, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // The first fetch is initiated.
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid1);
@@ -1514,14 +1523,14 @@ fn test_queued_fetch_max_capacity() {
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));

    // Now the 1st fetch is done, the 2nd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid2);
+
    assert_eq!(alice.fetches().next(), Some((rid2, bob.id)));
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::new(doc)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid3);
+
    assert_eq!(alice.fetches().next(), Some((rid3, bob.id)));
}

#[test]
@@ -1615,16 +1624,16 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.connect_to(&carol);

    // Send the first fetch.
-
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, bob.id, DEFAULT_TIMEOUT, send));
+
    let (cmd, _recv1) = Command::fetch(rid1, bob.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 2nd fetch that will be queued.
-
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, eve.id, DEFAULT_TIMEOUT, send2));
+
    let (cmd, _recv2) = Command::fetch(rid1, eve.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Send the 3rd fetch that will be queued.
-
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, carol.id, DEFAULT_TIMEOUT, send3));
+
    let (cmd, _recv3) = Command::fetch(rid1, carol.id, DEFAULT_TIMEOUT);
+
    alice.command(cmd);

    // Peers Alice will fetch from.
    let mut peers = [bob.id, eve.id, carol.id]
@@ -1771,29 +1780,20 @@ fn test_init_and_seed() {
    assert!(bob.get(proj_id).unwrap().is_none());

    // Bob seeds Alice's project.
-
    let (sender, receiver) = chan::bounded(1);
-
    bob.command(service::Command::Seed(
-
        proj_id,
-
        policy::Scope::default(),
-
        sender,
-
    ));
-
    assert!(receiver.recv().unwrap());
+
    let (cmd, receiver) = service::Command::seed(proj_id, policy::Scope::default());
+
    bob.command(cmd);
+
    assert!(receiver.recv().unwrap().unwrap());

    // Eve seeds Alice's project.
-
    let (sender, receiver) = chan::bounded(1);
-
    eve.command(service::Command::Seed(
-
        proj_id,
-
        policy::Scope::default(),
-
        sender,
-
    ));
-
    assert!(receiver.recv().unwrap());
+
    let (cmd, receiver) = service::Command::seed(proj_id, policy::Scope::default());
+
    eve.command(cmd);
+
    assert!(receiver.recv().unwrap().unwrap());

-
    let (send, _) = chan::bounded(1);
-
    // Alice announces her inventory.
    // We now expect Eve to fetch Alice's project from Alice.
    // Then we expect Bob to fetch Alice's project from Eve.
    alice.elapse(LocalDuration::from_secs(1)); // Make sure our announcement is fresh.
-
    alice.command(service::Command::AddInventory(proj_id, send));
+
    let (cmd, _) = service::Command::add_inventory(proj_id);
+
    alice.command(cmd);

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

@@ -2032,13 +2032,13 @@ fn test_announcement_message_amplification() {
            continue;
        }

-
        let (tx, _) = chan::bounded(1);
        let timestamp = (*alice.clock()).into();
        alice
            .storage_mut()
            .repos
            .insert(rid, gen::<MockRepository>(1));
-
        alice.command(Command::AddInventory(rid, tx));
+
        let (cmd, _) = Command::add_inventory(rid);
+
        alice.command(cmd);

        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
            s.elapsed() < LocalDuration::from_mins(3)
modified crates/radicle-protocol/src/service.rs
@@ -350,7 +350,7 @@ pub struct Service<D, S, G> {
    inventory: InventoryAnnouncement,
    /// Source of entropy.
    rng: Rng,
-
    fetcher: FetcherService<chan::Sender<FetchResult>>,
+
    fetcher: FetcherService<command::Responder<FetchResult>>,
    /// Request/connection rate limiter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -800,10 +800,10 @@ where
                self.outbox.disconnect(nid, DisconnectReason::Command);
            }
            Command::Config(resp) => {
-
                resp.send(self.config.clone()).ok();
+
                resp.ok(self.config.clone()).ok();
            }
            Command::ListenAddrs(resp) => {
-
                resp.send(self.listening.clone()).ok();
+
                resp.ok(self.listening.clone()).ok();
            }
            Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
                Ok(seeds) => {
@@ -813,10 +813,11 @@ where
                        "Found {} connected seed(s) and {} disconnected seed(s) for {}",
                        connected.len(), disconnected.len(),  rid
                    );
-
                    resp.send(seeds).ok();
+
                    resp.ok(seeds).ok();
                }
                Err(e) => {
                    warn!(target: "service", "Failed to get seeds for {rid}: {e}");
+
                    resp.err(e).ok();
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
@@ -827,7 +828,7 @@ where
                let seeded = self
                    .seed(&rid, scope)
                    .expect("Service::command: error seeding repository");
-
                resp.send(seeded).ok();
+
                resp.ok(seeded).ok();

                // Let all our peers know that we're interested in this repo from now on.
                self.outbox.broadcast(
@@ -839,31 +840,34 @@ where
                let updated = self
                    .unseed(&id)
                    .expect("Service::command: error unseeding repository");
-
                resp.send(updated).ok();
+
                resp.ok(updated).ok();
            }
            Command::Follow(id, alias, resp) => {
                let seeded = self
                    .policies
                    .follow(&id, alias.as_ref())
                    .expect("Service::command: error following node");
-
                resp.send(seeded).ok();
+
                resp.ok(seeded).ok();
            }
            Command::Unfollow(id, resp) => {
                let updated = self
                    .policies
                    .unfollow(&id)
                    .expect("Service::command: error unfollowing node");
-
                resp.send(updated).ok();
+
                resp.ok(updated).ok();
            }
            Command::AnnounceRefs(id, namespaces, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
                    Ok(None) => {
                        warn!(target: "service", "Failed to announce refs: repository {id} not found");
+
                        resp.err(command::Error::custom(format!("repository {id} not found")))
+
                            .ok();
                        return;
                    }
                    Err(e) => {
                        warn!(target: "service", "Failed to announce refs: doc error: {e}");
+
                        resp.err(e).ok();
                        return;
                    }
                };
@@ -871,11 +875,12 @@ where
                match self.announce_own_refs(id, doc, namespaces) {
                    Ok((refs, _timestamp)) => {
                        for r in refs {
-
                            resp.send(r).ok();
+
                            resp.ok(r).ok();
                        }
                    }
                    Err(err) => {
                        warn!(target: "service", "Failed to announce refs: {err}");
+
                        resp.err(err).ok();
                    }
                }
            }
@@ -884,10 +889,11 @@ where
            }
            Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
                Ok(updated) => {
-
                    resp.send(updated).ok();
+
                    resp.ok(updated).ok();
                }
                Err(e) => {
                    warn!(target: "service", "Failed to add {rid} to inventory: {e}");
+
                    resp.err(e).ok();
                }
            },
            Command::QueryState(query, sender) => {
@@ -929,19 +935,19 @@ where
        from: NodeId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
-
        channel: Option<chan::Sender<FetchResult>>,
+
        channel: Option<command::Responder<FetchResult>>,
    ) {
        let session = {
            let reason = format!("peer {from} is not connected; cannot initiate fetch");
            let Some(session) = self.sessions.get_mut(&from) else {
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed { reason }).ok();
+
                    c.ok(FetchResult::Failed { reason }).ok();
                }
                return;
            };
            if !session.is_connected() {
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed { reason }).ok();
+
                    c.ok(FetchResult::Failed { reason }).ok();
                }
                return;
            }
@@ -957,7 +963,7 @@ where
        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);

        if let Some(c) = rejected {
-
            c.send(FetchResult::Failed {
+
            c.ok(FetchResult::Failed {
                reason: "fetch queue at capacity".to_string(),
            })
            .ok();
@@ -1020,7 +1026,7 @@ where
                    },
                };
                for responder in subscribers {
-
                    responder.send(fetch_result.clone()).ok();
+
                    responder.ok(fetch_result.clone()).ok();
                }
                match result {
                    Ok(crate::worker::fetch::FetchResult {
@@ -1293,7 +1299,7 @@ where
        // Notify orphaned responders
        for (rid, responder) in orphaned {
            responder
-
                .send(FetchResult::Failed {
+
                .ok(FetchResult::Failed {
                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
                })
                .ok();
modified crates/radicle-protocol/src/service/command.rs
@@ -31,6 +31,7 @@ pub type Result<T> = std::result::Result<T, Error>;
/// - [`Responder::send`]
/// - [`Responder::ok`]
/// - [`Responder::err`]
+
#[derive(Debug)]
pub struct Responder<T> {
    channel: Sender<Result<T>>,
}
@@ -62,38 +63,121 @@ impl<T> Responder<T> {
}

/// Commands sent to the service by the operator.
+
///
+
/// Each variant has a corresponding helper constructor, e.g. [`Command::Seed`]
+
/// and [`Command::seed`]. These constructors will hide the construction of the
+
/// [`Responder`], and return the corresponding [`Receiver`] to receive the
+
/// result of the command process.
+
///
+
/// If the command does not return a [`Responder`], then it will only return the
+
/// [`Command`] variant, e.g. [`Command::AnnounceInventory`].
pub enum Command {
    /// Announce repository references for given repository and namespaces to peers.
-
    AnnounceRefs(RepoId, HashSet<PublicKey>, Sender<RefsAt>),
+
    AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
    /// Add repository to local inventory.
-
    AddInventory(RepoId, Sender<bool>),
+
    AddInventory(RepoId, Responder<bool>),
    /// Connect to node with the given address.
    Connect(NodeId, Address, ConnectOptions),
    /// Disconnect from node.
    Disconnect(NodeId),
    /// Get the node configuration.
-
    Config(Sender<Config>),
+
    Config(Responder<Config>),
    /// Get the node's listen addresses.
-
    ListenAddrs(Sender<Vec<std::net::SocketAddr>>),
+
    ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
    /// Lookup seeds for the given repository in the routing table, and report
    /// sync status for given namespaces.
-
    Seeds(RepoId, HashSet<PublicKey>, Sender<Seeds>),
+
    Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
    /// Fetch the given repository from the network.
-
    Fetch(RepoId, NodeId, time::Duration, Sender<FetchResult>),
+
    Fetch(RepoId, NodeId, time::Duration, Responder<FetchResult>),
    /// Seed the given repository.
-
    Seed(RepoId, Scope, Sender<bool>),
+
    Seed(RepoId, Scope, Responder<bool>),
    /// Unseed the given repository.
-
    Unseed(RepoId, Sender<bool>),
+
    Unseed(RepoId, Responder<bool>),
    /// Follow the given node.
-
    Follow(NodeId, Option<Alias>, Sender<bool>),
+
    Follow(NodeId, Option<Alias>, Responder<bool>),
    /// Unfollow the given node.
-
    Unfollow(NodeId, Sender<bool>),
+
    Unfollow(NodeId, Responder<bool>),
    /// Query the internal service state.
    QueryState(Arc<QueryState>, Sender<Result<()>>),
}

+
impl Command {
+
    pub fn announce_refs(
+
        rid: RepoId,
+
        keys: HashSet<PublicKey>,
+
    ) -> (Self, Receiver<Result<RefsAt>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::AnnounceRefs(rid, keys, responder), receiver)
+
    }
+

+
    pub fn announce_inventory() -> Self {
+
        Self::AnnounceInventory
+
    }
+

+
    pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::AddInventory(rid, responder), receiver)
+
    }
+

+
    pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
+
        Self::Connect(node_id, address, options)
+
    }
+

+
    pub fn disconnect(node_id: NodeId) -> Self {
+
        Self::Disconnect(node_id)
+
    }
+

+
    pub fn config() -> (Self, Receiver<Result<Config>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Config(responder), receiver)
+
    }
+

+
    pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::ListenAddrs(responder), receiver)
+
    }
+

+
    pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Seeds(rid, keys, responder), receiver)
+
    }
+

+
    pub fn fetch(
+
        rid: RepoId,
+
        node_id: NodeId,
+
        duration: time::Duration,
+
    ) -> (Self, Receiver<Result<FetchResult>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Fetch(rid, node_id, duration, responder), receiver)
+
    }
+

+
    pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Seed(rid, scope, responder), receiver)
+
    }
+

+
    pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Unseed(rid, responder), receiver)
+
    }
+

+
    pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Follow(node_id, alias, responder), receiver)
+
    }
+

+
    pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
+
        let (responder, receiver) = Responder::new();
+
        (Self::Unfollow(node_id, responder), receiver)
+
    }
+

+
    pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
+
        Self::QueryState(state, sender)
+
    }
+
}
+

impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
@@ -130,4 +214,14 @@ impl Error {
    {
        Self::Other(Box::new(error))
    }
+

+
    pub(super) fn custom(message: String) -> Self {
+
        Self::other(Custom { message })
+
    }
+
}
+

+
#[derive(Debug, Error)]
+
#[error("{message}")]
+
struct Custom {
+
    message: String,
}