Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: rewrite ci-broker.rs as cib.rs
Lars Wirzenius committed 1 year ago
commit 003e9012792694586d59764c8db2d59c38a3cd7d
parent 86884cbe433a967c3da5b0d9df0223a0ea60e527
2 files changed +206 -170
deleted src/bin/ci-broker.rs
@@ -1,170 +0,0 @@
-
use std::{
-
    error::Error,
-
    path::{Path, PathBuf},
-
    process::exit,
-
    thread::{sleep, spawn},
-
    time::Duration,
-
};
-

-
use log::{debug, error, info};
-

-
use radicle::prelude::Profile;
-
use radicle_ci_broker::msg::MessageError;
-
use radicle_ci_broker::{
-
    adapter::Adapter,
-
    broker::Broker,
-
    config::Config,
-
    error::BrokerError,
-
    event::{BrokerEvent, NodeEventSource},
-
    msg::RequestBuilder,
-
    pages::{PageBuilder, PageError, StatusPage},
-
};
-

-
fn main() {
-
    if let Err(e) = fallible_main() {
-
        eprintln!("ERROR: {}", e);
-
        let mut e = e.source();
-
        while let Some(source) = e {
-
            eprintln!("caused by: {}", source);
-
            e = source.source();
-
        }
-
        exit(1);
-
    }
-
}
-

-
fn fallible_main() -> Result<(), BrokerError> {
-
    pretty_env_logger::init();
-
    info!("Radicle CI broker starts");
-

-
    let mut args = std::env::args().skip(1);
-
    let filename: PathBuf = if let Some(filename) = args.next() {
-
        PathBuf::from(filename)
-
    } else {
-
        return Err(BrokerError::Usage);
-
    };
-

-
    let config = Config::load(&filename)?;
-
    debug!("loaded configuration: {:#?}", config);
-

-
    let mut broker = Broker::new(config.db())?;
-
    debug!(
-
        "created broker, db has {} CI runs",
-
        broker.all_runs()?.len()
-
    );
-

-
    // FIXME: this is broken. the config file only lists how to invoke
-
    // each adapter, not what adapter to use for each repo.
-
    // for (rid, spec) in config.adapters.iter() {
-
    //     debug!("setting adapter for {rid:?} to {spec:#?}");
-
    //     let rid = RepoId::from_urn(rid).map_err(|e| BrokerError::BadRepoId(rid.into(), e))?;
-
    //     let adapter = Adapter::new(&spec.command).with_environment(spec.envs());
-
    //     broker.set_repository_adapter(&rid, &adapter);
-
    // }
-
    // debug!("set per-repository adapters");
-

-
    let spec =
-
        config
-
            .adapter(&config.default_adapter)
-
            .ok_or(BrokerError::UnknownDefaultAdapter(
-
                config.default_adapter.clone(),
-
            ))?;
-
    let adapter = Adapter::new(&spec.command)
-
        .with_environment(spec.envs())
-
        .with_environment(spec.sensitive_envs());
-
    broker.set_default_adapter(&adapter);
-
    debug!("set default adapter");
-

-
    let profile = Profile::load()?;
-
    debug!("loaded profile {profile:#?}");
-

-
    let mut source = NodeEventSource::new(&profile)?;
-
    debug!("created node event source");
-

-
    for filter in config.filters.iter() {
-
        source.allow(filter.clone());
-
    }
-
    debug!("added filters to node event source");
-

-
    // Spawn a thread that updates the status pages.
-
    let mut page = PageBuilder::default()
-
        .node_alias(&profile.config.node.alias)
-
        .runs(broker.all_runs()?)
-
        .build()?;
-
    let page2 = page.clone();
-
    let report_dir = if let Some(dir) = &config.report_dir {
-
        dir.to_path_buf()
-
    } else {
-
        PathBuf::from(".")
-
    };
-
    let report_dir2 = report_dir.clone();
-
    let interval = Duration::from_secs(config.status_page_update_interval());
-
    let status_thread = spawn(move || status_updater(report_dir2, page2, interval));
-
    debug!(
-
        "started thread to update status pages in the background: {:?}",
-
        status_thread.thread().id()
-
    );
-

-
    // This loop ends when there's an error, e.g., failure to read an
-
    // event from the node.
-
    'event_loop: loop {
-
        debug!("waiting for event from node");
-
        for e in source.event()? {
-
            match e {
-
                BrokerEvent::Shutdown => break 'event_loop,
-
                BrokerEvent::RefChanged { .. } => {
-
                    info!("broker event {e:#?}");
-
                    let result = RequestBuilder::default()
-
                        .profile(&profile)
-
                        .broker_event(&e)
-
                        .build_trigger();
-
                    match result {
-
                        Ok(req) => {
-
                            if let Err(e) = broker.execute_ci(&req, &mut page) {
-
                                error!("failed to run adapter, or adapter failed to run CI: {e}");
-
                            }
-
                        }
-
                        Err(MessageError::NoEventHandler) => {
-
                            debug!("no handler found for the specific event");
-
                            continue;
-
                        }
-
                        Err(e) => {
-
                            return Err(e.into());
-
                        }
-
                    }
-
                }
-
            }
-
        }
-
    }
