Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor: move node event source functionality into its own type
Lars Wirzenius committed 1 year ago
commit 54618c6250ee15cbf498874e8d304d5f5fdf0033
parent 72c6a3396fc37212dc5487c670fa67fa2d53a739
3 files changed +162 -65
modified src/event.rs
@@ -28,7 +28,7 @@
//! ```

use radicle::{
-
    node::{Event, Handle, NodeId},
+
    node::{Event, NodeId},
    prelude::RepoId,
    storage::RefUpdate,
    Profile,
@@ -39,10 +39,9 @@ use serde::{Deserialize, Serialize};
use std::{
    fs::read,
    path::{Path, PathBuf},
-
    time,
};

-
use crate::logger;
+
use crate::{event_source::NodeEventSource, logger};

/// Source of events from the local Radicle node.
///
@@ -50,7 +49,7 @@ use crate::logger;
/// filter are returned. See [`NodeEventSource::allow`] and
/// [`EventFilter`].
pub struct BrokerEventSource {
-
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
+
    source: NodeEventSource,
    allowed: Vec<EventFilter>,
}

@@ -58,21 +57,11 @@ impl BrokerEventSource {
    /// Create a new source of node events, for a given Radicle
    /// profile.
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
-
        let socket = profile.socket();
-
        if !socket.exists() {
-
            return Err(NodeEventError::NoControlSocket(socket));
-
        }
-
        let node = radicle::Node::new(socket.clone());
-
        match node.subscribe(time::Duration::MAX) {
-
            Ok(events) => Ok(Self {
-
                events: Box::new(events.into_iter()),
-
                allowed: vec![],
-
            }),
-
            Err(err) => {
-
                logger::error("failed to subscribe to node events", &err);
-
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
-
            }
-
        }
+
        let source = NodeEventSource::new(profile).map_err(NodeEventError::CannotSubscribe)?;
+
        Ok(Self {
+
            source,
+
            allowed: vec![],
+
        })
    }

    /// Add an event filter for allowed events for this event source.
@@ -94,22 +83,16 @@ impl BrokerEventSource {
    /// there will be no more events from this source, or there's an
    /// error.
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
-
        if let Some(event) = self.events.next() {
-
            match event {
-
                Ok(event) => Ok(Some(event)),
-
                Err(radicle::node::Error::Io(err))
-
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
-
                {
-
                    logger::event_disconnected();
-
                    Err(NodeEventError::BrokenConnection)
-
                }
-
                Err(err) => {
-
                    logger::error("error reading event from node", &err);
-
                    Err(NodeEventError::Node(err))
-
                }
+
        match self.source.node_event() {
+
            Err(crate::event_source::NodeEventError::BrokenConnection) => {
+
                logger::event_disconnected();
+
                Err(NodeEventError::BrokenConnection)
            }
-
        } else {
-
            Ok(None)
+
            Err(err) => {
+
                logger::error("error reading event from node", &err);
+
                Err(NodeEventError::NodeEventError(err))
+
            }
+
            Ok(maybe_event) => Ok(maybe_event),
        }
    }

@@ -118,35 +101,32 @@ impl BrokerEventSource {
    /// no more events from this source, or there's an error.
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, NodeEventError> {
        loop {
-
            if let Some(event) = self.events.next() {
-
                match event {
-
                    Ok(event) => {
-
                        let mut result = vec![];
-
                        if let Some(broker_events) = BrokerEvent::from_event(&event) {
-
                            for e in broker_events {
-
                                if self.allowed(&e)? {
-
                                    result.push(e);
-
                                }
-
                            }
-
                            if !result.is_empty() {
-
                                return Ok(result);
+
            match self.source.node_event() {
+
                Err(crate::event_source::NodeEventError::BrokenConnection) => {
+
                    logger::event_disconnected();
+
                    return Err(NodeEventError::BrokenConnection);
+
                }
+
                Err(err) => {
+
                    logger::error("error reading event from node", &err);
+
                    return Err(NodeEventError::NodeEventError(err));
+
                }
+
                Ok(None) => {
+
                    logger::event_end();
+
                    return Ok(vec![]);
+
                }
+
                Ok(Some(event)) => {
+
                    let mut result = vec![];
+
                    if let Some(broker_events) = BrokerEvent::from_event(&event) {
+
                        for e in broker_events {
+
                            if self.allowed(&e)? {
+
                                result.push(e);
                            }
                        }
-
                    }
-
                    Err(radicle::node::Error::Io(err))
-
                        if err.kind() == std::io::ErrorKind::ConnectionReset =>
-
                    {
-
                        logger::event_disconnected();
-
                        return Err(NodeEventError::BrokenConnection);
-
                    }
-
                    Err(err) => {
-
                        logger::error("error reading event from node", &err);
-
                        return Err(NodeEventError::Node(err));
+
                        if !result.is_empty() {
+
                            return Ok(result);
+
                        }
                    }
                }
-
            } else {
-
                logger::event_end();
-
                return Ok(vec![]);
            }
        }
    }
@@ -155,6 +135,10 @@ impl BrokerEventSource {
/// Possible errors from accessing the local Radicle node.
#[derive(Debug, thiserror::Error)]
pub enum NodeEventError {
+
    /// Can't create a [`NodeEventSource`].
+
    #[error("failed to subscribe to node events")]
+
    CannotSubscribe(#[source] crate::event_source::NodeEventError),
+

    /// Regex compilation error.
    #[error("programming error in regular expression {0:?}")]
    Regex(&'static str, regex::Error),
@@ -163,13 +147,9 @@ pub enum NodeEventError {
    #[error("node control socket does not exist: {0}")]
    NoControlSocket(PathBuf),

-
    /// Can't subscribe to node events.
-
    #[error("failed to subscribe to node events on socket {0}")]
-
    CannotSubscribe(PathBuf, #[source] radicle::node::Error),
-

    /// Some error from getting an event from the node.
-
    #[error(transparent)]
-
    Node(#[from] radicle::node::Error),
+
    #[error("failed to get next event from node")]
+
    NodeEventError(#[from] crate::event_source::NodeEventError),

    /// Connection to the node control socket broke.
    #[error("connection to the node control socket broke")]
added src/event_source.rs
@@ -0,0 +1,116 @@
+
//! Read node events from the local node.
+

+
use std::{path::PathBuf, time};
+

+
use radicle::{
+
    node::{Event, Handle},
+
    Profile,
+
};
+

+
use crate::logger;
+

+
/// Source of events from the local Radicle node.
+
///
+
/// The events are filtered. Only events allowed by at least one
+
/// filter are returned. See [`NodeEventSource::allow`] and
+
/// [`EventFilter`].
+
pub struct NodeEventSource {
+
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
+
}
+

+
impl NodeEventSource {
+
    /// Create a new source of node events, for a given Radicle
+
    /// profile.
+
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
+
        let socket = profile.socket();
+
        if !socket.exists() {
+
            return Err(NodeEventError::NoControlSocket(socket));
+
        }
+
        let node = radicle::Node::new(socket.clone());
+
        match node.subscribe(time::Duration::MAX) {
+
            Ok(events) => Ok(Self {
+
                events: Box::new(events.into_iter()),
+
            }),
+
            Err(err) => {
+
                logger::error("failed to subscribe to node events", &err);
+
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
+
            }
+
        }
+
    }
+

+
    /// Get the next node event from an event source, without
+
    /// filtering. This will block until there is an event, or until
+
    /// there will be no more events from this source, or there's an
+
    /// error.
+
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
+
        if let Some(event) = self.events.next() {
+
            match event {
+
                Ok(event) => Ok(Some(event)),
+
                Err(radicle::node::Error::Io(err))
+
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
+
                {
+
                    logger::event_disconnected();
+
                    Err(NodeEventError::BrokenConnection)
+
                }
+
                Err(err) => {
+
                    logger::error("error reading event from node", &err);
+
                    Err(NodeEventError::Node(err))
+
                }
+
            }
+
        } else {
+
            Ok(None)
+
        }
+
    }
+
}
+

+
/// Possible errors from accessing the local Radicle node.
+
#[derive(Debug, thiserror::Error)]
+
pub enum NodeEventError {
+
    /// Regex compilation error.
+
    #[error("programming error in regular expression {0:?}")]
+
    Regex(&'static str, regex::Error),
+

+
    /// Node control socket does not exist.
+
    #[error("node control socket does not exist: {0}")]
+
    NoControlSocket(PathBuf),
+

+
    /// Can't subscribe to node events.
+
    #[error("failed to subscribe to node events on socket {0}")]
+
    CannotSubscribe(PathBuf, #[source] radicle::node::Error),
+

+
    /// Some error from getting an event from the node.
+
    #[error(transparent)]
+
    Node(#[from] radicle::node::Error),
+

+
    /// Connection to the node control socket broke.
+
    #[error("connection to the node control socket broke")]
+
    BrokenConnection,
+

+
    /// Some error from parsing a repository id.
+
    #[error(transparent)]
+
    Id(#[from] radicle::identity::IdError),
+

+
    /// Some error doing input/output.
+
    #[error(transparent)]
+
    Io(#[from] std::io::Error),
+

+
    /// An error reading a filter file.
+
    #[error("failed to read filter file: {0}")]
+
    ReadFilterFile(PathBuf, #[source] std::io::Error),
+

+
    /// An error parsing JSON as filters, when read from a file.
+
    #[error("failed to parser filters file: {0}")]
+
    FiltersJsonFile(PathBuf, #[source] serde_json::Error),
+

+
    /// An error parsing YAML as filters, when read from a file.
+
    #[error("failed to parser filters file: {0}")]
+
    FiltersYamlFile(PathBuf, #[source] serde_yml::Error),
+

+
    /// An error parsing JSON as filters, from an in-memory string.
+
    #[error("failed to parser filters as JSON")]
+
    FiltersJsonString(#[source] serde_json::Error),
+

+
    /// An error parsing a Git object id as string into an Oid.
+
    #[error("failed to parse string as a Git object id: {0:?}")]
+
    ParseOid(String, #[source] radicle::git::raw::Error),
+
}
modified src/lib.rs
@@ -10,6 +10,7 @@ pub mod broker;
pub mod config;
pub mod db;
pub mod event;
+
pub mod event_source;
pub mod logger;
pub mod msg;
pub mod notif;