Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor: use worker abstraction for queueproc
Lars Wirzenius committed 10 months ago
commit 9a45e3fcc8e34937b28ff6b1b4e5dd7189831314
parent a1de334
2 files changed +84 -69
modified src/bin/cib.rs
@@ -196,7 +196,7 @@ impl QueuedCmd {
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
-
        let thread = processor.process_in_thread();
+
        let thread = start_thread(processor);
        thread
            .join()
            .expect("wait for thread to finish")
@@ -277,7 +277,7 @@ impl ProcessEventsCmd {
            .build()
            .map_err(CibError::process_queue)?;

-
        let processor = processor.process_in_thread();
+
        let processor = start_thread(processor);
        processor
            .join()
            .expect("wait for processor thread to finish")
modified src/queueproc.rs
@@ -8,7 +8,7 @@ use std::{
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender},
        Arc, Mutex,
    },
-
    thread::{spawn, JoinHandle},
+
    thread::JoinHandle,
    time::{Duration, Instant},
};

@@ -24,6 +24,7 @@ use crate::{
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
    pull_queue::{PullQueue, PullQueueError},
+
    worker::{start_thread, Worker},
};

#[derive(Default)]
@@ -56,7 +57,7 @@ impl QueueProcessorBuilder {

        let picked_events = PullQueue::new();

-
        let mut descs = vec![];
+
        let mut processors = vec![];
        for _ in 0..concurrent_adapters {
            let broker = Broker::new(broker.db().filename(), broker.max_run_time())
                .map_err(QueueError::NewBroker)?;
@@ -67,12 +68,10 @@ impl QueueProcessorBuilder {
                adapters.clone(),
                broker,
                run_tx.clone(),
+
                picked_events.clone(),
                procssed_tx.clone(),
            );
-
            descs.push(ProcessorDescription::new(
-
                event_procssor,
-
                picked_events.clone(),
-
            ));
+
            processors.push(start_thread(event_procssor));
        }

        Ok(QueueProcessor {
@@ -82,7 +81,7 @@ impl QueueProcessorBuilder {
                .queue_len_interval
                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
            prev_queue_len: Instant::now(),
-
            processors: descs,
+
            processors,
            processed_rx: Some(processed_rx),
            picked_events: Some(picked_events),
            current: CurrentlyPicked::default(),
@@ -135,54 +134,52 @@ impl QueueProcessorBuilder {
    }
}

+
impl Worker for QueueProcessor {
+
    const NAME: &str = "queue-processor";
+
    type Error = QueueError;
+
    fn work(&mut self) -> Result<(), QueueError> {
+
        logger::queueproc_start(self.processors.len());
+

+
        // Spawn a thread to process results from running adapters.
+
        #[allow(clippy::unwrap_used)]
+
        let adapter_results = start_thread(AdapterResults::new(
+
            self.processed_rx.take().unwrap(),
+
            self.current.clone(),
+
        ));
+

+
        // Pick events from queue, send to worker threads that run
+
        // adapters. Results are processed by thread above.
+
        let result = self.process_until_shutdown();
+

+
        logger::queueproc_end(&result);
+

+
        // Wait for worker threads to terminate. This closes all
+
        // sender ends for results channel.
+
        while let Some(proc) = self.processors.pop() {
+
            let result = proc
+
                .join()
+
                .map_err(|_| QueueError::JoinEventProcessorThread)?;
+
            logger::queueproc_processor_thread_result(result.as_ref().map(|_| ()));
+
        }
+

+
        // Wait for results processing thread to terminate.
+
        adapter_results.join().ok();
+
        Ok(())
+
    }
+
}
+

pub struct QueueProcessor {
    db: Db,
    events_rx: NotificationReceiver,
    queue_len_interval: Duration,
    prev_queue_len: Instant,
    picked_events: Option<PullQueue<Picked>>,
-
    processors: Vec<ProcessorDescription>,
+
    processors: Vec<JoinHandle<Result<(), QueueError>>>,
    processed_rx: Option<Receiver<RepoId>>,
    current: CurrentlyPicked,
}

impl QueueProcessor {
-
    pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
-
        spawn(move || {
-
            logger::queueproc_start(self.processors.len());
-

-
            // Spawn a thread to process results from running adapters.
-
            #[allow(clippy::unwrap_used)]
-
            let rx = self.processed_rx.take().unwrap();
-
            let mut current = self.current.clone();
-
            let results = spawn(move || {
-
                logger::queueproc_start_result_receiver();
-
                while let Ok(repoid) = rx.recv() {
-
                    current.remove(repoid);
-
                    logger::queueproc_end_result_receiver();
-
                }
-
            });
-

-
            // Pick events from queue, send to worker threads that run
-
            // adapters. Results are processed by thread above.
-
            let result = self.process_until_shutdown();
-

-
            logger::queueproc_end(&result);
-

-
            // Wait for worker threads to terminate. This closes all
-
            // sender ends for results channel.
-
            for proc in self.processors {
-
                let result = proc.join();
-
                logger::queueproc_processor_thread_result(result.as_ref().map(|_| ()));
-
            }
-

-
            // Wait for results processing thread to terminate.
-
            results.join().ok();
-

-
            Ok(())
-
        })
-
    }
-

    #[allow(clippy::unwrap_used)]
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
        let mut done = false;
@@ -309,29 +306,6 @@ impl CurrentlyPicked {
    }
}

-
struct ProcessorDescription {
-
    processing_thread: JoinHandle<Result<bool, QueueError>>,
-
}
-

-
impl ProcessorDescription {
-
    fn new(processor: EventProcessor, mut picked_events: PullQueue<Picked>) -> Self {
-
        Self {
-
            processing_thread: spawn(move || {
-
                while let Ok(Some(picked)) = picked_events.pop() {
-
                    processor.process_picked_event(picked);
-
                }
-
                Ok(false)
-
            }),
-
        }
-
    }
-

-
    fn join(self) -> Result<bool, QueueError> {
-
        self.processing_thread
-
            .join()
-
            .map_err(|_| QueueError::JoinAdapterThread)?
-
    }
-
}
-

struct EventProcessor {
    profile: Profile,
    filters: Vec<EventFilter>,
@@ -339,6 +313,7 @@ struct EventProcessor {
    adapters: Adapters,
    broker: Broker,
    run_tx: NotificationSender,
+
    picked_events: PullQueue<Picked>,
    processed_tx: SyncSender<RepoId>,
}

@@ -351,6 +326,7 @@ impl EventProcessor {
        adapters: Adapters,
        broker: Broker,
        run_tx: NotificationSender,
+
        picked_events: PullQueue<Picked>,
        processed_tx: SyncSender<RepoId>,
    ) -> Self {
        Self {
@@ -360,6 +336,7 @@ impl EventProcessor {
            adapters,
            broker,
            run_tx,
+
            picked_events,
            processed_tx,
        }
    }
@@ -447,6 +424,41 @@ impl EventProcessor {
    }
}

+
impl Worker for EventProcessor {
+
    const NAME: &str = "event-processor";
+
    type Error = QueueError;
+
    fn work(&mut self) -> Result<(), QueueError> {
+
        while let Ok(Some(picked)) = self.picked_events.pop() {
+
            self.process_picked_event(picked);
+
        }
+
        Ok(())
+
    }
+
}
+

+
struct AdapterResults {
+
    rx: Receiver<RepoId>,
+
    current: CurrentlyPicked,
+
}
+

+
impl AdapterResults {
+
    fn new(rx: Receiver<RepoId>, current: CurrentlyPicked) -> Self {
+
        Self { rx, current }
+
    }
+
}
+

+
impl Worker for AdapterResults {
+
    const NAME: &str = "adapter-result-processor";
+
    type Error = QueueError;
+
    fn work(&mut self) -> Result<(), QueueError> {
+
        logger::queueproc_start_result_receiver();
+
        while let Ok(repoid) = self.rx.recv() {
+
            self.current.remove(repoid);
+
            logger::queueproc_end_result_receiver();
+
        }
+
        Ok(())
+
    }
+
}
+

#[derive(Debug, Clone)]
struct Picked {
    qe: QueuedCiEvent,
@@ -496,6 +508,9 @@ pub enum QueueError {
    #[error("failed to receive from channel for results of processed events")]
    RecvProcessResult,

+
    #[error("failed to wait for thread to process events to finish")]
+
    JoinEventProcessorThread,
+

    #[error("failed to wait for thread to run adapters to finish")]
    JoinAdapterThread,