Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: subcommand "log" to format and filter cib logs from journald
Merged liw opened 1 year ago

Add subcommand “cibtool log” to extract cib log messages from journald logs, and pretty print them. Optionally with some very rudimentary filtering based on log level, message kind, or broker run id.

Example:

sudo journalctl -S today -u radicle-ci-broker -o json | cibtool log --log-level info

Signed-off-by: Lars Wirzenius liw@liw.fi

6 files changed +375 -3 3ce84a47 9f5fff09
modified ci-broker.md
@@ -1446,6 +1446,47 @@ filters:
- !Branch "this-does-not-exist"
~~~

+
## Extract `cib` log from journald and pretty print
+

+
_Want:_ `cibtool` can extract `cib` log messages from the systemd
+
journal sub-system, and pretty print them, and optionally filter the
+
messages.
+

+
_Why:_ systemd is the common service manager for Linux systems, and it
+
needs to be convenient to extract `cib` log messages from its system
+
logging sub-system, journald. This is especially important for CI
+
broker developers who need to diagnose problems on remote `cib`
+
instances to which they don't have direct access.
+

+
_Who:_ `cib-devs`
+

+
~~~scenario
+
given a Radicle node, with CI configured with broker.yaml and adapter dummy.sh
+
given file journal.json
+

+
when I run cibtool log --journal journal.json --output x.json
+
when I run jq . x.json
+
then command is successful
+

+
when I run cibtool log --journal journal.json --broker-run-id 62c45727-a4d8-4a29-9dae-88c6e8b61655 --output x.json
+
when I run grep -c timestamp x.json
+
then stdout has one line
+
~~~
+