-

-
    update_report_page(&report_dir, &mut page)?;
-

-
    Ok(())
-
}
-

-
fn update_report_page(dirname: &Path, page: &mut StatusPage) -> Result<(), PageError> {
-
    if dirname.exists() {
-
        page.update_timestamp();
-
        if let Err(e) = page.write(dirname) {
-
            eprintln!(
-
                "ERROR: failed to update report pages in {}: {e}",
-
                dirname.display()
-
            );
-
            return Err(e);
-
        }
-
    }
-
    Ok(())
-
}
-

-
fn status_updater(dirname: PathBuf, mut page: StatusPage, interval: Duration) {
-
    let filename = dirname.join("status.json");
-
    loop {
-
        page.update_timestamp();
-
        if dirname.exists() {
-
            if let Err(e) = page.write_json(&filename) {
-
                eprintln!("ERROR: failed to update {}: {e}", filename.display());
-
            }
-
            update_report_page(&dirname, &mut page).ok();
-
        }
-
        sleep(interval);
-
    }
-
}
added src/bin/cib.rs
@@ -0,0 +1,206 @@
+
#![allow(clippy::result_large_err)]
+

+
use std::{
+
    error::Error,
+
    fs::write,
+
    path::{Path, PathBuf},
+
    process::exit,
+
};
+

+
use clap::Parser;
+
use log::{debug, error, info};
+

+
use radicle_ci_broker::{
+
    adapter::Adapter,
+
    broker::Broker,
+
    config::{Config, ConfigError},
+
    db::{Db, DbError},
+
    error::BrokerError,
+
    pages::{PageBuilder, PageError},
+
    queueadd::{AdderError, QueueAdder},
+
    queueproc::{QueueError, QueueProcessorBuilder},
+
};
+

+
fn main() {
+
    if let Err(e) = fallible_main() {
+
        error!("ERROR: {}", e);
+
        let mut e = e.source();
+
        while let Some(source) = e {
+
            error!("caused by: {}", source);
+
            e = source.source();
+
        }
+
        exit(1);
+
    }
+
}
+

+
fn fallible_main() -> Result<(), CibError> {
+
    let args = Args::parse();
+

+
    pretty_env_logger::init_custom_env("RADICLE_CI_BROKER_LOG");
+
    info!("Radicle CI broker starts");
+

+
    let config = Config::load(&args.config).map_err(|e| CibError::read_config(&args.config, e))?;
+
    debug!("loaded configuration: {:#?}", config);
+

+
    args.run(&config)?;
+

+
    Ok(())
+
}
+

+
#[derive(Debug, Parser)]
+
struct Args {
+
    #[clap(long)]
+
    config: PathBuf,
+

+
    #[clap(subcommand)]
+
    cmd: Cmd,
+
}
+

