Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
Add a unique, immutable broker run ID to each run, in addition to the
Merged liw opened 1 year ago

adapter run ID that may not always be available, and is not guaranteed to be unique.

Use inter-thread channels for threads to notify other threads those other threads have work to do. Event adding thread notifies event processing thread that there are new events, and the event processing thread notifies the report page generation thread that a new run has finished. In addition, the reporting thread reports after a while whether there are any new runs or not, so that the page time stamp gets updated.

13 files changed +345 -192 17828919 bdad79d4
modified ci-broker.md
@@ -230,7 +230,7 @@ when I run cibtool --db ci-broker.db event list
then stdout is exactly ""

when I run cibtool --db ci-broker.db run list --json
-
then stdout contains ""run_id": "xyzzy""
+
then stdout contains ""id": "xyzzy""
~~~


@@ -871,4 +871,7 @@ then stdout is exactly ""
when I run cibtool --db x.db run add --id x --repo rad:zwTxygwuz5LDGBq255RA2CbNGrz8 --alias x --url https://x/1 --branch main --commit f1815dde6ae406d8fe3cec0b96c4486766342716 --who x --finished --failure --timestamp 2024-07-09T02:00:00
when I run cibtool --db x.db run list --json
then stdout contains "rad:zwTxygwuz5LDGBq255RA2CbNGrz8"
+

