Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src bin cib.rs
//! The CI broker.

#![allow(clippy::result_large_err)]

use std::{
    fs::write,
    path::{Path, PathBuf},
    process::exit,
};

use clap::Parser;
use radicle_ci_broker::{
    adapter::AdapterError,
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
    ergo,
    logger::{self, LogLevel},
    notif::{NotificationChannel, NotificationError},
    pages::{PageError, StatusPage},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
    worker::start_thread,
};

fn main() {
    if let Err(e) = fallible_main() {
        logger::error("ERROR", &e);
        logger::end_cib_in_error();
        exit(1);
    }
    logger::end_cib_successfully();
}

fn fallible_main() -> Result<(), CibError> {
    let args = Args::parse();
    logger::open(args.minimum_log_level());

    // We only log this after setting the minimum log level from the
    // command line.
    logger::start_cib();

    let config = Config::load(&args.config).map_err(|e| CibError::read_config(&args.config, e))?;
    logger::loaded_config(&config);

    args.run(&config)?;

    Ok(())
}

/// The Radicle CI broker.
///
/// Listen to events from the local Radicle node, filter them, and run
/// CI for the events that get past the filter. The filter, and the
/// way CI is run, is defined in the configuration file.
#[derive(Debug, Parser)]
#[command(version = env!("VERSION"))]
struct Args {
    /// Load configuration from this YAML or JSON file. (There is no default
    /// location.)
    #[clap(long)]
    config: PathBuf,

    /// Se the global log level. Log messages that on at least this
    /// level get written to the standard error output.
    #[clap(long, value_enum)]
    log_level: Option<logger::LogLevel>,

    #[clap(subcommand)]
    cmd: Cmd,
}

impl Args {
    fn run(&self, config: &Config) -> Result<(), CibError> {
        match &self.cmd {
            Cmd::Adapters(x) => x.run(self, config)?,
            Cmd::Config(x) => x.run(self, config)?,
            Cmd::Insert(x) => x.run(self, config)?,
            Cmd::Queued(x) => x.run(self, config)?,
            Cmd::ProcessEvents(x) => x.run(self, config)?,
        }
        Ok(())
    }

    fn open_db(&self, config: &Config) -> Result<Db, CibError> {
        let db = Db::new(config.db()).map_err(CibError::db)?;
        Ok(db)
    }

    fn minimum_log_level(&self) -> LogLevel {
        self.log_level.unwrap_or(LogLevel::Trace)
    }
}

#[derive(Debug, Parser)]
enum Cmd {
    /// Show the configuration, as derived from the configuration file
    /// and built-in defaults.
    ///
    /// The configuration file location is specified with the
    /// `--config` option.
    Config(ConfigCmd),
    Adapters(AdaptersCmd),
    Insert(InsertCmd),
    Queued(QueuedCmd),
    ProcessEvents(ProcessEventsCmd),
}

#[derive(Debug, Parser)]
struct AdaptersCmd {
    /// Write output to this file. The default is to write to the
    /// standard output. This option is an alternative to redirecting
    /// output using shell constructs, for situations when that is not
    /// an option.
    #[clap(long)]
    output: Option<PathBuf>,
}

impl AdaptersCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let adapters = config.to_adapters().map_err(CibError::GetAdapters)?;
        let json = adapters
            .to_json()
            .map_err(|e| CibError::adapters_to_json(&args.config, e))?;

        if let Some(output) = &self.output {
            write(output, json.as_bytes()).map_err(|e| CibError::write_config(output, e))?;
        } else {
            println!("{json}");
        }

        Ok(())
    }
}

#[derive(Debug, Parser)]
struct ConfigCmd {
    /// Write output to this file. The default is to write to the
    /// standard output. This option is an alternative to redirecting
    /// output using shell constructs, for situations when that is not
    /// an option.
    #[clap(long)]
    output: Option<PathBuf>,
}

impl ConfigCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let json = config
            .to_json()
            .map_err(|e| CibError::config_to_json(&args.config, e))?;

        if let Some(output) = &self.output {
            write(output, json.as_bytes()).map_err(|e| CibError::write_config(output, e))?;
        } else {
            println!("{json}");
        }

        Ok(())
    }
}

/// Read events from the local node and insert them into the event
/// queue in the database.
///
/// Do not run CI based on the events. End when the connection to the
/// node is terminated by the node shutting down.
#[derive(Debug, Parser)]
struct InsertCmd {}

impl InsertCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let mut events_notification = NotificationChannel::new_event();
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
            .build()
            .map_err(CibError::QueueAdder)?;
        let thread = start_thread(adder);
        thread
            .join()
            .expect("wait for thread to finish")
            .map_err(CibError::add_events)?;
        Ok(())
    }
}

/// Process events from the queue in the database.
///
/// Do not read further events from the local node. End when the queue
/// is empty, or when a shutdown event is encountered in the queue.
#[derive(Debug, Parser)]
struct QueuedCmd {}

impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let adapters = config.to_adapters().map_err(CibError::Adapters)?;
        logger::adapter_config(config);

        let broker =
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;

        let mut event_notifications = NotificationChannel::new_event();
        event_notifications
            .tx()
            .map_err(CibError::notification)?
            .notify()
            .ok();

        let mut run_notifications = NotificationChannel::new_run();

        let db = args.open_db(config)?;
        let processor = QueueProcessorBuilder::default()
            .events_rx(event_notifications.rx().map_err(CibError::notification)?)
            .run_tx(run_notifications.tx().map_err(CibError::notification)?)
            .db(db)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
            .filters(config.filters())
            .triggers(&config.to_triggers())
            .adapters(&adapters)
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
        let thread = start_thread(processor);
        thread
            .join()
            .expect("wait for thread to finish")
            .map_err(CibError::process_queue)?;

        let db = args.open_db(config)?;
        let mut page = StatusPage::new(
            run_notifications.rx().map_err(CibError::notification)?,
            ergo::Radicle::new().map_err(CibError::ergo)?,
            db,
            true,
        );
        if let Some(desc) = config.description() {
            page.set_description(desc);
        }
        if let Some(dirname) = config.report_dir() {
            page.set_output_dir(dirname);
        }
        let page_updater = start_thread(page);
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
            .map_err(CibError::PageUpdater)?;

        Ok(())
    }
}

/// Read events from the local node, insert them into the event queue
/// in the database, then process events in the queue by running CI
/// based on each each event.
///
/// This is the subcommand to run CI normally.
///
/// This is combination of the subcommands `insert` and `queued`.
#[derive(Debug, Parser)]
struct ProcessEventsCmd {}

impl ProcessEventsCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let mut events_notification = NotificationChannel::new_event();
        let mut run_notification = NotificationChannel::new_run();

        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
            .build()
            .map_err(CibError::QueueAdder)?;
        let adder = start_thread(adder);

        let db = args.open_db(config)?;

        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibError::notification)?,
            ergo::Radicle::new().map_err(CibError::ergo)?,
            db,
            false,
        );
        if let Some(desc) = config.description() {
            page.set_description(desc);
        }
        if let Some(dirname) = config.report_dir() {
            page.set_output_dir(dirname);
        }
        let page_updater = start_thread(page);

        let adapters = config.to_adapters().map_err(CibError::Adapters)?;

        let broker =
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;

        let processor = QueueProcessorBuilder::default()
            .events_rx(events_notification.rx().map_err(CibError::notification)?)
            .run_tx(run_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
            .filters(config.filters())
            .triggers(&config.to_triggers())
            .adapters(&adapters)
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;

        let processor = start_thread(processor);
        processor
            .join()
            .expect("wait for processor thread to finish")
            .map_err(CibError::process_queue)?;

        adder
            .join()
            .expect("wait for thread to finish")
            .map_err(CibError::add_events)?;

        // The page updating thread ends when the channel for run
        // notifications is closed by the processor thread ending.
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
            .map_err(CibError::PageUpdater)?;

        Ok(())
    }
}

#[derive(Debug, thiserror::Error)]
enum CibError {
    #[error("failed to load ergonomic Radicle wrapper")]
    Ergo(#[source] ergo::ErgoError),

    #[error("failed to read configuration file {0}")]
    ReadConfig(PathBuf, #[source] ConfigError),

    #[error("failed to write config as JSON to file {0}")]
    WriteConfig(PathBuf, #[source] std::io::Error),

    #[error("failed to convert config as JSON")]
    ConfigToJson(PathBuf, #[source] ConfigError),

    #[error("failed to convert adapters as JSON")]
    AdaptersToJson(PathBuf, #[source] AdapterError),

    #[error("failed to use SQLite database")]
    Db(#[source] DbError),

    #[error("failed create broker data type")]
    NewBroker(#[source] BrokerError),

    #[error("failed to add node events into queue")]
    QueueAdder(#[source] AdderError),

    #[error("failed to process events from queue")]
    ProcessQueue(#[source] QueueError),

    #[error("thread to update report pages failed")]
    PageUpdater(#[source] PageError),

    #[error("failed to add events to queue")]
    AddEvents(#[source] AdderError),

    #[error("could not construct list of adapters from configuration")]
    Adapters(#[source] AdapterError),

    #[error("could not get list of adapters from configuration")]
    GetAdapters(#[source] AdapterError),

    #[error("programming error: failed to set up inter-thread notification channel")]
    Notification(#[source] NotificationError),
}

impl CibError {
    fn read_config(filename: &Path, e: ConfigError) -> Self {
        Self::ReadConfig(filename.into(), e)
    }

    fn write_config(filename: &Path, e: std::io::Error) -> Self {
        Self::WriteConfig(filename.into(), e)
    }

    fn config_to_json(filename: &Path, e: ConfigError) -> Self {
        Self::ConfigToJson(filename.into(), e)
    }

    fn adapters_to_json(filename: &Path, e: AdapterError) -> Self {
        Self::AdaptersToJson(filename.into(), e)
    }

    fn ergo(e: ergo::ErgoError) -> Self {
        Self::Ergo(e)
    }

    fn db(e: DbError) -> Self {
        Self::Db(e)
    }

    fn new_broker(e: BrokerError) -> Self {
        Self::NewBroker(e)
    }

    fn process_queue(e: QueueError) -> Self {
        Self::ProcessQueue(e)
    }

    fn add_events(e: AdderError) -> Self {
        Self::AddEvents(e)
    }

    fn notification(e: NotificationError) -> Self {
        Self::Notification(e)
    }
}