+
impl Args {
+
    fn run(&self, config: &Config) -> Result<(), CibError> {
+
        match &self.cmd {
+
            Cmd::Config(x) => x.run(self, config)?,
+
            Cmd::Insert(x) => x.run(self, config)?,
+
            Cmd::Queued(x) => x.run(self, config)?,
+
        }
+
        Ok(())
+
    }
+

+
    fn open_db(&self, config: &Config) -> Result<Db, CibError> {
+
        Db::new(&config.db).map_err(CibError::db)
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
enum Cmd {
+
    Config(ConfigCmd),
+
    Insert(InsertCmd),
+
    Queued(QueuedCmd),
+
}
+

+
#[derive(Debug, Parser)]
+
struct ConfigCmd {
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+
}
+

+
impl ConfigCmd {
+
    fn run(&self, _args: &Args, config: &Config) -> Result<(), CibError> {
+
        let json = config.to_json();
+

+
        if let Some(output) = &self.output {
+
            write(output, json.as_bytes()).map_err(|e| CibError::write_config(output, e))?;
+
        } else {
+
            println!("{json}");
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
struct InsertCmd {}
+

+
impl InsertCmd {
+
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let db = args.open_db(config)?;
+
        QueueAdder::add_events_in_thread(config, db).map_err(CibError::QueueAdder)?;
+
        debug!("cib ends");
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
struct QueuedCmd {}
+

+
impl QueuedCmd {
+
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let db = args.open_db(config)?;
+

+
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let spec =
+
            config
+
                .adapter(&config.default_adapter)
+
                .ok_or(CibError::UnknownDefaultAdapter(
+
                    config.default_adapter.clone(),
+
                ))?;
+
        let adapter = Adapter::new(&spec.command)
+
            .with_environment(spec.envs())
+
            .with_environment(spec.sensitive_envs());
+
        debug!("default adapter: {adapter:?}");
+
        broker.set_default_adapter(&adapter);
+

+
        let page = PageBuilder::default()
+
            .node_alias("dummy")
+
            .build()
+
            .map_err(CibError::status_page)?;
+

+
        let processor = QueueProcessorBuilder::default()
+
            .db(db)
+
            .broker(broker)
+
            .page(page)
+
            .build()
+
            .map_err(CibError::process_queue)?;
+
        let thread = processor.process_in_thread();
+
        thread
+
            .join()
+
            .expect("wait for thread to finish")
+
            .map_err(CibError::process_queue)?;
+

+
        debug!("cib ends");
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
enum CibError {
+
    #[error("failed to read configuration file {0}")]
+
    ReadConfig(PathBuf, #[source] ConfigError),
+

+
    #[error("failed to write config as JSON to file {0}")]
+
    WriteConfig(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to use SQLite database")]
+
    Db(#[source] DbError),
+

+
    #[error("failed create broker data type")]
+
    NewBroker(#[source] BrokerError),
+

+
    #[error("failed to update status page data structure")]
+
    StatusPage(#[source] PageError),
+

+
    #[error("failed to add node events into queue")]
+
    QueueAdder(#[source] AdderError),
+

+
    #[error("failed to process events from queue")]
+
    ProcessQueue(#[source] QueueError),
+

+
    #[error("default adapter is not in list of adapters")]
+
    UnknownDefaultAdapter(String),
+
}
+

+
impl CibError {
+
    fn read_config(filename: &Path, e: ConfigError) -> Self {
+
        Self::ReadConfig(filename.into(), e)
+
    }
+

+
    fn write_config(filename: &Path, e: std::io::Error) -> Self {
+
        Self::WriteConfig(filename.into(), e)
+
    }
+

+
    fn db(e: DbError) -> Self {
+
        Self::Db(e)
+
    }
+

+
    fn new_broker(e: BrokerError) -> Self {
+
        Self::NewBroker(e)
+
    }
+

+
    fn status_page(e: PageError) -> Self {
+
        Self::StatusPage(e)
+
    }
+

+
    fn process_queue(e: QueueError) -> Self {
+
        Self::ProcessQueue(e)
+
    }
+
}