Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
Add tools for recording, conversing, filtering events
Merged liw opened 1 year ago
4 files changed +359 -4 e5873fd0 c7b0b393
modified ci-broker.md
@@ -1124,6 +1124,106 @@ then stderr contains "rad:z3byzFpcfbMJBp4tKYyuuTZiP8WUB"

~~~

+
## Record node events
+

+
_What:_ Node operator can record node events into a file.
+

+
_Why:_ This can be helpful for remote debugging, it's very helpful for
+
CI broker development to see what events actually happen, and it's
+
useful for gathering data for trying out event filters.
+

+
_Who:_ `cib-devs`, `node-ops`
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed synthetic-events
+
given file refsfetched.json
+
given file set-rid
+
when I run bash radenv.sh env HOME=../homedir python3 set-rid refsfetched.json testy
+
when I run synthetic-events synt.sock refsfetched.json --log log.txt
+

+
given an installed cibtool
+
when I run bash radenv.sh cibtool event record --output events.json
+
then file events.json contains ""type":"refsFetched""
+
~~~
+

+
## Convert recorded node events into broker events
+

+
_What:_ Node operator can see what broke events are created from node
+
events.
+

+
_Why:_ This is helpful so that node operators can see what broker
+
events are created from node events, which may have been previously
+
recorded. It's also helpful for CI broker developers as a development
+
tool.
+

+
_Who:_ `cib-dev`, `node-ops`
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed synthetic-events
+
given file refsfetched.json
+
given file set-rid
+
when I run bash radenv.sh env HOME=../homedir python3 set-rid refsfetched.json testy
+
when I run synthetic-events synt.sock refsfetched.json --log log.txt
+

+
given an installed cibtool
+
when I run bash radenv.sh cibtool event record --output node-events.json
+
when I run bash radenv.sh cibtool event broker --output broker-events.json node-events.json
+
then file broker-events.json contains "RefChanged""
+
~~~
+

+

+
## Filter recorded broker events
+

+
_What:_ Node operator can see what broker events an event filter
+
allow.
+

+
_Why:_ This is helpful so that node operators can see verify their
+
event filter works as they expect.
+

+
_Who:_ `cib-dev`, `node-ops`
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed synthetic-events
+
given file refsfetched.json
+
given file set-rid
+
when I run bash radenv.sh env HOME=../homedir python3 set-rid refsfetched.json testy
+
when I run synthetic-events synt.sock refsfetched.json --log log.txt
+

+
given an installed cibtool
+
when I run bash radenv.sh cibtool event record --output node-events.json
+
when I run bash radenv.sh cibtool event broker --output broker-events.json node-events.json
+

+
given file allow.yaml
+
when I run cibtool event filter  allow.yaml broker-events.json
+
then stdout contains "RefChanged"
+

+
given file deny.yaml
+
when I run cibtool event filter deny.yaml broker-events.json
+
then stdout is exactly ""
+
~~~
+

