Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: cib reads and process events in one subcommand
Lars Wirzenius committed 1 year ago
commit d1c3f1801d7843b70d7941e1ae1cf9007a85b1ea
parent 14e2eaec015bf62f9b3472c6552aa3d5a20e3bdc
3 files changed +99 -8
modified ci-broker.md
@@ -242,16 +242,11 @@ given file broker.yaml
given file adapter.sh from dummy.sh
when I run chmod +x adapter.sh

-
when I try to run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml insert
-
then command is successful
+
when I run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml process-events

given an installed cibtool
-
when I run cibtool --db ci-broker.db event shutdown
when I run cibtool --db ci-broker.db event list
-
when I run cibtool --db ci-broker.db run list
-

-
when I try to run bash radenv.sh cib --config broker.yaml queued
-
then command is successful
+
then stdout is exactly ""

when I run cibtool --db ci-broker.db run list
then stdout contains "id: "xyzzy""
modified src/bin/cib.rs
@@ -10,11 +10,14 @@ use std::{
use clap::Parser;
use log::{debug, error, info};

+
use radicle::Profile;
+

use radicle_ci_broker::{
    adapter::Adapter,
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
+
    pages::{PageBuilder, PageError},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
};
@@ -60,6 +63,7 @@ impl Args {
            Cmd::Config(x) => x.run(self, config)?,
            Cmd::Insert(x) => x.run(self, config)?,
            Cmd::Queued(x) => x.run(self, config)?,
+
            Cmd::ProcessEvents(x) => x.run(self, config)?,
        }
        Ok(())
    }
@@ -74,6 +78,7 @@ enum Cmd {
    Config(ConfigCmd),
    Insert(InsertCmd),
    Queued(QueuedCmd),
+
    ProcessEvents(ProcessEventsCmd),
}

#[derive(Debug, Parser)]
@@ -152,6 +157,69 @@ impl QueuedCmd {
    }
}

+
#[derive(Debug, Parser)]
+
struct ProcessEventsCmd {}
+

+
impl ProcessEventsCmd {
+
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let adder = QueueAdderBuilder::default()
+
            .db(args.open_db(config)?)
+
            .filters(&config.filters)
+
            .push_shutdown()
+
            .build()
+
            .map_err(CibError::QueueAdder)?;
+
        let adder = adder.add_events_in_thread();
+

+
        let profile = Profile::load().map_err(CibError::profile)?;
+

+
        let db = args.open_db(config)?;
+

+
        let mut page = PageBuilder::default()
+
            .node_alias("FIXME")
+
            .runs(db.get_all_runs().map_err(CibError::db)?)
+
            .build()
+
            .map_err(CibError::page_updater)?;
+

+
        if let Some(dirname) = &config.report_dir {
+
            page.set_output_dir(dirname);
+
        }
+
        let thread = page.update_in_thread(db, &profile.config.node.alias, true);
+
        thread.join().unwrap().map_err(CibError::page_updater)?;
+

+
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let spec =
+
            config
+
                .adapter(&config.default_adapter)
+
                .ok_or(CibError::UnknownDefaultAdapter(
+
                    config.default_adapter.clone(),
+
                ))?;
+
        let adapter = Adapter::new(&spec.command)
+
            .with_environment(spec.envs())
+
            .with_environment(spec.sensitive_envs());
+
        debug!("default adapter: {adapter:?}");
+
        broker.set_default_adapter(&adapter);
+

+
        let processor = QueueProcessorBuilder::default()
+
            .db(args.open_db(config)?)
+
            .broker(broker)
+
            .build()
+
            .map_err(CibError::process_queue)?;
+
        let processor = processor.process_in_thread();
+
        processor
+
            .join()
+
            .expect("wait for processor thread to finish")
+
            .map_err(CibError::process_queue)?;
+

+
        adder
+
            .join()
+
            .expect("wait for adder thread to finish")
+
            .map_err(CibError::add_events)?;
+

+
        debug!("cib ends");
+
        Ok(())
+
    }
+
}
+

#[derive(Debug, thiserror::Error)]
enum CibError {
    #[error("failed to read configuration file {0}")]
@@ -160,9 +228,15 @@ enum CibError {
    #[error("failed to write config as JSON to file {0}")]
    WriteConfig(PathBuf, #[source] std::io::Error),

+
    #[error("failed to look up node profile")]
+
    Profile(#[source] radicle::profile::Error),
+

    #[error("failed to use SQLite database")]
    Db(#[source] DbError),

+
    #[error("failed to create report page")]
+
    PageUpdater(#[source] PageError),
+

    #[error("failed create broker data type")]
    NewBroker(#[source] BrokerError),

@@ -188,10 +262,18 @@ impl CibError {
        Self::WriteConfig(filename.into(), e)
    }

+
    fn profile(e: radicle::profile::Error) -> Self {
+
        Self::Profile(e)
+
    }
+

    fn db(e: DbError) -> Self {
        Self::Db(e)
    }

+
    fn page_updater(e: PageError) -> Self {
+
        Self::PageUpdater(e)
+
    }
+

    fn new_broker(e: BrokerError) -> Self {
        Self::NewBroker(e)
    }
modified src/queueadd.rs
@@ -6,13 +6,14 @@ use log::{debug, info};

use crate::{
    db::{Db, DbError},
-
    event::{EventFilter, NodeEventError, NodeEventSource},
+
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
};

#[derive(Default)]
pub struct QueueAdderBuilder {
    db: Option<Db>,
    filters: Option<Vec<EventFilter>>,
+
    push_shutdown: bool,
}

impl QueueAdderBuilder {
@@ -20,6 +21,7 @@ impl QueueAdderBuilder {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
+
            push_shutdown: self.push_shutdown,
        })
    }

@@ -32,11 +34,17 @@ impl QueueAdderBuilder {
        self.filters = Some(filters.to_vec());
        self
    }
+

+
    pub fn push_shutdown(mut self) -> Self {
+
        self.push_shutdown = true;
+
        self
+
    }
}

pub struct QueueAdder {
    filters: Vec<EventFilter>,
    db: Db,
+
    push_shutdown: bool,
}

impl QueueAdder {
@@ -72,6 +80,12 @@ impl QueueAdder {
            }
        }

+
        // Add a shutdown event to the queue so that the queue
+
        // processing thread knows to stop.
+
        if self.push_shutdown {
+
            self.db.push_queued_event(BrokerEvent::Shutdown)?;
+
        }
+

        Ok(())
    }
}