Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: subcommand "log" to format and filter cib logs from journald
Lars Wirzenius committed 1 year ago
commit 9f5fff09b01367008a2843f87f1b1f40deb85bf8
parent 3ce84a47c0300cacd98fc13766d58829570e64b1
6 files changed +375 -3
modified ci-broker.md
@@ -1446,6 +1446,47 @@ filters:
- !Branch "this-does-not-exist"
~~~

+
## Extract `cib` log from journald and pretty print
+

+
_Want:_ `cibtool` can extract `cib` log messages from the systemd
+
journal sub-system, and pretty print them, and optionally filter the
+
messages.
+

+
_Why:_ systemd is the common service manager for Linux systems, and it
+
needs to be convenient to extract `cib` log messages from its system
+
logging sub-system, journald. This is especially important for CI
+
broker developers who need to diagnose problems on remote `cib`
+
instances to which they don't have direct access.
+

+
_Who:_ `cib-devs`
+

+
~~~scenario
+
given a Radicle node, with CI configured with broker.yaml and adapter dummy.sh
+
given file journal.json
+

+
when I run cibtool log --journal journal.json --output x.json
+
when I run jq . x.json
+
then command is successful
+

+
when I run cibtool log --journal journal.json --broker-run-id 62c45727-a4d8-4a29-9dae-88c6e8b61655 --output x.json
+
when I run grep -c timestamp x.json
+
then stdout has one line
+
~~~
+

