Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: have threads notify each other of needing to do something
Lars Wirzenius committed 1 year ago
commit bdad79d4427b78a974251abed19fd45b9442d032
parent 2414e342d1b45c4141a91631801b7b5e269189ff
5 files changed +119 -129
modified src/bin/cib.rs
@@ -19,6 +19,7 @@ use radicle_ci_broker::{
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
+
    notif::NotificationChannel,
    pages::{PageBuilder, PageError},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
@@ -111,7 +112,9 @@ struct InsertCmd {}

impl InsertCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let mut events_notification = NotificationChannel::default();
        let adder = QueueAdderBuilder::default()
+
            .events_tx(events_notification.tx())
            .db(args.open_db(config)?)
            .filters(&config.filters)
            .build()
@@ -146,7 +149,14 @@ impl QueuedCmd {
        debug!("default adapter: {adapter:?}");
        broker.set_default_adapter(&adapter);

+
        let mut event_notifications = NotificationChannel::default();
+
        event_notifications.tx().send(()).ok();
+

+
        let mut run_notifications = NotificationChannel::default();
+

        let processor = QueueProcessorBuilder::default()
+
            .events_rx(event_notifications.rx())
+
            .run_tx(run_notifications.tx())
            .db(db)
            .broker(broker)
            .build()
@@ -167,7 +177,11 @@ struct ProcessEventsCmd {}

impl ProcessEventsCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let mut events_notification = NotificationChannel::default();
+
        let mut run_notification = NotificationChannel::default();
+

        let adder = QueueAdderBuilder::default()
+
            .events_tx(events_notification.tx())
            .db(args.open_db(config)?)
            .filters(&config.filters)
            .push_shutdown()
@@ -188,7 +202,7 @@ impl ProcessEventsCmd {
        if let Some(dirname) = &config.report_dir {
            page.set_output_dir(dirname);
        }
-
        page.update_in_thread(db, &profile.config.node.alias, false);
+
        page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);

        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
        let spec =
@@ -204,6 +218,8 @@ impl ProcessEventsCmd {
        broker.set_default_adapter(&adapter);

        let processor = QueueProcessorBuilder::default()
+
            .events_rx(events_notification.rx())
+
            .run_tx(run_notification.tx())
            .db(args.open_db(config)?)
            .broker(broker)
            .build()
modified src/bin/cibtool.rs
@@ -30,6 +30,7 @@ use radicle_ci_broker::{
    db::{Db, DbError, QueueId},
    event::BrokerEvent,
    msg::{RunId, RunResult},
+
    notif::NotificationChannel,
    pages::{PageBuilder, PageError},
    run::{Run, RunState, Whence},
};
@@ -729,7 +730,10 @@ impl ReportCmd {
            .build()?;

        page.set_output_dir(&self.output_dir);
-
        let thread = page.update_in_thread(db, &profile.config.node.alias, true);
+

+
        let mut run_notification = NotificationChannel::default();
+
        let thread =
+
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, true);
        thread.join().unwrap()?;

        Ok(())
modified src/pages.rs
@@ -11,8 +11,8 @@ use std::{
    collections::{HashMap, HashSet},
    fs::write,
    path::{Path, PathBuf},
-
    sync::{Arc, Mutex, MutexGuard},
-
    thread::{sleep, spawn, JoinHandle},
+
    sync::mpsc::RecvTimeoutError,
+
    thread::{spawn, JoinHandle},
    time::Duration,
};

@@ -27,6 +27,7 @@ use crate::{
    db::{Db, DbError},
    event::BrokerEvent,
    msg::{RunId, RunResult},
+
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
};

@@ -86,16 +87,7 @@ impl PageBuilder {
        }
        debug!("broker database has {} CI runs", runs.len());

-
        Ok(StatusPage::new(PageData {
-
            timestamp: now()?,
-
            ci_broker_version: env!("CARGO_PKG_VERSION"),
-
            ci_broker_git_commit: env!("GIT_HEAD"),
-
            node_alias: self.node_alias.ok_or(PageError::NoAlias)?,
-
            runs,
-
            broker_event_counter: 0,
-
            latest_broker_event: None,
-
            latest_ci_run: None,
-
        }))
+
        Ok(StatusPage::new())
    }
}

@@ -477,56 +469,25 @@ impl PageData {
/// page per such repository, with a list of CI runs for that
/// repository.
pub struct StatusPage {
-
    data: Arc<Mutex<PageData>>,
    node_alias: String,
    dirname: Option<PathBuf>,
}

impl StatusPage {
-
    fn new(data: PageData) -> Self {
+
    fn new() -> Self {
        Self {
-
            data: Arc::new(Mutex::new(data)),
            node_alias: String::new(),
            dirname: None,
        }
    }

-
    fn lock(&mut self) -> Result<MutexGuard<PageData>, PageError> {
-
        self.data
-
            .lock()
-
            .map_err(|_| PageError::Lock("lock StatusPage::data"))
-
    }
-

    pub fn set_output_dir(&mut self, dirname: &Path) {
        self.dirname = Some(dirname.into());
    }

-
    pub fn update_timestamp(&mut self) -> Result<(), PageError> {
-
        let mut data = self.lock()?;
-
        data.timestamp = now()?;
-
        Ok(())
-
    }
-

-
    pub fn broker_event(&mut self, event: &BrokerEvent) -> Result<(), PageError> {
-
        let mut data = self.lock()?;
-
        data.latest_broker_event = Some(event.clone());
-
        data.broker_event_counter += 1;
-
        Ok(())
-
    }
-

-
    /// Add a new CI run to the status page.
-
    pub fn push_run(&mut self, new: Run) -> Result<(), PageError> {
-
        // We silently ignore a run until its id has been set.
-
        if let Some(id) = new.adapter_run_id() {
-
            let mut data = self.lock()?;
-
            data.latest_ci_run = Some(new.clone());
-
            data.runs.insert(id.clone(), new);
-
        }
-
        Ok(())
-
    }
-

    pub fn update_in_thread(
        mut self,
+
        run_rx: NotificationReceiver,
        db: Db,
        node_alias: &str,
        once: bool,
@@ -540,71 +501,67 @@ impl StatusPage {
            UPDATE_INTERVAL.as_secs()
        );
        spawn(move || loop {
-
            self.update_and_write(&db)?;
-
            if once {
-
                return Ok(());
+
            loop {
+
                self.update_and_write(&db)?;
+
                if once {
+
                    return Ok(());
+
                }
+

+
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
+
                    Ok(_) | Err(RecvTimeoutError::Timeout) => (),
+
                    Err(RecvTimeoutError::Disconnected) => break,
+
                }
            }
-
            sleep(UPDATE_INTERVAL);
        })
    }

    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
            info!("write HTML report pages to {}", dirname.display());
-
            let mut page = PageBuilder::default()
-
                .node_alias(&self.node_alias)
-
                .runs(db.get_all_runs()?)
-
                .build()?;

-
            page.write(dirname)?;
-
        }
-
        Ok(())
-
    }
+
            let runs = db.get_all_runs()?;

-
    /// Write the status page (as index.html) and per-repository pages
-
    /// (`<RID>.html`) into the directory given as an argument. The directory must exist.
-
    pub fn write(&mut self, dirname: &Path) -> Result<(), PageError> {
-
        let nameless = String::from("nameless repo");
-

-
        // We avoid writing while keeping the lock, to reduce
-
        // contention.
-
        let (status, repos) = {
-
            let data = self.lock()?;
-

-
            let status = data.status_page_as_html()?.to_string();
-

-
            let mut repos = vec![];
-
            for (_, rid) in data.repos() {
-
                let basename = rid_to_basename(rid);
-
                let filename = dirname.join(format!("{basename}.html"));
-
                let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
-
                let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
-
                repos.push((filename, repopage.to_string()));
-
            }
+
            let data = PageData {
+
                timestamp: now()?,
+
                ci_broker_version: env!("CARGO_PKG_VERSION"),
+
                ci_broker_git_commit: env!("GIT_HEAD"),
+
                node_alias: self.node_alias.clone(),
+
                runs: HashMap::from_iter(
+
                    runs.iter()
+
                        .map(|run| (run.broker_run_id().clone(), run.clone())),
+
                ),
+
                broker_event_counter: 0,
+
                latest_broker_event: None,
+
                latest_ci_run: None,
+
            };

-
            (status, repos)
-
        };
+
            let nameless = String::from("nameless repo");

-
        let filename = dirname.join("index.html");
-
        Self::write_file(&filename, &status)?;
+
            // We avoid writing while keeping the lock, to reduce
+
            // contention.
+
            let (status, repos) = {
+
                let status = data.status_page_as_html()?.to_string();

-
        for (filename, repopage) in repos {
-
            Self::write_file(&filename, &repopage)?;
-
        }
+
                let mut repos = vec![];
+
                for (_, rid) in data.repos() {
+
                    let basename = rid_to_basename(rid);
+
                    let filename = dirname.join(format!("{basename}.html"));
+
                    let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
+
                    let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
+
                    repos.push((filename, repopage.to_string()));
+
                }

-
        Ok(())
-
    }
+
                (status, repos)
+
            };

-
    /// Write the JSON status file.
-
    pub fn write_json(&mut self, filename: &Path) -> Result<(), PageError> {
-
        // We avoid writing while keeping the lock, to reduce
-
        // contention.
-
        let status = {
-
            let data = self.lock()?;
-
            StatusData::from(&*data).as_json()?
-
        };
+
            let filename = dirname.join("index.html");
+
            Self::write_file(&filename, &status)?;

-
        Self::write_file(filename, &status)
+
            for (filename, repopage) in repos {
+
                Self::write_file(&filename, &repopage)?;
+
            }
+
        }
+
        Ok(())
    }

    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
@@ -614,16 +571,6 @@ impl StatusPage {
    }
}

