Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat(src/queueadd.rs): add CI events to queue
Lars Wirzenius committed 1 year ago
commit c75c87030b9f0ed0afd64711fb08a20b06685933
parent 42d6184967e3e184f044aa468ab5fe0532bd457e
1 file changed +14 -12
modified src/queueadd.rs
@@ -3,8 +3,10 @@ use std::thread::{spawn, JoinHandle};
use radicle::Profile;

use crate::{
+
    ci_event::CiEvent,
+
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError},
-
    event::{BrokerEvent, BrokerEventError, BrokerEventSource, EventFilter},
+
    filter::EventFilter,
    logger,
    notif::NotificationSender,
};
@@ -65,11 +67,7 @@ impl QueueAdder {

        let profile = Profile::load()?;

-
        let mut source = BrokerEventSource::new(&profile)?;
-

-
        for filter in self.filters.iter() {
-
            source.allow(filter.clone());
-
        }
+
        let mut source = CiEventSource::new(&profile)?;

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
@@ -80,8 +78,12 @@ impl QueueAdder {
                break 'event_loop;
            } else {
                for e in events {
-
                    logger::queueadd_push_event(&e);
-
                    self.push_event(e)?;
+
                    for filter in self.filters.iter() {
+
                        if filter.allows(&e) {
+
                            logger::queueadd_push_event(&e);
+
                            self.push_event(e.clone())?;
+
                        }
+
                    }
                }
            }
        }
@@ -89,15 +91,15 @@ impl QueueAdder {
        // Add a shutdown event to the queue so that the queue
        // processing thread knows to stop.
        if self.push_shutdown {
-
            self.push_event(BrokerEvent::Shutdown)?;
+
            self.push_event(CiEvent::Shutdown)?;
        }

        logger::queueadd_end();
        Ok(())
    }

-
    fn push_event(&self, e: BrokerEvent) -> Result<(), AdderError> {
-
        self.db.push_queued_event(e)?;
+
    fn push_event(&self, e: CiEvent) -> Result<(), AdderError> {
+
        self.db.push_queued_ci_event(e)?;
        self.events_tx.send(()).map_err(|_| AdderError::Send)?;
        Ok(())
    }
@@ -112,7 +114,7 @@ pub enum AdderError {
    Profile(#[from] radicle::profile::Error),

    #[error(transparent)]
-
    NodeEvent(#[from] BrokerEventError),
+
    CiEvent(#[from] CiEventSourceError),

    #[error(transparent)]
    Db(#[from] DbError),