Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: allow setting how often the event queue length is logged
Merged liw opened 1 year ago

The configuration file can have a field queue_len_interval, and defaults to 10 seconds. This should reduce log spam.

Signed-off-by: Lars Wirzenius liw@liw.fi

3 files changed +33 -5 c108cf11 cadb933d
modified ci-broker.md
@@ -45,6 +45,7 @@ the developers and maintainers of the CI broker.
db: ci-broker.db
report_dir: reports
default_adapter: mcadapterface
+
queue_len_interval: 1min
adapters:
  mcadapterface:
    command: ./adapter.sh
modified src/config.rs
@@ -19,19 +19,28 @@ const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;
pub struct Config {
    pub default_adapter: String,
    pub adapters: HashMap<String, Adapter>,
-
    #[serde(deserialize_with = "deserialize_duration")]
-
    #[serde(default = "default_max_run_time")]
-
    pub max_run_time: Duration,
    pub filters: Vec<EventFilter>,
    pub report_dir: Option<PathBuf>,
    pub status_update_interval_seconds: Option<u64>,
    pub db: PathBuf,
+

+
    #[serde(deserialize_with = "deserialize_duration")]
+
    #[serde(default = "default_max_run_time")]
+
    pub max_run_time: Duration,
+

+
    #[serde(deserialize_with = "deserialize_duration")]
+
    #[serde(default = "default_queue_len_interval")]
+
    pub queue_len_interval: Duration,
}

fn default_max_run_time() -> Duration {
    DEFAULT_MAX_RUN_TIME
}

+
fn default_queue_len_interval() -> Duration {
+
    DEFAULT_MAX_RUN_TIME
+
}
+

impl Config {
    pub fn load(filename: &Path) -> Result<Self, ConfigError> {
        let config =
modified src/queueproc.rs
@@ -5,6 +5,7 @@
use std::{
    sync::mpsc::RecvTimeoutError,
    thread::{spawn, JoinHandle},
+
    time::{Duration, Instant},
};

use radicle::Profile;
@@ -24,8 +25,11 @@ pub struct QueueProcessorBuilder {
    broker: Option<Broker>,
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
+
    queue_len_interval: Option<Duration>,
}

+
const DEFAULT_QUEUE_LEN_DURATION: Duration = Duration::from_secs(10);
+

impl QueueProcessorBuilder {
    pub fn build(self) -> Result<QueueProcessor, QueueError> {
        Ok(QueueProcessor {
@@ -34,6 +38,10 @@ impl QueueProcessorBuilder {
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
+
            queue_len_interval: self
+
                .queue_len_interval
+
                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
+
            prev_queue_len: Instant::now(),
        })
    }

@@ -52,6 +60,11 @@ impl QueueProcessorBuilder {
        self
    }

+
    pub fn queue_len_interval(mut self, interval: Duration) -> Self {
+
        self.queue_len_interval = Some(interval);
+
        self
+
    }
+

    pub fn broker(mut self, broker: Broker) -> Self {
        self.broker = Some(broker);
        self
@@ -64,6 +77,8 @@ pub struct QueueProcessor {
    broker: Broker,
    events_rx: NotificationReceiver,
    run_tx: NotificationSender,
+
    queue_len_interval: Duration,
+
    prev_queue_len: Instant,
}

impl QueueProcessor {
@@ -102,9 +117,12 @@ impl QueueProcessor {
        Ok(())
    }

-
    fn pick_event(&self) -> Result<Option<QueuedCiEvent>, QueueError> {
+
    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;
-
        logger::queueproc_queue_length(ids.len());
+
        if self.prev_queue_len.elapsed() > self.queue_len_interval {
+
            logger::queueproc_queue_length(ids.len());
+
            self.prev_queue_len = Instant::now();
+
        }

        let mut queue = vec![];
        for id in ids.iter() {