//! Read node events from the local node.
use std::{
fmt,
path::{Path, PathBuf},
time,
};
use radicle::{
Profile,
node::{Event, Handle},
};
use crate::logger;
/// Source of events from the local Radicle node.
pub struct NodeEventSource {
profile_path: PathBuf,
events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
}
impl NodeEventSource {
/// Create a new source of node events, for a given Radicle
/// profile.
pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
let socket = profile.socket();
if !socket.exists() {
return Err(NodeEventError::NoControlSocket(socket));
}
let node = radicle::Node::new(socket.clone());
let source = match node.subscribe(time::Duration::MAX) {
Ok(events) => Ok(Self {
profile_path: profile.home.path().into(),
events: Box::new(events.into_iter()),
}),
Err(err) => {
logger::error("failed to subscribe to node events", &err);
Err(NodeEventError::cannot_subscribe(&socket, err))
}
}?;
logger::node_event_source_created(&source);
Ok(source)
}
/// Get the next node event from an event source, without
/// filtering. This will block until there is an event, or until
/// there will be no more events from this source, or there's an
/// error.
///
/// A closed or broken connection to the node is not an error,
/// it's treated as end of file.
pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
if let Some(event) = self.events.next() {
match event {
Ok(event) => {
logger::node_event_source_got_event(&event);
Ok(Some(event))
}
Err(radicle::node::Error::Io(err))
if err.kind() == std::io::ErrorKind::ConnectionReset =>
{
logger::event_disconnected();
Ok(None)
}
Err(err) => {
logger::error("error reading event from node", &err);
Err(NodeEventError::node(err))
}
}
} else {
logger::node_event_source_eof(self);
Ok(None)
}
}
}
impl fmt::Debug for NodeEventSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "NodeEventSource<path={}>", self.profile_path.display())
}
}
/// Possible errors from accessing the local Radicle node.
#[derive(Debug, thiserror::Error)]
pub enum NodeEventError {
/// Node control socket does not exist.
#[error("node control socket does not exist: {0}")]
NoControlSocket(PathBuf),
/// Can't subscribe to node events.
#[error("failed to subscribe to node events on socket {0}")]
CannotSubscribe(
PathBuf,
#[source] Box<dyn std::error::Error + Send + 'static>,
),
/// Some error from getting an event from the node.
#[error(transparent)]
Node(#[from] Box<dyn std::error::Error + Send + 'static>),
/// Connection to the node control socket broke.
#[error("connection to the node control socket has been lost: can't continue")]
BrokenConnection,
}
impl NodeEventError {
fn cannot_subscribe(path: &Path, err: radicle::node::Error) -> Self {
Self::CannotSubscribe(path.into(), Box::new(err))
}
fn node(err: radicle::node::Error) -> Self {
Self::Node(Box::new(err))
}
}