Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix cib logic for when to process next and stop processing queue
Merged liw opened 6 months ago
5 files changed +92 -35 6bcf8140 7b865840
modified Makefile
@@ -8,6 +8,7 @@ check:

build:
	cargo build --all-targets
+
	subplot docgen ci-broker.subplot -o ci-broker.html

fast-test: build
	timeout 60 cargo test -- --skip upgrades $(TESTS)
modified ci-broker.md
@@ -575,6 +575,25 @@ then stdout contains ""id": "xyzzy""
~~~


+
## Processes empty event queue successfully
+

+
_Want:_ CI broker does nothing, but successfully, if asked to
+
process an empty event queue.
+

+
_Why:_This is an important corner case for `cib queued`.
+

+
_Who:_ `cib-devs`
+

+
~~~scenario
+
given a Radicle node, with CI configured with broker-with-triggers.yaml and adapter dummy.sh
+

+
when I run ./env.sh cib --config broker-with-triggers.yaml queued
+

+
when I run cibtool --db ci-broker.db run list
+
then stdout is exactly ""
+
~~~
+

+

## Handles adapter failing on a successful run

_Want:_ If the adapter fails, the CI broker creates a job COB and
modified src/db.rs
@@ -487,7 +487,7 @@ impl<'a> Stmt<'a> {
}

/// An identifier for an event in the event queue in the database.
-
#[derive(Clone, Debug, Serialize, Deserialize)]
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct QueueId {
    id: String,
}
modified src/logger.rs
@@ -4,8 +4,8 @@ use std::{path::Path, process::ExitStatus, time::Duration};

use clap::ValueEnum;
use serde_json::Value;
-
use tracing::{Level, debug, error, info, trace, warn};
-
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
+
use tracing::{debug, error, info, trace, warn, Level};
+
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use uuid::Uuid;

