Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add a thread for adding events to queue
Lars Wirzenius committed 1 year ago
commit e4573ec3fbb7579d3691348df8de8efc9a23c8e0
parent 0db9dfe34cee8b24574ab9449ee4310950c36f47
2 files changed +60 -9
modified src/bin/cib.rs
@@ -17,7 +17,7 @@ use radicle_ci_broker::{
    db::{Db, DbError},
    error::BrokerError,
    pages::{PageBuilder, PageError},
-
    queueadd::{AdderError, QueueAdder},
+
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
};

@@ -103,8 +103,16 @@ struct InsertCmd {}

impl InsertCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
-
        let db = args.open_db(config)?;
-
        QueueAdder::add_events_in_thread(config, db).map_err(CibError::QueueAdder)?;
+
        let adder = QueueAdderBuilder::default()
+
            .db(args.open_db(config)?)
+
            .filters(&config.filters)
+
            .build()
+
            .map_err(CibError::QueueAdder)?;
+
        let thread = adder.add_events_in_thread();
+
        thread
+
            .join()
+
            .expect("wait for thread to finish")
+
            .map_err(CibError::add_events)?;
        debug!("cib ends");
        Ok(())
    }
@@ -175,6 +183,9 @@ enum CibError {
    #[error("failed to process events from queue")]
    ProcessQueue(#[source] QueueError),

+
    #[error("failed to add events to queue")]
+
    AddEvents(#[source] AdderError),
+

    #[error("default adapter is not in list of adapters")]
    UnknownDefaultAdapter(String),
}
@@ -203,4 +214,8 @@ impl CibError {
    fn process_queue(e: QueueError) -> Self {
        Self::ProcessQueue(e)
    }
+

+
    fn add_events(e: AdderError) -> Self {
+
        Self::AddEvents(e)
+
    }
}
modified src/queueadd.rs
@@ -1,24 +1,57 @@
+
use std::thread::{spawn, JoinHandle};
+

use radicle::Profile;

use log::{debug, info};

use crate::{
-
    config::Config,
    db::{Db, DbError},
-
    event::{BrokerEvent, NodeEventError, NodeEventSource},
+
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
};

-
pub struct QueueAdder {}
+
#[derive(Default)]
+
pub struct QueueAdderBuilder {
+
    db: Option<Db>,
+
    filters: Option<Vec<EventFilter>>,
+
}
+

+
impl QueueAdderBuilder {
+
    pub fn build(self) -> Result<QueueAdder, AdderError> {
+
        Ok(QueueAdder {
+
            db: self.db.ok_or(AdderError::Missing("db"))?,
+
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
+
        })
+
    }
+

+
    pub fn db(mut self, db: Db) -> Self {
+
        self.db = Some(db);
+
        self
+
    }
+

+
    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
+
        self.filters = Some(filters.to_vec());
+
        self
+
    }
+
}
+

+
pub struct QueueAdder {
+
    filters: Vec<EventFilter>,
+
    db: Db,
+
}

impl QueueAdder {
-
    pub fn add_events_in_thread(config: &Config, db: Db) -> Result<(), AdderError> {
+
    pub fn add_events_in_thread(self) -> JoinHandle<Result<(), AdderError>> {
+
        spawn(move || self.add_events())
+
    }
+

+
    pub fn add_events(&self) -> Result<(), AdderError> {
        let profile = Profile::load()?;
        debug!("loaded profile {profile:#?}");

        let mut source = NodeEventSource::new(&profile)?;
        debug!("created node event source");

-
        for filter in config.filters.iter() {
+
        for filter in self.filters.iter() {
            source.allow(filter.clone());
        }
        debug!("added filters to node event source");
@@ -33,7 +66,7 @@ impl QueueAdder {
                    BrokerEvent::Shutdown => break 'event_loop,
                    BrokerEvent::RefChanged { .. } => {
                        info!("insert broker event into queue: {e:#?}");
-
                        db.push_queued_event(e)?;
+
                        self.db.push_queued_event(e)?;
                    }
                }
            }
@@ -45,6 +78,9 @@ impl QueueAdder {

#[derive(Debug, thiserror::Error)]
pub enum AdderError {
+
    #[error("programming error: QueueAdderBuilder field {0} was not set")]
+
    Missing(&'static str),
+

    #[error(transparent)]
    Pfofile(#[from] radicle::profile::Error),