use std::collections::HashSet;
use std::net;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{fmt, io, time};
#[cfg(unix)]
use std::os::unix::net::UnixStream;
#[cfg(windows)]
use uds_windows::UnixStream;
use crossbeam_channel as chan;
use radicle::crypto::PublicKey;
use radicle::node::events::{Event, Events};
use radicle::node::policy;
use radicle::node::{Config, NodeId};
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
use radicle::storage::refs;
use serde_json::json;
use thiserror::Error;
use crate::identity::RepoId;
use crate::node::{Alias, Command, FetchResult};
use crate::profile::Home;
use crate::reactor;
use crate::runtime::Emitter;
use crate::service;
use crate::service::QueryState;
use crate::storage::refs::RefsAt;
use crate::wire;
use crate::wire::StreamId;
use crate::worker::TaskResult;
/// An error resulting from a handle method.
#[derive(Error, Debug)]
pub enum Error {
/// The command channel is no longer connected.
#[error("command channel is not connected")]
ChannelDisconnected,
/// The command returned an error.
#[error("command failed: {0}")]
Command(#[from] service::command::Error),
/// The operation timed out.
#[error("the operation timed out")]
Timeout,
/// An I/O error occurred.
#[error(transparent)]
Io(#[from] std::io::Error),
}
impl From<chan::RecvError> for Error {
fn from(_: chan::RecvError) -> Self {
Self::ChannelDisconnected
}
}
impl From<chan::RecvTimeoutError> for Error {
fn from(err: chan::RecvTimeoutError) -> Self {
match err {
chan::RecvTimeoutError::Timeout => Self::Timeout,
chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
}
}
}
impl<T> From<chan::SendError<T>> for Error {
fn from(_: chan::SendError<T>) -> Self {
Self::ChannelDisconnected
}
}
pub struct Handle {
pub(crate) home: Home,
/// Path to the control socket in use. Required for shutdown.
pub(crate) socket: PathBuf,
pub(crate) controller: reactor::Controller,
/// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
shutdown: Arc<AtomicBool>,
/// Publishes events to subscribers.
emitter: Emitter<Event>,
}
impl Handle {
/// Subscribe to events stream.
pub fn events(&self) -> Events {
Events::from(self.emitter.subscribe())
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Handle").field("home", &self.home).finish()
}
}
impl Clone for Handle {
fn clone(&self) -> Self {
Self {
home: self.home.clone(),
socket: self.socket.clone(),
controller: self.controller.clone(),
shutdown: self.shutdown.clone(),
emitter: self.emitter.clone(),
}
}
}
impl Handle {
pub fn new(
home: Home,
socket: PathBuf,
controller: reactor::Controller,
emitter: Emitter<Event>,
) -> Self {
Self {
home,
socket,
controller,
shutdown: Arc::default(),
emitter,
}
}
pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
self.controller.cmd(wire::Control::Worker(result))
}
pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
self.controller.cmd(wire::Control::Flush { remote, stream })
}
pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
self.controller.cmd(wire::Control::User(cmd))
}
}
impl radicle::node::Handle for Handle {
type Sessions = Vec<radicle::node::Session>;
type Events = Events;
type Event = Event;
type Error = Error;
fn nid(&self) -> Result<NodeId, Self::Error> {
let (sender, receiver) = chan::bounded(1);
let query: Arc<QueryState> = Arc::new(move |state| {
sender.send(*state.nid()).ok();
Ok(())
});
let (err_sender, err_receiver) = chan::bounded(1);
self.command(service::Command::QueryState(query, err_sender))?;
err_receiver.recv()??;
let nid = receiver.recv()?;
Ok(nid)
}
fn is_running(&self) -> bool {
true
}
fn connect(
&mut self,
node: NodeId,
addr: radicle::node::Address,
opts: ConnectOptions,
) -> Result<ConnectResult, Error> {
let events = self.events();
let timeout = opts.timeout;
let sessions = self.sessions()?;
let session = sessions.iter().find(|s| s.nid == node);
if let Some(s) = session {
if s.state.is_connected() {
return Ok(ConnectResult::Connected);
}
}
self.command(service::Command::Connect(node, addr, opts))?;
events
.wait(
|e| match e {
Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
Event::PeerDisconnected { nid, reason } if nid == &node => {
Some(ConnectResult::Disconnected {
reason: reason.clone(),
})
}
_ => None,
},
timeout,
)
.map_err(Error::from)
}
fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
let events = self.events();
self.command(service::Command::Disconnect(node))?;
events
.wait(
|e| match e {
Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
_ => None,
},
time::Duration::MAX,
)
.map_err(Error::from)
}
fn seeds_for(
&mut self,
id: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
) -> Result<Seeds, Self::Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Seeds(
id,
HashSet::from_iter(namespaces),
responder,
))?;
Ok(receiver.recv()??)
}
fn config(&self) -> Result<Config, Self::Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Config(responder))?;
Ok(receiver.recv()??)
}
fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::ListenAddrs(responder))?;
Ok(receiver.recv()??)
}
fn fetch(
&mut self,
id: RepoId,
from: NodeId,
timeout: time::Duration,
signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
) -> Result<FetchResult, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Fetch(
id,
from,
timeout,
signed_references_minimum_feature_level,
responder,
))?;
Ok(receiver.recv()??)
}
fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Follow(id, alias, responder))?;
Ok(receiver.recv()??)
}
fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Unfollow(id, responder))?;
Ok(receiver.recv()??)
}
fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
let (sender, receiver) = chan::bounded(1);
self.command(service::Command::Block(id, sender))?;
receiver.recv().map_err(Error::from)
}
fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Seed(id, scope, responder))?;
Ok(receiver.recv()??)
}
fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::Unseed(id, responder))?;
Ok(receiver.recv()??)
}
fn announce_refs_for(
&mut self,
id: RepoId,
namespaces: impl IntoIterator<Item = PublicKey>,
) -> Result<RefsAt, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::AnnounceRefs(
id,
HashSet::from_iter(namespaces),
responder,
))?;
Ok(receiver.recv()??)
}
fn announce_inventory(&mut self) -> Result<(), Error> {
self.command(service::Command::AnnounceInventory)
.map_err(Error::from)
}
fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
let (responder, receiver) = service::command::Responder::oneshot();
self.command(service::Command::AddInventory(rid, responder))?;
Ok(receiver.recv()??)
}
fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
Ok(self.events())
}
fn sessions(&self) -> Result<Self::Sessions, Error> {
let (sender, receiver) = chan::unbounded();
let query: Arc<QueryState> = Arc::new(move |state| {
let sessions = state
.sessions()
.values()
.map(radicle::node::Session::from)
.collect();
sender.send(sessions).ok();
Ok(())
});
let (err_sender, err_receiver) = chan::bounded(1);
self.command(service::Command::QueryState(query, err_sender))?;
err_receiver.recv()??;
let sessions = receiver.recv()?;
Ok(sessions)
}
fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
let (sender, receiver) = chan::bounded(1);
let query: Arc<QueryState> = Arc::new(move |state| {
let session = state.sessions().get(&nid).map(radicle::node::Session::from);
sender.send(session).ok();
Ok(())
});
let (err_sender, err_receiver) = chan::bounded(1);
self.command(service::Command::QueryState(query, err_sender))?;
err_receiver.recv()??;
let sessions = receiver.recv()?;
Ok(sessions)
}
fn shutdown(self) -> Result<(), Error> {
// If the current value is `false`, set it to `true`, otherwise error.
if self
.shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Ok(());
}
// Send a shutdown request to our own control socket. This is the only way to kill the
// control thread gracefully. Since the control thread may have called this function,
// the control socket may already be disconnected. Ignore errors.
UnixStream::connect(self.socket)
.and_then(|sock| Command::Shutdown.to_writer(sock))
.ok();
self.controller
.shutdown()
.map_err(|_| Error::ChannelDisconnected)
}
fn debug(&self) -> Result<serde_json::Value, Self::Error> {
let (sender, receiver) = chan::bounded(1);
let query: Arc<QueryState> = Arc::new(move |state| {
let fetching = debug::Fetching::new(state.fetching());
let debug = serde_json::json!({
"outboxSize": state.outbox().len(),
"fetching": fetching,
"rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
json!({
"host": host.to_string(),
"bucket": bucket
})
}).collect::<Vec<_>>(),
"events": json!({
"subscribers": state.emitter().subscriptions(),
"pending": state.emitter().pending(),
}),
"metrics": state.metrics(),
});
sender.send(debug).ok();
Ok(())
});
let (err_sender, err_receiver) = chan::bounded(1);
self.command(service::Command::QueryState(query, err_sender))?;
err_receiver.recv()??;
let debug = receiver.recv()?;
Ok(debug)
}
}
mod debug {
//! Serialization formats for the output of [`Handle::debug`] output.
use radicle_protocol::fetcher;
use radicle_protocol::fetcher::FetcherState;
use serde::Serialize;
use super::{NodeId, RefsAt, RepoId};
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Fetching {
active: Vec<ActiveFetch>,
queued: Vec<QueuedFetch>,
}
impl Fetching {
pub fn new(state: &FetcherState) -> Self {
let active = state
.active_fetches()
.iter()
.map(|(rid, fetch)| ActiveFetch::new(*rid, fetch.clone()))
.collect();
let queued = state
.queued_fetches()
.iter()
.flat_map(|(node, queue)| {
queue
.iter()
.map(|fetch| QueuedFetch::new(*node, fetch.clone()))
})
.collect();
Self { active, queued }
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ActiveFetch {
rid: RepoId,
from: NodeId,
refs_at: Vec<RefsAt>,
}
impl ActiveFetch {
pub fn new(rid: RepoId, fetch: fetcher::ActiveFetch) -> Self {
Self {
rid,
from: fetch.from,
refs_at: fetch.refs.into(),
}
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct QueuedFetch {
nid: NodeId,
rid: RepoId,
refs_at: Vec<RefsAt>,
}
impl QueuedFetch {
pub fn new(node: NodeId, fetch: fetcher::QueuedFetch) -> Self {
Self {
nid: node,
rid: fetch.rid,
refs_at: fetch.refs.into(),
}
}
}
}