use radicle::{git::Oid, identity::RepoId, node::Event, patch::PatchId};
@@ -165,12 +165,14 @@ enum Id {
    QueueProcDisconnected,
    QueueProcEnd,
    QueueProcFilterDecision,
+
    QueueProcFinishedRun,
    QueueProcPickedEvent,
    QueueProcProcessingEvent,
    QueueProcProcessedEvent,
    QueueProcQueueLength,
    QueueProcRemoveEvent,
    QueueProcStart,
+
    QueueProcThreadJoin,
    QueueProcTrigger,
    QueueProcProcessorResult,
    QueueProcWorkerResult,
@@ -282,6 +284,10 @@ pub fn open(level: LogLevel) {
        .init();
}

+
pub fn info(msg: String) {
+
    info!(msg)
+
}
+

pub fn start_cib() {
    info!(
        msg_id = ?Id::CibStart,
@@ -489,22 +495,22 @@ pub fn queueproc_predicate_decision(event: &CiEvent, filter: &EventFilter, allow
    );
}

-
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent, adapter: &Adapter) {
+
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent) {
    info!(
        msg_id = ?Id::QueueProcPickedEvent,
        kind = %Kind::GotEvent,
        ?id,
        ?event,
-
        ?adapter,
-
        "running adapter on event picked from queue"
+
        "picked an event from event queue"
    );
}

-
pub fn queueproc_processing_event(event: &CiEvent) {
+
pub fn queueproc_processing_event(event: &CiEvent, adapter: &Adapter) {
    info!(
        msg_id = ?Id::QueueProcProcessingEvent,
        kind = %Kind::GotEvent,
        ?event,
+
        ?adapter,
        "processing event"
    );
}
@@ -527,6 +533,14 @@ pub fn queueproc_processor_thread_result(result: Result<(), &QueueError>) {
    );
}

+
pub fn queueproc_thread_join() {
+
    info!(
+
        msg_id = ?Id::QueueProcThreadJoin,
+
        kind = %Kind::FinishRun,
+
        "joining thread failed"
+
    );
+
}
+

pub fn queueproc_trigger(result: &Result<Request, QueueError>) {
    info!(
        msg_id = ?Id::QueueProcTrigger,
@@ -554,6 +568,15 @@ pub fn queueproc_remove_event(event: &QueuedCiEvent) {
    );
}

+
pub fn queueproc_finished_run(repoid: &Option<RepoId>) {
+
    info!(
+
        msg_id = ?Id::QueueProcFinishedRun,
+
        kind = %Kind::FinishRun,
+
        ?repoid,
+
        "finished run"
+
    );
+
}
+

pub fn queueproc_action_run(event: &CiEvent) {
    info!(
        msg_id = ?Id::QueueProcActionRun,
@@ -749,7 +772,7 @@ pub fn adapter_stdout_line(line: &str) {
        msg_id = ?Id::AdapterStdoutLine,
        kind = %Kind::AdapterMessage,
        ?line,
-
        "Action: shutdown"
+
        "adapter stdout line"
    );
}

modified src/queueproc.rs
@@ -4,18 +4,18 @@

use std::{
    collections::HashSet,
-
    sync::{Arc, Mutex},
-
    thread::spawn,
+
    sync::{mpsc::RecvTimeoutError, Arc, Mutex},
+
    thread::{sleep, spawn},
    time::{Duration, Instant},
};

-
use radicle::{Profile, identity::RepoId};
+
use radicle::{identity::RepoId, Profile};

use crate::{
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
-
    db::{Db, DbError, QueuedCiEvent},
+
    db::{Db, DbError, QueueId, QueuedCiEvent},
    filter::{EventFilter, Trigger},
    logger,
    msg::{MessageError, RequestBuilder},
@@ -23,6 +23,8 @@ use crate::{
    worker::Worker,
};

+
const SLEEP_WHEN_BUSY: Duration = Duration::from_secs(1);
+

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
@@ -143,14 +145,14 @@ impl QueueProcessor {
        let mut handles = vec![];

        loop {
-
            let mut queue_was_emptied = false;
+
            let mut queue = self.db.queued_ci_events().map_err(QueueError::db)?;

            // If we may spawn another adapter, pick an event from the queue
            // and run the adapters.
-
            if expecting_new_events && handles.len() < self.concurrent_adapters {
-
                match self.pick_event() {
+
            if handles.len() < self.concurrent_adapters {
+
                match self.pick_event(&mut queue) {
                    Ok(Some(qe)) => {
-
                        // Remove picked event from queue so we doh't re-process it. If we don't
+
                        // Remove picked event from queue so we don't re-process it. If we don't
                        // do this, and we crash when processing the event, we'll re-process it
                        // again and again. It seems better to discard an event, and skip running
                        // CI, rather then getting stuck on a specific event.
@@ -164,12 +166,12 @@ impl QueueProcessor {
                                let h = spawn(move || p.pick_and_process_one(qe, adapters));
                                handles.push((repoid, h));
                            }
-
                            Ok(None) => (),
-
                            Err(_) => (),
+
                            Ok(None) => {}
+
                            Err(_) => {}
                        }
                    }
-
                    Ok(None) => queue_was_emptied = true,
-
                    Err(_) => (),
+
                    Ok(None) => {}
+
                    Err(_) => {}
                }
            }

@@ -178,15 +180,15 @@ impl QueueProcessor {
            let mut h2 = vec![];
            for (repoid, h) in handles {
                if h.is_finished() {
+
                    logger::queueproc_finished_run(&repoid);
                    if let Some(repoid) = repoid {
                        self.current.remove(repoid);
                    }
                    match h.join() {
-
                        Err(err) => eprintln!("thread join error {err:?}"),
+
                        Err(_) => logger::queueproc_thread_join(),
                        Ok(Err(_)) => (),
                        Ok(Ok(MaybeShutdown::Shutdown)) => {
                            expecting_new_events = false;
-
                            queue_was_emptied = true;
                        }
                        Ok(Ok(MaybeShutdown::Continue)) => (),
                    }
@@ -199,16 +201,22 @@ impl QueueProcessor {
            // If we didn't empty the event queue, but we're still
            // expecting new events, wait for a new event. This prevents
            // a busy loop.
-
            if expecting_new_events && queue_was_emptied {
+
            if expecting_new_events && queue.is_empty() {
                match self.events_rx.wait_for_notification() {
-
                    Err(_) => {
+
                    Ok(_) => {}
+
                    Err(RecvTimeoutError::Timeout) => {}
+
                    Err(RecvTimeoutError::Disconnected) => {
+
                        logger::queueproc_channel_disconnect();
                        expecting_new_events = false;
                    }
-
                    Ok(_) => queue_was_emptied = false,
                }
+
            } else if handles.len() >= self.concurrent_adapters {
+
                // Avoid a busy loop when as many adapters are running
+
                // as we're allowed to run at once.
+
                sleep(SLEEP_WHEN_BUSY);
            }

-
            if handles.is_empty() && !expecting_new_events && queue_was_emptied {
+
            if handles.is_empty() && !expecting_new_events && queue.is_empty() {
                break;
            }
        }
@@ -225,9 +233,7 @@ impl QueueProcessor {
        })
    }

-
    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
-
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;
-

+
    fn pick_event(&mut self, ids: &mut Vec<QueueId>) -> Result<Option<QueuedCiEvent>, QueueError> {
        let elapsed = self.prev_queue_len.elapsed();
        if elapsed > self.queue_len_interval {
            logger::queueproc_queue_length(ids.len());
@@ -243,12 +249,12 @@ impl QueueProcessor {
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());

        // Remove the repositories for which CI is currently running.
-
        let ids = self.current.list();
+
        let current_repos = self.current.list();
        queue = queue
            .iter()
            .filter(|qe| {
                if let Some(repoid) = qe.event().repository() {
-
                    !ids.contains(repoid)
+
                    !current_repos.contains(repoid)
                } else {
                    true
                }
@@ -257,7 +263,16 @@ impl QueueProcessor {
            .collect();

        if let Some(qe) = queue.first() {
-
            eprintln!("picked event: {qe:?}");
+
            logger::queueproc_picked_event(qe.id(), qe);
+

+
            // Remove picked event from queue.
+
            for i in 0..current_repos.len() {
+
                if ids.get(i) == Some(qe.id()) {
+
                    ids.remove(i);
+
                    break;
+
                }
+
            }
+

            Ok(Some(qe.clone()))
        } else {
            Ok(None)
@@ -268,8 +283,7 @@ impl QueueProcessor {
        logger::queueproc_remove_event(qe);
        self.db
            .remove_queued_ci_event(qe.id())
-
            .map_err(QueueError::db)?;
-
        Ok(())
+
            .map_err(QueueError::db)
    }

    fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
@@ -330,7 +344,7 @@ impl Processor {
    ) -> Result<MaybeShutdown, QueueError> {
        for adapter in adapters.iter() {
            self.run_tx.notify()?;
-
            logger::queueproc_picked_event(qe.id(), &qe, adapter);
+
            logger::queueproc_processing_event(qe.event(), adapter);
            match qe.event() {
                CiEvent::V1(CiEventV1::Shutdown) => {
                    logger::queueproc_action_shutdown();