+
~~~{#allow.yaml .file .yaml}
+
filters:
+
- !Branch "main"
+
~~~
+

+
~~~{#deny.yaml .file .yaml}
+
filters:
+
- !Branch "this-does-not-exist"
+
~~~
+

# Acceptance criteria for logging

The CI broker writes log messages to its standard error output
modified src/bin/cibtool.rs
@@ -30,7 +30,8 @@ use radicle_git_ext::Oid;
use radicle_ci_broker::{
    broker::BrokerError,
    db::{Db, DbError, QueueId, QueuedEvent},
-
    event::BrokerEvent,
+
    event::{BrokerEvent, NodeEventError},
+
    logger,
    msg::{RunId, RunResult},
    notif::NotificationChannel,
    pages::PageError,
@@ -41,6 +42,7 @@ use radicle_ci_broker::{
mod cibtoolcmd;

fn main() {
+
    let _logger = logger::open();
    if let Err(e) = fallible_main() {
        eprintln!("ERROR: {}", e);
        let mut e = e.source();
@@ -86,7 +88,7 @@ struct Args {
    /// Name of the SQLite database file. The file will be created if
    /// it does not already exist. Locking is handled automatically.
    #[clap(long)]
-
    db: PathBuf,
+
    db: Option<PathBuf>,

    #[clap(subcommand)]
    cmd: Cmd,
@@ -94,7 +96,11 @@ struct Args {

impl Args {
    fn open_db(&self) -> Result<Db, CibToolError> {
-
        Ok(Db::new(&self.db)?)
+
        if let Some(filename) = &self.db {
+
            Ok(Db::new(filename)?)
+
        } else {
+
            Err(CibToolError::NoDb)
+
        }
    }
}

@@ -175,6 +181,9 @@ impl Subcommand for EventCmd {
            EventSubCmd::Count(x) => x.run(args)?,
            EventSubCmd::Show(x) => x.run(args)?,
            EventSubCmd::Remove(x) => x.run(args)?,
+
            EventSubCmd::Record(x) => x.run(args)?,
+
            EventSubCmd::Broker(x) => x.run(args)?,
+
            EventSubCmd::Filter(x) => x.run(args)?,
        }
        Ok(())
    }
@@ -190,6 +199,9 @@ enum EventSubCmd {
    Shutdown(cibtoolcmd::Shutdown),
    Show(cibtoolcmd::ShowEvent),
    Remove(cibtoolcmd::RemoveEvent),
+
    Record(cibtoolcmd::RecordEvents),
+
    Broker(cibtoolcmd::BrokerEvents),
+
    Filter(cibtoolcmd::FilterEvents),
}

#[derive(Parser)]
@@ -265,6 +277,9 @@ enum CibToolError {
    #[error("failed to serialize broker event to JSON: {0:#?}")]
    EventToJson(BrokerEvent, #[source] serde_json::Error),

+
    #[error("failed to serialize node event to JSON: {0:#?}")]
+
    NodeEevnetToJson(radicle::node::Event, #[source] serde_json::Error),
+

    #[error("failed to serialize list of stored broker event to JSON")]
    EventListToJson(#[source] serde_json::Error),

@@ -294,4 +309,43 @@ enum CibToolError {

    #[error(transparent)]
    Util(#[from] UtilError),
+

+
    #[error("no database file specified with the --db option")]
+
    NoDb,
+

+
    #[error("failed to subscribe to node events")]
+
    EventSubscribe(#[source] radicle_ci_broker::event::NodeEventError),
+

+
    #[error("failed to get next node event")]
+
    GetNodeEvent(#[source] radicle_ci_broker::event::NodeEventError),
+

+
    #[error("failed to create file for node events: {0}")]
+
    CreateEventsFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to write node event to file {0}")]
+
    WriteEvent(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to read node events from file {0}")]
+
    ReadEvents(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to read broker events from file {0}")]
+
    ReadBrokerEvents(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to read node events as UTF8 from file {0}")]
+
    NodeEventNotUtf8(PathBuf, #[source] std::string::FromUtf8Error),
+

+
    #[error("failed to read broker events as UTF8 from file {0}")]
+
    BrokerEventNotUtf8(PathBuf, #[source] std::string::FromUtf8Error),
+

+
    #[error("failed to read node events as JSON from file {0}")]
+
    JsonToNodeEvent(PathBuf, #[source] serde_json::Error),
+

+
    #[error("failed to create file for broker events: {0}")]
+
    CreateBrokerEventsFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to read filters from YAML file {0}")]
+
    ReadFilters(PathBuf, #[source] radicle_ci_broker::event::NodeEventError),
+

+
    #[error("failed to check if event is allowed: {0:#?}")]
+
    EventIsAllowed(BrokerEvent, #[source] NodeEventError),
}
modified src/bin/cibtoolcmd/event.rs
@@ -1,3 +1,7 @@
+
use std::io::Write;
+

+
use radicle_ci_broker::event::{EventFilter, NodeEventSource};
+

use super::*;

/// List events in the queue, waiting to be processed.
@@ -293,3 +297,163 @@ impl Leaf for Shutdown {
        Ok(())
    }
}
+

+
/// Record node events from the node.
+
///
+
/// The events are written to the standard output or to the specified
+
/// file, as one JSON object per line.
+
#[derive(Parser)]
+
pub struct RecordEvents {
+
    /// Write events to this file.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+
}
+

+
impl Leaf for RecordEvents {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let profile = util::load_profile()?;
+
        let mut source = NodeEventSource::new(&profile).map_err(CibToolError::EventSubscribe)?;
+
        let mut file = if let Some(filename) = &self.output {
+
            Some(
+
                std::fs::File::create(filename)
+
                    .map_err(|e| CibToolError::CreateEventsFile(filename.into(), e))?,
+
            )
+
        } else {
+
            None
+
        };
+

+
        loop {
+
            match source.node_event() {
+
                Err(e) => return Err(CibToolError::GetNodeEvent(e)),
+
                Ok(None) => break,
+
                Ok(Some(event)) => {
+
                    println!("got event {event:?}");
+
                    let mut json = serde_json::to_string(&event)
+
                        .map_err(|e| CibToolError::NodeEevnetToJson(event.clone(), e))?;
+
                    if let Some(file) = &mut file {
+
                        json.push('\n');
+
                        file.write_all(json.as_bytes()).map_err(|e| {
+
                            CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
+
                        })?;
+
                    } else {
+
                        println!("{json}");
+
                    }
+
                }
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
/// Convert node events into broker events.
+
///
+
/// Node events are read from the specified file. Note that one node
+
/// event can result in any number of broker events.
+
///
+
/// The events are written to the standard output or to the specified
+
/// file, as one JSON object per line.
+
#[derive(Parser)]
+
pub struct BrokerEvents {
+
    /// Write broker events to this file.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+

+
    /// Read node events from this file.
+
    input: PathBuf,
+
}
+

+
impl Leaf for BrokerEvents {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let bytes = std::fs::read(&self.input)
+
            .map_err(|e| CibToolError::ReadEvents(self.input.clone(), e))?;
+
        let text = String::from_utf8(bytes)
+
            .map_err(|e| CibToolError::NodeEventNotUtf8(self.input.clone(), e))?;
+

+
        let mut node_events = vec![];
+
        for line in text.lines() {
+
            let event: radicle::node::Event = serde_json::from_str(line)
+
                .map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
+
            node_events.push(event);
+
        }
+

+
        let mut broker_events = vec![];
+
        for node_event in node_events.iter() {
+
            if let Some(mut bes) = BrokerEvent::from_event(node_event) {
+
                broker_events.append(&mut bes);
+
            }
+
        }
+

+
        let mut jsons = vec![];
+
        for be in broker_events.iter() {
+
            jsons.push(
+
                serde_json::to_string(be)
+
                    .map_err(|err| CibToolError::EventToJson(be.clone(), err))?,
+
            );
+
        }
+

+
        if let Some(filename) = &self.output {
+
            let mut file = std::fs::File::create(filename)
+
                .map_err(|e| CibToolError::CreateBrokerEventsFile(filename.into(), e))?;
+
            for json in jsons {
+
                let json = format!("{json}\n");
+
                file.write_all(json.as_bytes()).map_err(|e| {
+
                    CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
+
                })?;
+
            }
+
        } else {
+
            for json in jsons {
+
                println!("{json}");
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
/// Filter broker events recorded in a file.
+
///
+
/// Those broker events allowed by the filter are written to the
+
/// standard output, as one JSON object per line.
+
#[derive(Parser)]
+
pub struct FilterEvents {
+
    /// Read event filter from this YAML file.
+
    filters: PathBuf,
+

+
    /// Read broker events from this file.
+
    input: PathBuf,
+
}
+

+
impl Leaf for FilterEvents {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let filters = EventFilter::from_yaml_file(&self.filters)
+
            .map_err(|e| CibToolError::ReadFilters(self.filters.clone(), e))?;
+

+
        let bytes = std::fs::read(&self.input)
+
            .map_err(|e| CibToolError::ReadBrokerEvents(self.input.clone(), e))?;
+
        let text = String::from_utf8(bytes)
+
            .map_err(|e| CibToolError::BrokerEventNotUtf8(self.input.clone(), e))?;
+

+
        let mut broker_events = vec![];
+
        for line in text.lines() {
+
            let event: BrokerEvent = serde_json::from_str(line)
+
                .map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
+
            broker_events.push(event);
+
        }
+

+
        for event in broker_events.iter() {
+
            for filter in filters.iter() {
+
                if event
+
                    .is_allowed(filter)
+
                    .map_err(|e| CibToolError::EventIsAllowed(event.clone(), e))?
+
                {
+
                    let json = serde_json::to_string_pretty(event)
+
                        .map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
+
                    println!("{json}");
+
                }
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
modified src/event.rs
@@ -89,6 +89,30 @@ impl NodeEventSource {
        Ok(true)
    }

+
    /// Get the next node event from an event source, without
+
    /// filtering. This will block until there is an event, or until
+
    /// there will be no more events from this source, or there's an
+
    /// error.
+
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
+
        if let Some(event) = self.events.next() {
+
            match event {
+
                Ok(event) => Ok(Some(event)),
+
                Err(radicle::node::Error::Io(err))
+
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
+
                {
+
                    logger::event_disconnected();
+
                    Err(NodeEventError::BrokenConnection)
+
                }
+
                Err(err) => {
+
                    logger::error("error reading event from node", &err);
+
                    Err(NodeEventError::Node(err))
+
                }
+
            }
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

    /// Get the allowed next event from an event source. This will
    /// block until there is an allowed event, or until there will be
    /// no more events from this source, or there's an error.
@@ -167,6 +191,10 @@ pub enum NodeEventError {
    #[error("failed to parser filters file: {0}")]
    FiltersJsonFile(PathBuf, #[source] serde_json::Error),

+
    /// An error parsing YAML as filters, when read from a file.
+
    #[error("failed to parser filters file: {0}")]
+
    FiltersYamlFile(PathBuf, #[source] serde_yml::Error),
+

    /// An error parsing JSON as filters, from an in-memory string.
    #[error("failed to parser filters as JSON")]
    FiltersJsonString(#[source] serde_json::Error),
@@ -256,6 +284,15 @@ impl EventFilter {
            .map_err(|e| NodeEventError::FiltersJsonFile(filename.into(), e))?;
        Ok(filters.filters)
    }
+

+
    /// Read filters from a YAML file.
+
    pub fn from_yaml_file(filename: &Path) -> Result<Vec<Self>, NodeEventError> {
+
        let filters =
+
            read(filename).map_err(|e| NodeEventError::ReadFilterFile(filename.into(), e))?;
+
        let filters: Filters = serde_yml::from_slice(&filters)
+
            .map_err(|e| NodeEventError::FiltersYamlFile(filename.into(), e))?;
+
        Ok(filters.filters)
+
    }
}

/// A set of filters for [`NodeEventSource`] to use. This struct
@@ -347,7 +384,7 @@ impl BrokerEvent {
    }

    /// Is this broker event allowed by a filter?
-
    fn is_allowed(&self, filter: &EventFilter) -> Result<bool, NodeEventError> {
+
    pub fn is_allowed(&self, filter: &EventFilter) -> Result<bool, NodeEventError> {
        let res = self.is_allowed_helper(filter)?;
        Ok(res)
    }