-
impl Clone for StatusPage {
-
    fn clone(&self) -> Self {
-
        Self {
-
            data: Arc::clone(&self.data),
-
            node_alias: self.node_alias.clone(),
-
            dirname: self.dirname.clone(),
-
        }
-
    }
-
}
-

#[derive(Debug, Clone, Serialize)]
struct StatusData {
    timestamp: String,
modified src/queueadd.rs
@@ -7,6 +7,7 @@ use log::{debug, info};
use crate::{
    db::{Db, DbError},
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
+
    notif::NotificationSender,
};

#[derive(Default)]
@@ -14,6 +15,7 @@ pub struct QueueAdderBuilder {
    db: Option<Db>,
    filters: Option<Vec<EventFilter>>,
    push_shutdown: bool,
+
    events_tx: Option<NotificationSender>,
}

impl QueueAdderBuilder {
@@ -22,9 +24,15 @@ impl QueueAdderBuilder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
            push_shutdown: self.push_shutdown,
+
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }

+
    pub fn events_tx(mut self, tx: NotificationSender) -> Self {
+
        self.events_tx = Some(tx);
+
        self
+
    }
+

    pub fn db(mut self, db: Db) -> Self {
        self.db = Some(db);
        self
@@ -45,6 +53,7 @@ pub struct QueueAdder {
    filters: Vec<EventFilter>,
    db: Db,
    push_shutdown: bool,
+
    events_tx: NotificationSender,
}

impl QueueAdder {
@@ -76,7 +85,7 @@ impl QueueAdder {
                for e in events {
                    debug!("got event {e:#?}");
                    info!("insert broker event into queue: {e:?}");
-
                    self.db.push_queued_event(e)?;
+
                    self.push_event(e)?;
                }
            }
        }
@@ -85,11 +94,17 @@ impl QueueAdder {
        // processing thread knows to stop.
        if self.push_shutdown {
            info!("push a shutdown event into queue");
-
            self.db.push_queued_event(BrokerEvent::Shutdown)?;
+
            self.push_event(BrokerEvent::Shutdown)?;
        }

        Ok(())
    }
+

+
    fn push_event(&self, e: BrokerEvent) -> Result<(), AdderError> {
+
        self.db.push_queued_event(e)?;
+
        self.events_tx.send(()).map_err(|_| AdderError::Send)?;
+
        Ok(())
+
    }
}

#[derive(Debug, thiserror::Error)]
@@ -105,4 +120,7 @@ pub enum AdderError {

    #[error(transparent)]
    Db(#[from] DbError),
+

+
    #[error("failed to notify other thread about database change")]
+
    Send,
}
modified src/queueproc.rs
@@ -2,28 +2,25 @@

#![allow(clippy::result_large_err)]

-
use std::{
-
    thread::{sleep, spawn, JoinHandle},
-
    time::Duration,
-
};
+
use std::thread::{spawn, JoinHandle};

use log::{debug, info};
use radicle::Profile;

use crate::{
-
    broker::Broker,
-
    broker::BrokerError,
+
    broker::{Broker, BrokerError},
    db::{Db, DbError, QueueId, QueuedEvent},
    event::BrokerEvent,
    msg::{MessageError, RequestBuilder},
+
    notif::{NotificationReceiver, NotificationSender},
};

-
const WAIT_FOR_EVENTS_DURATION: Duration = Duration::from_millis(10_000);
-

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
+
    events_rx: Option<NotificationReceiver>,
+
    run_tx: Option<NotificationSender>,
}

impl QueueProcessorBuilder {
@@ -32,9 +29,21 @@ impl QueueProcessorBuilder {
            db: self.db.ok_or(QueueError::Missing("db"))?,
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
+
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
+
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
        })
    }

