Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor: allow picking many adapters
Lars Wirzenius committed 1 year ago
commit 7ebe081334f233906291a716cd578b571487c03c
parent ba644e14d61f983547663e5d51fa2fe946eb9b3b
2 files changed +45 -21
modified src/logger.rs
@@ -9,6 +9,7 @@ use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

use crate::{
+
    adapter::Adapter,
    ci_event::CiEvent,
    ci_event_source::CiEventSource,
    config::Config,
@@ -430,12 +431,13 @@ pub fn queueproc_queue_length(len: usize) {
    );
}

-
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent) {
+
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent, adapter: &Adapter) {
    info!(
        msg_id = ?Id::QueueProcPickedEvent,
        kind = %Kind::GotEvent,
        ?id,
        ?event,
+
        ?adapter,
        "picked event from queue"
    );
}
modified src/queueproc.rs
@@ -110,23 +110,28 @@ impl QueueProcessor {
        logger::queueproc_start();
        let mut done = false;
        while !done {
+
            let mut first_error = None;
            while let Some(qe) = self.pick_event()? {
-
                if self.event_is_allowed(qe.event()) {
-
                    self.run_tx.notify()?;
-
                    logger::queueproc_picked_event(qe.id(), &qe);
-
                    let res = self.process_event(qe.event());
-
                    logger::queueproc_processed_event(&res);
-
                    match res {
-
                        Ok(shut_down) => done = shut_down,
-
                        Err(QueueError::BuildTrigger(_, _)) => done = false,
-
                        Err(err) => Err(err)?,
+
                if let Some(adapters) = self.pick_adapters(qe.event()) {
+
                    for adapter in adapters {
+
                        match self.run_adapter(&qe, &adapter) {
+
                            Err(err) => {
+
                                if first_error.is_none() {
+
                                    first_error = Some(err)
+
                                }
+
                            }
+
                            Ok(finished) => done = finished,
+
                        }
                    }
-
                    self.drop_event(qe.id())?;
-
                    self.run_tx.notify()?;
                } else {
                    self.drop_event(qe.id())?;
                }
            }
+

+
            if let Some(err) = first_error {
+
                return Err(err);
+
            }
+

            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
                Err(RecvTimeoutError::Timeout) => {}
@@ -141,6 +146,23 @@ impl QueueProcessor {
        Ok(())
    }

+
    fn run_adapter(&mut self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
        let mut done = false;
+

+
        self.run_tx.notify()?;
+
        logger::queueproc_picked_event(qe.id(), qe, adapter);
+
        let res = self.process_event(qe.event(), adapter);
+
        logger::queueproc_processed_event(&res);
+
        match res {
+
            Ok(shut_down) => done = shut_down,
+
            Err(QueueError::BuildTrigger(_, _)) => done = false,
+
            Err(err) => Err(err)?,
+
        }
+
        self.drop_event(qe.id())?;
+
        self.run_tx.notify()?;
+
        Ok(done)
+
    }
+

    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;

@@ -165,16 +187,16 @@ impl QueueProcessor {
        }
    }

-
    fn event_is_allowed(&self, e: &CiEvent) -> bool {
+
    fn pick_adapters(&self, e: &CiEvent) -> Option<Vec<Adapter>> {
        for filter in self.filters.iter() {
            if filter.allows(e) {
-
                return true;
+
                return Some(vec![self.default_adapter.clone()]);
            }
        }
-
        false
+
        None
    }

-
    fn process_event(&mut self, event: &CiEvent) -> Result<bool, QueueError> {
+
    fn process_event(&mut self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
        logger::debug2(format!("queproc::process_event: called; event={event:#?}"));
        let x = match event {
            CiEvent::V1(CiEventV1::Shutdown) => {
@@ -194,7 +216,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -212,7 +234,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -229,7 +251,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -246,7 +268,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -266,7 +288,7 @@ impl QueueProcessor {
                logger::debug2(format!("got trigger {trigger:?}"));
                let trigger = trigger?;
                self.broker
-
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
+
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                logger::debug("executed ci");
                Ok(false)