Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor: store timeout in receiver, separate event/run senders
Merged liw opened 1 year ago

Add new NotificationChannel constructors, for different kinds of channels. Store the receive timeout in the receiver; it depends on kind of channel.

Also add methods to send and receive notifications to senders and receivers. This hides the std::sync::mpsc machinery better, and makes the types easier to use.

Signed-off-by: Lars Wirzenius liw@liw.fi

feat: notify page generation about changes to runs more often

Change src/adapter.rs to send notification via the run notification channel when the adapter is spawned, and when the adapter sends a “triggered” message.

Signed-off-by: Lars Wirzenius liw@liw.fi

8 files changed +145 -44 d537fd39 0f90e97a
modified src/adapter.rs
@@ -19,6 +19,7 @@ use crate::{
    db::{Db, DbError},
    logger,
    msg::{MessageError, Request, Response},
+
    notif::NotificationSender,
    run::{Run, RunState},
};

@@ -50,11 +51,17 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

-
    pub fn run(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
+
    pub fn run(
+
        &self,
+
        trigger: &Request,
+
        run: &mut Run,
+
        db: &Db,
+
        run_notification: &NotificationSender,
+
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

-
        let x = self.run_helper(trigger, run, db);
+
        let x = self.run_helper(trigger, run, db, run_notification);

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
@@ -62,7 +69,13 @@ impl Adapter {
        x
    }

-
    fn run_helper(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
+
    fn run_helper(
+
        &self,
+
        trigger: &Request,
+
        run: &mut Run,
+
        db: &Db,
+
        run_notification: &NotificationSender,
+
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
@@ -74,6 +87,8 @@ impl Adapter {
            .spawn()
            .map_err(|e| AdapterError::SpawnAdapter(self.bin.clone(), e))?;

+
        run_notification.notify()?;
+

        // Write the request to trigger a run to the child's stdin.
        // Then close the pipe to prevent the child from trying to
        // read another message that will never be sent.
@@ -93,6 +108,7 @@ impl Adapter {
        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
+
            run_notification.notify()?;
            match resp {
                Response::Triggered { run_id, info_url } => {
                    run.set_state(RunState::Running);
@@ -111,6 +127,7 @@ impl Adapter {
        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
+
            run_notification.notify()?;
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
@@ -208,6 +225,10 @@ pub enum AdapterError {
    /// Could no update run in database.
    #[error("failed to update CI run information in database")]
    UpdateRun(#[source] DbError),
+

+
    /// Could not send notification of changes to CI runs.
+
    #[error(transparent)]
+
    Notif(#[from] crate::notif::NotificationError),
}

#[cfg(test)]
@@ -223,6 +244,7 @@ mod test {
    use crate::{
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
+
        notif::NotificationChannel,
        run::Whence,
        test::{mock_adapter, trigger_request, TestResult},
    };
@@ -260,7 +282,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db)?;
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -280,7 +304,9 @@ echo '{"response":"finished","result":"failure"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);

        match x {
            Ok(_) => (),
@@ -307,7 +333,9 @@ exit 1

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -327,7 +355,9 @@ kill -9 $BASHPID

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -351,7 +381,9 @@ kill -9 $BASHPID

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -373,7 +405,9 @@ kill -9 $BASHPID

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -394,7 +428,9 @@ echo '{"response":"finished","result":"success","bad":"field"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -417,7 +453,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -444,7 +482,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -463,7 +503,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -490,7 +532,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -521,7 +565,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/bin/cib.rs
@@ -125,7 +125,7 @@ struct InsertCmd {}

impl InsertCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
-
        let mut events_notification = NotificationChannel::default();
+
        let mut events_notification = NotificationChannel::new_event();
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
@@ -161,14 +161,14 @@ impl QueuedCmd {
        logger::adapter_config(config);
        broker.set_default_adapter(&adapter);

-
        let mut event_notifications = NotificationChannel::default();
+
        let mut event_notifications = NotificationChannel::new_event();
        event_notifications
            .tx()
            .map_err(CibError::notification)?
-
            .send(())
+
            .notify()
            .ok();

-
        let mut run_notifications = NotificationChannel::default();
+
        let mut run_notifications = NotificationChannel::new_run();

        let db = args.open_db(config)?;
        let processor = QueueProcessorBuilder::default()
@@ -209,8 +209,8 @@ struct ProcessEventsCmd {}

impl ProcessEventsCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
-
        let mut events_notification = NotificationChannel::default();
-
        let mut run_notification = NotificationChannel::default();
+
        let mut events_notification = NotificationChannel::new_event();
+
        let mut run_notification = NotificationChannel::new_run();

        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
modified src/bin/cibtoolcmd/report.rs
@@ -20,7 +20,7 @@ impl Leaf for ReportCmd {
        let mut page = StatusPage::default();
        page.set_output_dir(&self.output_dir);

-
        let mut run_notification = NotificationChannel::default();
+
        let mut run_notification = NotificationChannel::new_run();
        let thread = page.update_in_thread(
            run_notification.rx().map_err(CibToolError::Notification)?,
            profile,
modified src/broker.rs
@@ -17,6 +17,7 @@ use crate::{
    db::{Db, DbError},
    logger,
    msg::{PatchEvent, PushEvent, Request},
+
    notif::NotificationSender,
    run::{Run, Whence},
};

@@ -64,7 +65,11 @@ impl Broker {
    }

    #[allow(clippy::result_large_err)]
-
    pub fn execute_ci(&mut self, trigger: &Request) -> Result<Run, BrokerError> {
+
    pub fn execute_ci(
+
        &mut self,
+
        trigger: &Request,
+
        run_notification: &NotificationSender,
+
    ) -> Result<Run, BrokerError> {
        logger::broker_start_run(trigger);
        let run = match trigger {
            Request::Trigger {
@@ -102,7 +107,7 @@ impl Broker {
                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
-
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db) {
+
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db, run_notification) {
                        logger::error("failed to run adapter or it failed to run CI", &e);
                    }

@@ -173,6 +178,7 @@ mod test {
    use super::{Adapter, Broker, RepoId};
    use crate::{
        msg::{RunId, RunResult},
+
        notif::NotificationChannel,
        run::RunState,
        test::{mock_adapter, trigger_request, TestResult},
    };
@@ -283,7 +289,9 @@ echo '{"response":"finished","result":"success"}'

        let trigger = trigger_request()?;

-
        let x = broker.execute_ci(&trigger);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = broker.execute_ci(&trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -314,7 +322,9 @@ exit 1

        let trigger = trigger_request()?;

-
        let x = broker.execute_ci(&trigger);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = broker.execute_ci(&trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/notif.rs
@@ -1,12 +1,46 @@
//! Notification channel between threads.

-
use std::sync::mpsc::{channel, Receiver, Sender};
+
use std::{
+
    sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender},
+
    time::Duration,
+
};
+

+
// Timeout when receiving notification about a new event.
+
const EVENT_RECV_TIMEOUT: Duration = Duration::from_secs(1);
+

+
// Maximum interval between update HTML report pages.
+
const UPDATE_IMTERVAL: Duration = Duration::from_secs(1);

/// Channel endpoint for sending notifications.
-
pub type NotificationSender = Sender<()>;
+
pub struct NotificationSender {
+
    sender: Sender<()>,
+
}
+

+
impl NotificationSender {
+
    fn new(sender: Sender<()>) -> Self {
+
        Self { sender }
+
    }
+

+
    pub fn notify(&self) -> Result<(), NotificationError> {
+
        self.sender.send(()).map_err(NotificationError::Send)
+
    }
+
}

/// Channel endpoint for receiving notifications.
-
pub type NotificationReceiver = Receiver<()>;
+
pub struct NotificationReceiver {
+
    receiver: Receiver<()>,
+
    max_wait: Duration,
+
}
+

+
impl NotificationReceiver {
+
    fn new(receiver: Receiver<()>, max_wait: Duration) -> Self {
+
        Self { receiver, max_wait }
+
    }
+

+
    pub fn wait_for_notification(&self) -> Result<(), RecvTimeoutError> {
+
        self.receiver.recv_timeout(self.max_wait)
+
    }
+
}

/// Notification channel.
///
@@ -22,12 +56,22 @@ pub struct NotificationChannel {
    rx: Option<NotificationReceiver>,
}

-
impl Default for NotificationChannel {
-
    fn default() -> Self {
+
impl NotificationChannel {
+
    /// Construct a channel for notifying about new events.
+
    pub fn new_event() -> Self {
+
        Self::new(EVENT_RECV_TIMEOUT)
+
    }
+

+
    /// Construct a channel for notifying about new CI runs.
+
    pub fn new_run() -> Self {
+
        Self::new(UPDATE_IMTERVAL)
+
    }
+

+
    fn new(max_wait: Duration) -> Self {
        let (tx, rx) = channel();
        Self {
-
            tx: Some(tx),
-
            rx: Some(rx),
+
            tx: Some(NotificationSender::new(tx)),
+
            rx: Some(NotificationReceiver::new(rx, max_wait)),
        }
    }
}
@@ -53,4 +97,7 @@ pub enum NotificationError {

    #[error("sender end point already extracted")]
    Sender,
+

+
    #[error("error sending to channel")]
+
    Send(#[source] std::sync::mpsc::SendError<()>),
}
modified src/pages.rs
@@ -545,7 +545,7 @@ impl StatusPage {
                    return Ok(());
                }

-
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
+
                match run_rx.wait_for_notification() {
                    Ok(_) => (),
                    Err(RecvTimeoutError::Timeout) => (),
                    Err(RecvTimeoutError::Disconnected) => {
modified src/queueadd.rs
@@ -93,7 +93,7 @@ impl QueueAdder {

    fn push_event(&self, e: CiEvent) -> Result<(), AdderError> {
        self.db.push_queued_ci_event(e)?;
-
        self.events_tx.send(()).map_err(|_| AdderError::Send)?;
+
        self.events_tx.notify().map_err(|_| AdderError::Send)?;
        Ok(())
    }
}
modified src/queueproc.rs
@@ -5,7 +5,6 @@
use std::{
    sync::mpsc::RecvTimeoutError,
    thread::{spawn, JoinHandle},
-
    time::Duration,
};

use radicle::Profile;
@@ -19,8 +18,6 @@ use crate::{
    notif::{NotificationReceiver, NotificationSender},
};

-
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
-

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
@@ -79,12 +76,13 @@ impl QueueProcessor {
        let mut done = false;
        while !done {
            while let Some(qe) = self.pick_event()? {
+
                self.run_tx.notify()?;
                logger::queueproc_picked_event(qe.id(), &qe);
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
-
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
+
                self.run_tx.notify()?;
            }
-
            match self.events_rx.recv_timeout(RECV_TIMEOUT) {
+
            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
                Err(RecvTimeoutError::Timeout) => {}
                Err(RecvTimeoutError::Disconnected) => {
@@ -136,7 +134,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger)
+
                    .execute_ci(&trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -154,7 +152,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger)
+
                    .execute_ci(&trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -186,8 +184,8 @@ pub enum QueueError {
    #[error("failed to run CI")]
    ExecuteCi(#[source] BrokerError),

-
    #[error("failed to send notification about new run")]
-
    NotifyRun,
+
    #[error(transparent)]
+
    NotifyRun(#[from] crate::notif::NotificationError),
}

impl QueueError {