Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: add optional "triggers" to config
Merged liw opened 1 year ago
8 files changed +213 -46 ee34af59 aaf68737
modified ci-broker.md
@@ -57,6 +57,43 @@ filters:
  - !Branch "main"
~~~

+
~~~{#broker-with-triggers.yaml .file .yaml}
+
db: ci-broker.db
+
report_dir: reports
+
queue_len_interval: 1min
+
adapters:
+
  mcadapterface:
+
    command: ./adapter.sh
+
    env:
+
      RADICLE_NATIVE_CI: native-ci.yaml
+
    sensitive_env:
+
      API_KEY: xyzzy
+
triggers:
+
  - adapter: mcadapterface
+
    filters:
+
      - !Branch "main"
+
~~~
+

+
~~~{#broker-with-two-triggers.yaml .file .yaml}
+
db: ci-broker.db
+
report_dir: reports
+
queue_len_interval: 1min
+
adapters:
+
  mcadapterface:
+
    command: ./adapter.sh
+
    env:
+
      RADICLE_NATIVE_CI: native-ci.yaml
+
    sensitive_env:
+
      API_KEY: xyzzy
+
triggers:
+
  - adapter: mcadapterface
+
    filters:
+
      - !Branch "main"
+
  - adapter: mcadapterface
+
    filters:
+
      - !Branch "main"
+
~~~
+

~~~{#broker-allow-nothing.yaml .file .yaml}
db: ci-broker.db
report_dir: reports
@@ -293,12 +330,12 @@ nothing else has a hope of working.
_Who:_ `cib-devs`

~~~scenario
-
given a Radicle node, with CI configured with broker.yaml and adapter dummy.sh
+
given a Radicle node, with CI configured with broker-with-triggers.yaml and adapter dummy.sh
given a Git repository xyzzy in the Radicle node
given the Radicle node emits a refsUpdated event for xyzzy
when I run ./env.sh synthetic-events synt.sock event.json --log log.txt
given a directory reports
-
when I run ./env.sh cib --config broker.yaml process-events
+
when I run ./env.sh cib --config broker-with-triggers.yaml process-events

then stderr contains "CibStart"
then stderr contains "CibConfig"
@@ -315,6 +352,38 @@ then stdout contains ""id": "xyzzy""
~~~


+
## Runs adapters for all matching triggers
+

+
_Want:_ CI broker can run its adapter.
+

+
_Why:_ This is obviously necessary. If this doesn't work,
+
nothing else has a hope of working.
+

+
_Who:_ `cib-devs`
+

+
~~~scenario
+
given a Radicle node, with CI configured with broker-with-two-triggers.yaml and adapter dummy.sh
+
given a Git repository xyzzy in the Radicle node
+
given the Radicle node emits a refsUpdated event for xyzzy
+
when I run ./env.sh synthetic-events synt.sock event.json --log log.txt
+
given a directory reports
+
when I run ./env.sh cib --config broker-with-two-triggers.yaml process-events
+

+
then stderr contains "CibStart"
+
then stderr contains "CibConfig"
+
then stderr contains "CibEndSuccess"
+
then file reports/index.html exists
+
then file reports/status.json exists
+
then file reports/index.rss exists
+

+
when I run cibtool --db ci-broker.db event list
+
then stdout is empty
+

+
when I run cibtool --db ci-broker.db run list
+
then stdout has 2 lines containing "xyzzy"
+
~~~
+

+

## Runs adapter on each type of event

_Want:_ CI broker runs the adapter for each type of CI event.
modified ci-broker.yaml
@@ -25,6 +25,11 @@
    rust:
      function: stdout_has_one_line

+
- then: "stdout has {n:uint} lines containing \"{text:text}\""
+
  impl:
+
    rust:
+
      function: stdout_has_n_lines_containing
+

- then: "stdout is empty"
  impl:
    rust:
modified src/adapter.rs
@@ -34,32 +34,36 @@ const NOT_EXITED: i32 = 999;
#[derive(Clone)]
pub struct Adapters {
    adapters: HashMap<String, Adapter>,
-
    default_adapter: String,
+
    default_adapter: Option<String>,
}

impl Adapters {
    pub fn new(
        adapters: &HashMap<String, AdapterConfig>,
-
        default_adapter: &str,
+
        default_adapter: Option<&str>,
    ) -> Result<Self, AdapterError> {
-
        if !adapters.contains_key(default_adapter) {
-
            Err(AdapterError::NoDefaultAdapter)
-
        } else {
-
            Ok(Self {
-
                adapters: HashMap::from_iter(
-
                    adapters
-
                        .iter()
-
                        .map(|(k, v)| (k.to_string(), Adapter::from(v))),
-
                ),
-
                default_adapter: default_adapter.into(),
-
            })
+
        if let Some(default) = default_adapter {
+
            if !adapters.contains_key(default) {
+
                return Err(AdapterError::NoDefaultAdapter);
+
            }
        }
+

+
        Ok(Self {
+
            adapters: HashMap::from_iter(
+
                adapters
+
                    .iter()
+
                    .map(|(k, v)| (k.to_string(), Adapter::from(v))),
+
            ),
+
            default_adapter: default_adapter.map(|s| s.into()),
+
        })
    }

-
    pub fn default_adapter(&self) -> &Adapter {
-
        // We KNOW the default adapter is in the map so the unwrap is safe.
-
        #[allow(clippy::unwrap_used)]
-
        self.adapters.get(&self.default_adapter).unwrap()
+
    pub fn default_adapter(&self) -> Option<&Adapter> {
+
        if let Some(default) = &self.default_adapter {
+
            self.adapters.get(default)
+
        } else {
+
            None
+
        }
    }

    pub fn get(&self, name: &str) -> Option<&Adapter> {
modified src/bin/cib.rs
@@ -195,6 +195,7 @@ impl QueuedCmd {
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
            .filters(config.filters())
+
            .triggers(&config.to_triggers())
            .adapters(&adapters)
            .build()
            .map_err(CibError::process_queue)?;
@@ -273,6 +274,7 @@ impl ProcessEventsCmd {
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
            .filters(config.filters())
+
            .triggers(&config.to_triggers())
            .adapters(&adapters)
            .build()
            .map_err(CibError::process_queue)?;
modified src/config.rs
@@ -11,8 +11,7 @@ use serde::{Deserialize, Serialize};

use crate::{
    adapter::{Adapter, AdapterError, Adapters},
-
    filter::EventFilter,
-
    logger,
+
    filter::{EventFilter, Trigger},
    sensitive::Sensitive,
};

@@ -21,9 +20,10 @@ const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;

#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
-
    default_adapter: String,
+
    default_adapter: Option<String>,
    adapters: HashMap<String, AdapterConfig>,
-
    filters: Vec<EventFilter>,
+
    filters: Option<Vec<EventFilter>>,
+
    triggers: Option<Vec<TriggerConfig>>,
    report_dir: Option<PathBuf>,
    status_update_interval_seconds: Option<u64>,
    db: PathBuf,
@@ -57,19 +57,20 @@ impl Config {
    }

    pub fn to_adapters(&self) -> Result<Adapters, AdapterError> {
-
        Adapters::new(&self.adapters, &self.default_adapter)
+
        Adapters::new(&self.adapters, self.default_adapter.as_deref())
    }

    pub fn filters(&self) -> &[EventFilter] {
-
        &self.filters
+
        self.filters.as_deref().unwrap_or(&[])
    }

-
    pub fn default_adapter(&self) -> Result<Adapter, ConfigError> {
-
        logger::adapter_config(self);
-
        self.adapters
-
            .get(&self.default_adapter)
-
            .map(Adapter::from)
-
            .ok_or(ConfigError::NoDefaultAdapter)
+
    pub fn to_triggers(&self) -> Vec<Trigger> {
+
        self.triggers
+
            .as_deref()
+
            .unwrap_or(&[])
+
            .iter()
+
            .map(Trigger::from)
+
            .collect()
    }

    pub fn status_page_update_interval(&self) -> u64 {
@@ -120,6 +121,12 @@ impl From<&AdapterConfig> for Adapter {
    }
}

+
#[derive(Debug, Serialize, Deserialize)]
+
pub struct TriggerConfig {
+
    pub adapter: String,
+
    pub filters: Vec<EventFilter>,
+
}
+

/// All possible errors from configuration handling.
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
modified src/filter.rs
@@ -7,9 +7,35 @@ use radicle_git_ext::Oid;

use crate::{
    ci_event::{CiEvent, CiEventV1},
+
    config::TriggerConfig,
    logger,
};

+
#[derive(Clone)]
+
pub struct Trigger {
+
    adapter: String,
+
    filters: Vec<EventFilter>,
+
}
+

+
impl Trigger {
+
    pub fn adapter(&self) -> &str {
+
        &self.adapter
+
    }
+

+
    pub fn allows(&self, e: &CiEvent) -> bool {
+
        self.filters.iter().any(|filter| filter.allows(e))
+
    }
+
}
+

+
impl From<&TriggerConfig> for Trigger {
+
    fn from(config: &TriggerConfig) -> Self {
+
        Self {
+
            adapter: config.adapter.clone(),
+
            filters: config.filters.clone(),
+
        }
+
    }
+
}
+

/// A Boolean expression for filtering broker events.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
modified src/queueproc.rs
@@ -15,7 +15,7 @@ use crate::{
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueueId, QueuedCiEvent},
-
    filter::EventFilter,
+
    filter::{EventFilter, Trigger},
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
@@ -26,6 +26,7 @@ pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
    filters: Option<Vec<EventFilter>>,
+
    triggers: Option<Vec<Trigger>>,
    adapters: Option<Adapters>,
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
@@ -41,6 +42,7 @@ impl QueueProcessorBuilder {
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
            filters: self.filters.ok_or(QueueError::Missing("filters"))?,
+
            triggers: self.triggers.ok_or(QueueError::Missing("triggers"))?,
            adapters: self.adapters.ok_or(QueueError::Missing("adapters"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
@@ -81,6 +83,11 @@ impl QueueProcessorBuilder {
        self
    }

+
    pub fn triggers(mut self, triggers: &[Trigger]) -> Self {
+
        self.triggers = Some(triggers.to_vec());
+
        self
+
    }
+

    pub fn adapters(mut self, adapters: &Adapters) -> Self {
        self.adapters = Some(adapters.clone());
        self
@@ -91,6 +98,7 @@ pub struct QueueProcessor {
    db: Db,
    profile: Profile,
    filters: Vec<EventFilter>,
+
    triggers: Vec<Trigger>,
    broker: Broker,
    adapters: Adapters,
    events_rx: NotificationReceiver,
@@ -110,19 +118,25 @@ impl QueueProcessor {
        while !done {
            let mut first_error = None;
            while let Some(qe) = self.pick_event()? {
-
                if let Some(adapters) = self.pick_adapters(qe.event()) {
-
                    for adapter in adapters {
-
                        match self.run_adapter(&qe, &adapter) {
-
                            Err(err) => {
-
                                if first_error.is_none() {
-
                                    first_error = Some(err)
+
                match self.pick_adapters(qe.event()) {
+
                    Err(err) => {
+
                        if first_error.is_none() {
+
                            first_error = Some(err);
+
                        }
+
                    }
+
                    Ok(None) => self.drop_event(qe.id())?,
+
                    Ok(Some(adapters)) => {
+
                        for adapter in adapters {
+
                            match self.run_adapter(&qe, &adapter) {
+
                                Err(err) => {
+
                                    if first_error.is_none() {
+
                                        first_error = Some(err)
+
                                    }
                                }
+
                                Ok(finished) => done = finished,
                            }
-
                            Ok(finished) => done = finished,
                        }
                    }
-
                } else {
-
                    self.drop_event(qe.id())?;
                }
            }

@@ -185,13 +199,33 @@ impl QueueProcessor {
        }
    }

-
    fn pick_adapters(&self, e: &CiEvent) -> Option<Vec<Adapter>> {
-
        for filter in self.filters.iter() {
-
            if filter.allows(e) {
-
                return Some(vec![self.adapters.default_adapter().clone()]);
+
    fn pick_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
+
        let mut adapters = vec![];
+

+
        if self.filters.iter().any(|filter| filter.allows(e)) {
+
            if let Some(default) = self.adapters.default_adapter() {
+
                adapters.push(default.clone());
+
            } else {
+
                return Err(QueueError::NoDefaultAdapter);
            }
        }
-
        None
+

+
        for trigger in self.triggers.iter() {
+
            if trigger.allows(e) {
+
                let name = trigger.adapter().to_string();
+
                let adapter = self
+
                    .adapters
+
                    .get(&name)
+
                    .ok_or(QueueError::UnknownAdapter(name))?;
+
                adapters.push(adapter.clone());
+
            }
+
        }
+

+
        if adapters.is_empty() {
+
            Ok(None)
+
        } else {
+
            Ok(Some(adapters))
+
        }
    }

    fn process_event(&mut self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
@@ -322,6 +356,12 @@ pub enum QueueError {

    #[error(transparent)]
    NotifyRun(#[from] crate::notif::NotificationError),
+

+
    #[error("trigger refers to unknown adapter {0}")]
+
    UnknownAdapter(String),
+

+
    #[error("no default adapter specified in configuration")]
+
    NoDefaultAdapter,
}

impl QueueError {
modified src/subplot.rs
@@ -398,6 +398,20 @@ fn stdout_has_one_line(runcmd: &Runcmd) {
#[step]
#[context(SubplotContext)]
#[context(Runcmd)]
+
fn stdout_has_n_lines_containing(runcmd: &Runcmd, n: usize, text: &str) {
+
    let linecount = runcmd
+
        .stdout_as_string()
+
        .lines()
+
        .filter(|line| line.contains(text))
+
        .count();
+
    if linecount != n {
+
        throw!(format!("stdout had {linecount} lines, expected {n}"));
+
    }
+
}
+

+
#[step]
+
#[context(SubplotContext)]
+
#[context(Runcmd)]
fn stdout_is_empty(runcmd: &Runcmd) {
    let stdout = runcmd.stdout_as_string();
    if !stdout.is_empty() {