Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor: Broker::execute_ci is given adapter to use
Lars Wirzenius committed 1 year ago
commit 73c533580af998fcd86140508b6513d057bf3851
parent 2e37eb04f4800797d28b8b5d23ba30d7c32863dc
3 files changed +31 -38
modified src/bin/cib.rs
@@ -175,8 +175,8 @@ impl QueuedCmd {
        let adapter = config.default_adapter()?;
        logger::adapter_config(config);

-
        let broker = Broker::new(config.db(), config.max_run_time(), &adapter)
-
            .map_err(CibError::new_broker)?;
+
        let broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;

        let mut event_notifications = NotificationChannel::new_event();
        event_notifications
@@ -194,6 +194,7 @@ impl QueuedCmd {
            .db(db)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
+
            .default_adapter(&adapter)
            .build()
            .map_err(CibError::process_queue)?;
        let thread = processor.process_in_thread();
@@ -262,8 +263,8 @@ impl ProcessEventsCmd {

        let adapter = config.default_adapter()?;

-
        let broker = Broker::new(config.db(), config.max_run_time(), &adapter)
-
            .map_err(CibError::new_broker)?;
+
        let broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;

        let processor = QueueProcessorBuilder::default()
            .events_rx(events_notification.rx().map_err(CibError::notification)?)
@@ -271,6 +272,7 @@ impl ProcessEventsCmd {
            .db(args.open_db(config)?)
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
+
            .default_adapter(&adapter)
            .build()
            .map_err(CibError::process_queue)?;
        let processor = processor.process_in_thread();
modified src/broker.rs
@@ -28,21 +28,15 @@ use crate::{
/// node, and executes the appropriate adapter to run CI on the
/// repository.
pub struct Broker {
-
    default_adapter: Adapter,
    max_run_time: Duration,
    db: Db,
}

impl Broker {
    #[allow(clippy::result_large_err)]
-
    pub fn new(
-
        db_filename: &Path,
-
        max_run_time: Duration,
-
        default_adapter: &Adapter,
-
    ) -> Result<Self, BrokerError> {
+
    pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
        logger::broker_db(db_filename);
        Ok(Self {
-
            default_adapter: default_adapter.clone(),
            max_run_time,
            db: Db::new(db_filename)?,
        })
@@ -53,19 +47,15 @@ impl Broker {
        Ok(self.db.get_all_runs()?)
    }

-
    fn default_adapter(&self) -> &Adapter {
-
        &self.default_adapter
-
    }
-

    #[allow(clippy::result_large_err)]
    pub fn execute_ci(
        &mut self,
+
        adapter: &Adapter,
        trigger: &Request,
        run_notification: &NotificationSender,
    ) -> Result<Run, BrokerError> {
        let broker_run_id = RunId::default();
        let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
-
        let adapter = self.default_adapter();
        let run = span
            .in_scope(|| self.execute_helper(adapter, broker_run_id, trigger, run_notification))?;
        Ok(run)
@@ -193,7 +183,7 @@ mod test {
    use std::{path::Path, time::Duration};
    use tempfile::tempdir;

-
    use super::{Adapter, Broker};
+
    use super::Broker;
    use crate::{
        msg::{RunId, RunResult},
        notif::NotificationChannel,
@@ -201,18 +191,8 @@ mod test {
        test::{mock_adapter, trigger_request, TestResult},
    };

-
    fn broker(filename: &Path, adapter: &Adapter) -> anyhow::Result<Broker> {
-
        Ok(Broker::new(filename, Duration::from_secs(1), adapter)?)
-
    }
-

-
    #[test]
-
    fn has_adapter() -> TestResult<()> {
-
        let tmp = tempdir()?;
-
        let db = tmp.path().join("db.db");
-
        let adapter = Adapter::default();
-
        let broker = broker(&db, &adapter)?;
-
        assert_eq!(broker.default_adapter(), &adapter);
-
        Ok(())
+
    fn broker(filename: &Path) -> anyhow::Result<Broker> {
+
        Ok(Broker::new(filename, Duration::from_secs(1))?)
    }

    #[test]
@@ -229,13 +209,13 @@ echo '{"response":"finished","result":"success"}'

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db, &adapter)?;
+
        let mut broker = broker(&db)?;

        let trigger = trigger_request()?;

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&trigger, &sender);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -261,13 +241,13 @@ exit 1

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db, &adapter)?;
+
        let mut broker = broker(&db)?;

        let trigger = trigger_request()?;

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&trigger, &sender);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/queueproc.rs
@@ -11,6 +11,7 @@ use std::{
use radicle::Profile;

use crate::{
+
    adapter::Adapter,
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueueId, QueuedCiEvent},
@@ -23,6 +24,7 @@ use crate::{
pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
+
    default_adapter: Option<Adapter>,
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
    queue_len_interval: Option<Duration>,
@@ -36,6 +38,9 @@ impl QueueProcessorBuilder {
            db: self.db.ok_or(QueueError::Missing("db"))?,
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
+
            default_adapter: self
+
                .default_adapter
+
                .ok_or(QueueError::Missing("default_adapter"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
            queue_len_interval: self
@@ -69,12 +74,18 @@ impl QueueProcessorBuilder {
        self.broker = Some(broker);
        self
    }
+

+
    pub fn default_adapter(mut self, adapter: &Adapter) -> Self {
+
        self.default_adapter = Some(adapter.clone());
+
        self
+
    }
}

pub struct QueueProcessor {
    db: Db,
    profile: Profile,
    broker: Broker,
+
    default_adapter: Adapter,
    events_rx: NotificationReceiver,
    run_tx: NotificationSender,
    queue_len_interval: Duration,
@@ -161,7 +172,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -179,7 +190,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -196,7 +207,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -213,7 +224,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -233,7 +244,7 @@ impl QueueProcessor {
                logger::debug2(format!("got trigger {trigger:?}"));
                let trigger = trigger?;
                self.broker
-
                    .execute_ci(&trigger, &self.run_tx)
+
                    .execute_ci(&self.default_adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                logger::debug("executed ci");
                Ok(false)