+
~~~{#journal.json .file .json}
+
{"__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31360;b=da4ba18425ea4e34bd0f0731f0270572;m=c3256a61;t=6290db5b772ca;x=280fbc307405cb81","_SYSTEMD_UNIT":"radicle-ci-broker.service","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_SYSTEMD_SLICE":"system.slice","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_PID":"526","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:00.276073Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_RUNTIME_SCOPE":"system","_CAP_EFFECTIVE":"0","_GID":"1001","_SELINUX_CONTEXT":"unconfined\n","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","__MONOTONIC_TIMESTAMP":"3274009185","PRIORITY":"6","_COMM":"cib","_UID":"1001","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_HOSTNAME":"radicle-ci","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","__REALTIME_TIMESTAMP":"1733988720276170","SYSLOG_IDENTIFIER":"cib","_TRANSPORT":"stdout","_EXE":"/usr/bin/cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","SYSLOG_FACILITY":"3"}
+
{"_GID":"1001","_RUNTIME_SCOPE":"system","_UID":"1001","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_PID":"526","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_CAP_EFFECTIVE":"0","_SELINUX_CONTEXT":"unconfined\n","_TRANSPORT":"stdout","SYSLOG_IDENTIFIER":"cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:01.276463Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_SYSTEMD_SLICE":"system.slice","__MONOTONIC_TIMESTAMP":"3275009597","PRIORITY":"6","SYSLOG_FACILITY":"3","_HOSTNAME":"radicle-ci","_EXE":"/usr/bin/cib","_COMM":"cib","_SYSTEMD_UNIT":"radicle-ci-broker.service","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31361;b=da4ba18425ea4e34bd0f0731f0270572;m=c334ae3d;t=6290db5c6b6a5;x=a9c49aef9ad2a509","__REALTIME_TIMESTAMP":"1733988721276581"}
+
{"__MONOTONIC_TIMESTAMP":"3276010022","SYSLOG_IDENTIFIER":"cib","_CAP_EFFECTIVE":"0","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_HOSTNAME":"radicle-ci","_EXE":"/usr/bin/cib","_UID":"1001","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_SYSTEMD_SLICE":"system.slice","_PID":"526","SYSLOG_FACILITY":"3","_COMM":"cib","PRIORITY":"6","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_SELINUX_CONTEXT":"unconfined\n","_TRANSPORT":"stdout","_SYSTEMD_UNIT":"radicle-ci-broker.service","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:02.276881Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31362;b=da4ba18425ea4e34bd0f0731f0270572;m=c343f226;t=6290db5d5fa8e;x=843418c04ac1932f","_GID":"1001","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__REALTIME_TIMESTAMP":"1733988722277006","_RUNTIME_SCOPE":"system","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events"}
+
{"SYSLOG_FACILITY":"3","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","__REALTIME_TIMESTAMP":"1733988723277782","_CAP_EFFECTIVE":"0","_PID":"526","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_EXE":"/usr/bin/cib","_COMM":"cib","_RUNTIME_SCOPE":"system","_UID":"1001","__MONOTONIC_TIMESTAMP":"3277010798","_TRANSPORT":"stdout","_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_SLICE":"system.slice","_SELINUX_CONTEXT":"unconfined\n","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:03.277600Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","PRIORITY":"6","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","SYSLOG_IDENTIFIER":"cib","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_GID":"1001","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31363;b=da4ba18425ea4e34bd0f0731f0270572;m=c353376e;t=6290db5e53fd6;x=f0840763406ccc13","_HOSTNAME":"radicle-ci","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572"}
+
{"_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31364;b=da4ba18425ea4e34bd0f0731f0270572;m=c3627c05;t=6290db5f4846c;x=5bded561567c656f","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:04.278174Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_RUNTIME_SCOPE":"system","_HOSTNAME":"radicle-ci","_GID":"1001","PRIORITY":"6","_SELINUX_CONTEXT":"unconfined\n","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","__MONOTONIC_TIMESTAMP":"3278011397","_UID":"1001","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","SYSLOG_IDENTIFIER":"cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_TRANSPORT":"stdout","_PID":"526","_COMM":"cib","__REALTIME_TIMESTAMP":"1733988724278380","SYSLOG_FACILITY":"3","_CAP_EFFECTIVE":"0","_SYSTEMD_SLICE":"system.slice","_EXE":"/usr/bin/cib","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e"}
+
{"PRIORITY":"6","SYSLOG_FACILITY":"3","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_TRANSPORT":"stdout","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_GID":"1001","__REALTIME_TIMESTAMP":"1733988725278778","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:05.278657Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_PID":"526","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_UNIT":"radicle-ci-broker.service","_EXE":"/usr/bin/cib","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","SYSLOG_IDENTIFIER":"cib","_HOSTNAME":"radicle-ci","_COMM":"cib","_SYSTEMD_SLICE":"system.slice","__MONOTONIC_TIMESTAMP":"3279011794","_RUNTIME_SCOPE":"system","_UID":"1001","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31367;b=da4ba18425ea4e34bd0f0731f0270572;m=c371bfd2;t=6290db603c83a;x=b50947c211c1edcb","_SELINUX_CONTEXT":"unconfined\n","_CAP_EFFECTIVE":"0"}
+
{"SYSLOG_IDENTIFIER":"cib","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_TRANSPORT":"stdout","_PID":"526","PRIORITY":"6","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_GID":"1001","_SYSTEMD_SLICE":"system.slice","__REALTIME_TIMESTAMP":"1733988726279175","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_COMM":"cib","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31368;b=da4ba18425ea4e34bd0f0731f0270572;m=c381039f;t=6290db6130c07;x=7b8169758b14d38c","_SELINUX_CONTEXT":"unconfined\n","_RUNTIME_SCOPE":"system","_EXE":"/usr/bin/cib","_CAP_EFFECTIVE":"0","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:06.279078Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_HOSTNAME":"radicle-ci","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_SYSTEMD_UNIT":"radicle-ci-broker.service","SYSLOG_FACILITY":"3","_UID":"1001","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","__MONOTONIC_TIMESTAMP":"3280012191"}
+
{"MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:07.279440Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_EXE":"/usr/bin/cib","_RUNTIME_SCOPE":"system","_TRANSPORT":"stdout","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_SELINUX_CONTEXT":"unconfined\n","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_PID":"526","_HOSTNAME":"radicle-ci","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","__REALTIME_TIMESTAMP":"1733988727279541","_SYSTEMD_UNIT":"radicle-ci-broker.service","PRIORITY":"6","_GID":"1001","_UID":"1001","SYSLOG_FACILITY":"3","_COMM":"cib","SYSLOG_IDENTIFIER":"cib","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=31369;b=da4ba18425ea4e34bd0f0731f0270572;m=c390474d;t=6290db6224fb5;x=faafb09a153285ff","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_CAP_EFFECTIVE":"0","_SYSTEMD_SLICE":"system.slice","__MONOTONIC_TIMESTAMP":"3281012557"}
+
{"_PID":"526","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_HOSTNAME":"radicle-ci","__MONOTONIC_TIMESTAMP":"3282012856","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_TRANSPORT":"stdout","SYSLOG_IDENTIFIER":"cib","_SYSTEMD_SLICE":"system.slice","_COMM":"cib","_CAP_EFFECTIVE":"0","_SELINUX_CONTEXT":"unconfined\n","__REALTIME_TIMESTAMP":"1733988728279841","_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_GID":"1001","SYSLOG_FACILITY":"3","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=3136a;b=da4ba18425ea4e34bd0f0731f0270572;m=c39f8ab8;t=6290db6319321;x=d04fd63cc7903199","PRIORITY":"6","_EXE":"/usr/bin/cib","MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:08.279755Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_RUNTIME_SCOPE":"system","_UID":"1001"}
+
{"MESSAGE":"{\"timestamp\":\"2024-12-12T07:32:09.280049Z\",\"level\":\"TRACE\",\"fields\":{\"message\":\"event queue length\",\"msg_id\":\"QueueProcQueueLength\",\"kind\":\"debug\",\"len\":\"0\"}}","_UID":"1001","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_HOSTNAME":"radicle-ci","_PID":"526","_TRANSPORT":"stdout","__REALTIME_TIMESTAMP":"1733988729280146","_SELINUX_CONTEXT":"unconfined\n","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_RUNTIME_SCOPE":"system","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_GID":"1001","PRIORITY":"6","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_CAP_EFFECTIVE":"0","_SYSTEMD_UNIT":"radicle-ci-broker.service","_EXE":"/usr/bin/cib","SYSLOG_IDENTIFIER":"cib","_COMM":"cib","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","_SYSTEMD_SLICE":"system.slice","SYSLOG_FACILITY":"3","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=3136b;b=da4ba18425ea4e34bd0f0731f0270572;m=c3aece29;t=6290db640d692;x=b38aefb25148da02","__MONOTONIC_TIMESTAMP":"3283013161"}
+
{"__MONOTONIC_TIMESTAMP":"1284501864","_PID":"526","__REALTIME_TIMESTAMP":"1733986730768848","_EXE":"/usr/bin/cib","__CURSOR":"s=f30566e7c34e4ba190c6c671ff826507;i=2d8f2;b=da4ba18425ea4e34bd0f0731f0270572;m=4c8ff168;t=6290d3f21f9d0;x=d9b8c7ef2053cc0f","_SYSTEMD_CGROUP":"/system.slice/radicle-ci-broker.service","_CAP_EFFECTIVE":"0","_SYSTEMD_SLICE":"system.slice","_SELINUX_CONTEXT":"unconfined\n","_CMDLINE":"/bin/cib --log-level trace --config /home/_rad/ci-broker.yaml process-events","_UID":"1001","SYSLOG_IDENTIFIER":"cib","_RUNTIME_SCOPE":"system","_BOOT_ID":"da4ba18425ea4e34bd0f0731f0270572","_MACHINE_ID":"35c654cc069c402d8cb0b34b91f12e5e","_SYSTEMD_UNIT":"radicle-ci-broker.service","_SYSTEMD_INVOCATION_ID":"949b57247eb24bb5aefcec93f049beab","SYSLOG_FACILITY":"3","_COMM":"cib","_STREAM_ID":"9c46f4f6f0074e07986a0c3769f6545e","_HOSTNAME":"radicle-ci","_GID":"1001","MESSAGE":"{\"timestamp\":\"2024-12-12T06:58:50.768787Z\",\"level\":\"INFO\",\"fields\":{\"message\":\"Finish CI run\",\"msg_id\":\"BrokerRunEnd\",\"kind\":\"finish_run\",\"run\":\"Run { broker_run_id: RunId { id: \\\"62c45727-a4d8-4a29-9dae-88c6e8b61655\\\" }, adapter_run_id: Some(RunId { id: \\\"fa233c62-51df-4812-865c-7b989915c1f3\\\" }), adapter_info_url: Some(\\\"http://radicle-ci/fa233c62-51df-4812-865c-7b989915c1f3/log.html\\\"), repo_id: RepoId(rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5), repo_name: \\\"heartwood\\\", timestamp: \\\"2024-12-12 06:58:03Z\\\", whence: Branch { name: \\\"master\\\", commit: Oid(d9c76893a144fd787654613f2bfb919613014a71), who: Some(\\\"did:key:z6MkiB8T5cBEQHnrs2MgjMVqvpSVj42X81HjKfFi2XBoMbtr (radicle-ci)\\\") }, state: Finished, result: Some(Failure) }\"},\"span\":{\"broker_run_id\":\"62c45727-a4d8-4a29-9dae-88c6e8b61655\",\"name\":\"execute_ci_run\"},\"spans\":[{\"broker_run_id\":\"62c45727-a4d8-4a29-9dae-88c6e8b61655\",\"name\":\"execute_ci_run\"}]}","_TRANSPORT":"stdout","PRIORITY":"6"}
+
~~~
+

# Acceptance criteria for logging

The CI broker writes log messages to its standard error output
modified src/bin/cibtool.rs
@@ -124,6 +124,7 @@ enum Cmd {
    Timeout(cibtoolcmd::TimeoutCmd),
    #[clap(hide = true)]
    Message(cibtoolcmd::MessageCmd),
+
    Log(cibtoolcmd::LogCmd),
}

impl Subcommand for Cmd {
@@ -136,6 +137,7 @@ impl Subcommand for Cmd {
            Self::Trigger(x) => x.run(args),
            Self::Timeout(x) => x.run(args),
            Self::Message(x) => x.run(args),
+
            Self::Log(x) => x.run(args),
        }
    }
}
@@ -374,4 +376,7 @@ enum CibToolError {

    #[error(transparent)]
    Message(#[from] cibtoolcmd::MessageError),
+

+
    #[error(transparent)]
+
    Log(#[from] cibtoolcmd::LogError),
}
added src/bin/cibtoolcmd/log.rs
@@ -0,0 +1,217 @@
+
use std::{
+
    convert::TryFrom,
+
    fs::{read, File},
+
    io::{Read, Write},
+
    path::{Path, PathBuf},
+
};
+

+
use logger::LogLevel;
+
use serde::Deserialize;
+
use serde_json::{Map, Value};
+

+
use super::*;
+

+
/// Format Radicle CI broker log files in a pretty way.
+
///
+
/// The journald output must be provided in the format produced by
+
/// `journalctl -u radicle-ci-broker -o json` (possibly with options
+
/// such as `-u radicle-ci-broker` or `-S today`), and not in ah the
+
/// default text format that's hard to parse.
+
#[derive(Debug, Parser)]
+
pub struct LogCmd {
+
    /// Read journalctl JSON output from this file. repository name.
+
    /// Default is to read from the standard input.
+
    #[clap(long)]
+
    journal: Option<PathBuf>,
+

+
    /// Include messages only if they are at least of this log level.
+
    /// Default is to allow any log level.
+
    #[clap(long)]
+
    log_level: Option<crate::logger::LogLevel>,
+

+
    /// Include messages only if they have this message kind. Can be
+
    /// used multiple times, and any of the kinds will do. Default is
+
    /// to allow every kind of message.
+
    #[clap(long)]
+
    kind: Vec<logger::Kind>,
+

+
    /// Include messages only if they relate to this CI run. The run
+
    /// ID is the broker run id. Default is to allow every CI run, and
+
    /// messages not related to a specific CI run.
+
    #[clap(long)]
+
    broker_run_id: Option<RunId>,
+

+
    /// Write output to this file. Default is to write to the standard
+
    /// output.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+
}
+

+
#[allow(clippy::unwrap_used)]
+
impl LogCmd {
+
    fn allowed(&self, msg: &Value) -> bool {
+
        self.allowed_by_log_level(msg)
+
            && self.allowed_by_kind(msg)
+
            && self.allowed_by_broker_run_id(msg)
+
    }
+

+
    fn allowed_by_log_level(&self, msg: &Value) -> bool {
+
        if let Some(wanted) = &self.log_level {
+
            if let Some(actual) = log_level(msg) {
+
                return actual >= *wanted;
+
            }
+
        }
+
        true
+
    }
+

+
    fn allowed_by_kind(&self, msg: &Value) -> bool {
+
        if !self.kind.is_empty() {
+
            if let Some(actual) = kind(msg) {
+
                if !self.kind.iter().any(|wanted| *wanted == actual) {
+
                    return false;
+
                }
+
            } else {
+
                return false;
+
            }
+
        }
+

+
        true
+
    }
+

+
    fn allowed_by_broker_run_id(&self, msg: &Value) -> bool {
+
        if let Some(wanted) = &self.broker_run_id {
+
            if let Some(actual) = broker_run_id(msg) {
+
                return actual == *wanted;
+
            } else {
+
                return false;
+
            }
+
        }
+
        true
+
    }
+
}
+

+
fn log_level(msg: &Value) -> Option<LogLevel> {
+
    map_get(msg, "level").map(|v| LogLevel::try_from(v).ok())?
+
}
+

+
fn kind(msg: &Value) -> Option<logger::Kind> {
+
    let fields = map_get(msg, "fields")?;
+
    map_get(&fields, "kind").map(|v| logger::Kind::try_from(v).ok())?
+
}
+

+
fn broker_run_id(msg: &Value) -> Option<RunId> {
+
    let span = map_get(msg, "span")?;
+
    map_get(&span, "broker_run_id").map(|v| RunId::try_from(v).ok())?
+
}
+

+
fn map_get(v: &Value, key: &str) -> Option<Value> {
+
    if let Value::Object(map) = v {
+
        map.get(key).cloned()
+
    } else {
+
        None
+
    }
+
}
+

+
fn pretty(msg: Value) -> Result<String, LogError> {
+
    serde_json::to_string_pretty(&msg).map_err(|err| LogError::JsonSer(msg, err))
+
}
+

+
impl Leaf for LogCmd {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let journal = if let Some(filename) = &self.journal {
+
            JournalLines::from_file(filename)?
+
        } else {
+
            JournalLines::from_stdin()?
+
        };
+

+
        let messages = journal.flatten().filter(|msg| self.allowed(msg));
+
        if let Some(filename) = &self.output {
+
            let mut file = File::create(filename)
+
                .map_err(|err| LogError::CreateOutput(filename.into(), err))?;
+
            for msg in messages {
+
                file.write_all(format!("{}\n", pretty(msg)?).as_bytes())
+
                    .map_err(|err| LogError::WriteOutput(filename.into(), err))?;
+
            }
+
        } else {
+
            for msg in messages {
+
                println!("{}", pretty(msg)?);
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
struct JournalLines {
+
    data: Vec<u8>,
+
    next: usize,
+
}
+

+
impl JournalLines {
+
    fn from_file(filename: &Path) -> Result<Self, LogError> {
+
        let data = read(filename).map_err(|err| LogError::Read(filename.into(), err))?;
+
        Ok(Self { data, next: 0 })
+
    }
+

+
    fn from_stdin() -> Result<Self, LogError> {
+
        let mut data = vec![];
+
        std::io::stdin()
+
            .read_to_end(&mut data)
+
            .map_err(LogError::ReadStdin)?;
+
        Ok(Self { data, next: 0 })
+
    }
+
}
+

+
impl Iterator for JournalLines {
+
    type Item = Result<serde_json::Value, LogError>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        let mut i = self.next;
+
        while i < self.data.len() {
+
            if self.data.get(i) == Some(&b'\n') {
+
                let line = String::from_utf8_lossy(&self.data[self.next..i]).to_string();
+
                self.next = i + 1;
+

+
                match serde_json::from_str::<JournalJson>(&line) {
+
                    Err(err) => return Some(Err(LogError::JsonParseJournal(line, err))),
+
                    Ok(jj) => match serde_json::from_str(&jj.message) {
+
                        Err(err) => return Some(Err(LogError::JsonParse(line, err))),
+
                        Ok(value) => return Some(Ok(value)),
+
                    },
+
                }
+
            }
+
            i += 1;
+
        }
+
        None
+
    }
+
}
+

+
#[derive(Debug, Deserialize)]
+
struct JournalJson {
+
    #[serde(rename = "MESSAGE")]
+
    message: String,
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum LogError {
+
    #[error("failed to read journal file {0}")]
+
    Read(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to read journal file from the standard input")]
+
    ReadStdin(#[source] std::io::Error),
+

+
    #[error("failed to parse a journald log line as JSON: {0:?}")]
+
    JsonParseJournal(String, #[source] serde_json::Error),
+

+
    #[error("failed to parse a log line as JSON: {0:?}")]
+
    JsonParse(String, #[source] serde_json::Error),
+

+
    #[error("failed to serialize a log line as pretty JSON: {0:?}")]
+
    JsonSer(serde_json::Value, #[source] serde_json::Error),
+

+
    #[error("failed to create output file {0}")]
+
    CreateOutput(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to write to output file {0}")]
+
    WriteOutput(PathBuf, #[source] std::io::Error),
+
}
modified src/bin/cibtoolcmd/mod.rs
@@ -26,3 +26,6 @@ pub use timeout::*;

mod message;
pub use message::*;
+

+
mod log;
+
pub use log::*;
modified src/logger.rs
@@ -4,6 +4,7 @@ use std::{path::Path, process::ExitStatus, time::Duration};

use clap::ValueEnum;
use radicle::{git::raw::Oid, identity::RepoId, node::Event};
+
use serde_json::Value;
use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

@@ -18,8 +19,18 @@ use crate::{
    run::Run,
};

-
// A unique type or kind for each log message.
-
enum Kind {
+
#[derive(Debug, thiserror::Error)]
+
pub enum KindError {
+
    #[error("unknown log message kind {0:?}")]
+
    Unknown(String),
+

+
    #[error("unknown log message kind {0:?}")]
+
    UnknownValue(Value),
+
}
+

+
/// A unique type or kind for each log message.
+
#[derive(Debug, Clone, ValueEnum, Eq, PartialEq)]
+
pub enum Kind {
    Debug,
    Startup,
    Shutdown,
@@ -45,6 +56,43 @@ impl std::fmt::Display for Kind {
    }
}

+
impl TryFrom<&str> for Kind {
+
    type Error = KindError;
+

+
    fn try_from(value: &str) -> Result<Self, Self::Error> {
+
        match value {
+
            "debug" => Ok(Self::Debug),
+
            "startup" => Ok(Self::Startup),
+
            "shutdown" => Ok(Self::Shutdown),
+
            "got_event" => Ok(Self::GotEvent),
+
            "start_run" => Ok(Self::StartRun),
+
            "finish_run" => Ok(Self::FinishRun),
+
            "adapter_message" => Ok(Self::AdapterMessage),
+
            _ => Err(KindError::Unknown(value.into())),
+
        }
+
    }
+
}
+

+
impl TryFrom<Value> for Kind {
+
    type Error = KindError;
+

+
    fn try_from(value: Value) -> Result<Self, Self::Error> {
+
        match value {
+
            Value::String(s) => match s.as_str() {
+
                "debug" => Ok(Self::Debug),
+
                "startup" => Ok(Self::Startup),
+
                "shutdown" => Ok(Self::Shutdown),
+
                "got_event" => Ok(Self::GotEvent),
+
                "start_run" => Ok(Self::StartRun),
+
                "finish_run" => Ok(Self::FinishRun),
+
                "adapter_message" => Ok(Self::AdapterMessage),
+
                _ => Err(KindError::Unknown(s)),
+
            },
+
            _ => Err(KindError::UnknownValue(value)),
+
        }
+
    }
+
}
+

// An identifier for the specific log message, to avoid having to
// match on message text.
#[derive(Debug, Copy, Clone)]
@@ -132,9 +180,18 @@ enum Id {
    TimeoutWaitStdoutReaderEnd,
}

+
#[derive(Debug, thiserror::Error)]
+
pub enum LogLevelError {
+
    #[error("unknown log level {0}")]
+
    UnknownLogLevel(String),
+

+
    #[error("unknown log level {0:?}")]
+
    UnknownLogLevelValue(Value),
+
}
+

// We define our own type for log levels so that we can apply
// clap::ValueEnum on it.
-
#[derive(ValueEnum, Eq, PartialEq, Copy, Clone, Debug)]
+
#[derive(ValueEnum, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
pub enum LogLevel {
    Trace,
    Debug,
@@ -143,6 +200,44 @@ pub enum LogLevel {
    Error,
}

+
impl TryFrom<&str> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(s: &str) -> Result<Self, LogLevelError> {
+
        match s {
+
            "TRACE" => Ok(Self::Trace),
+
            "DEBUG" => Ok(Self::Debug),
+
            "INFO" => Ok(Self::Info),
+
            "WARNING" => Ok(Self::Warning),
+
            "ERROR" => Ok(Self::Error),
+
            _ => Err(LogLevelError::UnknownLogLevel(s.into())),
+
        }
+
    }
+
}
+

+
impl TryFrom<String> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(s: String) -> Result<Self, LogLevelError> {
+
        Self::try_from(s.as_str())
+
    }
+
}
+

+
impl TryFrom<&Value> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(v: &Value) -> Result<Self, LogLevelError> {
+
        match v {
+
            Value::String(s) => Self::try_from(s.as_str()),
+
            _ => Err(LogLevelError::UnknownLogLevelValue(v.clone())),
+
        }
+
    }
+
}
+

+
impl TryFrom<Value> for LogLevel {
+
    type Error = LogLevelError;
+
    fn try_from(v: Value) -> Result<Self, LogLevelError> {
+
        Self::try_from(&v)
+
    }
+
}
+

impl std::fmt::Display for LogLevel {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s = match self {
modified src/msg.rs
@@ -17,6 +17,7 @@ use std::{
};

use serde::{Deserialize, Serialize};
+
use serde_json::Value;
use uuid::Uuid;

pub use radicle::{
@@ -75,6 +76,16 @@ impl From<&str> for RunId {
    }
}

+
impl TryFrom<Value> for RunId {
+
    type Error = ();
+
    fn try_from(id: Value) -> Result<Self, Self::Error> {
+
        match id {
+
            Value::String(s) => Ok(Self::from(s.as_str())),
+
            _ => Err(()),
+
        }
+
    }
+
}
+

impl fmt::Display for RunId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.id)