Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor(src/queueproc.rs): simplify thread structure
Lars Wirzenius committed 6 months ago
commit dd9347b8acdce0022f0d8eb6378de849ba830834
parent 5379b67
2 files changed +180 -267
modified Cargo.toml
@@ -1,7 +1,7 @@
[package]
name = "radicle-ci-broker"
version = "0.21.0"
-
edition = "2021"
+
edition = "2024"
rust-version = "1.85"
authors = ["Lars Wirzenius <liw@liw.fi", "cloudhead <cloudhead@radicle.xyz>"]
description = "add integration to CI engins or systems to a Radicle node"
modified src/queueproc.rs
@@ -4,15 +4,12 @@

use std::{
    collections::HashSet,
-
    sync::{
-
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender},
-
        Arc, Mutex,
-
    },
-
    thread::JoinHandle,
+
    sync::{Arc, Mutex},
+
    thread::spawn,
    time::{Duration, Instant},
};

-
use radicle::{identity::RepoId, Profile};
+
use radicle::{Profile, identity::RepoId};

use crate::{
    adapter::{Adapter, Adapters},
@@ -23,8 +20,7 @@ use crate::{
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
-
    pull_queue::{PullQueue, PullQueueError},
-
    worker::{start_thread, Worker},
+
    worker::Worker,
};

#[derive(Default)]
@@ -53,37 +49,21 @@ impl QueueProcessorBuilder {
        let concurrent_adapters = self
            .concurrent_adapters
            .ok_or(QueueError::Missing("concurrent_adapters"))?;
-
        let (procssed_tx, processed_rx) = sync_channel(1);
-

-
        let picked_events = PullQueue::new();
-

-
        let mut processors = vec![];
-
        for _ in 0..concurrent_adapters {
-
            let broker = Broker::new(broker.db().filename(), broker.max_run_time())
-
                .map_err(QueueError::NewBroker)?;
-
            let event_procssor = EventProcessor::new(
-
                profile.clone(),
-
                filters.clone(),
-
                triggers.clone(),
-
                adapters.clone(),
-
                broker,
-
                run_tx.clone(),
-
                picked_events.clone(),
-
                procssed_tx.clone(),
-
            );
-
            processors.push(start_thread(event_procssor));
-
        }

        Ok(QueueProcessor {
+
            profile,
+
            broker,
+
            filters,
+
            triggers,
+
            adapters,
            db: self.db.ok_or(QueueError::Missing("db"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
            queue_len_interval: self
                .queue_len_interval
                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
            prev_queue_len: Instant::now(),
-
            processors,
-
            processed_rx: Some(processed_rx),
-
            picked_events: Some(picked_events),
+
            concurrent_adapters,
+
            run_tx,
            current: CurrentlyPicked::default(),
        })
    }
@@ -134,93 +114,117 @@ impl QueueProcessorBuilder {
    }
}

-
impl Worker for QueueProcessor {
-
    const NAME: &str = "queue-processor";
-
    type Error = QueueError;
-
    fn work(&mut self) -> Result<(), QueueError> {
-
        logger::queueproc_start(self.processors.len());
-

-
        // Spawn a thread to process results from running adapters.
-
        #[allow(clippy::unwrap_used)]
-
        let adapter_results = start_thread(AdapterResults::new(
-
            self.processed_rx.take().unwrap(),
-
            self.current.clone(),
-
        ));
-

-
        // Pick events from queue, send to worker threads that run
-
        // adapters. Results are processed by thread above.
-
        let result = self.process_until_shutdown();
-

-
        logger::queueproc_end(&result);
-

-
        // Wait for worker threads to terminate. This closes all
-
        // sender ends for results channel.
-
        while let Some(proc) = self.processors.pop() {
-
            let result = proc
-
                .join()
-
                .map_err(|_| QueueError::JoinEventProcessorThread)?;
-
            logger::queueproc_processor_thread_result(result.as_ref().map(|_| ()));
-
        }
-

-
        // Wait for results processing thread to terminate.
-
        adapter_results.join().ok();
-
        Ok(())
-
    }
-
}
-

+
// The queue processor gets events from the event queue in
+
// the database, and processes them concurrently by running
+
// the appropriate adapters. To avoid busy looping, the
+
// `events_rx` channel is used to receive notification that
+
// an event has been added to the database. Processing will
+
// end when the channel is closed and the queue is empty,
+
// or a "shutdown" event is encountered. In case of shutdown,
+
// any currently running adapters will be allowed to finish.
pub struct QueueProcessor {
+
    profile: Profile,
    db: Db,
+
    broker: Broker,
+
    filters: Vec<EventFilter>,
+
    triggers: Vec<Trigger>,
+
    adapters: Adapters,
+
    concurrent_adapters: usize,
    events_rx: NotificationReceiver,
    queue_len_interval: Duration,
    prev_queue_len: Instant,
-
    picked_events: Option<PullQueue<Picked>>,
-
    processors: Vec<JoinHandle<Result<(), QueueError>>>,
-
    processed_rx: Option<Receiver<RepoId>>,
+
    run_tx: NotificationSender,
    current: CurrentlyPicked,
}

impl QueueProcessor {
-
    #[allow(clippy::unwrap_used)]
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
-
        let mut done = false;
-
        let mut picked_events = self.picked_events.take().unwrap();
-
        while !done {
-
            // Process all events currently in the event queue, if a
-
            // filter allows it.
-
            loop {
-
                if let Some(qe) = self.pick_event()? {
-
                    let picked = Picked::new(qe.clone());
-
                    self.current.insert(picked.qe.event().repository());
-
                    picked_events.push(picked).map_err(QueueError::PullQueue)?;
-

-
                    // We always drop the picked event as soon as it has
-
                    // been pushed to a processing thread, so that if
-
                    // anything goes wrong, we're less likely try to
-
                    // process the same event again. It's better for the
-
                    // failure to be logged and let the humans deal with
-
                    // whatever complicated failure situation is
-
                    // happening.
-
                    self.drop_event(&qe)?;
-
                } else if self.current.is_empty() {
-
                    break;
+
        let mut expecting_new_events = true;
+
        let mut handles = vec![];
+

+
        loop {
+
            let mut queue_was_emptied = false;
+

+
            // If we may spawn another adapter, pick an event from the queue
+
            // and run the adapters.
+
            if expecting_new_events && handles.len() < self.concurrent_adapters {
+
                match self.pick_event() {
+
                    Ok(Some(qe)) => {
+
                        // Remove picked event from queue so we doh't re-process it. If we don't
+
                        // do this, and we crash when processing the event, we'll re-process it
+
                        // again and again. It seems better to discard an event, and skip running
+
                        // CI, rather then getting stuck on a specific event.
+
                        self.drop_event(&qe)?;
+

+
                        match self.matching_adapters(qe.event()) {
+
                            Ok(Some(adapters)) => {
+
                                let p = self.processor()?;
+
                                let repoid = qe.event().repository().copied();
+
                                self.current.insert(qe.event().repository());
+
                                let h = spawn(move || p.pick_and_process_one(qe, adapters));
+
                                handles.push((repoid, h));
+
                            }
+
                            Ok(None) => (),
+
                            Err(_) => (),
+
                        }
+
                    }
+
                    Ok(None) => queue_was_emptied = true,
+
                    Err(_) => (),
                }
            }

-
            // Wait for a notification of new events in the queue.
-
            // This prevents the loop from being a busy loop.
-
            match self.events_rx.wait_for_notification() {
-
                Ok(_) => {}
-
                Err(RecvTimeoutError::Timeout) => {}
-
                Err(RecvTimeoutError::Disconnected) => {
-
                    logger::queueproc_channel_disconnect();
-
                    done = true;
+
            // Wait for any threads processing events that have finished. Remove
+
            // them from the list of currently running threads.
+
            let mut h2 = vec![];
+
            for (repoid, h) in handles {
+
                if h.is_finished() {
+
                    if let Some(repoid) = repoid {
+
                        self.current.remove(repoid);
+
                    }
+
                    match h.join() {
+
                        Err(err) => eprintln!("thread join error {err:?}"),
+
                        Ok(Err(_)) => (),
+
                        Ok(Ok(MaybeShutdown::Shutdown)) => {
+
                            expecting_new_events = false;
+
                            queue_was_emptied = true;
+
                        }
+
                        Ok(Ok(MaybeShutdown::Continue)) => (),
+
                    }
+
                } else {
+
                    h2.push((repoid, h));
                }
            }
+
            handles = h2;
+

+
            // If we didn't empty the event queue, but we're still
+
            // expecting new events, wait for a new event. This prevents
+
            // a busy loop.
+
            if expecting_new_events && queue_was_emptied {
+
                match self.events_rx.wait_for_notification() {
+
                    Err(_) => {
+
                        expecting_new_events = false;
+
                    }
+
                    Ok(_) => queue_was_emptied = false,
+
                }
+
            }
+

+
            if handles.is_empty() && !expecting_new_events && queue_was_emptied {
+
                break;
+
            }
        }

        Ok(())
    }

+
    fn processor(&self) -> Result<Processor, QueueError> {
+
        Ok(Processor {
+
            profile: self.profile.clone(),
+
            broker: Broker::new(self.db.filename(), self.broker.max_run_time())
+
                .map_err(QueueError::NewBroker)?,
+
            run_tx: self.run_tx.clone(),
+
        })
+
    }
+

    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;

@@ -238,8 +242,7 @@ impl QueueProcessor {
        }
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());

-
        // If we can access the set of repositories for which CI is
-
        // currently running, remove those repositories from the list.
+
        // Remove the repositories for which CI is currently running.
        let ids = self.current.list();
        queue = queue
            .iter()
@@ -254,6 +257,7 @@ impl QueueProcessor {
            .collect();

        if let Some(qe) = queue.first() {
+
            eprintln!("picked event: {qe:?}");
            Ok(Some(qe.clone()))
        } else {
            Ok(None)
@@ -267,93 +271,6 @@ impl QueueProcessor {
            .map_err(QueueError::db)?;
        Ok(())
    }
-
}
-

-
#[derive(Default, Clone)]
-
struct CurrentlyPicked {
-
    set: Arc<Mutex<HashSet<RepoId>>>,
-
}
-

-
impl CurrentlyPicked {
-
    fn insert(&mut self, repoid: Option<&RepoId>) {
-
        if let Some(repoid) = repoid {
-
            if let Ok(mut set) = self.set.lock() {
-
                set.insert(*repoid);
-
            }
-
        }
-
    }
-

-
    fn remove(&mut self, repoid: RepoId) {
-
        if let Ok(mut set) = self.set.lock() {
-
            set.remove(&repoid);
-
        }
-
    }
-

-
    fn list(&self) -> Vec<RepoId> {
-
        if let Ok(set) = self.set.lock() {
-
            set.iter().copied().collect()
-
        } else {
-
            vec![]
-
        }
-
    }
-

-
    fn is_empty(&self) -> bool {
-
        if let Ok(set) = self.set.lock() {
-
            set.is_empty()
-
        } else {
-
            false
-
        }
-
    }
-
}
-

-
struct EventProcessor {
-
    profile: Profile,
-
    filters: Vec<EventFilter>,
-
    triggers: Vec<Trigger>,
-
    adapters: Adapters,
-
    broker: Broker,
-
    run_tx: NotificationSender,
-
    picked_events: PullQueue<Picked>,
-
    processed_tx: SyncSender<RepoId>,
-
}
-

-
impl EventProcessor {
-
    #[allow(clippy::too_many_arguments)]
-
    fn new(
-
        profile: Profile,
-
        filters: Vec<EventFilter>,
-
        triggers: Vec<Trigger>,
-
        adapters: Adapters,
-
        broker: Broker,
-
        run_tx: NotificationSender,
-
        picked_events: PullQueue<Picked>,
-
        processed_tx: SyncSender<RepoId>,
-
    ) -> Self {
-
        Self {
-
            profile,
-
            filters,
-
            triggers,
-
            adapters,
-
            broker,
-
            run_tx,
-
            picked_events,
-
            processed_tx,
-
        }
-
    }
-

-
    fn process_picked_event(&self, picked: Picked) {
-
        let res = self.matching_adapters(picked.qe.event());
-
        logger::queueproc_worker_thread_result(res.as_ref().map(|_| ()));
-
        if let Ok(Some(adapters)) = res {
-
            for adapter in adapters {
-
                self.run_adapter(&picked.qe, &adapter).ok();
-
            }
-
        }
-

-
        if let Some(repoid) = picked.qe.event().repository() {
-
            self.processed_tx.send(*repoid).ok();
-
        }
-
    }

    fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
        let mut adapters = vec![];
@@ -383,108 +300,107 @@ impl EventProcessor {
            Ok(Some(adapters))
        }
    }
-

-
    fn run_adapter(&self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
-
        let mut done = false;
-

-
        self.run_tx.notify()?;
-
        logger::queueproc_picked_event(qe.id(), qe, adapter);
-
        let res = self.process_event(qe.event(), adapter);
-
        logger::queueproc_processed_event(&res);
-
        match res {
-
            Ok(shut_down) => done = shut_down == MaybeShutdown::Shutdown,
-
            Err(QueueError::BuildTrigger(_, _)) => done = false,
-
            Err(err) => Err(err)?,
-
        }
-
        self.run_tx.notify()?;
-
        Ok(done)
-
    }
-

-
    fn process_event(
-
        &self,
-
        event: &CiEvent,
-
        adapter: &Adapter,
-
    ) -> Result<MaybeShutdown, QueueError> {
-
        if matches!(event, CiEvent::V1(CiEventV1::Shutdown)) {
-
            logger::queueproc_action_shutdown();
-
            Ok(MaybeShutdown::Shutdown)
-
        } else {
-
            logger::queueproc_action_run(event);
-

-
            let trigger = RequestBuilder::default()
-
                .profile(&self.profile)
-
                .ci_event(event)
-
                .build_trigger_from_ci_event()
-
                .map_err(|e| QueueError::build_trigger(event, e));
-
            logger::queueproc_trigger(&trigger);
-
            let trigger = trigger?;
-

-
            self.broker
-
                .execute_ci(adapter, &trigger, &self.run_tx)
-
                .map_err(QueueError::execute_ci)?;
-

-
            Ok(MaybeShutdown::Continue)
-
        }
-
    }
}

-
impl Worker for EventProcessor {
-
    const NAME: &str = "event-processor";
+
impl Worker for QueueProcessor {
+
    const NAME: &str = "queue-processor";
    type Error = QueueError;
    fn work(&mut self) -> Result<(), QueueError> {
-
        while let Ok(Some(picked)) = self.picked_events.pop() {
-
            self.process_picked_event(picked);
-
        }
+
        // Pick events from queue, send to worker threads that run
+
        // adapters. Results are processed by thread above.
+
        let result = self.process_until_shutdown();
+

+
        logger::queueproc_end(&result);
+

        Ok(())
    }
}

-
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
-
pub enum MaybeShutdown {
-
    Shutdown,
-
    Continue,
+
struct Processor {
+
    profile: Profile,
+
    broker: Broker,
+
    run_tx: NotificationSender,
}

-
struct AdapterResults {
-
    rx: Receiver<RepoId>,
-
    current: CurrentlyPicked,
-
}
+
impl Processor {
+
    fn pick_and_process_one(
+
        &self,
+
        qe: QueuedCiEvent,
+
        adapters: Vec<Adapter>,
+
    ) -> Result<MaybeShutdown, QueueError> {
+
        for adapter in adapters.iter() {
+
            self.run_tx.notify()?;
+
            logger::queueproc_picked_event(qe.id(), &qe, adapter);
+
            match qe.event() {
+
                CiEvent::V1(CiEventV1::Shutdown) => {
+
                    logger::queueproc_action_shutdown();
+
                    return Ok(MaybeShutdown::Shutdown);
+
                }
+
                _ => {
+
                    logger::queueproc_action_run(qe.event());
+

+
                    let trigger = RequestBuilder::default()
+
                        .profile(&self.profile)
+
                        .ci_event(qe.event())
+
                        .build_trigger_from_ci_event()
+
                        .map_err(|e| QueueError::build_trigger(qe.event(), e));
+
                    logger::queueproc_trigger(&trigger);
+
                    let trigger = trigger?;
+

+
                    self.broker
+
                        .execute_ci(adapter, &trigger, &self.run_tx)
+
                        .map_err(QueueError::execute_ci)?;
+
                }
+
            }
+
        }

-
impl AdapterResults {
-
    fn new(rx: Receiver<RepoId>, current: CurrentlyPicked) -> Self {
-
        Self { rx, current }
+
        Ok(MaybeShutdown::Continue)
    }
}

-
impl Worker for AdapterResults {
-
    const NAME: &str = "adapter-result-processor";
-
    type Error = QueueError;
-
    fn work(&mut self) -> Result<(), QueueError> {
-
        logger::queueproc_start_result_receiver();
-
        while let Ok(repoid) = self.rx.recv() {
-
            self.current.remove(repoid);
-
            logger::queueproc_end_result_receiver();
+
#[derive(Default, Clone)]
+
struct CurrentlyPicked {
+
    set: Arc<Mutex<HashSet<RepoId>>>,
+
}
+

+
impl CurrentlyPicked {
+
    fn insert(&mut self, repoid: Option<&RepoId>) {
+
        if let Some(repoid) = repoid {
+
            if let Ok(mut set) = self.set.lock() {
+
                set.insert(*repoid);
+
            }
        }
-
        Ok(())
    }
-
}

-
#[derive(Debug, Clone)]
-
struct Picked {
-
    qe: QueuedCiEvent,
-
}
+
    fn remove(&mut self, repoid: RepoId) {
+
        if let Ok(mut set) = self.set.lock() {
+
            set.remove(&repoid);
+
        }
+
    }

-
impl Picked {
-
    fn new(qe: QueuedCiEvent) -> Self {
-
        Self { qe }
+
    fn list(&self) -> Vec<RepoId> {
+
        if let Ok(set) = self.set.lock() {
+
            set.iter().copied().collect()
+
        } else {
+
            vec![]
+
        }
    }
}

+
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+
pub enum MaybeShutdown {
+
    Shutdown,
+
    Continue,
+
}
+

#[derive(Debug, thiserror::Error)]
pub enum QueueError {
    #[error("failed to load node profile")]
    Profile(#[source] radicle::profile::Error),

+
    #[error("failed to open database")]
+
    OpenDb(#[source] crate::db::DbError),
+

    #[error("programming error: QueueProcessorBuilder field {0} was not set")]
    Missing(&'static str),

@@ -529,9 +445,6 @@ pub enum QueueError {

    #[error("failed to create a new broker instance")]
    NewBroker(#[source] BrokerError),
-

-
    #[error("failure when using a pull queue")]
-
    PullQueue(#[source] PullQueueError),
}

impl QueueError {