+
when I run cibtool --db x.db run list --adapter-run-id abracadabra
+
then stdout is exactly ""
~~~
modified src/adapter.rs
@@ -18,6 +18,7 @@ use std::{
use log::{debug, error};

use crate::{
+
    db::{Db, DbError},
    msg::{MessageError, Request, Response},
    run::{Run, RunState},
};
@@ -50,15 +51,21 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

-
    pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
    pub fn run(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
        debug!("running adapter");
+

        run.set_state(RunState::Triggered);
-
        let x = self.run_helper(trigger, run);
+
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
+

+
        let x = self.run_helper(trigger, run, db);
+

        run.set_state(RunState::Finished);
+
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
+

        x
    }

-
    fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
    fn run_helper(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
@@ -99,6 +106,7 @@ impl Adapter {
                    if let Some(url) = info_url {
                        run.set_adapter_info_url(&url);
                    }
+
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
                _ => return Err(AdapterError::NotTriggered(resp)),
            }
@@ -113,6 +121,7 @@ impl Adapter {
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
+
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
                _ => return Err(AdapterError::NotFinished(resp)),
            }
@@ -204,18 +213,22 @@ pub enum AdapterError {
    /// Too many messages from adapter.
    #[error("adapter sent too many messages: first extra is {0:#?}")]
    TooMany(Response),
+

+
    /// Could no update run in database.
+
    #[error("failed to update CI run information in database")]
+
    UpdateRun(#[source] DbError),
}

#[cfg(test)]
mod test {
    use std::{fs::write, io::ErrorKind};

-
    use tempfile::tempdir;
+
    use tempfile::{tempdir, NamedTempFile};

    use radicle::git::Oid;
    use radicle::prelude::RepoId;

-
    use super::{Adapter, Run};
+
    use super::{Adapter, Db, Run};
    use crate::{
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
@@ -223,6 +236,12 @@ mod test {
        test::{log_in_tests, mock_adapter, trigger_request, TestResult},
    };

+
    fn db() -> anyhow::Result<Db> {
+
        let tmp = NamedTempFile::new()?;
+
        let db = Db::new(tmp.path())?;
+
        Ok(db)
+
    }
+

    fn run() -> anyhow::Result<Run> {
        Ok(Run::new(
            RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?,
@@ -250,8 +269,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -271,8 +291,9 @@ echo '{"response":"finished","result":"failure"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);

        match x {
            Ok(_) => (),
@@ -299,8 +320,9 @@ exit 1
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -320,8 +342,9 @@ kill -9 $BASHPID
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -345,8 +368,9 @@ kill -9 $BASHPID
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -368,8 +392,9 @@ kill -9 $BASHPID
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -390,8 +415,9 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -414,8 +440,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -442,8 +469,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -462,8 +490,9 @@ echo '{"response":"finished","result":"success"}'
        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -490,8 +519,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        write(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -522,8 +552,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/bin/cib.rs
@@ -19,6 +19,7 @@ use radicle_ci_broker::{
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
+
    notif::NotificationChannel,
    pages::{PageBuilder, PageError},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
@@ -111,7 +112,9 @@ struct InsertCmd {}

impl InsertCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let mut events_notification = NotificationChannel::default();
        let adder = QueueAdderBuilder::default()
+
            .events_tx(events_notification.tx())
            .db(args.open_db(config)?)
            .filters(&config.filters)
            .build()
@@ -146,7 +149,14 @@ impl QueuedCmd {
        debug!("default adapter: {adapter:?}");
        broker.set_default_adapter(&adapter);

+
        let mut event_notifications = NotificationChannel::default();
+
        event_notifications.tx().send(()).ok();
+

+
        let mut run_notifications = NotificationChannel::default();
+

        let processor = QueueProcessorBuilder::default()
+
            .events_rx(event_notifications.rx())
+
            .run_tx(run_notifications.tx())
            .db(db)
            .broker(broker)
            .build()
@@ -167,7 +177,11 @@ struct ProcessEventsCmd {}

impl ProcessEventsCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let mut events_notification = NotificationChannel::default();
+
        let mut run_notification = NotificationChannel::default();
+

        let adder = QueueAdderBuilder::default()
+
            .events_tx(events_notification.tx())
            .db(args.open_db(config)?)
            .filters(&config.filters)
            .push_shutdown()
@@ -188,7 +202,7 @@ impl ProcessEventsCmd {
        if let Some(dirname) = &config.report_dir {
            page.set_output_dir(dirname);
        }
-
        page.update_in_thread(db, &profile.config.node.alias, false);
+
        page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);

        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
        let spec =
@@ -204,6 +218,8 @@ impl ProcessEventsCmd {
        broker.set_default_adapter(&adapter);

        let processor = QueueProcessorBuilder::default()
+
            .events_rx(events_notification.rx())
+
            .run_tx(run_notification.tx())
            .db(args.open_db(config)?)
            .broker(broker)
            .build()
modified src/bin/cibtool.rs
@@ -30,6 +30,7 @@ use radicle_ci_broker::{
    db::{Db, DbError, QueueId},
    event::BrokerEvent,
    msg::{RunId, RunResult},
+
    notif::NotificationChannel,
    pages::{PageBuilder, PageError},
    run::{Run, RunState, Whence},
};
@@ -551,9 +552,9 @@ enum RunSubCmd {

#[derive(Parser)]
struct AddRun {
-
    /// Set the run ID.
+
    /// Set the adapter run ID.
    #[clap(long)]
-
    id: RunId,
+
    id: Option<RunId>,

    /// Set alias of node that performed the CI run.
    #[clap(long)]
@@ -617,7 +618,10 @@ impl AddRun {
            who: self.who.clone(),
        };
        let mut run = Run::new(self.repo, &self.alias, whence, self.timestamp.clone());
-
        run.set_adapter_run_id(RunId::default());
+

+
        let id = self.id.clone().unwrap_or_default();
+
        run.set_adapter_run_id(id);
+

        if let Some(url) = &self.url {
            run.set_adapter_info_url(url);
        }
@@ -628,7 +632,7 @@ impl AddRun {
            RunResult::Failure
        });

-
        db.push_run(run)?;
+
        db.push_run(&run)?;

        Ok(())
    }
@@ -638,6 +642,9 @@ impl AddRun {
struct ListRuns {
    #[clap(long)]
    json: bool,
+

+
    #[clap(long)]
+
    adapter_run_id: Option<RunId>,
}

impl ListRuns {
@@ -645,16 +652,22 @@ impl ListRuns {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;

+
        let runs = if let Some(wanted) = &self.adapter_run_id {
+
            db.find_runs(wanted)?
+
        } else {
+
            db.get_all_runs()?
+
        };
+

        if self.json {
-
            let runs: Vec<RunInfo> = db.get_all_runs()?.iter().map(RunInfo::from).collect();
            println!(
                "{}",
                serde_json::to_string_pretty(&runs).map_err(CibToolError::RunToJson)?
            );
        } else {
-
            for run in db.get_all_runs()? {
+
            for run in runs {
                println!(
-
                    "{}",
+
                    "{} {}",
+
                    run.broker_run_id(),
                    run.adapter_run_id()
                        .map(|id| id.to_string())
                        .unwrap_or("unknown".into())
@@ -717,7 +730,10 @@ impl ReportCmd {
            .build()?;

        page.set_output_dir(&self.output_dir);
-
        let thread = page.update_in_thread(db, &profile.config.node.alias, true);
+

+
        let mut run_notification = NotificationChannel::default();
+
        let thread =
+
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, true);
        thread.join().unwrap()?;

        Ok(())
modified src/broker.rs
@@ -17,7 +17,7 @@ use radicle::prelude::RepoId;
use crate::{
    adapter::Adapter,
    db::{Db, DbError},
-
    msg::{PatchEvent, PushEvent, Request, RunId},
+
    msg::{PatchEvent, PushEvent, Request},
    run::{Run, Whence},
};

@@ -68,7 +68,7 @@ impl Broker {
    pub fn execute_ci(&mut self, trigger: &Request) -> Result<Run, BrokerError> {
        info!("Start CI run");
        debug!("Start CI run on {trigger:#?}");
-
        let mut run = match trigger {
+
        let run = match trigger {
            Request::Trigger {
                common,
                push,
@@ -99,12 +99,13 @@ impl Broker {
                    };

                    let mut run = Run::new(*rid, &common.repository.name, whence, now()?);
+
                    self.db.push_run(&run)?;

                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
                    debug!("broker runs adapter");
-
                    if let Err(e) = adapter.run(trigger, &mut run) {
+
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db) {
                        error!("failed to run adapter or it failed to run CI: {e}");
                        let mut e = e.source();
                        while let Some(source) = e {
@@ -122,14 +123,7 @@ impl Broker {
        };
        info!("Finish CI run: {run:?}");

-
        // If the adapter never gave us a run ID, it has not been set.
-
        // In that case, we invent one so that it can be used by the
-
        // database as a key.
-
        if run.adapter_run_id().is_none() {
-
            run.set_adapter_run_id(RunId::default());
-
        }
-

-
        self.db.push_run(run.clone())?;
+
        self.db.update_run(&run)?;

        Ok(run)
    }
modified src/db.rs
@@ -60,7 +60,7 @@ impl Db {
        const TABLES: &[&str] = &[
            "CREATE TABLE IF NOT EXISTS counter_test (counter INT)",
            "CREATE TABLE IF NOT EXISTS event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT)",
-
            "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)",
+
            "CREATE TABLE IF NOT EXISTS ci_runs (broker_run_id TEXT PRIMARY KEY, json TEXT)",
        ];

        for table in TABLES.iter() {
@@ -294,7 +294,7 @@ impl Db {

    /// Return list of CI runs currently in the database.
    pub fn list_runs(&self) -> Result<Vec<RunId>, DbError> {
-
        let mut select = self.prepare("SELECT run_id FROM ci_runs")?;
+
        let mut select = self.prepare("SELECT broker_run_id FROM ci_runs")?;

        let mut run_ids = vec![];

@@ -304,7 +304,7 @@ impl Db {
                Ok(State::Row) => {
                    let run_id: String = select
                        .stmt
-
                        .read("run_id")
+
                        .read("broker_run_id")
                        .map_err(|e| DbError::get_run(&select.sql, e))?;
                    let run_id = RunId::from(run_id.as_str());
                    run_ids.push(run_id);
@@ -354,7 +354,7 @@ impl Db {

    /// Return a specific CI run, given is id, if one exists.
    pub fn get_run(&self, id: &RunId) -> Result<Option<Run>, DbError> {
-
        let mut select = self.prepare("SELECT json FROM ci_runs WHERE run_id = :id")?;
+
        let mut select = self.prepare("SELECT json FROM ci_runs WHERE broker_run_id = :id")?;
        select
            .stmt
            .bind((":id", id.to_string().as_str()))
@@ -387,13 +387,27 @@ impl Db {
        Ok(run)
    }

+
    /// Return a list of broker run IDs that their adapter run ID set
+
    /// to a given value.
+
    pub fn find_runs(&self, adapter_runid: &RunId) -> Result<Vec<Run>, DbError> {
+
        let runs = self
+
            .get_all_runs()?
+
            .iter()
+
            .filter(|run| run.adapter_run_id() == Some(adapter_runid))
+
            .cloned()
+
            .collect();
+

+
        Ok(runs)
+
    }
+

    /// Add a new CI run to the database, returning its id.
-
    pub fn push_run(&self, run: Run) -> Result<RunId, DbError> {
-
        let id = run.adapter_run_id().ok_or(DbError::without_id())?.clone();
+
    pub fn push_run(&self, run: &Run) -> Result<RunId, DbError> {
+
        let id = run.broker_run_id().clone();

        let json = serde_json::to_string(&run).map_err(DbError::event_to_json)?;

-
        let mut insert = self.prepare("INSERT INTO ci_runs (run_id, json) VALUES (:id, :json)")?;
+
        let mut insert =
+
            self.prepare("INSERT INTO ci_runs (broker_run_id, json) VALUES (:id, :json)")?;
        insert
            .stmt
            .bind((":id", id.to_string().as_str()))
@@ -411,6 +425,31 @@ impl Db {
        Ok(id)
    }

+
    /// Update a CI run in the database.
+
    pub fn update_run(&self, run: &Run) -> Result<(), DbError> {
+
        let id = run.broker_run_id().clone();
+

+
        let json = serde_json::to_string(&run).map_err(DbError::event_to_json)?;
+

+
        let mut update =
+
            self.prepare("UPDATE ci_runs SET json = :json WHERE broker_run_id = :id")?;
+
        update
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&update.sql, e))?;
+
        update
+
            .stmt
+
            .bind((":json", json.as_str()))
+
            .map_err(|e| DbError::bind(&update.sql, e))?;
+

+
        match update.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::update_run(&update.sql, e)),
+
        }
+

+
        Ok(())
+
    }
+

    /// Remove a CI run from database, given its id. It's OK if the run is
    /// not in the database, that is just silently ignored.
    pub fn remove_run(&self, id: &RunId) -> Result<(), DbError> {
@@ -584,12 +623,12 @@ pub enum DbError {
    #[error("failed to retrieve a CI run from database")]
    GetRun(String, #[source] sqlite::Error),

-
    #[error("programming error: pushed run does not have its run ID set")]
-
    WithoutId,
-

    #[error("failed to insert a CI run into database")]
    PushRun(String, #[source] sqlite::Error),

+
    #[error("failed to update a CI run in database")]
+
    UpdateRun(String, #[source] sqlite::Error),
+

    #[error("failed to remove a CI run from database")]
    RemoveRun(String, #[source] sqlite::Error),
}
@@ -679,14 +718,14 @@ impl DbError {
        Self::GetRun(sql.into(), e)
    }

-
    fn without_id() -> Self {
-
        Self::WithoutId
-
    }
-

    fn push_run(sql: &str, e: sqlite::Error) -> Self {
        Self::PushRun(sql.into(), e)
    }

+
    fn update_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::UpdateRun(sql.into(), e)
+
    }
+

    fn remove_run(sql: &str, e: sqlite::Error) -> Self {
        Self::RemoveRun(sql.into(), e)
    }
modified src/lib.rs
@@ -11,6 +11,7 @@ pub mod config;
pub mod db;
pub mod event;
pub mod msg;
+
pub mod notif;
pub mod pages;
pub mod queueadd;
pub mod queueproc;
modified src/msg.rs
@@ -77,6 +77,13 @@ impl fmt::Display for RunId {
    }
}

+
impl RunId {
+
    /// Return representation of identifier as a string slice.
+
    pub fn as_str(&self) -> &str {
+
        &self.id
+
    }
+
}
+

/// The result of a CI run.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
added src/notif.rs
@@ -0,0 +1,47 @@
+
//! Notification channel between threads.
+

+
use std::sync::mpsc::{channel, Receiver, Sender};
+

+
/// Channel endpoint for sending notifications.
+
pub type NotificationSender = Sender<()>;
+

+
/// Channel endpoint for receiving notifications.
+
pub type NotificationReceiver = Receiver<()>;
+

+
/// Notification channel.
+
///
+
/// The notification channel allows one thread to notify another that
+
/// the other thread has some work to do. The notification carries no
+
/// other information: the receiver is supposed to know where it can
+
/// get whatever data it needs to what it needs to do.
+
///
+
/// The point of this is to make sure threads in the CI broker
+
/// exchange data only via the database, where it is persistent.
+
pub struct NotificationChannel {
+
    tx: Option<NotificationSender>,
+
    rx: Option<NotificationReceiver>,
+
}
+

+
impl Default for NotificationChannel {
+
    fn default() -> Self {
+
        let (tx, rx) = channel();
+
        Self {
+
            tx: Some(tx),
+
            rx: Some(rx),
+
        }
+
    }
+
}
+

+
impl NotificationChannel {
+
    /// Return the transmit endpoint of the notification channel. This
+
    /// can only be called once.
+
    pub fn tx(&mut self) -> NotificationSender {
+
        self.tx.take().unwrap()
+
    }
+

+
    /// Return the receive endpoint of the notification channel. This
+
    /// can only be called once.
+
    pub fn rx(&mut self) -> NotificationReceiver {
+
        self.rx.take().unwrap()
+
    }
+
}
modified src/pages.rs
@@ -11,8 +11,8 @@ use std::{
    collections::{HashMap, HashSet},
    fs::write,
    path::{Path, PathBuf},
-
    sync::{Arc, Mutex, MutexGuard},
-
    thread::{sleep, spawn, JoinHandle},
+
    sync::mpsc::RecvTimeoutError,
+
    thread::{spawn, JoinHandle},
    time::Duration,
};

@@ -27,6 +27,7 @@ use crate::{
    db::{Db, DbError},
    event::BrokerEvent,
    msg::{RunId, RunResult},
+
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
};

@@ -86,16 +87,7 @@ impl PageBuilder {
        }
        debug!("broker database has {} CI runs", runs.len());

-
        Ok(StatusPage::new(PageData {
-
            timestamp: now()?,
-
            ci_broker_version: env!("CARGO_PKG_VERSION"),
-
            ci_broker_git_commit: env!("GIT_HEAD"),
-
            node_alias: self.node_alias.ok_or(PageError::NoAlias)?,
-
            runs,
-
            broker_event_counter: 0,
-
            latest_broker_event: None,
-
            latest_ci_run: None,
-
        }))
+
        Ok(StatusPage::new())
    }
}

@@ -197,8 +189,25 @@ impl PageData {
                item.push_child(Element::new(Tag::Br));
                item.push_child(
                    Element::new(Tag::Span)
+
                        .with_text("latest run at ")
                        .with_text(run.timestamp())
                        .with_child(Element::new(Tag::Br))
+
                        .with_text("broker run ID: ")
+
                        .with_child(
+
                            Element::new(Tag::Code)
+
                                .with_class("broker_run_id")
+
                                .with_text(run.broker_run_id().as_str()),
+
                        )
+
                        .with_child(Element::new(Tag::Br))
+
                        .with_text("adapter run ID: ")
+
                        .with_child(if let Some(id) = run.adapter_run_id() {
+
                            Element::new(Tag::Code)
+
                                .with_class("adapter_run_id")
+
                                .with_text(id.as_str())
+
                        } else {
+
                            Element::new(Tag::Span).with_text("unknown")
+
                        })
+
                        .with_child(Element::new(Tag::Br))
                        .with_text(" ")
                        .with_child(Self::whence_as_html(run.whence())),
                );
@@ -359,16 +368,6 @@ impl PageData {
                }
            };

-
            let link = if let Some(id) = run.adapter_run_id() {
-
                Element::new(Tag::A)
-
                    .with_attribute("href", &format!("{}/log.html", id))
-
                    .with_text("log")
-
            } else {
-
                Element::new(Tag::Span)
-
                    .with_class("missing_log")
-
                    .with_text("no log yet")
-
            };
-

            let info_url = if let Some(url) = run.adapter_info_url() {
                Element::new(Tag::Span).with_child(
                    Element::new(Tag::A)
@@ -382,14 +381,29 @@ impl PageData {

            list.push_child(
                Element::new(Tag::Li)
+
                    .with_text("at ")
                    .with_text(run.timestamp())
-
                    .with_text(" ")
+
                    .with_text("; currently ")
                    .with_child(current)
-
                    .with_text(" ")
-
                    .with_child(link)
                    .with_text("; ")
                    .with_child(info_url)
                    .with_child(Element::new(Tag::Br))
+
                    .with_text("broker run ID: ")
+
                    .with_child(
+
                        Element::new(Tag::Code)
+
                            .with_class("broker_run_id")
+
                            .with_text(run.broker_run_id().as_str()),
+
                    )
+
                    .with_child(Element::new(Tag::Br))
+
                    .with_text("adapter run ID: ")
+
                    .with_child(if let Some(id) = run.adapter_run_id() {
+
                        Element::new(Tag::Code)
+
                            .with_class("adapter_run_id")
+
                            .with_text(id.as_str())
+
                    } else {
+
                        Element::new(Tag::Span).with_text("unknown")
+
                    })
+
                    .with_child(Element::new(Tag::Br))
                    .with_child(Self::whence_as_html(run.whence())),
            );
        }
@@ -455,56 +469,25 @@ impl PageData {
/// page per such repository, with a list of CI runs for that
/// repository.
pub struct StatusPage {
-
    data: Arc<Mutex<PageData>>,
    node_alias: String,
    dirname: Option<PathBuf>,
}

impl StatusPage {
-
    fn new(data: PageData) -> Self {
+
    fn new() -> Self {
        Self {
-
            data: Arc::new(Mutex::new(data)),
            node_alias: String::new(),
            dirname: None,
        }
    }

-
    fn lock(&mut self) -> Result<MutexGuard<PageData>, PageError> {
-
        self.data
-
            .lock()
-
            .map_err(|_| PageError::Lock("lock StatusPage::data"))
-
    }
-

    pub fn set_output_dir(&mut self, dirname: &Path) {
        self.dirname = Some(dirname.into());
    }

-
    pub fn update_timestamp(&mut self) -> Result<(), PageError> {
-
        let mut data = self.lock()?;
-
        data.timestamp = now()?;
-
        Ok(())
-
    }
-

-
    pub fn broker_event(&mut self, event: &BrokerEvent) -> Result<(), PageError> {
-
        let mut data = self.lock()?;
-
        data.latest_broker_event = Some(event.clone());
-
        data.broker_event_counter += 1;
-
        Ok(())
-
    }
-

-
    /// Add a new CI run to the status page.
-
    pub fn push_run(&mut self, new: Run) -> Result<(), PageError> {
-
        // We silently ignore a run until its id has been set.
-
        if let Some(id) = new.adapter_run_id() {
-
            let mut data = self.lock()?;
-
            data.latest_ci_run = Some(new.clone());
-
            data.runs.insert(id.clone(), new);
-
        }
-
        Ok(())
-
    }
-

    pub fn update_in_thread(
        mut self,
+
        run_rx: NotificationReceiver,
        db: Db,
        node_alias: &str,
        once: bool,
@@ -518,71 +501,67 @@ impl StatusPage {
            UPDATE_INTERVAL.as_secs()
        );
        spawn(move || loop {
-
            self.update_and_write(&db)?;
-
            if once {
-
                return Ok(());
+
            loop {
+
                self.update_and_write(&db)?;
+
                if once {
+
                    return Ok(());
+
                }
+

+
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
+
                    Ok(_) | Err(RecvTimeoutError::Timeout) => (),
+
                    Err(RecvTimeoutError::Disconnected) => break,
+
                }
            }
-
            sleep(UPDATE_INTERVAL);
        })
    }

    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
            info!("write HTML report pages to {}", dirname.display());
-
            let mut page = PageBuilder::default()
-
                .node_alias(&self.node_alias)
-
                .runs(db.get_all_runs()?)
-
                .build()?;

-
            page.write(dirname)?;
-
        }
-
        Ok(())
-
    }
+
            let runs = db.get_all_runs()?;

-
    /// Write the status page (as index.html) and per-repository pages
-
    /// (`<RID>.html`) into the directory given as an argument. The directory must exist.
-
    pub fn write(&mut self, dirname: &Path) -> Result<(), PageError> {
-
        let nameless = String::from("nameless repo");
-

-
        // We avoid writing while keeping the lock, to reduce
-
        // contention.
-
        let (status, repos) = {
-
            let data = self.lock()?;
-

-
            let status = data.status_page_as_html()?.to_string();
-

-
            let mut repos = vec![];
-
            for (_, rid) in data.repos() {
-
                let basename = rid_to_basename(rid);
-
                let filename = dirname.join(format!("{basename}.html"));
-
                let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
-
                let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
-
                repos.push((filename, repopage.to_string()));
-
            }
+
            let data = PageData {
+
                timestamp: now()?,
+
                ci_broker_version: env!("CARGO_PKG_VERSION"),
+
                ci_broker_git_commit: env!("GIT_HEAD"),
+
                node_alias: self.node_alias.clone(),
+
                runs: HashMap::from_iter(
+
                    runs.iter()
+
                        .map(|run| (run.broker_run_id().clone(), run.clone())),
+
                ),
+
                broker_event_counter: 0,
+
                latest_broker_event: None,
+
                latest_ci_run: None,
+
            };

-
            (status, repos)
-
        };
+
            let nameless = String::from("nameless repo");

-
        let filename = dirname.join("index.html");
-
        Self::write_file(&filename, &status)?;
+
            // We avoid writing while keeping the lock, to reduce
+
            // contention.
+
            let (status, repos) = {
+
                let status = data.status_page_as_html()?.to_string();

-
        for (filename, repopage) in repos {
-
            Self::write_file(&filename, &repopage)?;
-
        }
+
                let mut repos = vec![];
+
                for (_, rid) in data.repos() {
+
                    let basename = rid_to_basename(rid);
+
                    let filename = dirname.join(format!("{basename}.html"));
+
                    let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
+
                    let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
+
                    repos.push((filename, repopage.to_string()));
+
                }

-
        Ok(())
-
    }
+
                (status, repos)
+
            };

-
    /// Write the JSON status file.
-
    pub fn write_json(&mut self, filename: &Path) -> Result<(), PageError> {
-
        // We avoid writing while keeping the lock, to reduce
-
        // contention.
-
        let status = {
-
            let data = self.lock()?;
-
            StatusData::from(&*data).as_json()?
-
        };
+
            let filename = dirname.join("index.html");
+
            Self::write_file(&filename, &status)?;

-
        Self::write_file(filename, &status)
+
            for (filename, repopage) in repos {
+
                Self::write_file(&filename, &repopage)?;
+
            }
+
        }
+
        Ok(())
    }

    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
@@ -592,16 +571,6 @@ impl StatusPage {
    }
}

-
impl Clone for StatusPage {
-
    fn clone(&self) -> Self {
-
        Self {
-
            data: Arc::clone(&self.data),
-
            node_alias: self.node_alias.clone(),
-
            dirname: self.dirname.clone(),
-
        }
-
    }
-
}
-

#[derive(Debug, Clone, Serialize)]
struct StatusData {
    timestamp: String,
modified src/queueadd.rs
@@ -7,6 +7,7 @@ use log::{debug, info};
use crate::{
    db::{Db, DbError},
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
+
    notif::NotificationSender,
};

#[derive(Default)]
@@ -14,6 +15,7 @@ pub struct QueueAdderBuilder {
    db: Option<Db>,
    filters: Option<Vec<EventFilter>>,
    push_shutdown: bool,
+
    events_tx: Option<NotificationSender>,
}

impl QueueAdderBuilder {
@@ -22,9 +24,15 @@ impl QueueAdderBuilder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
            push_shutdown: self.push_shutdown,
+
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }

+
    pub fn events_tx(mut self, tx: NotificationSender) -> Self {
+
        self.events_tx = Some(tx);
+
        self
+
    }
+

    pub fn db(mut self, db: Db) -> Self {
        self.db = Some(db);
        self
@@ -45,6 +53,7 @@ pub struct QueueAdder {
    filters: Vec<EventFilter>,
    db: Db,
    push_shutdown: bool,
+
    events_tx: NotificationSender,
}

impl QueueAdder {
@@ -76,7 +85,7 @@ impl QueueAdder {
                for e in events {
                    debug!("got event {e:#?}");
                    info!("insert broker event into queue: {e:?}");
-
                    self.db.push_queued_event(e)?;
+
                    self.push_event(e)?;
                }
            }
        }
@@ -85,11 +94,17 @@ impl QueueAdder {
        // processing thread knows to stop.
        if self.push_shutdown {
            info!("push a shutdown event into queue");
-
            self.db.push_queued_event(BrokerEvent::Shutdown)?;
+
            self.push_event(BrokerEvent::Shutdown)?;
        }

        Ok(())
    }
+

+
    fn push_event(&self, e: BrokerEvent) -> Result<(), AdderError> {
+
        self.db.push_queued_event(e)?;
+
        self.events_tx.send(()).map_err(|_| AdderError::Send)?;
+
        Ok(())
+
    }
}

#[derive(Debug, thiserror::Error)]
@@ -105,4 +120,7 @@ pub enum AdderError {

    #[error(transparent)]
    Db(#[from] DbError),
+

+
    #[error("failed to notify other thread about database change")]
+
    Send,
}
modified src/queueproc.rs
@@ -2,28 +2,25 @@

#![allow(clippy::result_large_err)]

-
use std::{
-
    thread::{sleep, spawn, JoinHandle},
-
    time::Duration,
-
};
+
use std::thread::{spawn, JoinHandle};

use log::{debug, info};
use radicle::Profile;

use crate::{
-
    broker::Broker,
-
    broker::BrokerError,
+
    broker::{Broker, BrokerError},
    db::{Db, DbError, QueueId, QueuedEvent},
    event::BrokerEvent,
    msg::{MessageError, RequestBuilder},
+
    notif::{NotificationReceiver, NotificationSender},
};

-
const WAIT_FOR_EVENTS_DURATION: Duration = Duration::from_millis(10_000);
-

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
+
    events_rx: Option<NotificationReceiver>,
+
    run_tx: Option<NotificationSender>,
}

impl QueueProcessorBuilder {
@@ -32,9 +29,21 @@ impl QueueProcessorBuilder {
            db: self.db.ok_or(QueueError::Missing("db"))?,
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
+
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
+
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
        })
    }

+
    pub fn events_rx(mut self, rx: NotificationReceiver) -> Self {
+
        self.events_rx = Some(rx);
+
        self
+
    }
+

+
    pub fn run_tx(mut self, tx: NotificationSender) -> Self {
+
        self.run_tx = Some(tx);
+
        self
+
    }
+

    pub fn db(mut self, db: Db) -> Self {
        self.db = Some(db);
        self
@@ -50,6 +59,8 @@ pub struct QueueProcessor {
    db: Db,
    profile: Profile,
    broker: Broker,
+
    events_rx: NotificationReceiver,
+
    run_tx: NotificationSender,
}

impl QueueProcessor {
@@ -58,19 +69,14 @@ impl QueueProcessor {
    }

    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
-
        info!(
-
            "wait about {} seconds to check for events to process",
-
            WAIT_FOR_EVENTS_DURATION.as_secs()
-
        );
        let mut done = false;
-
        while !done {
+
        while !done && self.events_rx.recv().is_ok() {
            debug!("Looking for an event to process");
-
            if let Some(qe) = self.pick_event()? {
+
            while let Some(qe) = self.pick_event()? {
                info!("picked event from queue: {}: {:?}", qe.id(), qe.event());
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
-
            } else {
-
                self.wait_for_events();
+
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
            }
        }

@@ -128,10 +134,6 @@ impl QueueProcessor {
        self.db.remove_queued_event(id).map_err(QueueError::db)?;
        Ok(())
    }
-

-
    fn wait_for_events(&self) {
-
        sleep(WAIT_FOR_EVENTS_DURATION);
-
    }
}

#[derive(Debug, thiserror::Error)]
@@ -151,6 +153,9 @@ pub enum QueueError {

    #[error("failed to run CI")]
    ExecuteCi(#[source] BrokerError),
+

+
    #[error("failed to send notification about new run")]
+
    NotifyRun,
}

impl QueueError {
modified src/run.rs
@@ -9,6 +9,7 @@ use crate::msg::{Revision, RunId, RunResult};

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Run {
+
    broker_run_id: RunId,
    adapter_run_id: Option<RunId>,
    adapter_info_url: Option<String>,
    repo_id: RepoId,
@@ -23,6 +24,7 @@ impl Run {
    /// Create a new run.
    pub fn new(repo_id: RepoId, alias: &str, whence: Whence, timestamp: String) -> Self {
        Self {
+
            broker_run_id: RunId::default(),
            adapter_run_id: None,
            adapter_info_url: None,
            repo_id,
@@ -54,6 +56,11 @@ impl Run {
        &self.whence
    }

+
    /// Return the run id assigned by the CI broker itself. This always exists.
+
    pub fn broker_run_id(&self) -> &RunId {
+
        &self.broker_run_id
+
    }
+

    /// Set the run id assigned by the adapter.
    pub fn set_adapter_run_id(&mut self, run_id: RunId) {
        assert!(self.adapter_run_id.is_none());