Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src bin cibtoolcmd log.rs
use std::{
    convert::TryFrom,
    fs::{File, read},
    io::{Read, Write},
    path::{Path, PathBuf},
};

use clap::ValueEnum;
use logger::LogLevel;
use serde::Deserialize;
use serde_json::Value;

use super::*;

/// Format Radicle CI broker log files in a pretty way.
///
/// The journald output must be provided in the format produced by
/// `journalctl -u radicle-ci-broker -o json` (possibly with options
/// such as `-u radicle-ci-broker` or `-S today`), and not in ah the
/// default text format that's hard to parse.
#[derive(Debug, Parser)]
pub struct LogCmd {
    /// Input is this format.
    #[clap(long)]
    format: LogKind,

    /// One JSON object per line.
    #[clap(long)]
    jsonl: bool,

    /// Include messages only if they are at least of this log level.
    /// Default is to allow any log level.
    #[clap(long)]
    log_level: Option<crate::logger::LogLevel>,

    /// Include messages only if they have this message kind. Can be
    /// used multiple times, and any of the kinds will do. Default is
    /// to allow every kind of message.
    #[clap(long)]
    kind: Vec<logger::Kind>,

    /// Include messages only if they relate to this CI run. The run
    /// ID is the broker run id. Default is to allow every CI run, and
    /// messages not related to a specific CI run.
    #[clap(long)]
    broker_run_id: Option<RunId>,

    /// Write output to this file. Default is to write to the standard
    /// output.
    #[clap(long)]
    output: Option<PathBuf>,

    /// Read input from this file. Default is to read from the
    /// standard input.
    log_file: Option<PathBuf>,
}

#[allow(clippy::unwrap_used)]
impl LogCmd {
    fn allowed(&self, msg: &Value) -> bool {
        self.allowed_by_log_level(msg)
            && self.allowed_by_kind(msg)
            && self.allowed_by_broker_run_id(msg)
    }

    fn allowed_by_log_level(&self, msg: &Value) -> bool {
        if let Some(wanted) = &self.log_level
            && let Some(actual) = log_level(msg)
        {
            return actual >= *wanted;
        }
        true
    }

    fn allowed_by_kind(&self, msg: &Value) -> bool {
        if !self.kind.is_empty() {
            if let Some(actual) = kind(msg) {
                if !self.kind.contains(&actual) {
                    return false;
                }
            } else {
                return false;
            }
        }

        true
    }

    fn allowed_by_broker_run_id(&self, msg: &Value) -> bool {
        if let Some(wanted) = &self.broker_run_id {
            if let Some(actual) = broker_run_id(msg) {
                return actual == *wanted;
            } else {
                return false;
            }
        }
        true
    }

    fn write_pretty(&self, mut out: impl Write, journal: JournalLines) -> Result<(), LogError> {
        let messages = journal.flatten().filter(|msg| self.allowed(msg));
        for msg in messages {
            out.write_all(format!("{}\n", pretty(msg)?).as_bytes())
                .map_err(LogError::Write)?;
        }

        Ok(())
    }

    fn write_jsonl(&self, mut out: impl Write, journal: JournalLines) -> Result<(), LogError> {
        let messages = journal.flatten().filter(|msg| self.allowed(msg));
        for msg in messages {
            out.write_all(format!("{}\n", jsonl(msg)?).as_bytes())
                .map_err(LogError::Write)?;
        }

        Ok(())
    }
}

fn log_level(msg: &Value) -> Option<LogLevel> {
    map_get(msg, "level").map(|v| LogLevel::try_from(v).ok())?
}

fn kind(msg: &Value) -> Option<logger::Kind> {
    let fields = map_get(msg, "fields")?;
    map_get(&fields, "kind").map(|v| logger::Kind::try_from(v).ok())?
}

fn broker_run_id(msg: &Value) -> Option<RunId> {
    let span = map_get(msg, "span")?;
    map_get(&span, "broker_run_id").map(|v| RunId::try_from(v).ok())?
}

fn map_get(v: &Value, key: &str) -> Option<Value> {
    if let Value::Object(map) = v {
        map.get(key).cloned()
    } else {
        None
    }
}

fn pretty(msg: Value) -> Result<String, LogError> {
    serde_json::to_string_pretty(&msg).map_err(|err| LogError::JsonSer(msg, err))
}

fn jsonl(msg: Value) -> Result<String, LogError> {
    serde_json::to_string(&msg).map_err(|err| LogError::JsonSer(msg, err))
}

