Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src runtime handle.rs
use std::collections::HashSet;
use std::net;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{fmt, io, time};

#[cfg(unix)]
use std::os::unix::net::UnixStream;
#[cfg(windows)]
use uds_windows::UnixStream;

use crossbeam_channel as chan;
use radicle::crypto::PublicKey;
use radicle::node::events::{Event, Events};
use radicle::node::policy;
use radicle::node::{Config, NodeId};
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
use radicle::storage::refs;
use serde_json::json;
use thiserror::Error;

use crate::identity::RepoId;
use crate::node::{Alias, Command, FetchResult};
use crate::profile::Home;
use crate::reactor;
use crate::runtime::Emitter;
use crate::service;
use crate::service::QueryState;
use crate::storage::refs::RefsAt;
use crate::wire;
use crate::wire::StreamId;
use crate::worker::TaskResult;

/// An error resulting from a handle method.
#[derive(Error, Debug)]
pub enum Error {
    /// The command channel is no longer connected.
    #[error("command channel is not connected")]
    ChannelDisconnected,
    /// The command returned an error.
    #[error("command failed: {0}")]
    Command(#[from] service::command::Error),
    /// The operation timed out.
    #[error("the operation timed out")]
    Timeout,
    /// An I/O error occurred.
    #[error(transparent)]
    Io(#[from] std::io::Error),
}

impl From<chan::RecvError> for Error {
    fn from(_: chan::RecvError) -> Self {
        Self::ChannelDisconnected
    }
}

impl From<chan::RecvTimeoutError> for Error {
    fn from(err: chan::RecvTimeoutError) -> Self {
        match err {
            chan::RecvTimeoutError::Timeout => Self::Timeout,
            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
        }
    }
}

impl<T> From<chan::SendError<T>> for Error {
    fn from(_: chan::SendError<T>) -> Self {
        Self::ChannelDisconnected
    }
}

pub struct Handle {
    pub(crate) home: Home,

    /// Path to the control socket in use. Required for shutdown.
    pub(crate) socket: PathBuf,

    pub(crate) controller: reactor::Controller,

    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
    shutdown: Arc<AtomicBool>,
    /// Publishes events to subscribers.
    emitter: Emitter<Event>,
}

impl Handle {
    /// Subscribe to events stream.
    pub fn events(&self) -> Events {
        Events::from(self.emitter.subscribe())
    }
}

impl fmt::Debug for Handle {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Handle").field("home", &self.home).finish()
    }
}

impl Clone for Handle {
    fn clone(&self) -> Self {
        Self {
            home: self.home.clone(),
            socket: self.socket.clone(),
            controller: self.controller.clone(),
            shutdown: self.shutdown.clone(),
            emitter: self.emitter.clone(),
        }
    }
}

impl Handle {
    pub fn new(
        home: Home,
        socket: PathBuf,
        controller: reactor::Controller,
        emitter: Emitter<Event>,
    ) -> Self {
        Self {
            home,
            socket,
            controller,
            shutdown: Arc::default(),
            emitter,
        }
    }

    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
        self.controller.cmd(wire::Control::Worker(result))
    }

    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
        self.controller.cmd(wire::Control::Flush { remote, stream })
    }

    pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
        self.controller.cmd(wire::Control::User(cmd))
    }
}

