Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat(src/queueproc.rs): process CI events
Lars Wirzenius committed 1 year ago
commit 1876b0f186b18da14207b0c5fc2d632eb00595ea
parent c75c87030b9f0ed0afd64711fb08a20b06685933
1 file changed +39 -21
modified src/queueproc.rs
@@ -12,8 +12,8 @@ use radicle::Profile;

use crate::{
    broker::{Broker, BrokerError},
-
    db::{Db, DbError, QueueId, QueuedEvent},
-
    event::BrokerEvent,
+
    ci_event::CiEvent,
+
    db::{Db, DbError, QueueId, QueuedCiEvent},
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
@@ -98,13 +98,13 @@ impl QueueProcessor {
        Ok(())
    }

-
    fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
-
        let ids = self.db.queued_events().map_err(QueueError::db)?;
+
    fn pick_event(&self) -> Result<Option<QueuedCiEvent>, QueueError> {
+
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;
        logger::debug2(format!("queue in db has {} events", ids.len()));

        let mut queue = vec![];
        for id in ids.iter() {
-
            if let Some(qe) = self.db.get_queued_event(id).map_err(QueueError::db)? {
+
            if let Some(qe) = self.db.get_queued_ci_event(id).map_err(QueueError::db)? {
                queue.push(qe);
            }
        }
@@ -117,36 +117,54 @@ impl QueueProcessor {
        }
    }

-
    fn process_event(&mut self, event: &BrokerEvent) -> Result<bool, QueueError> {
+
    fn process_event(&mut self, event: &CiEvent) -> Result<bool, QueueError> {
        match event {
-
            BrokerEvent::RefChanged {
-
                rid,
-
                name: _,
-
                oid,
-
                old: _,
+
            CiEvent::Shutdown => {
+
                logger::queueproc_action_shutdown();
+
                Ok(true)
+
            }
+
            CiEvent::BranchCreated {
+
                from_node: _,
+
                repo,
+
                branch: _,
+
                tip,
            } => {
-
                logger::queueproc_action_run(rid, oid);
-

+
                logger::queueproc_action_run(repo, tip);
                let trigger = RequestBuilder::default()
                    .profile(&self.profile)
-
                    .broker_event(event)
-
                    .build_trigger()
+
                    .ci_event(event)
+
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
                    .execute_ci(&trigger)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
-
            BrokerEvent::Shutdown => {
-
                logger::queueproc_action_shutdown();
-
                Ok(true)
+
            CiEvent::BranchUpdated {
+
                from_node: _,
+
                repo,
+
                branch: _,
+
                tip,
+
                old_tip: _,
+
            } => {
+
                logger::queueproc_action_run(repo, tip);
+
                let trigger = RequestBuilder::default()
+
                    .profile(&self.profile)
+
                    .ci_event(event)
+
                    .build_trigger_from_ci_event()
+
                    .map_err(|e| QueueError::build_trigger(event, e))?;
+
                self.broker
+
                    .execute_ci(&trigger)
+
                    .map_err(QueueError::execute_ci)?;
+
                Ok(false)
            }
+
            _ => unimplemented!("unknown CI event {event:#?}"),
        }
    }

    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
        logger::queueproc_remove_event(id);
-
        self.db.remove_queued_event(id).map_err(QueueError::db)?;
+
        self.db.remove_queued_ci_event(id).map_err(QueueError::db)?;
        Ok(())
    }
}
@@ -164,7 +182,7 @@ pub enum QueueError {
    Db(#[source] DbError),

    #[error("failed to create a trigger message from broker event {0:?}")]
-
    BuildTrigger(BrokerEvent, #[source] MessageError),
+
    BuildTrigger(CiEvent, #[source] MessageError),

    #[error("failed to run CI")]
    ExecuteCi(#[source] BrokerError),
@@ -178,7 +196,7 @@ impl QueueError {
        Self::Db(e)
    }

-
    fn build_trigger(event: &BrokerEvent, err: MessageError) -> Self {
+
    fn build_trigger(event: &CiEvent, err: MessageError) -> Self {
        Self::BuildTrigger(event.clone(), err)
    }