impl Leaf for LogCmd {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
        let journal = match self.format {
            LogKind::Cib => {
                if let Some(filename) = &self.log_file {
                    JournalLines::cib_from_file(filename)?
                } else {
                    JournalLines::cib_from_stdin()?
                }
            }
            LogKind::Journald => {
                if let Some(filename) = &self.log_file {
                    JournalLines::journal_from_file(filename)?
                } else {
                    JournalLines::journal_from_stdin()?
                }
            }
        };

        if let Some(filename) = &self.output {
            let file = File::create(filename)
                .map_err(|err| LogError::CreateOutput(filename.into(), err))?;
            if self.jsonl {
                self.write_jsonl(file, journal)?;
            } else {
                self.write_pretty(file, journal)?;
            }
        } else {
            let out = std::io::stdout();
            if self.jsonl {
                self.write_jsonl(out, journal)?;
            } else {
                self.write_pretty(out, journal)?;
            }
        }

        Ok(())
    }
}

struct JournalLines {
    kind: LogKind,
    data: Vec<u8>,
    next: usize,
}

impl JournalLines {
    fn read_file(filename: &Path) -> Result<Vec<u8>, LogError> {
        read(filename).map_err(|err| LogError::Read(filename.into(), err))
    }

    fn read_stdin() -> Result<Vec<u8>, LogError> {
        let mut data = vec![];
        std::io::stdin()
            .read_to_end(&mut data)
            .map_err(LogError::ReadStdin)?;
        Ok(data)
    }

    fn journal_from_file(filename: &Path) -> Result<Self, LogError> {
        let data = Self::read_file(filename)?;
        Ok(Self {
            data,
            next: 0,
            kind: LogKind::Journald,
        })
    }

    fn journal_from_stdin() -> Result<Self, LogError> {
        let data = Self::read_stdin()?;
        Ok(Self {
            data,
            next: 0,
            kind: LogKind::Journald,
        })
    }

    fn parse_journal_line(line: String) -> Result<Value, LogError> {
        let jj: JournalJson = serde_json::from_str(&line)
            .map_err(|err| LogError::JsonParseJournal(line.clone(), err))?;
        serde_json::from_str(&jj.message).map_err(|err| LogError::JsonParse(line, err))
    }

    fn cib_from_file(filename: &Path) -> Result<Self, LogError> {
        let data = Self::read_file(filename)?;
        Ok(Self {
            data,
            next: 0,
            kind: LogKind::Cib,
        })
    }

    fn cib_from_stdin() -> Result<Self, LogError> {
        let data = Self::read_stdin()?;
        Ok(Self {
            data,
            next: 0,
            kind: LogKind::Cib,
        })
    }

    fn parse_cib_line(line: String) -> Result<Value, LogError> {
        serde_json::from_str(&line).map_err(|err| LogError::JsonParseCib(line.clone(), err))
    }
}

impl Iterator for JournalLines {
    type Item = Result<serde_json::Value, LogError>;

    fn next(&mut self) -> Option<Self::Item> {
        let mut i = self.next;
        while i < self.data.len() {
            if self.data.get(i) == Some(&b'\n') {
                let line = String::from_utf8_lossy(&self.data[self.next..i]).to_string();
                self.next = i + 1;

                match self.kind {
                    LogKind::Cib => return Some(Self::parse_cib_line(line)),
                    LogKind::Journald => return Some(Self::parse_journal_line(line)),
                }
            }
            i += 1;
        }
        None
    }
}

#[derive(Debug, Copy, Clone, ValueEnum)]
enum LogKind {
    Journald,
    Cib,
}

#[derive(Debug, Deserialize)]
struct JournalJson {
    #[serde(rename = "MESSAGE")]
    message: String,
}

#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct CibJson {
    name: String,
    value: Value,
}

#[derive(Debug, thiserror::Error)]
pub enum LogError {
    #[error("failed to read journal file {0}")]
    Read(PathBuf, #[source] std::io::Error),

    #[error("failed to read journal file from the standard input")]
    ReadStdin(#[source] std::io::Error),

    #[error("failed to parse a journald log line as JSON: {0:?}")]
    JsonParseJournal(String, #[source] serde_json::Error),

    #[error("failed to parse a log line as JSON: {0:?}")]
    JsonParse(String, #[source] serde_json::Error),

    #[error("failed to parse a cib log line as JSON: {0:?}")]
    JsonParseCib(String, #[source] serde_json::Error),

    #[error("failed to serialize a log line as pretty JSON: {0:?}")]
    JsonSer(serde_json::Value, #[source] serde_json::Error),

    #[error("failed to create output file {0}")]
    CreateOutput(PathBuf, #[source] std::io::Error),

    #[error("failed to write to output")]
    Write(#[source] std::io::Error),
}