Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor: use worker abstraction in queueadd
Lars Wirzenius committed 10 months ago
commit 663de6013f628cef4565da2b103a49ff31869aaf
parent beb35a1
2 files changed +19 -13
modified src/bin/cib.rs
@@ -22,6 +22,7 @@ use radicle_ci_broker::{
    pages::{PageError, StatusPage},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
+
    worker::start_thread,
};

fn main() {
@@ -147,7 +148,7 @@ impl InsertCmd {
            .db(args.open_db(config)?)
            .build()
            .map_err(CibError::QueueAdder)?;
-
        let thread = adder.add_events_in_thread();
+
        let thread = start_thread(adder);
        thread
            .join()
            .expect("wait for thread to finish")
@@ -241,7 +242,7 @@ impl ProcessEventsCmd {
            .db(args.open_db(config)?)
            .build()
            .map_err(CibError::QueueAdder)?;
-
        adder.add_events_in_thread();
+
        let adder = start_thread(adder);

        let profile = Profile::load().map_err(CibError::profile)?;

@@ -275,12 +276,18 @@ impl ProcessEventsCmd {
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
+

        let processor = processor.process_in_thread();
        processor
            .join()
            .expect("wait for processor thread to finish")
            .map_err(CibError::process_queue)?;

+
        adder
+
            .join()
+
            .expect("wait for thread to finish")
+
            .map_err(CibError::add_events)?;
+

        // The page updating thread ends when the channel for run
        // notifications is closed by the processor thread ending.
        page_updater
modified src/queueadd.rs
@@ -1,5 +1,3 @@
-
use std::thread::{spawn, JoinHandle};
-

use radicle::Profile;

use crate::{
@@ -8,6 +6,7 @@ use crate::{
    db::{Db, DbError, QueueId},
    logger,
    notif::NotificationSender,
+
    worker::Worker,
};

#[derive(Default)]
@@ -41,15 +40,6 @@ pub struct QueueAdder {
}

impl QueueAdder {
-
    pub fn add_events_in_thread(self) -> JoinHandle<Result<(), AdderError>> {
-
        spawn(move || {
-
            logger::queueadd_start();
-
            let result = self.add_events();
-
            logger::queueadd_end(&result);
-
            result
-
        })
-
    }
-

    fn add_events(&self) -> Result<(), AdderError> {
        let profile = Profile::load()?;

@@ -85,6 +75,15 @@ impl QueueAdder {
    }
}

+
impl Worker for QueueAdder {
+
    const NAME: &str = "queue-adder";
+
    type Error = AdderError;
+

+
    fn work(&mut self) -> Result<(), Self::Error> {
+
        self.add_events()
+
    }
+
}
+

#[derive(Debug, thiserror::Error)]
pub enum AdderError {
    #[error("programming error: QueueAdderBuilder field {0} was not set")]