Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor: store timeout in receiver, separate event/run senders
Lars Wirzenius committed 1 year ago
commit 10ee855966a11b04110349511d83cbf8aa0acd76
parent d537fd39d134550a61e8f83b1554d1235c280f2a
6 files changed +68 -23
modified src/bin/cib.rs
@@ -125,7 +125,7 @@ struct InsertCmd {}

impl InsertCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
-
        let mut events_notification = NotificationChannel::default();
+
        let mut events_notification = NotificationChannel::new_event();
        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
            .db(args.open_db(config)?)
@@ -161,14 +161,14 @@ impl QueuedCmd {
        logger::adapter_config(config);
        broker.set_default_adapter(&adapter);

-
        let mut event_notifications = NotificationChannel::default();
+
        let mut event_notifications = NotificationChannel::new_event();
        event_notifications
            .tx()
            .map_err(CibError::notification)?
-
            .send(())
+
            .notify()
            .ok();

-
        let mut run_notifications = NotificationChannel::default();
+
        let mut run_notifications = NotificationChannel::new_run();

        let db = args.open_db(config)?;
        let processor = QueueProcessorBuilder::default()
@@ -209,8 +209,8 @@ struct ProcessEventsCmd {}

impl ProcessEventsCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
-
        let mut events_notification = NotificationChannel::default();
-
        let mut run_notification = NotificationChannel::default();
+
        let mut events_notification = NotificationChannel::new_event();
+
        let mut run_notification = NotificationChannel::new_run();

        let adder = QueueAdderBuilder::default()
            .events_tx(events_notification.tx().map_err(CibError::notification)?)
modified src/bin/cibtoolcmd/report.rs
@@ -20,7 +20,7 @@ impl Leaf for ReportCmd {
        let mut page = StatusPage::default();
        page.set_output_dir(&self.output_dir);

-
        let mut run_notification = NotificationChannel::default();
+
        let mut run_notification = NotificationChannel::new_run();
        let thread = page.update_in_thread(
            run_notification.rx().map_err(CibToolError::Notification)?,
            profile,
modified src/notif.rs
@@ -1,12 +1,46 @@
//! Notification channel between threads.

-
use std::sync::mpsc::{channel, Receiver, Sender};
+
use std::{
+
    sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender},
+
    time::Duration,
+
};
+

+
// Timeout when receiving notification about a new event.
+
const EVENT_RECV_TIMEOUT: Duration = Duration::from_secs(1);
+

+
// Maximum interval between update HTML report pages.
+
const UPDATE_IMTERVAL: Duration = Duration::from_secs(1);

/// Channel endpoint for sending notifications.
-
pub type NotificationSender = Sender<()>;
+
pub struct NotificationSender {
+
    sender: Sender<()>,
+
}
+

+
impl NotificationSender {
+
    fn new(sender: Sender<()>) -> Self {
+
        Self { sender }
+
    }
+

+
    pub fn notify(&self) -> Result<(), NotificationError> {
+
        self.sender.send(()).map_err(NotificationError::Send)
+
    }
+
}

/// Channel endpoint for receiving notifications.
-
pub type NotificationReceiver = Receiver<()>;
+
pub struct NotificationReceiver {
+
    receiver: Receiver<()>,
+
    max_wait: Duration,
+
}
+

+
impl NotificationReceiver {
+
    fn new(receiver: Receiver<()>, max_wait: Duration) -> Self {
+
        Self { receiver, max_wait }
+
    }
+

+
    pub fn wait_for_notification(&self) -> Result<(), RecvTimeoutError> {
+
        self.receiver.recv_timeout(self.max_wait)
+
    }
+
}

/// Notification channel.
///
@@ -22,12 +56,22 @@ pub struct NotificationChannel {
    rx: Option<NotificationReceiver>,
}

-
impl Default for NotificationChannel {
-
    fn default() -> Self {
+
impl NotificationChannel {
+
    /// Construct a channel for notifying about new events.
+
    pub fn new_event() -> Self {
+
        Self::new(EVENT_RECV_TIMEOUT)
+
    }
+

+
    /// Construct a channel for notifying about new CI runs.
+
    pub fn new_run() -> Self {
+
        Self::new(UPDATE_IMTERVAL)
+
    }
+

+
    fn new(max_wait: Duration) -> Self {
        let (tx, rx) = channel();
        Self {
-
            tx: Some(tx),
-
            rx: Some(rx),
+
            tx: Some(NotificationSender::new(tx)),
+
            rx: Some(NotificationReceiver::new(rx, max_wait)),
        }
    }
}
@@ -53,4 +97,7 @@ pub enum NotificationError {

    #[error("sender end point already extracted")]
    Sender,
+

+
    #[error("error sending to channel")]
+
    Send(#[source] std::sync::mpsc::SendError<()>),
}
modified src/pages.rs
@@ -545,7 +545,7 @@ impl StatusPage {
                    return Ok(());
                }

-
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
+
                match run_rx.wait_for_notification() {
                    Ok(_) => (),
                    Err(RecvTimeoutError::Timeout) => (),
                    Err(RecvTimeoutError::Disconnected) => {
modified src/queueadd.rs
@@ -93,7 +93,7 @@ impl QueueAdder {

    fn push_event(&self, e: CiEvent) -> Result<(), AdderError> {
        self.db.push_queued_ci_event(e)?;
-
        self.events_tx.send(()).map_err(|_| AdderError::Send)?;
+
        self.events_tx.notify().map_err(|_| AdderError::Send)?;
        Ok(())
    }
}
modified src/queueproc.rs
@@ -5,7 +5,6 @@
use std::{
    sync::mpsc::RecvTimeoutError,
    thread::{spawn, JoinHandle},
-
    time::Duration,
};

use radicle::Profile;
@@ -19,8 +18,6 @@ use crate::{
    notif::{NotificationReceiver, NotificationSender},
};

-
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
-

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
@@ -79,12 +76,13 @@ impl QueueProcessor {
        let mut done = false;
        while !done {
            while let Some(qe) = self.pick_event()? {
+
                self.run_tx.notify()?;
                logger::queueproc_picked_event(qe.id(), &qe);
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
-
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
+
                self.run_tx.notify()?;
            }
-
            match self.events_rx.recv_timeout(RECV_TIMEOUT) {
+
            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
                Err(RecvTimeoutError::Timeout) => {}
                Err(RecvTimeoutError::Disconnected) => {
@@ -186,8 +184,8 @@ pub enum QueueError {
    #[error("failed to run CI")]
    ExecuteCi(#[source] BrokerError),

-
    #[error("failed to send notification about new run")]
-
    NotifyRun,
+
    #[error(transparent)]
+
    NotifyRun(#[from] crate::notif::NotificationError),
}

impl QueueError {