Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix(src/queueproc.rs): logic for when to stop processing events
Lars Wirzenius committed 6 months ago
commit 7b865840e80022fcf9279bad2ffe353812a10079
parent 59ef2c7
2 files changed +42 -28
modified src/db.rs
@@ -487,7 +487,7 @@ impl<'a> Stmt<'a> {
}

/// An identifier for an event in the event queue in the database.
-
#[derive(Clone, Debug, Serialize, Deserialize)]
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct QueueId {
    id: String,
}
modified src/queueproc.rs
@@ -4,18 +4,18 @@

use std::{
    collections::HashSet,
-
    sync::{Arc, Mutex},
-
    thread::spawn,
+
    sync::{mpsc::RecvTimeoutError, Arc, Mutex},
+
    thread::{sleep, spawn},
    time::{Duration, Instant},
};

-
use radicle::{Profile, identity::RepoId};
+
use radicle::{identity::RepoId, Profile};

use crate::{
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
-
    db::{Db, DbError, QueuedCiEvent},
+
    db::{Db, DbError, QueueId, QueuedCiEvent},
    filter::{EventFilter, Trigger},
    logger,
    msg::{MessageError, RequestBuilder},
@@ -23,6 +23,8 @@ use crate::{
    worker::Worker,
};

+
const SLEEP_WHEN_BUSY: Duration = Duration::from_secs(1);
+

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
@@ -143,14 +145,14 @@ impl QueueProcessor {
        let mut handles = vec![];

        loop {
-
            let mut queue_was_emptied = false;
+
            let mut queue = self.db.queued_ci_events().map_err(QueueError::db)?;

            // If we may spawn another adapter, pick an event from the queue
            // and run the adapters.
-
            if expecting_new_events && handles.len() < self.concurrent_adapters {
-
                match self.pick_event() {
+
            if handles.len() < self.concurrent_adapters {
+
                match self.pick_event(&mut queue) {
                    Ok(Some(qe)) => {
-
                        // Remove picked event from queue so we doh't re-process it. If we don't
+
                        // Remove picked event from queue so we don't re-process it. If we don't
                        // do this, and we crash when processing the event, we'll re-process it
                        // again and again. It seems better to discard an event, and skip running
                        // CI, rather then getting stuck on a specific event.
@@ -164,12 +166,12 @@ impl QueueProcessor {
                                let h = spawn(move || p.pick_and_process_one(qe, adapters));
                                handles.push((repoid, h));
                            }
-
                            Ok(None) => (),
-
                            Err(_) => (),
+
                            Ok(None) => {}
+
                            Err(_) => {}
                        }
                    }
-
                    Ok(None) => queue_was_emptied = true,
-
                    Err(_) => (),
+
                    Ok(None) => {}
+
                    Err(_) => {}
                }
            }

@@ -178,15 +180,15 @@ impl QueueProcessor {
            let mut h2 = vec![];
            for (repoid, h) in handles {
                if h.is_finished() {
+
                    logger::queueproc_finished_run(&repoid);
                    if let Some(repoid) = repoid {
                        self.current.remove(repoid);
                    }
                    match h.join() {
-
                        Err(err) => eprintln!("thread join error {err:?}"),
+
                        Err(_) => logger::queueproc_thread_join(),
                        Ok(Err(_)) => (),
                        Ok(Ok(MaybeShutdown::Shutdown)) => {
                            expecting_new_events = false;
-
                            queue_was_emptied = true;
                        }
                        Ok(Ok(MaybeShutdown::Continue)) => (),
                    }
@@ -199,16 +201,22 @@ impl QueueProcessor {
            // If we didn't empty the event queue, but we're still
            // expecting new events, wait for a new event. This prevents
            // a busy loop.
-
            if expecting_new_events && queue_was_emptied {
+
            if expecting_new_events && queue.is_empty() {
                match self.events_rx.wait_for_notification() {
-
                    Err(_) => {
+
                    Ok(_) => {}
+
                    Err(RecvTimeoutError::Timeout) => {}
+
                    Err(RecvTimeoutError::Disconnected) => {
+
                        logger::queueproc_channel_disconnect();
                        expecting_new_events = false;
                    }
-
                    Ok(_) => queue_was_emptied = false,
                }
+
            } else if handles.len() >= self.concurrent_adapters {
+
                // Avoid a busy loop when as many adapters are running
+
                // as we're allowed to run at once.
+
                sleep(SLEEP_WHEN_BUSY);
            }

-
            if handles.is_empty() && !expecting_new_events && queue_was_emptied {
+
            if handles.is_empty() && !expecting_new_events && queue.is_empty() {
                break;
            }
        }
@@ -225,9 +233,7 @@ impl QueueProcessor {
        })
    }

-
    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
-
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;
-

+
    fn pick_event(&mut self, ids: &mut Vec<QueueId>) -> Result<Option<QueuedCiEvent>, QueueError> {
        let elapsed = self.prev_queue_len.elapsed();
        if elapsed > self.queue_len_interval {
            logger::queueproc_queue_length(ids.len());
@@ -243,12 +249,12 @@ impl QueueProcessor {
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());

        // Remove the repositories for which CI is currently running.
-
        let ids = self.current.list();
+
        let current_repos = self.current.list();
        queue = queue
            .iter()
            .filter(|qe| {
                if let Some(repoid) = qe.event().repository() {
-
                    !ids.contains(repoid)
+
                    !current_repos.contains(repoid)
                } else {
                    true
                }
@@ -257,7 +263,16 @@ impl QueueProcessor {
            .collect();

        if let Some(qe) = queue.first() {
-
            eprintln!("picked event: {qe:?}");
+
            logger::queueproc_picked_event(qe.id(), qe);
+

+
            // Remove picked event from queue.
+
            for i in 0..current_repos.len() {
+
                if ids.get(i) == Some(qe.id()) {
+
                    ids.remove(i);
+
                    break;
+
                }
+
            }
+

            Ok(Some(qe.clone()))
        } else {
            Ok(None)
@@ -268,8 +283,7 @@ impl QueueProcessor {
        logger::queueproc_remove_event(qe);
        self.db
            .remove_queued_ci_event(qe.id())
-
            .map_err(QueueError::db)?;
-
        Ok(())
+
            .map_err(QueueError::db)
    }

    fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
@@ -330,7 +344,7 @@ impl Processor {
    ) -> Result<MaybeShutdown, QueueError> {
        for adapter in adapters.iter() {
            self.run_tx.notify()?;
-
            logger::queueproc_picked_event(qe.id(), &qe, adapter);
+
            logger::queueproc_processing_event(qe.event(), adapter);
            match qe.event() {
                CiEvent::V1(CiEventV1::Shutdown) => {
                    logger::queueproc_action_shutdown();