+
~~~{#journal.json .file .json}
+
{"__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31360;b=da4ba18425ea4e34bd0f0731f0270572;m=c3256a61;t=6290db5b772ca;x=280fbc307405cb81","_SYSTEMD_UNIT":"radicle-ci-broker.service","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_SYSTEMD_SLICE":"system.slice","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_PID":"526","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:00.276073Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_RUNTIME_SCOPE":"system","_CAP_EFFECTIVE":"0","_GID":"1001","_SELINUX_CONTEXT":"unconfined\n","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","__MONOTONIC_TIMESTAMP":"3274009185","PRIORITY":"6","_COMM":"cib","_UID":"1001","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_HOSTNAME":"radicle-ci","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","__REALTIME_TIMESTAMP":"1733988720276170","SYSLOG_IDENTIFIER":"cib","_TRANSPORT":"stdout","_EXE":"/usr/bin/cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","SYSLOG_FACILITY":"3"}
+
{"_GID":"1001","_RUNTIME_SCOPE":"system","_UID":"1001","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_PID":"526","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_CAP_EFFECTIVE":"0","_SELINUX_CONTEXT":"unconfined\n","_TRANSPORT":"stdout","SYSLOG_IDENTIFIER":"cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:01.276463Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_SYSTEMD_SLICE":"system.slice","__MONOTONIC_TIMESTAMP":"3275009597","PRIORITY":"6","SYSLOG_FACILITY":"3","_HOSTNAME":"radicle-ci","_EXE":"/usr/bin/cib","_COMM":"cib","_SYSTEMD_UNIT":"radicle-ci-broker.service","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31361;b=da4ba18425ea4e34bd0f0731f0270572;m=c334ae3d;t=6290db5c6b6a5;x=a9c49aef9ad2a509","__REALTIME_TIMESTAMP":"1733988721276581"}
+
{"__MONOTONIC_TIMESTAMP":"3276010022","SYSLOG_IDENTIFIER":"cib","_CAP_EFFECTIVE":"0","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_HOSTNAME":"radicle-ci","_EXE":"/usr/bin/cib","_UID":"1001","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_SYSTEMD_SLICE":"system.slice","_PID":"526","SYSLOG_FACILITY":"3","_COMM":"cib","PRIORITY":"6","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_SELINUX_CONTEXT":"unconfined\n","_TRANSPORT":"stdout","_SYSTEMD_UNIT":"radicle-ci-broker.service","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:02.276881Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31362;b=da4ba18425ea4e34bd0f0731f0270572;m=c343f226;t=6290db5d5fa8e;x=843418c04ac1932f","_GID":"1001","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__REALTIME_TIMESTAMP":"1733988722277006","_RUNTIME_SCOPE":"system","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events"}
+
{"SYSLOG_FACILITY":"3","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","__REALTIME_TIMESTAMP":"1733988723277782","_CAP_EFFECTIVE":"0","_PID":"526","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_EXE":"/usr/bin/cib","_COMM":"cib","_RUNTIME_SCOPE":"system","_UID":"1001","__MONOTONIC_TIMESTAMP":"3277010798","_TRANSPORT":"stdout","_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_SLICE":"system.slice","_SELINUX_CONTEXT":"unconfined\n","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:03.277600Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","PRIORITY":"6","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","SYSLOG_IDENTIFIER":"cib","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_GID":"1001","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31363;b=da4ba18425ea4e34bd0f0731f0270572;m=c353376e;t=6290db5e53fd6;x=f0840763406ccc13","_HOSTNAME":"radicle-ci","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572"}
+
{"_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31364;b=da4ba18425ea4e34bd0f0731f0270572;m=c3627c05;t=6290db5f4846c;x=5bded561567c656f","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:04.278174Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_RUNTIME_SCOPE":"system","_HOSTNAME":"radicle-ci","_GID":"1001","PRIORITY":"6","_SELINUX_CONTEXT":"unconfined\n","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","__MONOTONIC_TIMESTAMP":"3278011397","_UID":"1001","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","SYSLOG_IDENTIFIER":"cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_TRANSPORT":"stdout","_PID":"526","_COMM":"cib","__REALTIME_TIMESTAMP":"1733988724278380","SYSLOG_FACILITY":"3","_CAP_EFFECTIVE":"0","_SYSTEMD_SLICE":"system.slice","_EXE":"/usr/bin/cib","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e"}
+
{"PRIORITY":"6","SYSLOG_FACILITY":"3","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_TRANSPORT":"stdout","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_GID":"1001","__REALTIME_TIMESTAMP":"1733988725278778","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:05.278657Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_PID":"526","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_UNIT":"radicle-ci-broker.service","_EXE":"/usr/bin/cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","SYSLOG_IDENTIFIER":"cib","_HOSTNAME":"radicle-ci","_COMM":"cib","_SYSTEMD_SLICE":"system.slice","__MONOTONIC_TIMESTAMP":"3279011794","_RUNTIME_SCOPE":"system","_UID":"1001","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31367;b=da4ba18425ea4e34bd0f0731f0270572;m=c371bfd2;t=6290db603c83a;x=b50947c211c1edcb","_SELINUX_CONTEXT":"unconfined\n","_CAP_EFFECTIVE":"0"}
+
{"SYSLOG_IDENTIFIER":"cib","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_TRANSPORT":"stdout","_PID":"526","PRIORITY":"6","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_GID":"1001","_SYSTEMD_SLICE":"system.slice","__REALTIME_TIMESTAMP":"1733988726279175","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_COMM":"cib","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31368;b=da4ba18425ea4e34bd0f0731f0270572;m=c381039f;t=6290db6130c07;x=7b8169758b14d38c","_SELINUX_CONTEXT":"unconfined\n","_RUNTIME_SCOPE":"system","_EXE":"/usr/bin/cib","_CAP_EFFECTIVE":"0","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:06.279078Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_HOSTNAME":"radicle-ci","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_SYSTEMD_UNIT":"radicle-ci-broker.service","SYSLOG_FACILITY":"3","_UID":"1001","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__MONOTONIC_TIMESTAMP":"3280012191"}
+
{"MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:07.279440Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_EXE":"/usr/bin/cib","_RUNTIME_SCOPE":"system","_TRANSPORT":"stdout","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_SELINUX_CONTEXT":"unconfined\n","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_PID":"526","_HOSTNAME":"radicle-ci","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","__REALTIME_TIMESTAMP":"1733988727279541","_SYSTEMD_UNIT":"radicle-ci-broker.service","PRIORITY":"6","_GID":"1001","_UID":"1001","SYSLOG_FACILITY":"3","_COMM":"cib","SYSLOG_IDENTIFIER":"cib","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31369;b=da4ba18425ea4e34bd0f0731f0270572;m=c390474d;t=6290db6224fb5;x=faafb09a153285ff","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_CAP_EFFECTIVE":"0","_SYSTEMD_SLICE":"system.slice","__MONOTONIC_TIMESTAMP":"3281012557"}
+
{"_PID":"526","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_HOSTNAME":"radicle-ci","__MONOTONIC_TIMESTAMP":"3282012856","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_TRANSPORT":"stdout","SYSLOG_IDENTIFIER":"cib","_SYSTEMD_SLICE":"system.slice","_COMM":"cib","_CAP_EFFECTIVE":"0","_SELINUX_CONTEXT":"unconfined\n","__REALTIME_TIMESTAMP":"1733988728279841","_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_GID":"1001","SYSLOG_FACILITY":"3","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=3136a;b=da4ba18425ea4e34bd0f0731f0270572;m=c39f8ab8;t=6290db6319321;x=d04fd63cc7903199","PRIORITY":"6","_EXE":"/usr/bin/cib","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:08.279755Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_RUNTIME_SCOPE":"system","_UID":"1001"}
+
{"MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:09.280049Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_UID":"1001","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_HOSTNAME":"radicle-ci","_PID":"526","_TRANSPORT":"stdout","__REALTIME_TIMESTAMP":"1733988729280146","_SELINUX_CONTEXT":"unconfined\n","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_RUNTIME_SCOPE":"system","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_GID":"1001","PRIORITY":"6","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_CAP_EFFECTIVE":"0","_SYSTEMD_UNIT":"radicle-ci-broker.service","_EXE":"/usr/bin/cib","SYSLOG_IDENTIFIER":"cib","_COMM":"cib","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_SYSTEMD_SLICE":"system.slice","SYSLOG_FACILITY":"3","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=3136b;b=da4ba18425ea4e34bd0f0731f0270572;m=c3aece29;t=6290db640d692;x=b38aefb25148da02","__MONOTONIC_TIMESTAMP":"3283013161"}
+
{"__MONOTONIC_TIMESTAMP":"1284501864","_PID":"526","__REALTIME_TIMESTAMP":"1733986730768848","_EXE":"/usr/bin/cib","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=2d8f2;b=da4ba18425ea4e34bd0f0731f0270572;m=4c8ff168;t=6290d3f21f9d0;x=d9b8c7ef2053cc0f","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_CAP_EFFECTIVE":"0","_SYSTEMD_SLICE":"system.slice","_SELINUX_CONTEXT":"unconfined\n","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_UID":"1001","SYSLOG_IDENTIFIER":"cib","_RUNTIME_SCOPE":"system","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","SYSLOG_FACILITY":"3","_COMM":"cib","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_HOSTNAME":"radicle-ci","_GID":"1001","MESSAGE":"{\"timestamp\":\"2024-12-12T06:58:50.768787Z\",\"level\":\"INFO\",\"fields\":{\"message\":\"Finish CI run\",\"msg_id\":\"BrokerRunEnd\",\"kind\":\"finish_run\",\"run\":\"Run { broker_run_id: RunId { id: \\\"62c45727-a4d8-4a29-9dae-88c6e8b61655\\\" }, adapter_run_id: Some(RunId { id: \\\"fa233c62-51df-4812-865c-7b989915c1f3\\\" }), adapter_info_url: Some(\\\"http://radicle-ci/fa233c62-51df-4812-865c-7b989915c1f3/log.html\\\"), repo_id: RepoId(rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5), repo_name: \\\"heartwood\\\", timestamp: \\\"2024-12-12 06:58:03Z\\\", whence: Branch { name: \\\"master\\\", commit: Oid(d9c76893a144fd787654613f2bfb919613014a71), who: Some(\\\"did:key:z6MkiB8T5cBEQHnrs2MgjMVqvpSVj42X81HjKfFi2XBoMbtr (radicle-ci)\\\") }, state: Finished, result: Some(Failure) }\"},\"span\":{\"broker_run_id\":\"62c45727-a4d8-4a29-9dae-88c6e8b61655\",\"name\":\"execute_ci_run\"},\"spans\":[{\"broker_run_id\":\"62c45727-a4d8-4a29-9dae-88c6e8b61655\",\"name\":\"execute_ci_run\"}]}","_TRANSPORT":"stdout","PRIORITY":"6"}
+
~~~
+

# Acceptance criteria for logging

The CI broker writes log messages to its standard error output
modified src/bin/cibtool.rs
@@ -124,6 +124,7 @@ enum Cmd {
    Timeout(cibtoolcmd::TimeoutCmd),
    #[clap(hide = true)]
    Message(cibtoolcmd::MessageCmd),
+
    Log(cibtoolcmd::LogCmd),
}

impl Subcommand for Cmd {
@@ -136,6 +137,7 @@ impl Subcommand for Cmd {
            Self::Trigger(x) => x.run(args),
            Self::Timeout(x) => x.run(args),
            Self::Message(x) => x.run(args),
+
            Self::Log(x) => x.run(args),
        }
    }
}
@@ -374,4 +376,7 @@ enum CibToolError {

    #[error(transparent)]
    Message(#[from] cibtoolcmd::MessageError),
+

+
    #[error(transparent)]
+
    Log(#[from] cibtoolcmd::LogError),
}
added src/bin/cibtoolcmd/log.rs
@@ -0,0 +1,217 @@
+
use std::{
+
    convert::TryFrom,
+
    fs::{read, File},
+
    io::{Read, Write},
+
    path::{Path, PathBuf},
+
};
+

+
use logger::LogLevel;
+
use serde::Deserialize;
+
use serde_json::{Map, Value};
+

+
use super::*;
+

+
/// Format Radicle CI broker log files in a pretty way.
+
///
+
/// The journald output must be provided in the format produced by
+
/// `journalctl -u radicle-ci-broker -o json` (possibly with options
+
/// such as `-u radicle-ci-broker` or `-S today`), and not in ah the
+
/// default text format that's hard to parse.
+
#[derive(Debug, Parser)]
+
pub struct LogCmd {
+
    /// Read journalctl JSON output from this file. repository name.
+
    /// Default is to read from the standard input.
+
    #[clap(long)]
+
    journal: Option<PathBuf>,
+

+
    /// Include messages only if they are at least of this log level.
+
    /// Default is to allow any log level.
+
    #[clap(long)]
+
    log_level: Option<crate::logger::LogLevel>,
+

+
    /// Include messages only if they have this message kind. Can be
+
    /// used multiple times, and any of the kinds will do. Default is
+
    /// to allow every kind of message.
+
    #[clap(long)]
+
    kind: Vec<logger::Kind>,
+

+
    /// Include messages only if they relate to this CI run. The run
+
    /// ID is the broker run id. Default is to allow every CI run, and
+
    /// messages not related to a specific CI run.
+
    #[clap(long)]
+
    broker_run_id: Option<RunId>,
+

+
    /// Write output to this file. Default is to write to the standard
+
    /// output.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+
}
+

+
#[allow(clippy::unwrap_used)]
+
impl LogCmd {
+
    fn allowed(&self, msg: &Value) -> bool {
+
        self.allowed_by_log_level(msg)
+
            && self.allowed_by_kind(msg)
+
            && self.allowed_by_broker_run_id(msg)
+
    }
+

+
    fn allowed_by_log_level(&self, msg: &Value) -> bool {
+
        if let Some(wanted) = &self.log_level {
+
            if let Some(actual) = log_level(msg) {
+
                return actual >= *wanted;
+
            }
+
        }
+
        true
+
    }
+

+
    fn allowed_by_kind(&self, msg: &Value) -> bool {
+
        if !self.kind.is_empty() {
+
            if let Some(actual) = kind(msg) {
+
                if !self.kind.iter().any(|wanted| *wanted == actual) {
+
                    return false;
+
                }
+
            } else {
+
                return false;
+
            }
+
        }
+

+
        true
+
    }
+

+
    fn allowed_by_broker_run_id(&self, msg: &Value) -> bool {
+
        if let Some(wanted) = &self.broker_run_id {
+
            if let Some(actual) = broker_run_id(msg) {
+
                return actual == *wanted;
+
            } else {
+
                return false;
+
            }
+
        }
+
        true
+
    }
+
}
+

+
fn log_level(msg: &Value) -> Option<LogLevel> {
+
    map_get(msg, "level").map(|v| LogLevel::try_from(v).ok())?
+
}
+

+
fn kind(msg: &Value) -> Option<logger::Kind> {
+
    let fields = map_get(msg, "fields")?;
+
    map_get(&fields, "kind").map(|v| logger::Kind::try_from(v).ok())?
+
}
+

+
fn broker_run_id(msg: &Value) -> Option<RunId> {
+
    let span = map_get(msg, "span")?;
+
    map_get(&span, "broker_run_id").map(|v| RunId::try_from(v).ok())?
+
}
+

+
fn map_get(v: &Value, key: &str) -> Option<Value> {
+
    if let Value::Object(map) = v {
+
        map.get(key).cloned()
+
    } else {
+
        None
+
    }
+
}
+

+
fn pretty(msg: Value) -> Result<String, LogError> {
+
    serde_json::to_string_pretty(&msg).map_err(|err| LogError::JsonSer(msg, err))
+
}
+

+
impl Leaf for LogCmd {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let journal = if let Some(filename) = &self.journal {
+
            JournalLines::from_file(filename)?
+
        } else {
+
            JournalLines::from_stdin()?
+
        };
+

+
        let messages = journal.flatten().filter(|msg| self.allowed(msg));
+
        if let Some(filename) = &self.output {
+
            let mut file = File::create(filename)
+
                .map_err(|err| LogError::CreateOutput(filename.into(), err))?;
+
            for msg in messages {
+
                file.write_all(format!("{}\n", pretty(msg)?).as_bytes())
+
                    .map_err(|err| LogError::WriteOutput(filename.into(), err))?;
+
            }
+
        } else {
+
            for msg in messages {
+
                println!("{}", pretty(msg)?);
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
struct JournalLines {
+
    data: Vec<u8>,
+
    next: usize,
+
}
+

+
impl JournalLines {
+
    fn from_file(filename: &Path) -> Result<Self, LogError> {
+
        let data = read(filename).map_err(|err| LogError::Read(filename.into(), err))?;
+
        Ok(Self { data, next: 0 })
+
    }
+

+
    fn from_stdin() -> Result<Self, LogError> {
+
        let mut data = vec![];
+
        std::io::stdin()
+
            .read_to_end(&mut data)
+
            .map_err(LogError::ReadStdin)?;
+
        Ok(Self { data, next: 0 })
+
    }
+
}
+

+
impl Iterator for JournalLines {
+
    type Item = Result<serde_json::Value, LogError>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        let mut i = self.next;
+
        while i < self.data.len() {
+
            if self.data.get(i) == Some(&b'\n') {
+
                let line = String::from_utf8_lossy(&self.data[self.next..i]).to_string();
+
                self.next = i + 1;
+

+
                match serde_json::from_str::<JournalJson>(&line) {
+
                    Err(err) => return Some(Err(LogError::JsonParseJournal(line, err))),
+
                    Ok(jj) => match serde_json::from_str(&jj.message) {
+
                        Err(err) => return Some(Err(LogError::JsonParse(line, err))),
+
                        Ok(value) => return Some(Ok(value)),
+
                    },
+
                }
+
            }
+
            i += 1;
+
        }
+
        None
+
    }
+
}
+

+
#[derive(Debug, Deserialize)]
+
struct JournalJson {
+
    #[serde(rename = "MESSAGE")]
+
    message: String,
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum LogError {
+
    #[error("failed to read journal file {0}")]
+
    Read(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to read journal file from the standard input")]
+
    ReadStdin(#[source] std::io::Error),
+

+
    #[error("failed to parse a journald log line as JSON: {0:?}")]
+
    JsonParseJournal(String, #[source] serde_json::Error),
+

+
    #[error("failed to parse a log line as JSON: {0:?}")]
+
    JsonParse(String, #[source] serde_json::Error),
+

+
    #[error("failed to serialize a log line as pretty JSON: {0:?}")]
+
    JsonSer(serde_json::Value, #[source] serde_json::Error),
+

+
    #[error("failed to create output file {0}")]
+
    CreateOutput(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to write to output file {0}")]
+
    WriteOutput(PathBuf, #[source] std::io::Error),
+
}
modified src/bin/cibtoolcmd/mod.rs
@@ -26,3 +26,6 @@ pub use timeout::*;

mod message;
pub use message::*;
+

+
mod log;
+
pub use log::*;
modified src/logger.rs
@@ -4,6 +4,7 @@ use std::{path::Path, process::ExitStatus, time::Duration};

use clap::ValueEnum;
use radicle::{git::raw::Oid, identity::RepoId, node::Event};
+
use serde_json::Value;
use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

@@ -18,8 +19,18 @@ use crate::{
    run::Run,
};

-
// A unique type or kind for each log message.
-
enum Kind {
+
#[derive(Debug, thiserror::Error)]
+
pub enum KindError {
+
    #[error("unknown log message kind {0:?}")]
+
    Unknown(String),
+

+
    #[error("unknown log message kind {0:?}")]
+
    UnknownValue(Value),
+
}
+

+
/// A unique type or kind for each log message.
+
#[derive(Debug, Clone, ValueEnum, Eq, PartialEq)]
+
pub enum Kind {
    Debug,
    Startup,
    Shutdown,
@@ -45,6 +56,43 @@ impl std::fmt::Display for Kind {
    }
}

+
impl TryFrom<&str> for Kind {
+
    type Error = KindError;
+

+
    fn try_from(value: &str) -> Result<Self, Self::Error> {
+
        match value {
+
            "debug" => Ok(Self::Debug),
+
            "startup" => Ok(Self::Startup),
+
            "shutdown" => Ok(Self::Shutdown),
+
            "got_event" => Ok(Self::GotEvent),
+
            "start_run" => Ok(Self::StartRun),
+
            "finish_run" => Ok(Self::FinishRun),
+
            "adapter_message" => Ok(Self::AdapterMessage),
+
            _ => Err(KindError::Unknown(value.into())),
+
        }
+
    }
+
}
+

+
impl TryFrom<Value> for Kind {
+
    type Error = KindError;
+

+
    fn try_from(value: Value) -> Result<Self, Self::Error> {
+
        match value {
+
            Value::String(s) => match s.as_str() {
+
                "debug" => Ok(Self::Debug),
+
                "startup" => Ok(Self::Startup),
+
                "shutdown" => Ok(Self::Shutdown),
+
                "got_event" => Ok(Self::GotEvent),
+
                "start_run" => Ok(Self::StartRun),
+
                "finish_run" => Ok(Self::FinishRun),
+
                "adapter_message" => Ok(Self::AdapterMessage),
+
                _ => Err(KindError::Unknown(s)),
+
            },
+
            _ => Err(KindError::UnknownValue(value)),
+
        }
+
    }
+
}
+

// An identifier for the specific log message, to avoid having to
// match on message text.
#[derive(Debug, Copy, Clone)]
@@ -132,9 +180,18 @@ enum Id {
    TimeoutWaitStdoutReaderEnd,
}

+
#[derive(Debug, thiserror::Error)]
+
pub enum LogLevelError {
+
    #[error("unknown log level {0}")]
+
    UnknownLogLevel(String),
+

+
    #[error("unknown log level {0:?}")]
+
    UnknownLogLevelValue(Value),
+
}
+

// We define our own type for log levels so that we can apply
// clap::ValueEnum on it.
-
#[derive(ValueEnum, Eq, PartialEq, Copy, Clone, Debug)]
+
#[derive(ValueEnum, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
pub enum LogLevel {
    Trace,
    Debug,
@@ -143,6 +200,44 @@ pub enum LogLevel {
    Error,
}

+
impl TryFrom<&str> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(s: &str) -> Result<Self, LogLevelError> {
+
        match s {
+
            "TRACE" => Ok(Self::Trace),
+
            "DEBUG" => Ok(Self::Debug),
+
            "INFO" => Ok(Self::Info),
+
            "WARNING" => Ok(Self::Warning),
+
            "ERROR" => Ok(Self::Error),
+
            _ => Err(LogLevelError::UnknownLogLevel(s.into())),
+
        }
+
    }
+
}
+

+
impl TryFrom<String> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(s: String) -> Result<Self, LogLevelError> {
+
        Self::try_from(s.as_str())
+
    }
+
}
+

+
impl TryFrom<&Value> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(v: &Value) -> Result<Self, LogLevelError> {
+
        match v {
+
            Value::String(s) => Self::try_from(s.as_str()),
+
            _ => Err(LogLevelError::UnknownLogLevelValue(v.clone())),
+
        }
+
    }
+
}
+

+
impl TryFrom<Value> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(v: Value) -> Result<Self, LogLevelError> {
+
        Self::try_from(&v)
+
    }
+
}
+

impl std::fmt::Display for LogLevel {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s = match self {
modified src/msg.rs
@@ -17,6 +17,7 @@ use std::{
};

use serde::{Deserialize, Serialize};
+
use serde_json::Value;
use uuid::Uuid;

pub use radicle::{
@@ -75,6 +76,16 @@ impl From<&str> for RunId {
    }
}

+
impl TryFrom<Value> for RunId {
+
    type Error = ();
+
    fn try_from(id: Value) -> Result<Self, Self::Error> {
+
        match id {
+
            Value::String(s) => Ok(Self::from(s.as_str())),
+
            _ => Err(()),
+
        }
+
    }
+
}
+

impl fmt::Display for RunId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.id)