//! The CI broker.
#![allow(clippy::result_large_err)]
use std::{
fs::write,
path::{Path, PathBuf},
process::exit,
};
use clap::Parser;
use radicle_ci_broker::{
adapter::AdapterError,
broker::{Broker, BrokerError},
config::{Config, ConfigError},
db::{Db, DbError},
ergo,
logger::{self, LogLevel},
notif::{NotificationChannel, NotificationError},
pages::{PageError, StatusPage},
queueadd::{AdderError, QueueAdderBuilder},
queueproc::{QueueError, QueueProcessorBuilder},
worker::start_thread,
};
fn main() {
if let Err(e) = fallible_main() {
logger::error("ERROR", &e);
logger::end_cib_in_error();
exit(1);
}
logger::end_cib_successfully();
}
fn fallible_main() -> Result<(), CibError> {
let args = Args::parse();
logger::open(args.minimum_log_level());
// We only log this after setting the minimum log level from the
// command line.
logger::start_cib();
let config = Config::load(&args.config).map_err(|e| CibError::read_config(&args.config, e))?;
logger::loaded_config(&config);
args.run(&config)?;
Ok(())
}
/// The Radicle CI broker.
///
/// Listen to events from the local Radicle node, filter them, and run
/// CI for the events that get past the filter. The filter, and the
/// way CI is run, is defined in the configuration file.
#[derive(Debug, Parser)]
#[command(version = env!("VERSION"))]
struct Args {
/// Load configuration from this YAML or JSON file. (There is no default
/// location.)
#[clap(long)]
config: PathBuf,
/// Se the global log level. Log messages that on at least this
/// level get written to the standard error output.
#[clap(long, value_enum)]
log_level: Option<logger::LogLevel>,
#[clap(subcommand)]
cmd: Cmd,
}
impl Args {
fn run(&self, config: &Config) -> Result<(), CibError> {
match &self.cmd {
Cmd::Adapters(x) => x.run(self, config)?,
Cmd::Config(x) => x.run(self, config)?,
Cmd::Insert(x) => x.run(self, config)?,
Cmd::Queued(x) => x.run(self, config)?,
Cmd::ProcessEvents(x) => x.run(self, config)?,
}
Ok(())
}
fn open_db(&self, config: &Config) -> Result<Db, CibError> {
let db = Db::new(config.db()).map_err(CibError::db)?;
Ok(db)
}
fn minimum_log_level(&self) -> LogLevel {
self.log_level.unwrap_or(LogLevel::Trace)
}
}
#[derive(Debug, Parser)]
enum Cmd {
/// Show the configuration, as derived from the configuration file
/// and built-in defaults.
///
/// The configuration file location is specified with the
/// `--config` option.
Config(ConfigCmd),
Adapters(AdaptersCmd),
Insert(InsertCmd),
Queued(QueuedCmd),
ProcessEvents(ProcessEventsCmd),
}
#[derive(Debug, Parser)]
struct AdaptersCmd {
/// Write output to this file. The default is to write to the
/// standard output. This option is an alternative to redirecting
/// output using shell constructs, for situations when that is not
/// an option.
#[clap(long)]
output: Option<PathBuf>,
}
impl AdaptersCmd {
fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
let adapters = config.to_adapters().map_err(CibError::GetAdapters)?;
let json = adapters
.to_json()
.map_err(|e| CibError::adapters_to_json(&args.config, e))?;
if let Some(output) = &self.output {
write(output, json.as_bytes()).map_err(|e| CibError::write_config(output, e))?;
} else {
println!("{json}");
}
Ok(())
}
}
#[derive(Debug, Parser)]
struct ConfigCmd {
/// Write output to this file. The default is to write to the
/// standard output. This option is an alternative to redirecting
/// output using shell constructs, for situations when that is not
/// an option.
#[clap(long)]
output: Option<PathBuf>,
}
impl ConfigCmd {
fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
let json = config
.to_json()
.map_err(|e| CibError::config_to_json(&args.config, e))?;
if let Some(output) = &self.output {
write(output, json.as_bytes()).map_err(|e| CibError::write_config(output, e))?;
} else {
println!("{json}");
}
Ok(())
}
}
/// Read events from the local node and insert them into the event
/// queue in the database.
///
/// Do not run CI based on the events. End when the connection to the
/// node is terminated by the node shutting down.
#[derive(Debug, Parser)]
struct InsertCmd {}
impl InsertCmd {
fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
let mut events_notification = NotificationChannel::new_event();
let adder = QueueAdderBuilder::default()
.events_tx(events_notification.tx().map_err(CibError::notification)?)
.db(args.open_db(config)?)
.build()
.map_err(CibError::QueueAdder)?;
let thread = start_thread(adder);
thread
.join()
.expect("wait for thread to finish")
.map_err(CibError::add_events)?;
Ok(())
}
}
/// Process events from the queue in the database.
///
/// Do not read further events from the local node. End when the queue
/// is empty, or when a shutdown event is encountered in the queue.
#[derive(Debug, Parser)]
struct QueuedCmd {}
impl QueuedCmd {
fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
let adapters = config.to_adapters().map_err(CibError::Adapters)?;
logger::adapter_config(config);
let broker =
Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
let mut event_notifications = NotificationChannel::new_event();
event_notifications
.tx()
.map_err(CibError::notification)?
.notify()
.ok();
let mut run_notifications = NotificationChannel::new_run();
let db = args.open_db(config)?;
let processor = QueueProcessorBuilder::default()
.events_rx(event_notifications.rx().map_err(CibError::notification)?)
.run_tx(run_notifications.tx().map_err(CibError::notification)?)
.db(db)
.queue_len_interval(config.queue_len_interval())
.broker(broker)
.filters(config.filters())
.triggers(&config.to_triggers())
.adapters(&adapters)
.concurrent_adapters(config.concurrent_adapters())
.build()
.map_err(CibError::process_queue)?;
let thread = start_thread(processor);
thread
.join()
.expect("wait for thread to finish")
.map_err(CibError::process_queue)?;
let db = args.open_db(config)?;
let mut page = StatusPage::new(
run_notifications.rx().map_err(CibError::notification)?,
ergo::Radicle::new().map_err(CibError::ergo)?,
db,
true,
);
if let Some(desc) = config.description() {
page.set_description(desc);
}
if let Some(dirname) = config.report_dir() {
page.set_output_dir(dirname);
}
let page_updater = start_thread(page);
page_updater
.join()
.expect("wait for page updater thread to finish")
.map_err(CibError::PageUpdater)?;
Ok(())
}
}
/// Read events from the local node, insert them into the event queue
/// in the database, then process events in the queue by running CI
/// based on each each event.
///
/// This is the subcommand to run CI normally.
///
/// This is combination of the subcommands `insert` and `queued`.
#[derive(Debug, Parser)]
struct ProcessEventsCmd {}
impl ProcessEventsCmd {
fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
let mut events_notification = NotificationChannel::new_event();
let mut run_notification = NotificationChannel::new_run();
let adder = QueueAdderBuilder::default()
.events_tx(events_notification.tx().map_err(CibError::notification)?)
.db(args.open_db(config)?)
.build()
.map_err(CibError::QueueAdder)?;
let adder = start_thread(adder);
let db = args.open_db(config)?;
let mut page = StatusPage::new(
run_notification.rx().map_err(CibError::notification)?,
ergo::Radicle::new().map_err(CibError::ergo)?,
db,
false,
);
if let Some(desc) = config.description() {
page.set_description(desc);
}
if let Some(dirname) = config.report_dir() {
page.set_output_dir(dirname);
}
let page_updater = start_thread(page);
let adapters = config.to_adapters().map_err(CibError::Adapters)?;
let broker =
Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
let processor = QueueProcessorBuilder::default()
.events_rx(events_notification.rx().map_err(CibError::notification)?)
.run_tx(run_notification.tx().map_err(CibError::notification)?)
.db(args.open_db(config)?)
.queue_len_interval(config.queue_len_interval())
.broker(broker)
.filters(config.filters())
.triggers(&config.to_triggers())
.adapters(&adapters)
.concurrent_adapters(config.concurrent_adapters())
.build()
.map_err(CibError::process_queue)?;
let processor = start_thread(processor);
processor
.join()
.expect("wait for processor thread to finish")
.map_err(CibError::process_queue)?;
adder
.join()
.expect("wait for thread to finish")
.map_err(CibError::add_events)?;
// The page updating thread ends when the channel for run
// notifications is closed by the processor thread ending.
page_updater
.join()
.expect("wait for page updater thread to finish")
.map_err(CibError::PageUpdater)?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
enum CibError {
#[error("failed to load ergonomic Radicle wrapper")]
Ergo(#[source] ergo::ErgoError),
#[error("failed to read configuration file {0}")]
ReadConfig(PathBuf, #[source] ConfigError),
#[error("failed to write config as JSON to file {0}")]
WriteConfig(PathBuf, #[source] std::io::Error),
#[error("failed to convert config as JSON")]
ConfigToJson(PathBuf, #[source] ConfigError),
#[error("failed to convert adapters as JSON")]
AdaptersToJson(PathBuf, #[source] AdapterError),
#[error("failed to use SQLite database")]
Db(#[source] DbError),
#[error("failed create broker data type")]
NewBroker(#[source] BrokerError),
#[error("failed to add node events into queue")]
QueueAdder(#[source] AdderError),
#[error("failed to process events from queue")]
ProcessQueue(#[source] QueueError),
#[error("thread to update report pages failed")]
PageUpdater(#[source] PageError),
#[error("failed to add events to queue")]
AddEvents(#[source] AdderError),
#[error("could not construct list of adapters from configuration")]
Adapters(#[source] AdapterError),
#[error("could not get list of adapters from configuration")]
GetAdapters(#[source] AdapterError),
#[error("programming error: failed to set up inter-thread notification channel")]
Notification(#[source] NotificationError),
}
impl CibError {
fn read_config(filename: &Path, e: ConfigError) -> Self {
Self::ReadConfig(filename.into(), e)
}
fn write_config(filename: &Path, e: std::io::Error) -> Self {
Self::WriteConfig(filename.into(), e)
}
fn config_to_json(filename: &Path, e: ConfigError) -> Self {
Self::ConfigToJson(filename.into(), e)
}
fn adapters_to_json(filename: &Path, e: AdapterError) -> Self {
Self::AdaptersToJson(filename.into(), e)
}
fn ergo(e: ergo::ErgoError) -> Self {
Self::Ergo(e)
}
fn db(e: DbError) -> Self {
Self::Db(e)
}
fn new_broker(e: BrokerError) -> Self {
Self::NewBroker(e)
}
fn process_queue(e: QueueError) -> Self {
Self::ProcessQueue(e)
}
fn add_events(e: AdderError) -> Self {
Self::AddEvents(e)
}
fn notification(e: NotificationError) -> Self {
Self::Notification(e)
}
}