Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add cibtool subcommand for recording node events
Lars Wirzenius committed 1 year ago
commit 795dc1c97aa39e91f906a33d4ccb26449ea8248a
parent e5873fd0ea6e14351cf470ab7983740dde5ac004
4 files changed +129 -2
modified ci-broker.md
@@ -1124,6 +1124,32 @@ then stderr contains "rad:z3byzFpcfbMJBp4tKYyuuTZiP8WUB"

~~~

+
## Record node events
+

+
_What:_ Node operator can record node events into a file.
+

+
_Why:_ This can be helpful for remote debugging, it's very helpful for
+
CI broker development to see what events actually happen, and it's
+
useful for gathering data for trying out event filters.
+

+
_Who:_ `cib-devs`, `node-ops`
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed synthetic-events
+
given file refsfetched.json
+
given file set-rid
+
when I run bash radenv.sh env HOME=../homedir python3 set-rid refsfetched.json testy
+
when I run synthetic-events synt.sock refsfetched.json --log log.txt
+

+
given an installed cibtool
+
when I run bash radenv.sh cibtool event record --output events.json
+
then file events.json contains ""type":"refsFetched""
+
~~~
+

# Acceptance criteria for logging

The CI broker writes log messages to its standard error output
modified src/bin/cibtool.rs
@@ -31,6 +31,7 @@ use radicle_ci_broker::{
    broker::BrokerError,
    db::{Db, DbError, QueueId, QueuedEvent},
    event::BrokerEvent,
+
    logger,
    msg::{RunId, RunResult},
    notif::NotificationChannel,
    pages::PageError,
@@ -41,6 +42,7 @@ use radicle_ci_broker::{
mod cibtoolcmd;

fn main() {
+
    let _logger = logger::open();
    if let Err(e) = fallible_main() {
        eprintln!("ERROR: {}", e);
        let mut e = e.source();
@@ -86,7 +88,7 @@ struct Args {
    /// Name of the SQLite database file. The file will be created if
    /// it does not already exist. Locking is handled automatically.
    #[clap(long)]
-
    db: PathBuf,
+
    db: Option<PathBuf>,

    #[clap(subcommand)]
    cmd: Cmd,
@@ -94,7 +96,11 @@ struct Args {

impl Args {
    fn open_db(&self) -> Result<Db, CibToolError> {
-
        Ok(Db::new(&self.db)?)
+
        if let Some(filename) = &self.db {
+
            Ok(Db::new(filename)?)
+
        } else {
+
            Err(CibToolError::NoDb)
+
        }
    }
}

@@ -175,6 +181,7 @@ impl Subcommand for EventCmd {
            EventSubCmd::Count(x) => x.run(args)?,
            EventSubCmd::Show(x) => x.run(args)?,
            EventSubCmd::Remove(x) => x.run(args)?,
+
            EventSubCmd::Record(x) => x.run(args)?,
        }
        Ok(())
    }
@@ -190,6 +197,7 @@ enum EventSubCmd {
    Shutdown(cibtoolcmd::Shutdown),
    Show(cibtoolcmd::ShowEvent),
    Remove(cibtoolcmd::RemoveEvent),
+
    Record(cibtoolcmd::RecordEvents),
}

#[derive(Parser)]
@@ -265,6 +273,9 @@ enum CibToolError {
    #[error("failed to serialize broker event to JSON: {0:#?}")]
    EventToJson(BrokerEvent, #[source] serde_json::Error),

+
    #[error("failed to serialize node event to JSON: {0:#?}")]
+
    NodeEevnetToJson(radicle::node::Event, #[source] serde_json::Error),
+

    #[error("failed to serialize list of stored broker event to JSON")]
    EventListToJson(#[source] serde_json::Error),

@@ -294,4 +305,19 @@ enum CibToolError {

    #[error(transparent)]
    Util(#[from] UtilError),
+

+
    #[error("no database file specified with the --db option")]
+
    NoDb,
+

+
    #[error("failed to subscribe to node events")]
+
    EventSubscribe(#[source] radicle_ci_broker::event::NodeEventError),
+

+
    #[error("failed to get next node event")]
+
    GetNodeEvent(#[source] radicle_ci_broker::event::NodeEventError),
+

+
    #[error("failed to create file for node events: {0}")]
+
    CreateEventsFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to write node event to file {0}")]
+
    WriteEvent(PathBuf, #[source] std::io::Error),
}
modified src/bin/cibtoolcmd/event.rs
@@ -1,3 +1,7 @@
+
use std::io::Write;
+

+
use radicle_ci_broker::event::NodeEventSource;
+

use super::*;

/// List events in the queue, waiting to be processed.
@@ -293,3 +297,50 @@ impl Leaf for Shutdown {
        Ok(())
    }
}
+

+
/// Record node events from the node.
+
///
+
/// The events are written to the standard output or to the specified
+
/// file, as one JSON object per line.
+
#[derive(Parser)]
+
pub struct RecordEvents {
+
    /// Write events to this file.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+
}
+

+
impl Leaf for RecordEvents {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let profile = util::load_profile()?;
+
        let mut source = NodeEventSource::new(&profile).map_err(CibToolError::EventSubscribe)?;
+
        let mut file = if let Some(filename) = &self.output {
+
            Some(
+
                std::fs::File::create(filename)
+
                    .map_err(|e| CibToolError::CreateEventsFile(filename.into(), e))?,
+
            )
+
        } else {
+
            None
+
        };
+

+
        loop {
+
            match source.node_event() {
+
                Err(e) => return Err(CibToolError::GetNodeEvent(e)),
+
                Ok(None) => break,
+
                Ok(Some(event)) => {
+
                    println!("got event {event:?}");
+
                    let json = serde_json::to_string(&event)
+
                        .map_err(|e| CibToolError::NodeEevnetToJson(event.clone(), e))?;
+
                    if let Some(file) = &mut file {
+
                        file.write_all(json.as_bytes()).map_err(|e| {
+
                            CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
+
                        })?;
+
                    } else {
+
                        println!("{json}");
+
                    }
+
                }
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
modified src/event.rs
@@ -89,6 +89,30 @@ impl NodeEventSource {
        Ok(true)
    }

+
    /// Get the next node event from an event source, without
+
    /// filtering. This will block until there is an event, or until
+
    /// there will be no more events from this source, or there's an
+
    /// error.
+
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
+
        if let Some(event) = self.events.next() {
+
            match event {
+
                Ok(event) => Ok(Some(event)),
+
                Err(radicle::node::Error::Io(err))
+
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
+
                {
+
                    logger::event_disconnected();
+
                    Err(NodeEventError::BrokenConnection)
+
                }
+
                Err(err) => {
+
                    logger::error("error reading event from node", &err);
+
                    Err(NodeEventError::Node(err))
+
                }
+
            }
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

    /// Get the allowed next event from an event source. This will
    /// block until there is an allowed event, or until there will be
    /// no more events from this source, or there's an error.