+
    pub fn events_rx(mut self, rx: NotificationReceiver) -> Self {
+
        self.events_rx = Some(rx);
+
        self
+
    }
+

+
    pub fn run_tx(mut self, tx: NotificationSender) -> Self {
+
        self.run_tx = Some(tx);
+
        self
+
    }
+

    pub fn db(mut self, db: Db) -> Self {
        self.db = Some(db);
        self
@@ -50,6 +59,8 @@ pub struct QueueProcessor {
    db: Db,
    profile: Profile,
    broker: Broker,
+
    events_rx: NotificationReceiver,
+
    run_tx: NotificationSender,
}

impl QueueProcessor {
@@ -58,19 +69,14 @@ impl QueueProcessor {
    }

    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
-
        info!(
-
            "wait about {} seconds to check for events to process",
-
            WAIT_FOR_EVENTS_DURATION.as_secs()
-
        );
        let mut done = false;
-
        while !done {
+
        while !done && self.events_rx.recv().is_ok() {
            debug!("Looking for an event to process");
-
            if let Some(qe) = self.pick_event()? {
+
            while let Some(qe) = self.pick_event()? {
                info!("picked event from queue: {}: {:?}", qe.id(), qe.event());
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
-
            } else {
-
                self.wait_for_events();
+
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
            }
        }

@@ -128,10 +134,6 @@ impl QueueProcessor {
        self.db.remove_queued_event(id).map_err(QueueError::db)?;
        Ok(())
    }
-

-
    fn wait_for_events(&self) {
-
        sleep(WAIT_FOR_EVENTS_DURATION);
-
    }
}

#[derive(Debug, thiserror::Error)]
@@ -151,6 +153,9 @@ pub enum QueueError {

    #[error("failed to run CI")]
    ExecuteCi(#[source] BrokerError),
+

+
    #[error("failed to send notification about new run")]
+
    NotifyRun,
}

impl QueueError {