Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: module to insert events into event queue
Lars Wirzenius committed 1 year ago
commit 77d08d3fccff45a9aa18cef01935f0d055e232f6
parent 6e5c0d02903409df8fb744b9fd78722f729fdd05
2 files changed +57 -1
modified src/event.rs
@@ -298,7 +298,7 @@ impl TryFrom<&str> for Filters {
/// A single node event can represent many git refs having changed,
/// but that's hard to process or filter. The broker breaks up such
/// complex events to simpler ones that only affect one ref at a time.
-
#[derive(Debug, Clone, Serialize)]
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BrokerEvent {
    /// Request the CI broker shuts down in an orderly fashion.
    Shutdown,
added src/queueadd.rs
@@ -0,0 +1,56 @@
+
use radicle::Profile;
+

+
use log::{debug, info};
+

+
use crate::{
+
    config::Config,
+
    db::{Db, DbError},
+
    event::{BrokerEvent, NodeEventError, NodeEventSource},
+
};
+

+
pub struct QueueAdder {}
+

+
impl QueueAdder {
+
    pub fn add_events_in_thread(config: &Config, db: Db) -> Result<(), AdderError> {
+
        let profile = Profile::load()?;
+
        debug!("loaded profile {profile:#?}");
+

+
        let mut source = NodeEventSource::new(&profile)?;
+
        debug!("created node event source");
+

+
        for filter in config.filters.iter() {
+
            source.allow(filter.clone());
+
        }
+
        debug!("added filters to node event source");
+

+
        // This loop ends when there's an error, e.g., failure to read an
+
        // event from the node.
+
        'event_loop: loop {
+
            debug!("waiting for event from node");
+
            for e in source.event()? {
+
                debug!("got event {e:#?}");
+
                match e {
+
                    BrokerEvent::Shutdown => break 'event_loop,
+
                    BrokerEvent::RefChanged { .. } => {
+
                        info!("insert broker event into queue: {e:#?}");
+
                        db.push_queued_event(e)?;
+
                    }
+
                }
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum AdderError {
+
    #[error(transparent)]
+
    Pfofile(#[from] radicle::profile::Error),
+

+
    #[error(transparent)]
+
    NodeEvent(#[from] NodeEventError),
+

+
    #[error(transparent)]
+
    Db(#[from] DbError),
+
}