Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
add abstraction for worker threads
Merged liw opened 10 months ago
8 files changed +218 -146 6c505bb0 246068c6
modified src/bin/cib.rs
@@ -22,6 +22,7 @@ use radicle_ci_broker::{
    pages::{PageError, StatusPage},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
+
    worker::start_thread,
};

fn main() {
@@ -147,7 +148,7 @@ impl InsertCmd {
            .db(args.open_db(config)?)
            .build()
            .map_err(CibError::QueueAdder)?;
-
        let thread = adder.add_events_in_thread();
+
        let thread = start_thread(adder);
        thread
            .join()
            .expect("wait for thread to finish")
@@ -195,23 +196,23 @@ impl QueuedCmd {
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
-
        let thread = processor.process_in_thread();
+
        let thread = start_thread(processor);
        thread
            .join()
            .expect("wait for thread to finish")
            .map_err(CibError::process_queue)?;

        let db = args.open_db(config)?;
-
        let mut page = StatusPage::default();
-
        if let Some(dirname) = config.report_dir() {
-
            page.set_output_dir(dirname);
-
        }
-
        let page_updater = page.update_in_thread(
+
        let mut page = StatusPage::new(
            run_notifications.rx().map_err(CibError::notification)?,
            profile,
            db,
            true,
        );
+
        if let Some(dirname) = config.report_dir() {
+
            page.set_output_dir(dirname);
+
        }
+
        let page_updater = start_thread(page);
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
@@ -241,22 +242,22 @@ impl ProcessEventsCmd {
            .db(args.open_db(config)?)
            .build()
            .map_err(CibError::QueueAdder)?;
-
        adder.add_events_in_thread();
+
        let adder = start_thread(adder);

        let profile = Profile::load().map_err(CibError::profile)?;

        let db = args.open_db(config)?;

-
        let mut page = StatusPage::default();
-
        if let Some(dirname) = config.report_dir() {
-
            page.set_output_dir(dirname);
-
        }
-
        let page_updater = page.update_in_thread(
+
        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibError::notification)?,
            profile,
            db,
            false,
        );
+
        if let Some(dirname) = config.report_dir() {
+
            page.set_output_dir(dirname);
+
        }
+
        let page_updater = start_thread(page);

        let adapters = config.to_adapters().map_err(CibError::Adapters)?;

@@ -275,12 +276,18 @@ impl ProcessEventsCmd {
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
-
        let processor = processor.process_in_thread();
+

+
        let processor = start_thread(processor);
        processor
            .join()
            .expect("wait for processor thread to finish")
            .map_err(CibError::process_queue)?;

+
        adder
+
            .join()
+
            .expect("wait for thread to finish")
+
            .map_err(CibError::add_events)?;
+

        // The page updating thread ends when the channel for run
        // notifications is closed by the processor thread ending.
        page_updater
modified src/bin/cibtoolcmd/report.rs
@@ -1,4 +1,4 @@
-
use radicle_ci_broker::pages::StatusPage;
+
use radicle_ci_broker::{pages::StatusPage, worker::start_thread};

use super::*;

@@ -17,16 +17,16 @@ impl Leaf for ReportCmd {

        let db = args.open_db()?;

-
        let mut page = StatusPage::default();
-
        page.set_output_dir(&self.output_dir);
-

        let mut run_notification = NotificationChannel::new_run();
-
        let thread = page.update_in_thread(
+
        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibToolError::Notification)?,
            profile,
            db,
            true,
        );
+
        page.set_output_dir(&self.output_dir);
+

+
        let thread = start_thread(page);
        thread.join().unwrap()?;

        Ok(())
modified src/lib.rs
@@ -29,3 +29,4 @@ pub mod sensitive;
pub mod test;
pub mod timeoutcmd;
pub mod util;
+
pub mod worker;
modified src/logger.rs
@@ -19,7 +19,7 @@ use crate::{
    node_event_source::NodeEventSource,
    pages::PageError,
    queueadd::AdderError,
-
    queueproc::QueueError,
+
    queueproc::{MaybeShutdown, QueueError},
    run::Run,
    timeoutcmd::TimeoutError,
};
@@ -543,7 +543,7 @@ pub fn queueproc_trigger(result: &Result<Request, QueueError>) {
    );
}

-
pub fn queueproc_processed_event(result: &Result<bool, QueueError>) {
+
pub fn queueproc_processed_event(result: &Result<MaybeShutdown, QueueError>) {
    info!(
        msg_id = ?Id::QueueProcProcessedEvent,
        kind = %Kind::GotEvent,
modified src/pages.rs
@@ -12,8 +12,6 @@ use std::{
    fs::write,
    path::{Path, PathBuf},
    sync::mpsc::RecvTimeoutError,
-
    thread::{spawn, JoinHandle},
-
    time::Duration,
};

use html_page::{Element, HtmlPage, Tag};
@@ -36,6 +34,7 @@ use crate::{
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
    util::{parse_timestamp, rfc822_timestamp},
+
    worker::Worker,
};

const MAX_RSS_ENTRIES: usize = 10;
@@ -44,7 +43,6 @@ const FAILURE_RSS: &str = "failed.rss";
const UNFINISHED_RSS: &str = "unfinished.rss";
const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "300";
-
const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_JSON: &str = "status.json";

/// All possible errors returned from the status page module.
@@ -760,10 +758,17 @@ impl RssEntry {
/// of repositories for which the broker has run CI. Then there is a
/// page per such repository, with a list of CI runs for that
/// repository.
-
#[derive(Default)]
pub struct StatusPage {
    node_alias: String,
    dirname: Option<PathBuf>,
+
    args: PageArgs,
+
}
+

+
struct PageArgs {
+
    run_rx: NotificationReceiver,
+
    profile: Profile,
+
    db: Db,
+
    once: bool,
}

impl StatusPage {
@@ -771,46 +776,27 @@ impl StatusPage {
        self.dirname = Some(dirname.into());
    }

-
    pub fn update_in_thread(
-
        mut self,
-
        run_rx: NotificationReceiver,
-
        profile: Profile,
-
        db: Db,
-
        once: bool,
-
    ) -> JoinHandle<Result<(), PageError>> {
-
        match &self.dirname {
-
            None => logger::pages_directory_unset(),
-
            Some(report_dir) if !report_dir.exists() => {
-
                logger::pages_directory_does_not_exist(report_dir)
-
            }
-
            Some(_) => (),
+
    pub fn new(run_rx: NotificationReceiver, profile: Profile, db: Db, once: bool) -> Self {
+
        Self {
+
            node_alias: "".into(),
+
            dirname: None,
+
            args: PageArgs {
+
                run_rx,
+
                profile,
+
                db,
+
                once,
+
            },
        }
-

-
        self.node_alias = profile.config.alias().to_string();
-
        logger::pages_interval(UPDATE_INTERVAL);
-

-
        spawn(move || {
-
            logger::pages_start();
-
            let result = self.update_loop(run_rx, profile, db, once);
-
            logger::pages_end(&result);
-
            result
-
        })
    }

-
    fn update_loop(
-
        mut self,
-
        run_rx: NotificationReceiver,
-
        profile: Profile,
-
        db: Db,
-
        once: bool,
-
    ) -> Result<(), PageError> {
+
    fn update_loop(&mut self) -> Result<(), PageError> {
        'processing_loop: loop {
-
            self.update_and_write(&profile, &db)?;
-
            if once {
+
            self.update_and_write()?;
+
            if self.args.once {
                return Ok(());
            }

-
            match run_rx.wait_for_notification() {
+
            match self.args.run_rx.wait_for_notification() {
                Ok(_) => (),
                Err(RecvTimeoutError::Timeout) => (),
                Err(RecvTimeoutError::Disconnected) => {
@@ -821,29 +807,31 @@ impl StatusPage {
        }

        // Make sure we update reports and status JSON at least once.
-
        self.update_and_write(&profile, &db)?;
+
        self.update_and_write()?;

        Ok(())
    }

-
    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
+
    fn update_and_write(&mut self) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
            if dirname.exists() {
-
                let runs = db.get_all_runs()?;
+
                let runs = self.args.db.get_all_runs()?;

                // Create list of events, except ones for private
                // repositories.
-
                let events: Result<Vec<QueuedCiEvent>, PageError> = db
+
                let events: Result<Vec<QueuedCiEvent>, PageError> = self
+
                    .args
+
                    .db
                    .queued_ci_events()?
                    .iter()
-
                    .filter_map(|id| match db.get_queued_ci_event(id) {
+
                    .filter_map(|id| match self.args.db.get_queued_ci_event(id) {
                        Ok(Some(event)) => match event.event() {
                            CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
                            CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
                            | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
                            | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
                            | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
-
                                if Self::is_public_repo(profile, repo) {
+
                                if Self::is_public_repo(&self.args.profile, repo) {
                                    Some(Ok(event))
                                } else {
                                    None
@@ -939,6 +927,22 @@ impl StatusPage {
    }
}

+
impl Worker for StatusPage {
+
    const NAME: &str = "status-page";
+
    type Error = PageError;
+
    fn work(&mut self) -> Result<(), PageError> {
+
        match &self.dirname {
+
            None => logger::pages_directory_unset(),
+
            Some(report_dir) if !report_dir.exists() => {
+
                logger::pages_directory_does_not_exist(report_dir)
+
            }
+
            Some(_) => (),
+
        }
+

+
        self.update_loop()
+
    }
+
}
+

#[derive(Debug, Clone, Serialize)]
struct StatusData {
    timestamp: String,
modified src/queueadd.rs
@@ -1,5 +1,3 @@
-
use std::thread::{spawn, JoinHandle};
-

use radicle::Profile;

use crate::{
@@ -8,6 +6,7 @@ use crate::{
    db::{Db, DbError, QueueId},
    logger,
    notif::NotificationSender,
+
    worker::Worker,
};

#[derive(Default)]
@@ -41,15 +40,6 @@ pub struct QueueAdder {
}

impl QueueAdder {
-
    pub fn add_events_in_thread(self) -> JoinHandle<Result<(), AdderError>> {
-
        spawn(move || {
-
            logger::queueadd_start();
-
            let result = self.add_events();
-
            logger::queueadd_end(&result);
-
            result
-
        })
-
    }
-

    fn add_events(&self) -> Result<(), AdderError> {
        let profile = Profile::load()?;

@@ -85,6 +75,15 @@ impl QueueAdder {
    }
}

+
impl Worker for QueueAdder {
+
    const NAME: &str = "queue-adder";
+
    type Error = AdderError;
+

+
    fn work(&mut self) -> Result<(), Self::Error> {
+
        self.add_events()
+
    }
+
}
+

#[derive(Debug, thiserror::Error)]
pub enum AdderError {
    #[error("programming error: QueueAdderBuilder field {0} was not set")]
modified src/queueproc.rs
@@ -8,7 +8,7 @@ use std::{
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender},
        Arc, Mutex,
    },
-
    thread::{spawn, JoinHandle},
+
    thread::JoinHandle,
    time::{Duration, Instant},
};

@@ -24,6 +24,7 @@ use crate::{
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
    pull_queue::{PullQueue, PullQueueError},
+
    worker::{start_thread, Worker},
};

#[derive(Default)]
@@ -56,7 +57,7 @@ impl QueueProcessorBuilder {

        let picked_events = PullQueue::new();

-
        let mut descs = vec![];
+
        let mut processors = vec![];
        for _ in 0..concurrent_adapters {
            let broker = Broker::new(broker.db().filename(), broker.max_run_time())
                .map_err(QueueError::NewBroker)?;
@@ -67,12 +68,10 @@ impl QueueProcessorBuilder {
                adapters.clone(),
                broker,
                run_tx.clone(),
+
                picked_events.clone(),
                procssed_tx.clone(),
            );
-
            descs.push(ProcessorDescription::new(
-
                event_procssor,
-
                picked_events.clone(),
-
            ));
+
            processors.push(start_thread(event_procssor));
        }

        Ok(QueueProcessor {
@@ -82,7 +81,7 @@ impl QueueProcessorBuilder {
                .queue_len_interval
                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
            prev_queue_len: Instant::now(),
-
            processors: descs,
+
            processors,
            processed_rx: Some(processed_rx),
            picked_events: Some(picked_events),
            current: CurrentlyPicked::default(),
@@ -135,54 +134,52 @@ impl QueueProcessorBuilder {
    }
}

+
impl Worker for QueueProcessor {
+
    const NAME: &str = "queue-processor";
+
    type Error = QueueError;
+
    fn work(&mut self) -> Result<(), QueueError> {
+
        logger::queueproc_start(self.processors.len());
+

+
        // Spawn a thread to process results from running adapters.
+
        #[allow(clippy::unwrap_used)]
+
        let adapter_results = start_thread(AdapterResults::new(
+
            self.processed_rx.take().unwrap(),
+
            self.current.clone(),
+
        ));
+

+
        // Pick events from queue, send to worker threads that run
+
        // adapters. Results are processed by thread above.
+
        let result = self.process_until_shutdown();
+

+
        logger::queueproc_end(&result);
+

+
        // Wait for worker threads to terminate. This closes all
+
        // sender ends for results channel.
+
        while let Some(proc) = self.processors.pop() {
+
            let result = proc
+
                .join()
+
                .map_err(|_| QueueError::JoinEventProcessorThread)?;
+
            logger::queueproc_processor_thread_result(result.as_ref().map(|_| ()));
+
        }
+

+
        // Wait for results processing thread to terminate.
+
        adapter_results.join().ok();
+
        Ok(())
+
    }
+
}
+

pub struct QueueProcessor {
    db: Db,
    events_rx: NotificationReceiver,
    queue_len_interval: Duration,
    prev_queue_len: Instant,
    picked_events: Option<PullQueue<Picked>>,
-
    processors: Vec<ProcessorDescription>,
+
    processors: Vec<JoinHandle<Result<(), QueueError>>>,
    processed_rx: Option<Receiver<RepoId>>,
    current: CurrentlyPicked,
}

impl QueueProcessor {
-
    pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
-
        spawn(move || {
-
            logger::queueproc_start(self.processors.len());
-

-
            // Spawn a thread to process results from running adapters.
-
            #[allow(clippy::unwrap_used)]
-
            let rx = self.processed_rx.take().unwrap();
-
            let mut current = self.current.clone();
-
            let results = spawn(move || {
-
                logger::queueproc_start_result_receiver();
-
                while let Ok(repoid) = rx.recv() {
-
                    current.remove(repoid);
-
                    logger::queueproc_end_result_receiver();
-
                }
-
            });
-

-
            // Pick events from queue, send to worker threads that run
-
            // adapters. Results are processed by thread above.
-
            let result = self.process_until_shutdown();
-

-
            logger::queueproc_end(&result);
-

-
            // Wait for worker threads to terminate. This closes all
-
            // sender ends for results channel.
-
            for proc in self.processors {
-
                let result = proc.join();
-
                logger::queueproc_processor_thread_result(result.as_ref().map(|_| ()));
-
            }
-

-
            // Wait for results processing thread to terminate.
-
            results.join().ok();
-

-
            Ok(())
-
        })
-
    }
-

    #[allow(clippy::unwrap_used)]
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
        let mut done = false;
@@ -309,29 +306,6 @@ impl CurrentlyPicked {
    }
}

-
struct ProcessorDescription {
-
    processing_thread: JoinHandle<Result<bool, QueueError>>,
-
}
-

-
impl ProcessorDescription {
-
    fn new(processor: EventProcessor, mut picked_events: PullQueue<Picked>) -> Self {
-
        Self {
-
            processing_thread: spawn(move || {
-
                while let Ok(Some(picked)) = picked_events.pop() {
-
                    processor.process_picked_event(picked);
-
                }
-
                Ok(false)
-
            }),
-
        }
-
    }
-

-
    fn join(self) -> Result<bool, QueueError> {
-
        self.processing_thread
-
            .join()
-
            .map_err(|_| QueueError::JoinAdapterThread)?
-
    }
-
}
-

struct EventProcessor {
    profile: Profile,
    filters: Vec<EventFilter>,
@@ -339,6 +313,7 @@ struct EventProcessor {
    adapters: Adapters,
    broker: Broker,
    run_tx: NotificationSender,
+
    picked_events: PullQueue<Picked>,
    processed_tx: SyncSender<RepoId>,
}

@@ -351,6 +326,7 @@ impl EventProcessor {
        adapters: Adapters,
        broker: Broker,
        run_tx: NotificationSender,
+
        picked_events: PullQueue<Picked>,
        processed_tx: SyncSender<RepoId>,
    ) -> Self {
        Self {
@@ -360,6 +336,7 @@ impl EventProcessor {
            adapters,
            broker,
            run_tx,
+
            picked_events,
            processed_tx,
        }
    }
@@ -415,7 +392,7 @@ impl EventProcessor {
        let res = self.process_event(qe.event(), adapter);
        logger::queueproc_processed_event(&res);
        match res {
-
            Ok(shut_down) => done = shut_down,
+
            Ok(shut_down) => done = shut_down == MaybeShutdown::Shutdown,
            Err(QueueError::BuildTrigger(_, _)) => done = false,
            Err(err) => Err(err)?,
        }
@@ -423,10 +400,14 @@ impl EventProcessor {
        Ok(done)
    }

-
    fn process_event(&self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
    fn process_event(
+
        &self,
+
        event: &CiEvent,
+
        adapter: &Adapter,
+
    ) -> Result<MaybeShutdown, QueueError> {
        if matches!(event, CiEvent::V1(CiEventV1::Shutdown)) {
            logger::queueproc_action_shutdown();
-
            Ok(true)
+
            Ok(MaybeShutdown::Shutdown)
        } else {
            logger::queueproc_action_run(event);

@@ -442,8 +423,49 @@ impl EventProcessor {
                .execute_ci(adapter, &trigger, &self.run_tx)
                .map_err(QueueError::execute_ci)?;

-
            Ok(false)
+
            Ok(MaybeShutdown::Continue)
+
        }
+
    }
+
}
+

+
impl Worker for EventProcessor {
+
    const NAME: &str = "event-processor";
+
    type Error = QueueError;
+
    fn work(&mut self) -> Result<(), QueueError> {
+
        while let Ok(Some(picked)) = self.picked_events.pop() {
+
            self.process_picked_event(picked);
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+
pub enum MaybeShutdown {
+
    Shutdown,
+
    Continue,
+
}
+

+
struct AdapterResults {
+
    rx: Receiver<RepoId>,
+
    current: CurrentlyPicked,
+
}
+

+
impl AdapterResults {
+
    fn new(rx: Receiver<RepoId>, current: CurrentlyPicked) -> Self {
+
        Self { rx, current }
+
    }
+
}
+

+
impl Worker for AdapterResults {
+
    const NAME: &str = "adapter-result-processor";
+
    type Error = QueueError;
+
    fn work(&mut self) -> Result<(), QueueError> {
+
        logger::queueproc_start_result_receiver();
+
        while let Ok(repoid) = self.rx.recv() {
+
            self.current.remove(repoid);
+
            logger::queueproc_end_result_receiver();
+
        }
+
        Ok(())
    }
}

@@ -496,6 +518,9 @@ pub enum QueueError {
    #[error("failed to receive from channel for results of processed events")]
    RecvProcessResult,

+
    #[error("failed to wait for thread to process events to finish")]
+
    JoinEventProcessorThread,
+

    #[error("failed to wait for thread to run adapters to finish")]
    JoinAdapterThread,

added src/worker.rs
@@ -0,0 +1,36 @@
+
//! An abstraction for worker threads.
+
//!
+
//! The abstraction provided some consistency on how worker threads
+
//! are implemented and used, as well as logging. A worker thread does
+
//! what it does. The abstraction is not meant to constrain that.
+

+
use std::thread::{spawn, JoinHandle};
+

+
/// Start a new thread. Caller must catch the thread handle and
+
/// join it to wait for thread to end.
+
pub fn start_thread<W: Worker>(mut o: W) -> JoinHandle<Result<(), W::Error>> {
+
    let name = o.name();
+
    spawn(move || {
+
        eprintln!("start worker {name}");
+
        let result = o.work();
+
        eprintln!("end worker {name}: result={result:?}");
+
        result
+
    })
+
}
+

+
/// A worker thread.
+
pub trait Worker: Send + 'static {
+
    /// Name of thread, or kind of thread. Used for logging only.
+
    const NAME: &str;
+

+
    /// Type of error from this worker.
+
    type Error: std::fmt::Debug + Send;
+

+
    /// Do the work the thread is supposed to do.
+
    fn work(&mut self) -> Result<(), Self::Error>;
+

+
    /// Return name of thread as an owned string.
+
    fn name(&self) -> String {
+
        Self::NAME.to_string()
+
    }
+
}