Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src node_event_source.rs
//! Read node events from the local node.

use std::{
    fmt,
    path::{Path, PathBuf},
    time,
};

use radicle::{
    Profile,
    node::{Event, Handle},
};

use crate::logger;

/// Source of events from the local Radicle node.
pub struct NodeEventSource {
    profile_path: PathBuf,
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
}

impl NodeEventSource {
    /// Create a new source of node events, for a given Radicle
    /// profile.
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
        let socket = profile.socket();
        if !socket.exists() {
            return Err(NodeEventError::NoControlSocket(socket));
        }
        let node = radicle::Node::new(socket.clone());
        let source = match node.subscribe(time::Duration::MAX) {
            Ok(events) => Ok(Self {
                profile_path: profile.home.path().into(),
                events: Box::new(events.into_iter()),
            }),
            Err(err) => {
                logger::error("failed to subscribe to node events", &err);
                Err(NodeEventError::cannot_subscribe(&socket, err))
            }
        }?;
        logger::node_event_source_created(&source);
        Ok(source)
    }

    /// Get the next node event from an event source, without
    /// filtering. This will block until there is an event, or until
    /// there will be no more events from this source, or there's an
    /// error.
    ///
    /// A closed or broken connection to the node is not an error,
    /// it's treated as end of file.
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
        if let Some(event) = self.events.next() {
            match event {
                Ok(event) => {
                    logger::node_event_source_got_event(&event);
                    Ok(Some(event))
                }
                Err(radicle::node::Error::Io(err))
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
                {
                    logger::event_disconnected();
                    Ok(None)
                }
                Err(err) => {
                    logger::error("error reading event from node", &err);
                    Err(NodeEventError::node(err))
                }
            }
        } else {
            logger::node_event_source_eof(self);
            Ok(None)
        }
    }
}

impl fmt::Debug for NodeEventSource {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "NodeEventSource<path={}>", self.profile_path.display())
    }
}

/// Possible errors from accessing the local Radicle node.
#[derive(Debug, thiserror::Error)]
pub enum NodeEventError {
    /// Node control socket does not exist.
    #[error("node control socket does not exist: {0}")]
    NoControlSocket(PathBuf),

    /// Can't subscribe to node events.
    #[error("failed to subscribe to node events on socket {0}")]
    CannotSubscribe(
        PathBuf,
        #[source] Box<dyn std::error::Error + Send + 'static>,
    ),

    /// Some error from getting an event from the node.
    #[error(transparent)]
    Node(#[from] Box<dyn std::error::Error + Send + 'static>),

    /// Connection to the node control socket broke.
    #[error("connection to the node control socket has been lost: can't continue")]
    BrokenConnection,
}

impl NodeEventError {
    fn cannot_subscribe(path: &Path, err: radicle::node::Error) -> Self {
        Self::CannotSubscribe(path.into(), Box::new(err))
    }

    fn node(err: radicle::node::Error) -> Self {
        Self::Node(Box::new(err))
    }
}