Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-protocol src service command.rs
use std::{collections::HashSet, fmt, sync::Arc, time};

use crossbeam_channel::Receiver;
use crossbeam_channel::SendError;
use crossbeam_channel::Sender;
use radicle::crypto::PublicKey;
use radicle::node::FetchResult;
use radicle::node::Seeds;
use radicle::node::policy::Scope;
use radicle::node::{Address, Alias, Config, ConnectOptions};
use radicle::storage::refs;
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};
use thiserror::Error;

use super::ServiceState;

/// Function used to query internal service state.
pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<()> + Send + Sync;

/// A result returned from processing a [`Command`].
///
/// It is a type synonym for a [`std::result::Result`]
pub type Result<T> = std::result::Result<T, Error>;

/// A [`Responder`] returns results after processing a service [`Command`].
///
/// To construct a [`Responder`], use [`Responder::oneshot`], which also returns its
/// corresponding [`Receiver`].
///
/// To send results, use either:
/// - [`Responder::send`]
/// - [`Responder::ok`]
/// - [`Responder::err`]
#[derive(Debug)]
pub struct Responder<T> {
    channel: Sender<Result<T>>,
}

impl<T> Responder<T> {
    /// Construct a new [`Responder`] and its corresponding [`Receiver`].
    pub fn oneshot() -> (Self, Receiver<Result<T>>) {
        let (sender, receiver) = crossbeam_channel::bounded(1);
        (Self { channel: sender }, receiver)
    }

    /// Send a [`Result`] to the receiver.
    pub fn send(self, result: Result<T>) -> std::result::Result<(), SendError<Result<T>>> {
        self.channel.send(result)
    }

    /// Send a [`Result::Ok`] to the receiver.
    pub fn ok(self, value: T) -> std::result::Result<(), SendError<Result<T>>> {
        self.send(Ok(value))
    }

    /// Send a [`Result::Err`] to the receiver.
    pub fn err<E>(self, error: E) -> std::result::Result<(), SendError<Result<T>>>
    where
        E: std::error::Error + Send + Sync + 'static,
    {
        self.send(Err(Error::other(error)))
    }
}

/// Commands sent to the service by the operator.
///
/// Each variant has a corresponding helper constructor, e.g. [`Command::Seed`]
/// and [`Command::seed`]. These constructors will hide the construction of the
/// [`Responder`], and return the corresponding [`Receiver`] to receive the
/// result of the command process.
///
/// If the command does not return a [`Responder`], then it will only return the
/// [`Command`] variant, e.g. [`Command::AnnounceInventory`].
pub enum Command {
    /// Announce repository references for given repository and namespaces to peers.
    AnnounceRefs(RepoId, HashSet<PublicKey>, Responder<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
    /// Add repository to local inventory.
    AddInventory(RepoId, Responder<bool>),
    /// Connect to node with the given address.
    Connect(NodeId, Address, ConnectOptions),
    /// Disconnect from node.
    Disconnect(NodeId),
    /// Get the node configuration.
    Config(Responder<Config>),
    /// Get the node's listen addresses.
    ListenAddrs(Responder<Vec<std::net::SocketAddr>>),
    /// Lookup seeds for the given repository in the routing table, and report
    /// sync status for given namespaces.
    Seeds(RepoId, HashSet<PublicKey>, Responder<Seeds>),
    /// Fetch the given repository from the network.
    Fetch(
        RepoId,
        NodeId,
        time::Duration,
        Option<refs::FeatureLevel>,
        Responder<FetchResult>,
    ),
    /// Seed the given repository.
    Seed(RepoId, Scope, Responder<bool>),
    /// Unseed the given repository.
    Unseed(RepoId, Responder<bool>),
    /// Follow the given node.
    Follow(NodeId, Option<Alias>, Responder<bool>),
    /// Unfollow the given node.
    Unfollow(NodeId, Responder<bool>),
    /// Block the given node.
    Block(NodeId, Sender<bool>),
    /// Query the internal service state.
    QueryState(Arc<QueryState>, Sender<Result<()>>),
}

impl Command {
    pub fn announce_refs(
        rid: RepoId,
        keys: HashSet<PublicKey>,
    ) -> (Self, Receiver<Result<RefsAt>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::AnnounceRefs(rid, keys, responder), receiver)
    }

    pub fn announce_inventory() -> Self {
        Self::AnnounceInventory
    }

    pub fn add_inventory(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::AddInventory(rid, responder), receiver)
    }

    pub fn connect(node_id: NodeId, address: Address, options: ConnectOptions) -> Self {
        Self::Connect(node_id, address, options)
    }

    pub fn disconnect(node_id: NodeId) -> Self {
        Self::Disconnect(node_id)
    }

    pub fn config() -> (Self, Receiver<Result<Config>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::Config(responder), receiver)
    }

    pub fn listen_addrs() -> (Self, Receiver<Result<Vec<std::net::SocketAddr>>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::ListenAddrs(responder), receiver)
    }

    pub fn seeds(rid: RepoId, keys: HashSet<PublicKey>) -> (Self, Receiver<Result<Seeds>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::Seeds(rid, keys, responder), receiver)
    }

    pub fn fetch(
        rid: RepoId,
        node_id: NodeId,
        duration: time::Duration,
        signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
    ) -> (Self, Receiver<Result<FetchResult>>) {
        let (responder, receiver) = Responder::oneshot();
        (
            Self::Fetch(
                rid,
                node_id,
                duration,
                signed_references_minimum_feature_level,
                responder,
            ),
            receiver,
        )
    }

    pub fn seed(rid: RepoId, scope: Scope) -> (Self, Receiver<Result<bool>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::Seed(rid, scope, responder), receiver)
    }

    pub fn unseed(rid: RepoId) -> (Self, Receiver<Result<bool>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::Unseed(rid, responder), receiver)
    }

    pub fn follow(node_id: NodeId, alias: Option<Alias>) -> (Self, Receiver<Result<bool>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::Follow(node_id, alias, responder), receiver)
    }

    pub fn unfollow(node_id: NodeId) -> (Self, Receiver<Result<bool>>) {
        let (responder, receiver) = Responder::oneshot();
        (Self::Unfollow(node_id, responder), receiver)
    }

    pub fn query_state(state: Arc<QueryState>, sender: Sender<Result<()>>) -> Self {
        Self::QueryState(state, sender)
    }
}

impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::AnnounceRefs(id, _, _) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
            Self::AddInventory(rid, _) => write!(f, "AddInventory({rid})"),
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Config(_) => write!(f, "Config"),
            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
            Self::Seeds(id, _, _) => write!(f, "Seeds({id})"),
            Self::Fetch(id, node, _, feature_level, _) => match feature_level {
                Some(feature_level) => write!(f, "Fetch({id}, {node} {feature_level})"),
                None => write!(f, "Fetch({id}, {node})"),
            },
            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
            Self::Block(id, _) => write!(f, "Block({id})"),
            Self::QueryState { .. } => write!(f, "QueryState(..)"),
        }
    }
}

/// An error that occurred when processing a service [`Command`].
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum Error {
    #[error("{0}")]
    Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl Error {
    pub(super) fn other<E>(error: E) -> Self
    where
        E: std::error::Error + Send + Sync + 'static,
    {
        Self::Other(Box::new(error))
    }

    pub(super) fn custom(message: String) -> Self {
        Self::other(Custom { message })
    }
}

#[derive(Debug, Error)]
#[error("{message}")]
struct Custom {
    message: String,
}