use std::{
convert::TryFrom,
fs::{File, read},
io::{Read, Write},
path::{Path, PathBuf},
};
use clap::ValueEnum;
use logger::LogLevel;
use serde::Deserialize;
use serde_json::Value;
use super::*;
/// Format Radicle CI broker log files in a pretty way.
///
/// The journald output must be provided in the format produced by
/// `journalctl -u radicle-ci-broker -o json` (possibly with options
/// such as `-u radicle-ci-broker` or `-S today`), and not in ah the
/// default text format that's hard to parse.
#[derive(Debug, Parser)]
pub struct LogCmd {
/// Input is this format.
#[clap(long)]
format: LogKind,
/// One JSON object per line.
#[clap(long)]
jsonl: bool,
/// Include messages only if they are at least of this log level.
/// Default is to allow any log level.
#[clap(long)]
log_level: Option<crate::logger::LogLevel>,
/// Include messages only if they have this message kind. Can be
/// used multiple times, and any of the kinds will do. Default is
/// to allow every kind of message.
#[clap(long)]
kind: Vec<logger::Kind>,
/// Include messages only if they relate to this CI run. The run
/// ID is the broker run id. Default is to allow every CI run, and
/// messages not related to a specific CI run.
#[clap(long)]
broker_run_id: Option<RunId>,
/// Write output to this file. Default is to write to the standard
/// output.
#[clap(long)]
output: Option<PathBuf>,
/// Read input from this file. Default is to read from the
/// standard input.
log_file: Option<PathBuf>,
}
#[allow(clippy::unwrap_used)]
impl LogCmd {
fn allowed(&self, msg: &Value) -> bool {
self.allowed_by_log_level(msg)
&& self.allowed_by_kind(msg)
&& self.allowed_by_broker_run_id(msg)
}
fn allowed_by_log_level(&self, msg: &Value) -> bool {
if let Some(wanted) = &self.log_level
&& let Some(actual) = log_level(msg)
{
return actual >= *wanted;
}
true
}
fn allowed_by_kind(&self, msg: &Value) -> bool {
if !self.kind.is_empty() {
if let Some(actual) = kind(msg) {
if !self.kind.contains(&actual) {
return false;
}
} else {
return false;
}
}
true
}
fn allowed_by_broker_run_id(&self, msg: &Value) -> bool {
if let Some(wanted) = &self.broker_run_id {
if let Some(actual) = broker_run_id(msg) {
return actual == *wanted;
} else {
return false;
}
}
true
}
fn write_pretty(&self, mut out: impl Write, journal: JournalLines) -> Result<(), LogError> {
let messages = journal.flatten().filter(|msg| self.allowed(msg));
for msg in messages {
out.write_all(format!("{}\n", pretty(msg)?).as_bytes())
.map_err(LogError::Write)?;
}
Ok(())
}
fn write_jsonl(&self, mut out: impl Write, journal: JournalLines) -> Result<(), LogError> {
let messages = journal.flatten().filter(|msg| self.allowed(msg));
for msg in messages {
out.write_all(format!("{}\n", jsonl(msg)?).as_bytes())
.map_err(LogError::Write)?;
}
Ok(())
}
}
fn log_level(msg: &Value) -> Option<LogLevel> {
map_get(msg, "level").map(|v| LogLevel::try_from(v).ok())?
}
fn kind(msg: &Value) -> Option<logger::Kind> {
let fields = map_get(msg, "fields")?;
map_get(&fields, "kind").map(|v| logger::Kind::try_from(v).ok())?
}
fn broker_run_id(msg: &Value) -> Option<RunId> {
let span = map_get(msg, "span")?;
map_get(&span, "broker_run_id").map(|v| RunId::try_from(v).ok())?
}
fn map_get(v: &Value, key: &str) -> Option<Value> {
if let Value::Object(map) = v {
map.get(key).cloned()
} else {
None
}
}
fn pretty(msg: Value) -> Result<String, LogError> {
serde_json::to_string_pretty(&msg).map_err(|err| LogError::JsonSer(msg, err))
}
fn jsonl(msg: Value) -> Result<String, LogError> {
serde_json::to_string(&msg).map_err(|err| LogError::JsonSer(msg, err))
}
impl Leaf for LogCmd {
fn run(&self, _args: &Args) -> Result<(), CibToolError> {
let journal = match self.format {
LogKind::Cib => {
if let Some(filename) = &self.log_file {
JournalLines::cib_from_file(filename)?
} else {
JournalLines::cib_from_stdin()?
}
}
LogKind::Journald => {
if let Some(filename) = &self.log_file {
JournalLines::journal_from_file(filename)?
} else {
JournalLines::journal_from_stdin()?
}
}
};
if let Some(filename) = &self.output {
let file = File::create(filename)
.map_err(|err| LogError::CreateOutput(filename.into(), err))?;
if self.jsonl {
self.write_jsonl(file, journal)?;
} else {
self.write_pretty(file, journal)?;
}
} else {
let out = std::io::stdout();
if self.jsonl {
self.write_jsonl(out, journal)?;
} else {
self.write_pretty(out, journal)?;
}
}
Ok(())
}
}
struct JournalLines {
kind: LogKind,
data: Vec<u8>,
next: usize,
}
impl JournalLines {
fn read_file(filename: &Path) -> Result<Vec<u8>, LogError> {
read(filename).map_err(|err| LogError::Read(filename.into(), err))
}
fn read_stdin() -> Result<Vec<u8>, LogError> {
let mut data = vec![];
std::io::stdin()
.read_to_end(&mut data)
.map_err(LogError::ReadStdin)?;
Ok(data)
}
fn journal_from_file(filename: &Path) -> Result<Self, LogError> {
let data = Self::read_file(filename)?;
Ok(Self {
data,
next: 0,
kind: LogKind::Journald,
})
}
fn journal_from_stdin() -> Result<Self, LogError> {
let data = Self::read_stdin()?;
Ok(Self {
data,
next: 0,
kind: LogKind::Journald,
})
}
fn parse_journal_line(line: String) -> Result<Value, LogError> {
let jj: JournalJson = serde_json::from_str(&line)
.map_err(|err| LogError::JsonParseJournal(line.clone(), err))?;
serde_json::from_str(&jj.message).map_err(|err| LogError::JsonParse(line, err))
}
fn cib_from_file(filename: &Path) -> Result<Self, LogError> {
let data = Self::read_file(filename)?;
Ok(Self {
data,
next: 0,
kind: LogKind::Cib,
})
}
fn cib_from_stdin() -> Result<Self, LogError> {
let data = Self::read_stdin()?;
Ok(Self {
data,
next: 0,
kind: LogKind::Cib,
})
}
fn parse_cib_line(line: String) -> Result<Value, LogError> {
serde_json::from_str(&line).map_err(|err| LogError::JsonParseCib(line.clone(), err))
}
}
impl Iterator for JournalLines {
type Item = Result<serde_json::Value, LogError>;
fn next(&mut self) -> Option<Self::Item> {
let mut i = self.next;
while i < self.data.len() {
if self.data.get(i) == Some(&b'\n') {
let line = String::from_utf8_lossy(&self.data[self.next..i]).to_string();
self.next = i + 1;
match self.kind {
LogKind::Cib => return Some(Self::parse_cib_line(line)),
LogKind::Journald => return Some(Self::parse_journal_line(line)),
}
}
i += 1;
}
None
}
}
#[derive(Debug, Copy, Clone, ValueEnum)]
enum LogKind {
Journald,
Cib,
}
#[derive(Debug, Deserialize)]
struct JournalJson {
#[serde(rename = "MESSAGE")]
message: String,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct CibJson {
name: String,
value: Value,
}
#[derive(Debug, thiserror::Error)]
pub enum LogError {
#[error("failed to read journal file {0}")]
Read(PathBuf, #[source] std::io::Error),
#[error("failed to read journal file from the standard input")]
ReadStdin(#[source] std::io::Error),
#[error("failed to parse a journald log line as JSON: {0:?}")]
JsonParseJournal(String, #[source] serde_json::Error),
#[error("failed to parse a log line as JSON: {0:?}")]
JsonParse(String, #[source] serde_json::Error),
#[error("failed to parse a cib log line as JSON: {0:?}")]
JsonParseCib(String, #[source] serde_json::Error),
#[error("failed to serialize a log line as pretty JSON: {0:?}")]
JsonSer(serde_json::Value, #[source] serde_json::Error),
#[error("failed to create output file {0}")]
CreateOutput(PathBuf, #[source] std::io::Error),
#[error("failed to write to output")]
Write(#[source] std::io::Error),
}