Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: filter queued events when they are processed
Lars Wirzenius committed 1 year ago
commit ba644e14d61f983547663e5d51fa2fe946eb9b3b
parent 73c533580af998fcd86140508b6513d057bf3851
4 files changed +36 -50
modified ci-broker.md
@@ -858,29 +858,6 @@ when I run cibtool --db ci-broker.db event count
then stdout is exactly "1000\n"
~~~

-
## Don't insert events into queue when not allowed by filter
-

-
_Want:_ Nothing is inserted into the persistent event queue
-
then the CI broker's filter does not allow any events.
-

-
_Why:_ This is fundamental for running CI when repositories
-
in a node change.
-

-
_Who:_ `cib-devs`
-

-
~~~scenario
-
given a Radicle node, with CI configured with broker-allow-nothing.yaml and adapter dummy.sh
-
given a Git repository xyzzy in the Radicle node
-
given the Radicle node emits a refsUpdated event for xyzzy
-
when I run ./env.sh synthetic-events synt.sock event.json --log log.txt
-

-
when I try to run ./env.sh env RAD_SOCKET=synt.sock cib --config broker-allow-nothing.yaml insert
-
then command is successful
-

-
when I run cibtool --db ci-broker.db event count
-
then stdout is exactly "0\n"
-
~~~
-

## Process queued events

_Want:_ It's possible to run the CI broker in a mode where it
modified src/bin/cib.rs
@@ -149,7 +149,6 @@ impl InsertCmd {
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
-
            .filters(config.filters())
            .build()
            .map_err(CibError::QueueAdder)?;
        let thread = adder.add_events_in_thread();
@@ -194,6 +193,7 @@ impl QueuedCmd {
            .db(db)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
+
            .filters(config.filters())
            .default_adapter(&adapter)
            .build()
            .map_err(CibError::process_queue)?;
@@ -241,7 +241,6 @@ impl ProcessEventsCmd {
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
-
            .filters(config.filters())
            .build()
            .map_err(CibError::QueueAdder)?;
        adder.add_events_in_thread();
@@ -272,6 +271,7 @@ impl ProcessEventsCmd {
            .db(args.open_db(config)?)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
+
            .filters(config.filters())
            .default_adapter(&adapter)
            .build()
            .map_err(CibError::process_queue)?;
modified src/queueadd.rs
@@ -6,7 +6,6 @@ use crate::{
    ci_event::CiEvent,
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError},
-
    filter::EventFilter,
    logger,
    notif::NotificationSender,
};
@@ -14,7 +13,6 @@ use crate::{
#[derive(Default)]
pub struct QueueAdderBuilder {
    db: Option<Db>,
-
    filters: Option<Vec<EventFilter>>,
    events_tx: Option<NotificationSender>,
}

@@ -22,7 +20,6 @@ impl QueueAdderBuilder {
    pub fn build(self) -> Result<QueueAdder, AdderError> {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
-
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }
@@ -36,15 +33,9 @@ impl QueueAdderBuilder {
        self.db = Some(db);
        self
    }
-

-
    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
-
        self.filters = Some(filters.to_vec());
-
        self
-
    }
}

pub struct QueueAdder {
-
    filters: Vec<EventFilter>,
    db: Db,
    events_tx: NotificationSender,
}
@@ -76,12 +67,8 @@ impl QueueAdder {
                }
                Ok(Some(events)) => {
                    for e in events {
-
                        for filter in self.filters.iter() {
-
                            if filter.allows(&e) {
-
                                logger::queueadd_push_event(&e);
-
                                self.push_event(e.clone())?;
-
                            }
-
                        }
+
                        logger::queueadd_push_event(&e);
+
                        self.push_event(e.clone())?;
                    }
                }
            }
modified src/queueproc.rs
@@ -15,6 +15,7 @@ use crate::{
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueueId, QueuedCiEvent},
+
    filter::EventFilter,
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
@@ -24,6 +25,7 @@ use crate::{
pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
+
    filters: Option<Vec<EventFilter>>,
    default_adapter: Option<Adapter>,
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
@@ -38,6 +40,7 @@ impl QueueProcessorBuilder {
            db: self.db.ok_or(QueueError::Missing("db"))?,
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
+
            filters: self.filters.ok_or(QueueError::Missing("filters"))?,
            default_adapter: self
                .default_adapter
                .ok_or(QueueError::Missing("default_adapter"))?,
@@ -75,6 +78,11 @@ impl QueueProcessorBuilder {
        self
    }

+
    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
+
        self.filters = Some(filters.to_vec());
+
        self
+
    }
+

    pub fn default_adapter(mut self, adapter: &Adapter) -> Self {
        self.default_adapter = Some(adapter.clone());
        self
@@ -84,6 +92,7 @@ impl QueueProcessorBuilder {
pub struct QueueProcessor {
    db: Db,
    profile: Profile,
+
    filters: Vec<EventFilter>,
    broker: Broker,
    default_adapter: Adapter,
    events_rx: NotificationReceiver,
@@ -102,17 +111,21 @@ impl QueueProcessor {
        let mut done = false;
        while !done {
            while let Some(qe) = self.pick_event()? {
-
                self.run_tx.notify()?;
-
                logger::queueproc_picked_event(qe.id(), &qe);
-
                let res = self.process_event(qe.event());
-
                logger::queueproc_processed_event(&res);
-
                match res {
-
                    Ok(shut_down) => done = shut_down,
-
                    Err(QueueError::BuildTrigger(_, _)) => done = false,
-
                    Err(err) => Err(err)?,
+
                if self.event_is_allowed(qe.event()) {
+
                    self.run_tx.notify()?;
+
                    logger::queueproc_picked_event(qe.id(), &qe);
+
                    let res = self.process_event(qe.event());
+
                    logger::queueproc_processed_event(&res);
+
                    match res {
+
                        Ok(shut_down) => done = shut_down,
+
                        Err(QueueError::BuildTrigger(_, _)) => done = false,
+
                        Err(err) => Err(err)?,
+
                    }
+
                    self.drop_event(qe.id())?;
+
                    self.run_tx.notify()?;
+
                } else {
+
                    self.drop_event(qe.id())?;
                }
-
                self.drop_event(qe.id())?;
-
                self.run_tx.notify()?;
            }
            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
@@ -152,6 +165,15 @@ impl QueueProcessor {
        }
    }

+
    fn event_is_allowed(&self, e: &CiEvent) -> bool {
+
        for filter in self.filters.iter() {
+
            if filter.allows(e) {
+
                return true;
+
            }
+
        }
+
        false
+
    }
+

    fn process_event(&mut self, event: &CiEvent) -> Result<bool, QueueError> {
        logger::debug2(format!("queproc::process_event: called; event={event:#?}"));
        let x = match event {