Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
Refactor to prepare for new broker events
Merged liw opened 1 year ago

This makes naming clearer, and splits out the node event source into its own type. This is all in preparation for making big changes to how broker events are constructed, and what their structure is, so that we can have events that are more aimed at the kinds of things CI runs for different purposes need.

8 files changed +202 -123 c7b0b393 b8a9aa6d
modified src/bin/cibtool.rs
@@ -30,7 +30,7 @@ use radicle_git_ext::Oid;
use radicle_ci_broker::{
    broker::BrokerError,
    db::{Db, DbError, QueueId, QueuedEvent},
-
    event::{BrokerEvent, NodeEventError},
+
    event::{BrokerEvent, BrokerEventError},
    logger,
    msg::{RunId, RunResult},
    notif::NotificationChannel,
@@ -314,10 +314,10 @@ enum CibToolError {
    NoDb,

    #[error("failed to subscribe to node events")]
-
    EventSubscribe(#[source] radicle_ci_broker::event::NodeEventError),
+
    EventSubscribe(#[source] radicle_ci_broker::node_event_source::NodeEventError),

    #[error("failed to get next node event")]
-
    GetNodeEvent(#[source] radicle_ci_broker::event::NodeEventError),
+
    GetNodeEvent(#[source] radicle_ci_broker::node_event_source::NodeEventError),

    #[error("failed to create file for node events: {0}")]
    CreateEventsFile(PathBuf, #[source] std::io::Error),
@@ -344,8 +344,11 @@ enum CibToolError {
    CreateBrokerEventsFile(PathBuf, #[source] std::io::Error),

    #[error("failed to read filters from YAML file {0}")]
-
    ReadFilters(PathBuf, #[source] radicle_ci_broker::event::NodeEventError),
+
    ReadFilters(
+
        PathBuf,
+
        #[source] radicle_ci_broker::event::BrokerEventError,
+
    ),

    #[error("failed to check if event is allowed: {0:#?}")]
-
    EventIsAllowed(BrokerEvent, #[source] NodeEventError),
+
    EventIsAllowed(BrokerEvent, #[source] BrokerEventError),
}
modified src/bin/cibtoolcmd/event.rs
@@ -1,6 +1,6 @@
use std::io::Write;

-
use radicle_ci_broker::event::{EventFilter, NodeEventSource};
+
use radicle_ci_broker::{event::EventFilter, node_event_source::NodeEventSource};

use super::*;

modified src/broker.rs
@@ -135,7 +135,7 @@ pub enum BrokerError {

    /// Error from an node event subscriber.
    #[error(transparent)]
-
    NodeEvent(#[from] crate::event::NodeEventError),
+
    NodeEvent(#[from] crate::event::BrokerEventError),

    /// Error from Radicle.
    #[error(transparent)]
modified src/event.rs
@@ -1,6 +1,8 @@
-
//! Read node events from the local node, filter into broker events.
+
//! Get events from local node.
//!
-
//! [`NodeEventSource`] is an event listener.
+
//! [`BrokerEventSource`] subscribes to the local node for node
+
//! events, creates them to corresponding sets of broker events, and
+
//! returns the events allowed by the configured filters.
//!
//! Events can be filtered based on various criteria and can be
//! combined with logical operators. Filters can be read from a JSON
@@ -28,7 +30,7 @@
//! ```

use radicle::{
-
    node::{Event, Handle, NodeId},
+
    node::{Event, NodeId},
    prelude::RepoId,
    storage::RefUpdate,
    Profile,
@@ -39,40 +41,32 @@ use serde::{Deserialize, Serialize};
use std::{
    fs::read,
    path::{Path, PathBuf},
-
    time,
};

-
use crate::logger;
+
use crate::{
+
    logger,
+
    node_event_source::{NodeEventError, NodeEventSource},
+
};

/// Source of events from the local Radicle node.
///
/// The events are filtered. Only events allowed by at least one
-
/// filter are returned. See [`NodeEventSource::allow`] and
+
/// filter are returned. See [`BrokerEventSource::allow`] and
/// [`EventFilter`].
-
pub struct NodeEventSource {
-
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
+
pub struct BrokerEventSource {
+
    source: NodeEventSource,
    allowed: Vec<EventFilter>,
}

-
impl NodeEventSource {
+
impl BrokerEventSource {
    /// Create a new source of node events, for a given Radicle
    /// profile.
-
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
-
        let socket = profile.socket();
-
        if !socket.exists() {
-
            return Err(NodeEventError::NoControlSocket(socket));
-
        }
-
        let node = radicle::Node::new(socket.clone());
-
        match node.subscribe(time::Duration::MAX) {
-
            Ok(events) => Ok(Self {
-
                events: Box::new(events.into_iter()),
-
                allowed: vec![],
-
            }),
-
            Err(err) => {
-
                logger::error("failed to subscribe to node events", &err);
-
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
-
            }
-
        }
+
    pub fn new(profile: &Profile) -> Result<Self, BrokerEventError> {
+
        let source = NodeEventSource::new(profile).map_err(BrokerEventError::CannotSubscribe)?;
+
        Ok(Self {
+
            source,
+
            allowed: vec![],
+
        })
    }

    /// Add an event filter for allowed events for this event source.
@@ -80,7 +74,7 @@ impl NodeEventSource {
        self.allowed.push(filter);
    }

-
    fn allowed(&self, event: &BrokerEvent) -> Result<bool, NodeEventError> {
+
    fn allowed(&self, event: &BrokerEvent) -> Result<bool, BrokerEventError> {
        for filter in self.allowed.iter() {
            if !event.is_allowed(filter)? {
                return Ok(false);
@@ -89,64 +83,37 @@ impl NodeEventSource {
        Ok(true)
    }

-
    /// Get the next node event from an event source, without
-
    /// filtering. This will block until there is an event, or until
-
    /// there will be no more events from this source, or there's an
-
    /// error.
-
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
-
        if let Some(event) = self.events.next() {
-
            match event {
-
                Ok(event) => Ok(Some(event)),
-
                Err(radicle::node::Error::Io(err))
-
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
-
                {
+
    /// Get the allowed next event from an event source. This will
+
    /// block until there is an allowed event, or until there will be
+
    /// no more events from this source, or there's an error.
+
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, BrokerEventError> {
+
        loop {
+
            match self.source.node_event() {
+
                Err(NodeEventError::BrokenConnection) => {
                    logger::event_disconnected();
-
                    Err(NodeEventError::BrokenConnection)
+
                    return Err(BrokerEventError::BrokenConnection);
                }
                Err(err) => {
                    logger::error("error reading event from node", &err);
-
                    Err(NodeEventError::Node(err))
+
                    return Err(BrokerEventError::NodeEventError(err));
                }
-
            }
-
        } else {
-
            Ok(None)
-
        }
-
    }
-

-
    /// Get the allowed next event from an event source. This will
-
    /// block until there is an allowed event, or until there will be
-
    /// no more events from this source, or there's an error.
-
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, NodeEventError> {
-
        loop {
-
            if let Some(event) = self.events.next() {
-
                match event {
-
                    Ok(event) => {
-
                        let mut result = vec![];
-
                        if let Some(broker_events) = BrokerEvent::from_event(&event) {
-
                            for e in broker_events {
-
                                if self.allowed(&e)? {
-
                                    result.push(e);
-
                                }
-
                            }
-
                            if !result.is_empty() {
-
                                return Ok(result);
+
                Ok(None) => {
+
                    logger::event_end();
+
                    return Ok(vec![]);
+
                }
+
                Ok(Some(event)) => {
+
                    let mut result = vec![];
+
                    if let Some(broker_events) = BrokerEvent::from_event(&event) {
+
                        for e in broker_events {
+
                            if self.allowed(&e)? {
+
                                result.push(e);
                            }
                        }
-
                    }
-
                    Err(radicle::node::Error::Io(err))
-
                        if err.kind() == std::io::ErrorKind::ConnectionReset =>
-
                    {
-
                        logger::event_disconnected();
-
                        return Err(NodeEventError::BrokenConnection);
-
                    }
-
                    Err(err) => {
-
                        logger::error("error reading event from node", &err);
-
                        return Err(NodeEventError::Node(err));
+
                        if !result.is_empty() {
+
                            return Ok(result);
+
                        }
                    }
                }
-
            } else {
-
                logger::event_end();
-
                return Ok(vec![]);
            }
        }
    }
@@ -154,22 +121,18 @@ impl NodeEventSource {

/// Possible errors from accessing the local Radicle node.
#[derive(Debug, thiserror::Error)]
-
pub enum NodeEventError {
+
pub enum BrokerEventError {
+
    /// Can't create a [`NodeEventSource`].
+
    #[error("failed to subscribe to node events")]
+
    CannotSubscribe(#[source] NodeEventError),
+

    /// Regex compilation error.
    #[error("programming error in regular expression {0:?}")]
    Regex(&'static str, regex::Error),

-
    /// Node control socket does not exist.
-
    #[error("node control socket does not exist: {0}")]
-
    NoControlSocket(PathBuf),
-

-
    /// Can't subscribe to node events.
-
    #[error("failed to subscribe to node events on socket {0}")]
-
    CannotSubscribe(PathBuf, #[source] radicle::node::Error),
-

    /// Some error from getting an event from the node.
-
    #[error(transparent)]
-
    Node(#[from] radicle::node::Error),
+
    #[error("failed to get next event from node")]
+
    NodeEventError(#[from] NodeEventError),

    /// Connection to the node control socket broke.
    #[error("connection to the node control socket broke")]
@@ -179,10 +142,6 @@ pub enum NodeEventError {
    #[error(transparent)]
    Id(#[from] radicle::identity::IdError),

-
    /// Some error doing input/output.
-
    #[error(transparent)]
-
    Io(#[from] std::io::Error),
-

    /// An error reading a filter file.
    #[error("failed to read filter file: {0}")]
    ReadFilterFile(PathBuf, #[source] std::io::Error),
@@ -246,12 +205,12 @@ pub enum EventFilter {

impl EventFilter {
    /// Create a filter for a repository.
-
    pub fn repository(rid: &str) -> Result<Self, NodeEventError> {
+
    pub fn repository(rid: &str) -> Result<Self, BrokerEventError> {
        Ok(Self::Repository(RepoId::from_urn(rid)?))
    }

    /// Create a filter for a git ref that ends with a string.
-
    pub fn glob(pattern: &str) -> Result<Self, NodeEventError> {
+
    pub fn glob(pattern: &str) -> Result<Self, BrokerEventError> {
        Ok(Self::RefSuffix(pattern.into()))
    }

@@ -277,20 +236,20 @@ impl EventFilter {
    /// instead of a `Filter`.
    ///
    /// See the module description for an example of the file content.
-
    pub fn from_file(filename: &Path) -> Result<Vec<Self>, NodeEventError> {
+
    pub fn from_file(filename: &Path) -> Result<Vec<Self>, BrokerEventError> {
        let filters =
-
            read(filename).map_err(|e| NodeEventError::ReadFilterFile(filename.into(), e))?;
+
            read(filename).map_err(|e| BrokerEventError::ReadFilterFile(filename.into(), e))?;
        let filters: Filters = serde_json::from_slice(&filters)
-
            .map_err(|e| NodeEventError::FiltersJsonFile(filename.into(), e))?;
+
            .map_err(|e| BrokerEventError::FiltersJsonFile(filename.into(), e))?;
        Ok(filters.filters)
    }

    /// Read filters from a YAML file.
-
    pub fn from_yaml_file(filename: &Path) -> Result<Vec<Self>, NodeEventError> {
+
    pub fn from_yaml_file(filename: &Path) -> Result<Vec<Self>, BrokerEventError> {
        let filters =
-
            read(filename).map_err(|e| NodeEventError::ReadFilterFile(filename.into(), e))?;
+
            read(filename).map_err(|e| BrokerEventError::ReadFilterFile(filename.into(), e))?;
        let filters: Filters = serde_yml::from_slice(&filters)
-
            .map_err(|e| NodeEventError::FiltersYamlFile(filename.into(), e))?;
+
            .map_err(|e| BrokerEventError::FiltersYamlFile(filename.into(), e))?;
        Ok(filters.filters)
    }
}
@@ -304,10 +263,10 @@ pub struct Filters {
}

impl TryFrom<&str> for Filters {
-
    type Error = NodeEventError;
+
    type Error = BrokerEventError;

    fn try_from(s: &str) -> Result<Self, Self::Error> {
-
        serde_json::from_str(s).map_err(NodeEventError::FiltersJsonString)
+
        serde_json::from_str(s).map_err(BrokerEventError::FiltersJsonString)
    }
}

@@ -384,12 +343,12 @@ impl BrokerEvent {
    }

    /// Is this broker event allowed by a filter?
-
    pub fn is_allowed(&self, filter: &EventFilter) -> Result<bool, NodeEventError> {
+
    pub fn is_allowed(&self, filter: &EventFilter) -> Result<bool, BrokerEventError> {
        let res = self.is_allowed_helper(filter)?;
        Ok(res)
    }

-
    fn is_allowed_helper(&self, filter: &EventFilter) -> Result<bool, NodeEventError> {
+
    fn is_allowed_helper(&self, filter: &EventFilter) -> Result<bool, BrokerEventError> {
        let allowed = match self {
            Self::Shutdown => true,
            Self::RefChanged {
@@ -407,14 +366,14 @@ impl BrokerEvent {
                    EventFilter::AnyPatch => matches!(parsed, Some(ParsedRef::Patch(_))),
                    EventFilter::Patch(wanted) => {
                        let oid = Oid::try_from(wanted.as_str())
-
                            .map_err(|e| NodeEventError::ParseOid(wanted.into(), e))?;
+
                            .map_err(|e| BrokerEventError::ParseOid(wanted.into(), e))?;
                        parsed == Some(ParsedRef::Patch(oid))
                    }
                    EventFilter::AnyPatchRef => matches!(parsed, Some(ParsedRef::Patch(_))),
                    EventFilter::AnyPushRef => matches!(parsed, Some(ParsedRef::Push(_))),
                    EventFilter::PatchRef(wanted) => {
                        let oid = Oid::try_from(wanted.as_str())
-
                            .map_err(|e| NodeEventError::ParseOid(wanted.into(), e))?;
+
                            .map_err(|e| BrokerEventError::ParseOid(wanted.into(), e))?;
                        parsed == Some(ParsedRef::Patch(oid))
                    }
                    EventFilter::And(conds) => conds
@@ -442,7 +401,7 @@ impl BrokerEvent {

    /// Extract the NID from the RefString.
    /// The RefString will start with `refs/namespaces/<nid>/...`
-
    pub fn nid(&self) -> Result<Option<NodeId>, NodeEventError> {
+
    pub fn nid(&self) -> Result<Option<NodeId>, BrokerEventError> {
        if let Some(name) = self.name() {
            Ok(parse_nid_from_refstring(name)?)
        } else {
@@ -450,7 +409,7 @@ impl BrokerEvent {
        }
    }

-
    pub fn patch_id(&self) -> Result<Option<Oid>, NodeEventError> {
+
    pub fn patch_id(&self) -> Result<Option<Oid>, BrokerEventError> {
        if let Some(name) = self.name() {
            if let Some(ParsedRef::Patch(oid)) = parse_ref(name)? {
                return Ok(Some(oid));
@@ -462,9 +421,9 @@ impl BrokerEvent {

/// Extract the NID from a the ref string in a repository.
/// The RefString should start with `refs/namespaces/<nid>/...`
-
pub fn parse_nid_from_refstring(name: &RefString) -> Result<Option<NodeId>, NodeEventError> {
+
pub fn parse_nid_from_refstring(name: &RefString) -> Result<Option<NodeId>, BrokerEventError> {
    const PAT: &str = r"^refs/namespaces/(?P<nid>[^/]+)/";
-
    let pat = Regex::new(PAT).map_err(|e| NodeEventError::Regex(PAT, e))?;
+
    let pat = Regex::new(PAT).map_err(|e| BrokerEventError::Regex(PAT, e))?;
    if let Some(captures) = pat.captures(name.as_str()) {
        if let Some(m) = captures.name("nid") {
            if let Ok(parsed) = m.as_str().parse() {
@@ -555,20 +514,20 @@ pub enum ParsedRef {
///     }
/// }
/// ```
-
pub fn parse_ref(s: &str) -> Result<Option<ParsedRef>, NodeEventError> {
+
pub fn parse_ref(s: &str) -> Result<Option<ParsedRef>, BrokerEventError> {
    const PAT_PATCH: &str = r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$";
-
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| NodeEventError::Regex(PAT_PATCH, e))?;
+
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| BrokerEventError::Regex(PAT_PATCH, e))?;
    if let Some(patch_captures) = patch_re.captures(s) {
        if let Some(patch_id) = patch_captures.get(1) {
            let patch_id_str = patch_id.as_str();
            let oid = Oid::try_from(patch_id_str)
-
                .map_err(|e| NodeEventError::ParseOid(patch_id_str.into(), e))?;
+
                .map_err(|e| BrokerEventError::ParseOid(patch_id_str.into(), e))?;
            return Ok(Some(ParsedRef::Patch(oid)));
        }
    }

    const PAT_BRANCH: &str = r"^refs/namespaces/[^/]+/refs/heads/(.+)$";
-
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| NodeEventError::Regex(PAT_BRANCH, e))?;
+
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| BrokerEventError::Regex(PAT_BRANCH, e))?;
    if let Some(push_captures) = push_re.captures(s) {
        if let Some(branch) = push_captures.get(1) {
            return Ok(Some(ParsedRef::Push(branch.as_str().to_string())));
modified src/lib.rs
@@ -12,6 +12,7 @@ pub mod db;
pub mod event;
pub mod logger;
pub mod msg;
+
pub mod node_event_source;
pub mod notif;
pub mod pages;
pub mod queueadd;
modified src/msg.rs
@@ -769,7 +769,7 @@ pub enum MessageError {

    /// Error from event module.
    #[error(transparent)]
-
    EventError(#[from] crate::event::NodeEventError),
+
    EventError(#[from] crate::event::BrokerEventError),

    /// Error from Radicle repository.
    #[error(transparent)]
added src/node_event_source.rs
@@ -0,0 +1,116 @@
+
//! Read node events from the local node.
+

+
use std::{path::PathBuf, time};
+

+
use radicle::{
+
    node::{Event, Handle},
+
    Profile,
+
};
+

+
use crate::logger;
+

+
/// Source of events from the local Radicle node.
+
///
+
/// The events are filtered. Only events allowed by at least one
+
/// filter are returned. See [`NodeEventSource::allow`] and
+
/// [`EventFilter`].
+
pub struct NodeEventSource {
+
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
+
}
+

+
impl NodeEventSource {
+
    /// Create a new source of node events, for a given Radicle
+
    /// profile.
+
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
+
        let socket = profile.socket();
+
        if !socket.exists() {
+
            return Err(NodeEventError::NoControlSocket(socket));
+
        }
+
        let node = radicle::Node::new(socket.clone());
+
        match node.subscribe(time::Duration::MAX) {
+
            Ok(events) => Ok(Self {
+
                events: Box::new(events.into_iter()),
+
            }),
+
            Err(err) => {
+
                logger::error("failed to subscribe to node events", &err);
+
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
+
            }
+
        }
+
    }
+

+
    /// Get the next node event from an event source, without
+
    /// filtering. This will block until there is an event, or until
+
    /// there will be no more events from this source, or there's an
+
    /// error.
+
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
+
        if let Some(event) = self.events.next() {
+
            match event {
+
                Ok(event) => Ok(Some(event)),
+
                Err(radicle::node::Error::Io(err))
+
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
+
                {
+
                    logger::event_disconnected();
+
                    Err(NodeEventError::BrokenConnection)
+
                }
+
                Err(err) => {
+
                    logger::error("error reading event from node", &err);
+
                    Err(NodeEventError::Node(err))
+
                }
+
            }
+
        } else {
+
            Ok(None)
+
        }
+
    }
+
}
+

+
/// Possible errors from accessing the local Radicle node.
+
#[derive(Debug, thiserror::Error)]
+
pub enum NodeEventError {
+
    /// Regex compilation error.
+
    #[error("programming error in regular expression {0:?}")]
+
    Regex(&'static str, regex::Error),
+

+
    /// Node control socket does not exist.
+
    #[error("node control socket does not exist: {0}")]
+
    NoControlSocket(PathBuf),
+

+
    /// Can't subscribe to node events.
+
    #[error("failed to subscribe to node events on socket {0}")]
+
    CannotSubscribe(PathBuf, #[source] radicle::node::Error),
+

+
    /// Some error from getting an event from the node.
+
    #[error(transparent)]
+
    Node(#[from] radicle::node::Error),
+

+
    /// Connection to the node control socket broke.
+
    #[error("connection to the node control socket broke")]
+
    BrokenConnection,
+

+
    /// Some error from parsing a repository id.
+
    #[error(transparent)]
+
    Id(#[from] radicle::identity::IdError),
+

+
    /// Some error doing input/output.
+
    #[error(transparent)]
+
    Io(#[from] std::io::Error),
+

+
    /// An error reading a filter file.
+
    #[error("failed to read filter file: {0}")]
+
    ReadFilterFile(PathBuf, #[source] std::io::Error),
+

+
    /// An error parsing JSON as filters, when read from a file.
+
    #[error("failed to parser filters file: {0}")]
+
    FiltersJsonFile(PathBuf, #[source] serde_json::Error),
+

+
    /// An error parsing YAML as filters, when read from a file.
+
    #[error("failed to parser filters file: {0}")]
+
    FiltersYamlFile(PathBuf, #[source] serde_yml::Error),
+

+
    /// An error parsing JSON as filters, from an in-memory string.
+
    #[error("failed to parser filters as JSON")]
+
    FiltersJsonString(#[source] serde_json::Error),
+

+
    /// An error parsing a Git object id as string into an Oid.
+
    #[error("failed to parse string as a Git object id: {0:?}")]
+
    ParseOid(String, #[source] radicle::git::raw::Error),
+
}
modified src/queueadd.rs
@@ -4,7 +4,7 @@ use radicle::Profile;

use crate::{
    db::{Db, DbError},
-
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
+
    event::{BrokerEvent, BrokerEventError, BrokerEventSource, EventFilter},
    logger,
    notif::NotificationSender,
};
@@ -65,7 +65,7 @@ impl QueueAdder {

        let profile = Profile::load()?;

-
        let mut source = NodeEventSource::new(&profile)?;
+
        let mut source = BrokerEventSource::new(&profile)?;

        for filter in self.filters.iter() {
            source.allow(filter.clone());
@@ -112,7 +112,7 @@ pub enum AdderError {
    Profile(#[from] radicle::profile::Error),

    #[error(transparent)]
-
    NodeEvent(#[from] NodeEventError),
+
    NodeEvent(#[from] BrokerEventError),

    #[error(transparent)]
    Db(#[from] DbError),