impl radicle::node::Handle for Handle {
    type Sessions = Vec<radicle::node::Session>;
    type Events = Events;
    type Event = Event;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
            sender.send(*state.nid()).ok();
            Ok(())
        });
        let (err_sender, err_receiver) = chan::bounded(1);
        self.command(service::Command::QueryState(query, err_sender))?;
        err_receiver.recv()??;

        let nid = receiver.recv()?;

        Ok(nid)
    }

    fn is_running(&self) -> bool {
        true
    }

    fn connect(
        &mut self,
        node: NodeId,
        addr: radicle::node::Address,
        opts: ConnectOptions,
    ) -> Result<ConnectResult, Error> {
        let events = self.events();
        let timeout = opts.timeout;
        let sessions = self.sessions()?;
        let session = sessions.iter().find(|s| s.nid == node);

        if let Some(s) = session {
            if s.state.is_connected() {
                return Ok(ConnectResult::Connected);
            }
        }
        self.command(service::Command::Connect(node, addr, opts))?;

        events
            .wait(
                |e| match e {
                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
                    Event::PeerDisconnected { nid, reason } if nid == &node => {
                        Some(ConnectResult::Disconnected {
                            reason: reason.clone(),
                        })
                    }
                    _ => None,
                },
                timeout,
            )
            .map_err(Error::from)
    }

    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
        let events = self.events();
        self.command(service::Command::Disconnect(node))?;
        events
            .wait(
                |e| match e {
                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
                    _ => None,
                },
                time::Duration::MAX,
            )
            .map_err(Error::from)
    }

    fn seeds_for(
        &mut self,
        id: RepoId,
        namespaces: impl IntoIterator<Item = PublicKey>,
    ) -> Result<Seeds, Self::Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Seeds(
            id,
            HashSet::from_iter(namespaces),
            responder,
        ))?;
        Ok(receiver.recv()??)
    }

    fn config(&self) -> Result<Config, Self::Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Config(responder))?;
        Ok(receiver.recv()??)
    }

    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::ListenAddrs(responder))?;
        Ok(receiver.recv()??)
    }

    fn fetch(
        &mut self,
        id: RepoId,
        from: NodeId,
        timeout: time::Duration,
        signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
    ) -> Result<FetchResult, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Fetch(
            id,
            from,
            timeout,
            signed_references_minimum_feature_level,
            responder,
        ))?;
        Ok(receiver.recv()??)
    }

    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Follow(id, alias, responder))?;
        Ok(receiver.recv()??)
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Unfollow(id, responder))?;
        Ok(receiver.recv()??)
    }

    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        self.command(service::Command::Block(id, sender))?;
        receiver.recv().map_err(Error::from)
    }

    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Seed(id, scope, responder))?;
        Ok(receiver.recv()??)
    }

    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Unseed(id, responder))?;
        Ok(receiver.recv()??)
    }

    fn announce_refs_for(
        &mut self,
        id: RepoId,
        namespaces: impl IntoIterator<Item = PublicKey>,
    ) -> Result<RefsAt, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::AnnounceRefs(
            id,
            HashSet::from_iter(namespaces),
            responder,
        ))?;
        Ok(receiver.recv()??)
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
        self.command(service::Command::AnnounceInventory)
            .map_err(Error::from)
    }

    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::AddInventory(rid, responder))?;
        Ok(receiver.recv()??)
    }

    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
        Ok(self.events())
    }

    fn sessions(&self) -> Result<Self::Sessions, Error> {
        let (sender, receiver) = chan::unbounded();
        let query: Arc<QueryState> = Arc::new(move |state| {
            let sessions = state
                .sessions()
                .values()
                .map(radicle::node::Session::from)
                .collect();
            sender.send(sessions).ok();

            Ok(())
        });
        let (err_sender, err_receiver) = chan::bounded(1);
        self.command(service::Command::QueryState(query, err_sender))?;
        err_receiver.recv()??;

        let sessions = receiver.recv()?;

        Ok(sessions)
    }

    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
            sender.send(session).ok();

            Ok(())
        });
        let (err_sender, err_receiver) = chan::bounded(1);
        self.command(service::Command::QueryState(query, err_sender))?;
        err_receiver.recv()??;

        let sessions = receiver.recv()?;

        Ok(sessions)
    }

    fn shutdown(self) -> Result<(), Error> {
        // If the current value is `false`, set it to `true`, otherwise error.
        if self
            .shutdown
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
            .is_err()
        {
            return Ok(());
        }
        // Send a shutdown request to our own control socket. This is the only way to kill the
        // control thread gracefully. Since the control thread may have called this function,
        // the control socket may already be disconnected. Ignore errors.
        UnixStream::connect(self.socket)
            .and_then(|sock| Command::Shutdown.to_writer(sock))
            .ok();

        self.controller
            .shutdown()
            .map_err(|_| Error::ChannelDisconnected)
    }

    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
            let fetching = debug::Fetching::new(state.fetching());
            let debug = serde_json::json!({
                "outboxSize": state.outbox().len(),
                "fetching": fetching,
                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
                    json!({
                        "host": host.to_string(),
                        "bucket": bucket
                    })
                }).collect::<Vec<_>>(),
                "events": json!({
                    "subscribers": state.emitter().subscriptions(),
                    "pending": state.emitter().pending(),
                }),
                "metrics": state.metrics(),
            });
            sender.send(debug).ok();

            Ok(())
        });
        let (err_sender, err_receiver) = chan::bounded(1);
        self.command(service::Command::QueryState(query, err_sender))?;
        err_receiver.recv()??;

        let debug = receiver.recv()?;

        Ok(debug)
    }
}

mod debug {
    //! Serialization formats for the output of [`Handle::debug`] output.

    use radicle_protocol::fetcher;
    use radicle_protocol::fetcher::FetcherState;
    use serde::Serialize;

    use super::{NodeId, RefsAt, RepoId};

    #[derive(Serialize)]
    #[serde(rename_all = "camelCase")]
    pub struct Fetching {
        active: Vec<ActiveFetch>,
        queued: Vec<QueuedFetch>,
    }

    impl Fetching {
        pub fn new(state: &FetcherState) -> Self {
            let active = state
                .active_fetches()
                .iter()
                .map(|(rid, fetch)| ActiveFetch::new(*rid, fetch.clone()))
                .collect();
            let queued = state
                .queued_fetches()
                .iter()
                .flat_map(|(node, queue)| {
                    queue
                        .iter()
                        .map(|fetch| QueuedFetch::new(*node, fetch.clone()))
                })
                .collect();
            Self { active, queued }
        }
    }

    #[derive(Serialize)]
    #[serde(rename_all = "camelCase")]
    pub struct ActiveFetch {
        rid: RepoId,
        from: NodeId,
        refs_at: Vec<RefsAt>,
    }

    impl ActiveFetch {
        pub fn new(rid: RepoId, fetch: fetcher::ActiveFetch) -> Self {
            Self {
                rid,
                from: fetch.from,
                refs_at: fetch.refs.into(),
            }
        }
    }

    #[derive(Serialize)]
    #[serde(rename_all = "camelCase")]
    pub struct QueuedFetch {
        nid: NodeId,
        rid: RepoId,
        refs_at: Vec<RefsAt>,
    }

    impl QueuedFetch {
        pub fn new(node: NodeId, fetch: fetcher::QueuedFetch) -> Self {
            Self {
                nid: node,
                rid: fetch.rid,
                refs_at: fetch.refs.into(),
            }
        }
    }
}