Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor internal adapter handling
Merged liw opened 1 year ago

This is all ground work towards having any number of triggers for CI runs in the config, potentially each with a different adapter.

8 files changed +148 -95 2e37eb04 ee34af59
modified ci-broker.md
@@ -858,29 +858,6 @@ when I run cibtool --db ci-broker.db event count
then stdout is exactly "1000\n"
~~~

-
## Don't insert events into queue when not allowed by filter
-

-
_Want:_ Nothing is inserted into the persistent event queue
-
then the CI broker's filter does not allow any events.
-

-
_Why:_ This is fundamental for running CI when repositories
-
in a node change.
-

-
_Who:_ `cib-devs`
-

-
~~~scenario
-
given a Radicle node, with CI configured with broker-allow-nothing.yaml and adapter dummy.sh
-
given a Git repository xyzzy in the Radicle node
-
given the Radicle node emits a refsUpdated event for xyzzy
-
when I run ./env.sh synthetic-events synt.sock event.json --log log.txt
-

-
when I try to run ./env.sh env RAD_SOCKET=synt.sock cib --config broker-allow-nothing.yaml insert
-
then command is successful
-

-
when I run cibtool --db ci-broker.db event count
-
then stdout is exactly "0\n"
-
~~~
-

## Process queued events

_Want:_ It's possible to run the CI broker in a mode where it
modified src/adapter.rs
@@ -18,6 +18,7 @@ use std::{
//use tracing::Instrument;

use crate::{
+
    config::AdapterConfig,
    db::{Db, DbError},
    logger,
    msg::{MessageError, Request, Response},
@@ -29,6 +30,43 @@ use crate::{

const NOT_EXITED: i32 = 999;

+
/// The set of all configured adapters.
+
#[derive(Clone)]
+
pub struct Adapters {
+
    adapters: HashMap<String, Adapter>,
+
    default_adapter: String,
+
}
+

+
impl Adapters {
+
    pub fn new(
+
        adapters: &HashMap<String, AdapterConfig>,
+
        default_adapter: &str,
+
    ) -> Result<Self, AdapterError> {
+
        if !adapters.contains_key(default_adapter) {
+
            Err(AdapterError::NoDefaultAdapter)
+
        } else {
+
            Ok(Self {
+
                adapters: HashMap::from_iter(
+
                    adapters
+
                        .iter()
+
                        .map(|(k, v)| (k.to_string(), Adapter::from(v))),
+
                ),
+
                default_adapter: default_adapter.into(),
+
            })
+
        }
+
    }
+

+
    pub fn default_adapter(&self) -> &Adapter {
+
        // We KNOW the default adapter is in the map so the unwrap is safe.
+
        #[allow(clippy::unwrap_used)]
+
        self.adapters.get(&self.default_adapter).unwrap()
+
    }
+

+
    pub fn get(&self, name: &str) -> Option<&Adapter> {
+
        self.adapters.get(name)
+
    }
+
}
+

/// An external executable that runs CI on request.
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct Adapter {
@@ -227,6 +265,10 @@ impl MaybeResult {

#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
+
    /// No default adapter.
+
    #[error("the default adapter is not defined in the configuration")]
+
    NoDefaultAdapter,
+

    /// Error from [`TimeoutCommand`] or [`RunningProcess`].
    #[error(transparent)]
    TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),
modified src/bin/cib.rs
@@ -13,6 +13,7 @@ use clap::Parser;
use radicle::Profile;

use radicle_ci_broker::{
+
    adapter::AdapterError,
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
@@ -149,7 +150,6 @@ impl InsertCmd {
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
-
            .filters(config.filters())
            .build()
            .map_err(CibError::QueueAdder)?;
        let thread = adder.add_events_in_thread();
@@ -172,11 +172,11 @@ impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let profile = Profile::load().map_err(CibError::profile)?;

-
        let adapter = config.default_adapter()?;
+
        let adapters = config.to_adapters().map_err(CibError::Adapters)?;
        logger::adapter_config(config);

-
        let broker = Broker::new(config.db(), config.max_run_time(), &adapter)
-
            .map_err(CibError::new_broker)?;
+
        let broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;

        let mut event_notifications = NotificationChannel::new_event();
        event_notifications
@@ -194,6 +194,8 @@ impl QueuedCmd {
            .db(db)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
+
            .filters(config.filters())
+
            .adapters(&adapters)
            .build()
            .map_err(CibError::process_queue)?;
        let thread = processor.process_in_thread();
@@ -240,7 +242,6 @@ impl ProcessEventsCmd {
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
-
            .filters(config.filters())
            .build()
            .map_err(CibError::QueueAdder)?;
        adder.add_events_in_thread();
@@ -260,10 +261,10 @@ impl ProcessEventsCmd {
            false,
        );

-
        let adapter = config.default_adapter()?;
+
        let adapters = config.to_adapters().map_err(CibError::Adapters)?;

-
        let broker = Broker::new(config.db(), config.max_run_time(), &adapter)
-
            .map_err(CibError::new_broker)?;
+
        let broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;

        let processor = QueueProcessorBuilder::default()
            .events_rx(events_notification.rx().map_err(CibError::notification)?)
@@ -271,6 +272,8 @@ impl ProcessEventsCmd {
            .db(args.open_db(config)?)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
+
            .filters(config.filters())
+
            .adapters(&adapters)
            .build()
            .map_err(CibError::process_queue)?;
        let processor = processor.process_in_thread();
@@ -322,8 +325,8 @@ enum CibError {
    #[error("failed to add events to queue")]
    AddEvents(#[source] AdderError),

-
    #[error(transparent)]
-
    DefaultAdapter(#[from] ConfigError),
+
    #[error("could not construct list of adapters from configuration")]
+
    Adapters(#[source] AdapterError),

    #[error("programming error: failed to set up inter-thread notification channel")]
    Notification(#[source] NotificationError),
modified src/broker.rs
@@ -28,21 +28,15 @@ use crate::{
/// node, and executes the appropriate adapter to run CI on the
/// repository.
pub struct Broker {
-
    default_adapter: Adapter,
    max_run_time: Duration,
    db: Db,
}

impl Broker {
    #[allow(clippy::result_large_err)]
-
    pub fn new(
-
        db_filename: &Path,
-
        max_run_time: Duration,
-
        default_adapter: &Adapter,
-
    ) -> Result<Self, BrokerError> {
+
    pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
        logger::broker_db(db_filename);
        Ok(Self {
-
            default_adapter: default_adapter.clone(),
            max_run_time,
            db: Db::new(db_filename)?,
        })
@@ -53,19 +47,15 @@ impl Broker {
        Ok(self.db.get_all_runs()?)
    }

-
    fn default_adapter(&self) -> &Adapter {
-
        &self.default_adapter
-
    }
-

    #[allow(clippy::result_large_err)]
    pub fn execute_ci(
        &mut self,
+
        adapter: &Adapter,
        trigger: &Request,
        run_notification: &NotificationSender,
    ) -> Result<Run, BrokerError> {
        let broker_run_id = RunId::default();
        let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
-
        let adapter = self.default_adapter();
        let run = span
            .in_scope(|| self.execute_helper(adapter, broker_run_id, trigger, run_notification))?;
        Ok(run)
@@ -193,7 +183,7 @@ mod test {
    use std::{path::Path, time::Duration};
    use tempfile::tempdir;

-
    use super::{Adapter, Broker};
+
    use super::Broker;
    use crate::{
        msg::{RunId, RunResult},
        notif::NotificationChannel,
@@ -201,18 +191,8 @@ mod test {
        test::{mock_adapter, trigger_request, TestResult},
    };

-
    fn broker(filename: &Path, adapter: &Adapter) -> anyhow::Result<Broker> {
-
        Ok(Broker::new(filename, Duration::from_secs(1), adapter)?)
-
    }
-

-
    #[test]
-
    fn has_adapter() -> TestResult<()> {
-
        let tmp = tempdir()?;
-
        let db = tmp.path().join("db.db");
-
        let adapter = Adapter::default();
-
        let broker = broker(&db, &adapter)?;
-
        assert_eq!(broker.default_adapter(), &adapter);
-
        Ok(())
+
    fn broker(filename: &Path) -> anyhow::Result<Broker> {
+
        Ok(Broker::new(filename, Duration::from_secs(1))?)
    }

    #[test]
@@ -229,13 +209,13 @@ echo '{"response":"finished","result":"success"}'

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db, &adapter)?;
+
        let mut broker = broker(&db)?;

        let trigger = trigger_request()?;

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&trigger, &sender);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -261,13 +241,13 @@ exit 1

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db, &adapter)?;
+
        let mut broker = broker(&db)?;

        let trigger = trigger_request()?;

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&trigger, &sender);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/config.rs
@@ -9,7 +9,12 @@ use std::{
use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};

-
use crate::{adapter::Adapter, filter::EventFilter, logger, sensitive::Sensitive};
+
use crate::{
+
    adapter::{Adapter, AdapterError, Adapters},
+
    filter::EventFilter,
+
    logger,
+
    sensitive::Sensitive,
+
};

const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;
@@ -51,6 +56,10 @@ impl Config {
        self.report_dir.as_deref()
    }

+
    pub fn to_adapters(&self) -> Result<Adapters, AdapterError> {
+
        Adapters::new(&self.adapters, &self.default_adapter)
+
    }
+

    pub fn filters(&self) -> &[EventFilter] {
        &self.filters
    }
modified src/logger.rs
@@ -9,6 +9,7 @@ use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

use crate::{
+
    adapter::Adapter,
    ci_event::CiEvent,
    ci_event_source::CiEventSource,
    config::Config,
@@ -430,12 +431,13 @@ pub fn queueproc_queue_length(len: usize) {
    );
}

-
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent) {
+
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent, adapter: &Adapter) {
    info!(
        msg_id = ?Id::QueueProcPickedEvent,
        kind = %Kind::GotEvent,
        ?id,
        ?event,
+
        ?adapter,
        "picked event from queue"
    );
}
modified src/queueadd.rs
@@ -6,7 +6,6 @@ use crate::{
    ci_event::CiEvent,
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError},
-
    filter::EventFilter,
    logger,
    notif::NotificationSender,
};
@@ -14,7 +13,6 @@ use crate::{
#[derive(Default)]
pub struct QueueAdderBuilder {
    db: Option<Db>,
-
    filters: Option<Vec<EventFilter>>,
    events_tx: Option<NotificationSender>,
}

@@ -22,7 +20,6 @@ impl QueueAdderBuilder {
    pub fn build(self) -> Result<QueueAdder, AdderError> {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
-
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }
@@ -36,15 +33,9 @@ impl QueueAdderBuilder {
        self.db = Some(db);
        self
    }
-

-
    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
-
        self.filters = Some(filters.to_vec());
-
        self
-
    }
}

pub struct QueueAdder {
-
    filters: Vec<EventFilter>,
    db: Db,
    events_tx: NotificationSender,
}
@@ -76,12 +67,8 @@ impl QueueAdder {
                }
                Ok(Some(events)) => {
                    for e in events {
-
                        for filter in self.filters.iter() {
-
                            if filter.allows(&e) {
-
                                logger::queueadd_push_event(&e);
-
                                self.push_event(e.clone())?;
-
                            }
-
                        }
+
                        logger::queueadd_push_event(&e);
+
                        self.push_event(e.clone())?;
                    }
                }
            }
modified src/queueproc.rs
@@ -11,9 +11,11 @@ use std::{
use radicle::Profile;

use crate::{
+
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueueId, QueuedCiEvent},
+
    filter::EventFilter,
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
@@ -23,6 +25,8 @@ use crate::{
pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
+
    filters: Option<Vec<EventFilter>>,
+
    adapters: Option<Adapters>,
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
    queue_len_interval: Option<Duration>,
@@ -36,6 +40,8 @@ impl QueueProcessorBuilder {
            db: self.db.ok_or(QueueError::Missing("db"))?,
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
+
            filters: self.filters.ok_or(QueueError::Missing("filters"))?,
+
            adapters: self.adapters.ok_or(QueueError::Missing("adapters"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
            queue_len_interval: self
@@ -69,12 +75,24 @@ impl QueueProcessorBuilder {
        self.broker = Some(broker);
        self
    }
+

+
    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
+
        self.filters = Some(filters.to_vec());
+
        self
+
    }
+

+
    pub fn adapters(mut self, adapters: &Adapters) -> Self {
+
        self.adapters = Some(adapters.clone());
+
        self
+
    }
}

pub struct QueueProcessor {
    db: Db,
    profile: Profile,
+
    filters: Vec<EventFilter>,
    broker: Broker,
+
    adapters: Adapters,
    events_rx: NotificationReceiver,
    run_tx: NotificationSender,
    queue_len_interval: Duration,
@@ -90,19 +108,28 @@ impl QueueProcessor {
        logger::queueproc_start();
        let mut done = false;
        while !done {
+
            let mut first_error = None;
            while let Some(qe) = self.pick_event()? {
-
                self.run_tx.notify()?;
-
                logger::queueproc_picked_event(qe.id(), &qe);
-
                let res = self.process_event(qe.event());
-
                logger::queueproc_processed_event(&res);
-
                match res {
-
                    Ok(shut_down) => done = shut_down,
-
                    Err(QueueError::BuildTrigger(_, _)) => done = false,
-
                    Err(err) => Err(err)?,
+
                if let Some(adapters) = self.pick_adapters(qe.event()) {
+
                    for adapter in adapters {
+
                        match self.run_adapter(&qe, &adapter) {
+
                            Err(err) => {
+
                                if first_error.is_none() {
+
                                    first_error = Some(err)
+
                                }
+
                            }
+
                            Ok(finished) => done = finished,
+
                        }
+
                    }
+
                } else {
+
                    self.drop_event(qe.id())?;
                }
-
                self.drop_event(qe.id())?;
-
                self.run_tx.notify()?;
            }
+

+
            if let Some(err) = first_error {
+
                return Err(err);
+
            }
+

            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
                Err(RecvTimeoutError::Timeout) => {}
@@ -117,6 +144,23 @@ impl QueueProcessor {
        Ok(())
    }

+
    fn run_adapter(&mut self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
        let mut done = false;
+

+
        self.run_tx.notify()?;
+
        logger::queueproc_picked_event(qe.id(), qe, adapter);
+
        let res = self.process_event(qe.event(), adapter);
+
        logger::queueproc_processed_event(&res);
+
        match res {
+
            Ok(shut_down) => done = shut_down,
+
            Err(QueueError::BuildTrigger(_, _)) => done = false,
+
            Err(err) => Err(err)?,
+
        }
+
        self.drop_event(qe.id())?;
+
        self.run_tx.notify()?;
+
        Ok(done)
+
    }
+

    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;

@@ -141,7 +185,16 @@ impl QueueProcessor {
        }
    }

-
    fn process_event(&mut self, event: &CiEvent) -> Result<bool, QueueError> {
+
    fn pick_adapters(&self, e: &CiEvent) -> Option<Vec<Adapter>> {
+
        for filter in self.filters.iter() {
+
            if filter.allows(e) {
+
                return Some(vec![self.adapters.default_adapter().clone()]);
+
            }
+
        }
+
        None
+
    }
+

+
    fn process_event(&mut self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
        logger::debug2(format!("queproc::process_event: called; event={event:#?}"));
        let x = match event {
            CiEvent::V1(CiEventV1::Shutdown) => {
@@ -161,7 +214,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -179,7 +232,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -196,7 +249,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -213,7 +266,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -233,7 +286,7 @@ impl QueueProcessor {
                logger::debug2(format!("got trigger {trigger:?}"));
                let trigger = trigger?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                logger::debug("executed ci");
                Ok(false)