Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: implement concurrently running adapters
Lars Wirzenius committed 1 year ago
commit 0cf160d3d1aee6e7942c61ca42d89322e77421b1
parent c16e9b0af76db10569a2bb04eaf17cc9b5ed7360
2 files changed +370 -258
modified src/adapter.rs
@@ -10,8 +10,9 @@
use std::{
    collections::HashMap,
    ffi::OsStr,
+
    os::unix::process::ExitStatusExt,
    path::{Path, PathBuf},
-
    process::Command,
+
    process::{Command, ExitStatus},
    time::Duration,
};

@@ -25,11 +26,9 @@ use crate::{
    notif::NotificationSender,
    run::{Run, RunState},
    sensitive::Sensitive,
-
    timeoutcmd::{LineReceiver, TimeoutCommand, TimeoutError},
+
    timeoutcmd::{RealtimeLines, TimeoutCommand, TimeoutError},
};

-
const NOT_EXITED: i32 = 999;
-

/// The set of all configured adapters.
#[derive(Clone)]
pub struct Adapters {
@@ -137,41 +136,54 @@ impl Adapter {
        let mut cmd = Command::new(&self.bin);
        cmd.envs(self.envs());
        let mut child = TimeoutCommand::new(max_run_time);
+
        let stdout = child.stdout();
        child.feed_stdin(trigger.to_string().as_bytes());
-
        let child = child.spawn(cmd).map_err(|err| match err {
-
            TimeoutError::Spawn(_, err) => AdapterError::SpawnAdapter(self.bin.clone(), err),
-
            _ => AdapterError::TimeoutCommand(err),
-
        })?;
+
        let child = match child.spawn(cmd) {
+
            Ok(child) => child,
+
            Err(TimeoutError::Spawn(_, err)) => {
+
                Err(AdapterError::SpawnAdapter(self.bin.clone(), err))?
+
            }
+
            Err(err) => Err(AdapterError::TimeoutCommand(err))?,
+
        };

        run_notification.notify()?;

-
        let stdout = child.stdout();
-
        let stderr = child.stderr();
-

        let mut outcome = MaybeResult::default();

        if let Err(err) = self.read_stdout(run, db, run_notification, stdout) {
            outcome.set_error(err);
        }

-
        self.read_stderr(stderr);
-

-
        if outcome.has_error() {
-
            child.kill().ok();
-
        }
-

-
        let wait_result = child.wait().expect("FIXME");
-
        if wait_result.timed_out() {
-
            logger::adapter_did_not_exit_voluntarily();
-
            outcome.set_error(AdapterError::Failed(NOT_EXITED));
-
        } else if let Some(exit) = wait_result.status().code() {
-
            logger::adapter_result(exit);
-
            if exit != 0 {
-
                outcome.set_error(AdapterError::Failed(exit));
-
            }
+
        let result = if outcome.has_error() {
+
            child.kill()
        } else {
-
            logger::adapter_did_not_exit();
-
            outcome.set_error(AdapterError::Signal);
+
            child.wait()
+
        };
+

+
        match result {
+
            Ok(finished) => {
+
                let stderr = finished.stderr();
+
                self.log_stderr(stderr);
+

+
                let exit = finished.exit_code();
+
                logger::adapter_result(exit);
+
                if !exit.success() {
+
                    if let Some(signal) = exit.signal() {
+
                        outcome.set_error(AdapterError::Signal(signal));
+
                    } else {
+
                        outcome.set_error(AdapterError::Failed(exit));
+
                    }
+
                }
+
            }
+
            Err(TimeoutError::TimedOut) => {
+
                logger::adapter_did_not_exit_voluntarily();
+
                outcome.set_error(AdapterError::FailedNotExited);
+
            }
+
            Err(_err) => {
+
                // FIXME this log message doesn't make sense any more; should log error
+
                logger::adapter_did_not_exit();
+
                outcome.set_error(AdapterError::Signal(9));
+
            }
        }

        if let Some(err) = outcome.error() {
@@ -186,7 +198,7 @@ impl Adapter {
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
-
        stdout: &LineReceiver,
+
        mut stdout: RealtimeLines,
    ) -> Result<(), AdapterError> {
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
@@ -235,9 +247,9 @@ impl Adapter {
        Ok(())
    }

-
    fn read_stderr(&self, stderr: &LineReceiver) {
-
        while let Some(line) = stderr.line() {
-
            logger::adapter_stderr_line(&line);
+
    fn log_stderr(&self, stderr: &[u8]) {
+
        for line in String::from_utf8_lossy(stderr).lines() {
+
            logger::adapter_stderr_line(line);
        }
    }
}
@@ -309,12 +321,16 @@ pub enum AdapterError {
    Wait(#[source] std::io::Error),

    /// Child process failed.
-
    #[error("child process failed with wait status {0}")]
-
    Failed(i32),
+
    #[error("child process failed with wait status {0:?}")]
+
    Failed(ExitStatus),
+

+
    /// Child process failed: didn't exit.
+
    #[error("child process failed without exiting")]
+
    FailedNotExited,

    /// Child process was killed.
-
    #[error("child process terminated by signal")]
-
    Signal,
+
    #[error("child process terminated by signal {0}")]
+
    Signal(i32),

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
@@ -544,8 +560,12 @@ kill -9 $$
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
-
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::Signal)));
+
        eprintln!("Adapter::run result: {x:#?}");
+
        if let Err(AdapterError::Failed(x)) = x {
+
            use std::os::unix::process::ExitStatusExt;
+
            eprintln!("Adapter::run result: signal={:?}", x.signal());
+
        }
+
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }
@@ -704,7 +724,7 @@ echo '{"response":"finished","result":"success"}'
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
-
        eprintln!("{x:#?}");
+
        eprintln!("result from run: {x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
modified src/queueproc.rs
@@ -3,22 +3,27 @@
#![allow(clippy::result_large_err)]

use std::{
-
    sync::mpsc::RecvTimeoutError,
+
    collections::HashSet,
+
    sync::{
+
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender},
+
        Arc, Mutex,
+
    },
    thread::{spawn, JoinHandle},
    time::{Duration, Instant},
};

-
use radicle::Profile;
+
use radicle::{identity::RepoId, Profile};

use crate::{
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
-
    db::{Db, DbError, QueueId, QueuedCiEvent},
+
    db::{Db, DbError, QueuedCiEvent},
    filter::{EventFilter, Trigger},
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
+
    pull_queue::{PullQueue, PullQueueError},
};

#[derive(Default)]
@@ -31,25 +36,56 @@ pub struct QueueProcessorBuilder {
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
    queue_len_interval: Option<Duration>,
+
    concurrent_adapters: Option<usize>,
}

const DEFAULT_QUEUE_LEN_DURATION: Duration = Duration::from_secs(10);

impl QueueProcessorBuilder {
    pub fn build(self) -> Result<QueueProcessor, QueueError> {
+
        let profile = Profile::load().map_err(QueueError::Profile)?;
+
        let broker = self.broker.ok_or(QueueError::Missing("broker"))?;
+
        let filters = self.filters.ok_or(QueueError::Missing("filters"))?;
+
        let triggers = self.triggers.ok_or(QueueError::Missing("triggers"))?;
+
        let adapters = self.adapters.ok_or(QueueError::Missing("adapters"))?;
+
        let run_tx = self.run_tx.ok_or(QueueError::Missing("run_tx"))?;
+
        let concurrent_adapters = self
+
            .concurrent_adapters
+
            .ok_or(QueueError::Missing("concurrent_adapters"))?;
+
        let (procssed_tx, processed_rx) = sync_channel(1);
+

+
        let picked_events = PullQueue::new();
+

+
        let mut descs = vec![];
+
        for _ in 0..concurrent_adapters {
+
            let broker = Broker::new(broker.db.filename(), broker.max_run_time)
+
                .map_err(QueueError::NewBroker)?;
+
            let event_procssor = EventProcessor::new(
+
                profile.clone(),
+
                filters.clone(),
+
                triggers.clone(),
+
                adapters.clone(),
+
                broker,
+
                run_tx.clone(),
+
                procssed_tx.clone(),
+
            );
+
            descs.push(ProcessorDescription::new(
+
                event_procssor,
+
                picked_events.clone(),
+
            ));
+
        }
+

        Ok(QueueProcessor {
            db: self.db.ok_or(QueueError::Missing("db"))?,
-
            profile: Profile::load().map_err(QueueError::Profile)?,
-
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
-
            filters: self.filters.ok_or(QueueError::Missing("filters"))?,
-
            triggers: self.triggers.ok_or(QueueError::Missing("triggers"))?,
-
            adapters: self.adapters.ok_or(QueueError::Missing("adapters"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
-
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
            queue_len_interval: self
                .queue_len_interval
                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
            prev_queue_len: Instant::now(),
+
            processors: descs,
+
            processed_rx: Some(processed_rx),
+
            picked_events,
+
            current: CurrentlyPicked::default(),
        })
    }

@@ -73,6 +109,11 @@ impl QueueProcessorBuilder {
        self
    }

+
    pub fn concurrent_adapters(mut self, n: usize) -> Self {
+
        self.concurrent_adapters = Some(n);
+
        self
+
    }
+

    pub fn broker(mut self, broker: Broker) -> Self {
        self.broker = Some(broker);
        self
@@ -96,65 +137,81 @@ impl QueueProcessorBuilder {

pub struct QueueProcessor {
    db: Db,
-
    profile: Profile,
-
    filters: Vec<EventFilter>,
-
    triggers: Vec<Trigger>,
-
    broker: Broker,
-
    adapters: Adapters,
    events_rx: NotificationReceiver,
-
    run_tx: NotificationSender,
    queue_len_interval: Duration,
    prev_queue_len: Instant,
+
    picked_events: PullQueue<Picked>,
+
    processors: Vec<ProcessorDescription>,
+
    processed_rx: Option<Receiver<RepoId>>,
+
    current: CurrentlyPicked,
}

impl QueueProcessor {
    pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
        spawn(move || {
-
            logger::queueproc_start();
+
            logger::queueproc_start(self.processors.len());
+

+
            // Spawn a thread to process results from running adapters.
+
            #[allow(clippy::unwrap_used)]
+
            let rx = self.processed_rx.take().unwrap();
+
            let mut current = self.current.clone();
+
            let results = spawn(move || {
+
                logger::queueproc_start_result_receiver();
+
                while let Ok(repoid) = rx.recv() {
+
                    current.remove(repoid);
+
                    logger::queueproc_end_result_receiver();
+
                }
+
            });
+

+
            // Pick events from queue, send to worker threads that run
+
            // adapters. Results are processed by thread above.
            let result = self.process_until_shutdown();
+

            logger::queueproc_end(&result);
-
            result
+
            self.picked_events.end().map_err(QueueError::PullQueue)?;
+

+
            // Wait for worker threads to terminate. This closes all
+
            // sender ends for results channel.
+
            for proc in self.processors {
+
                proc.join().ok();
+
            }
+

+
            // Wait for results processing thread to terminate.
+
            results.join().ok();
+

+
            Ok(())
        })
    }

+
    #[allow(clippy::unwrap_used)]
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
        let mut done = false;
        while !done {
-
            let mut first_error = None;
-
            while let Some(qe) = self.pick_event()? {
-
                // We always drop the picked event at once so that if
-
                // anything goes wrong, we don't try to process the
-
                // same event again. It's better for the failure to be
-
                // logged and let the humans deal with whatever
-
                // complicated failure situation is happening.
-
                self.drop_event(qe.id())?;
-

-
                match self.pick_adapters(qe.event()) {
-
                    Err(err) => {
-
                        if first_error.is_none() {
-
                            first_error = Some(err);
-
                        }
-
                    }
-
                    Ok(None) => (), // We already removed event from queue.
-
                    Ok(Some(adapters)) => {
-
                        for adapter in adapters {
-
                            match self.run_adapter(&qe, &adapter) {
-
                                Err(err) => {
-
                                    if first_error.is_none() {
-
                                        first_error = Some(err)
-
                                    }
-
                                }
-
                                Ok(finished) => done = finished,
-
                            }
-
                        }
-
                    }
+
            // Process all events currently in the event queue, if a
+
            // filter allows it.
+
            loop {
+
                if let Some(qe) = self.pick_event()? {
+
                    let picked = Picked::new(qe.clone());
+
                    self.current.insert(picked.qe.event().repository());
+
                    self.picked_events
+
                        .push(picked)
+
                        .map_err(QueueError::PullQueue)?;
+

+
                    // We always drop the picked event as soon as it has
+
                    // been pushed to a processing thread, so that if
+
                    // anything goes wrong, we're less likely try to
+
                    // process the same event again. It's better for the
+
                    // failure to be logged and let the humans deal with
+
                    // whatever complicated failure situation is
+
                    // happening.
+
                    self.drop_event(&qe)?;
+
                } else if self.current.is_empty() {
+
                    break;
                }
            }

-
            if let Some(err) = first_error {
-
                return Err(err);
-
            }
-

+
            // Wait for a notification of new events in the queue.
+
            // This prevents the loop from being a busy loop.
            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
                Err(RecvTimeoutError::Timeout) => {}
@@ -168,22 +225,6 @@ impl QueueProcessor {
        Ok(())
    }

-
    fn run_adapter(&mut self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
-
        let mut done = false;
-

-
        self.run_tx.notify()?;
-
        logger::queueproc_picked_event(qe.id(), qe, adapter);
-
        let res = self.process_event(qe.event(), adapter);
-
        logger::queueproc_processed_event(&res);
-
        match res {
-
            Ok(shut_down) => done = shut_down,
-
            Err(QueueError::BuildTrigger(_, _)) => done = false,
-
            Err(err) => Err(err)?,
-
        }
-
        self.run_tx.notify()?;
-
        Ok(done)
-
    }
-

    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;

@@ -201,6 +242,21 @@ impl QueueProcessor {
        }
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());

+
        // If we can access the set of repositories for which CI is
+
        // currently running, remove those repositories from the list.
+
        let ids = self.current.list();
+
        queue = queue
+
            .iter()
+
            .filter(|qe| {
+
                if let Some(repoid) = qe.event().repository() {
+
                    !ids.contains(repoid)
+
                } else {
+
                    true
+
                }
+
            })
+
            .cloned()
+
            .collect();
+

        if let Some(qe) = queue.first() {
            Ok(Some(qe.clone()))
        } else {
@@ -208,7 +264,127 @@ impl QueueProcessor {
        }
    }

-
    fn pick_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
+
    fn drop_event(&mut self, qe: &QueuedCiEvent) -> Result<(), QueueError> {
+
        logger::queueproc_remove_event(qe);
+
        self.db
+
            .remove_queued_ci_event(qe.id())
+
            .map_err(QueueError::db)?;
+
        Ok(())
+
    }
+
}
+

+
#[derive(Default, Clone)]
+
struct CurrentlyPicked {
+
    set: Arc<Mutex<HashSet<RepoId>>>,
+
}
+

+
impl CurrentlyPicked {
+
    fn insert(&mut self, repoid: Option<&RepoId>) {
+
        if let Some(repoid) = repoid {
+
            if let Ok(mut set) = self.set.lock() {
+
                set.insert(*repoid);
+
            }
+
        }
+
    }
+

+
    fn remove(&mut self, repoid: RepoId) {
+
        if let Ok(mut set) = self.set.lock() {
+
            set.remove(&repoid);
+
        }
+
    }
+

+
    fn list(&self) -> Vec<RepoId> {
+
        if let Ok(set) = self.set.lock() {
+
            set.iter().copied().collect()
+
        } else {
+
            vec![]
+
        }
+
    }
+

+
    fn is_empty(&self) -> bool {
+
        if let Ok(set) = self.set.lock() {
+
            set.is_empty()
+
        } else {
+
            false
+
        }
+
    }
+
}
+

+
struct ProcessorDescription {
+
    processing_thread: JoinHandle<Result<bool, QueueError>>,
+
}
+

+
impl ProcessorDescription {
+
    fn new(processor: EventProcessor, mut picked_events: PullQueue<Picked>) -> Self {
+
        Self {
+
            processing_thread: spawn(move || {
+
                while let Ok(Some(picked)) = picked_events.pop() {
+
                    processor.process_picked_event(picked);
+
                }
+
                Ok(false)
+
            }),
+
        }
+
    }
+

+
    fn join(self) -> Result<bool, QueueError> {
+
        self.processing_thread
+
            .join()
+
            .map_err(|_| QueueError::JoinAdapterThread)?
+
    }
+
}
+

+
struct EventProcessor {
+
    profile: Profile,
+
    filters: Vec<EventFilter>,
+
    triggers: Vec<Trigger>,
+
    adapters: Adapters,
+
    broker: Broker,
+
    run_tx: NotificationSender,
+
    processed_tx: SyncSender<RepoId>,
+
}
+

+
impl EventProcessor {
+
    #[allow(clippy::too_many_arguments)]
+
    fn new(
+
        profile: Profile,
+
        filters: Vec<EventFilter>,
+
        triggers: Vec<Trigger>,
+
        adapters: Adapters,
+
        broker: Broker,
+
        run_tx: NotificationSender,
+
        processed_tx: SyncSender<RepoId>,
+
    ) -> Self {
+
        Self {
+
            profile,
+
            filters,
+
            triggers,
+
            adapters,
+
            broker,
+
            run_tx,
+
            processed_tx,
+
        }
+
    }
+

+
    fn process_picked_event(&self, picked: Picked) {
+
        match self.matching_adapters(picked.qe.event()) {
+
            Err(_) => (),
+
            Ok(None) => (), // We already removed event from queue.
+
            Ok(Some(adapters)) => {
+
                for adapter in adapters {
+
                    self.run_adapter(&picked.qe, &adapter).ok();
+
                }
+
            }
+
        }
+

+
        match picked.qe.event().repository() {
+
            Some(repoid) => {
+
                self.processed_tx.send(*repoid).ok();
+
            }
+
            None => unimplemented!(),
+
        }
+
    }
+

+
    fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
        let mut adapters = vec![];

        if self.filters.iter().any(|filter| filter.allows(e)) {
@@ -237,162 +413,54 @@ impl QueueProcessor {
        }
    }

-
    fn process_event(&mut self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
-
        logger::queueproc_processing_event(event);
-
        match event {
-
            CiEvent::V1(CiEventV1::Shutdown) => {
-
                logger::queueproc_action_shutdown();
-
                Ok(true)
-
            }
-
            CiEvent::V1(CiEventV1::BranchCreated {
-
                from_node: _,
-
                repo,
-
                branch: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "branch created");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::BranchUpdated {
-
                from_node: _,
-
                repo,
-
                branch: _,
-
                tip,
-
                old_tip: _,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "branch updated");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::BranchDeleted {
-
                from_node: _,
-
                repo,
-
                branch: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "branch deleted");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::TagCreated {
-
                from_node: _,
-
                repo,
-
                tag: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "tag created");
-
                let result = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e));
-
                logger::queueproc_trigger(&result);
-
                let trigger = result?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::TagUpdated {
-
                from_node: _,
-
                repo,
-
                tag: _,
-
                tip,
-
                old_tip: _,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "tag updated");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::TagDeleted {
-
                from_node: _,
-
                repo,
-
                tag: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "tag deleted");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::PatchCreated {
-
                from_node: _,
-
                repo,
-
                patch: _,
-
                new_tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, new_tip, "patch created");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::PatchUpdated {
-
                from_node: _,
-
                repo,
-
                patch: _,
-
                new_tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, new_tip, "patch updated");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e));
-
                logger::queueproc_trigger(&trigger);
-
                let trigger = trigger?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
+
    fn run_adapter(&self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
        let mut done = false;
+

+
        self.run_tx.notify()?;
+
        logger::queueproc_picked_event(qe.id(), qe, adapter);
+
        let res = self.process_event(qe.event(), adapter);
+
        logger::queueproc_processed_event(&res);
+
        match res {
+
            Ok(shut_down) => done = shut_down,
+
            Err(QueueError::BuildTrigger(_, _)) => done = false,
+
            Err(err) => Err(err)?,
        }
+
        self.run_tx.notify()?;
+
        Ok(done)
    }

-
    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
-
        logger::queueproc_remove_event(id);
-
        self.db.remove_queued_ci_event(id).map_err(QueueError::db)?;
-
        Ok(())
+
    fn process_event(&self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
        if matches!(event, CiEvent::V1(CiEventV1::Shutdown)) {
+
            logger::queueproc_action_shutdown();
+
            Ok(true)
+
        } else {
+
            logger::queueproc_action_run(event);
+

+
            let trigger = RequestBuilder::default()
+
                .profile(&self.profile)
+
                .ci_event(event)
+
                .build_trigger_from_ci_event()
+
                .map_err(|e| QueueError::build_trigger(event, e));
+
            logger::queueproc_trigger(&trigger);
+
            let trigger = trigger?;
+

+
            self.broker
+
                .execute_ci(adapter, &trigger, &self.run_tx)
+
                .map_err(QueueError::execute_ci)?;
+

+
            Ok(false)
+
        }
+
    }
+
}
+

+
#[derive(Debug, Clone)]
+
struct Picked {
+
    qe: QueuedCiEvent,
+
}
+

+
impl Picked {
+
    fn new(qe: QueuedCiEvent) -> Self {
+
        Self { qe }
    }
}

@@ -421,6 +489,30 @@ pub enum QueueError {

    #[error("no default adapter specified in configuration")]
    NoDefaultAdapter,
+

+
    #[error("failed to send to channel for picked events")]
+
    SendPicked,
+

+
    #[error("failed to receive from channel for picked events")]
+
    RecvPicked,
+

+
    #[error("failed to send to channel for results of processed events")]
+
    SendProcessResult,
+

+
    #[error("failed to receive from channel for results of processed events")]
+
    RecvProcessResult,
+

+
    #[error("failed to wait for thread to run adapters to finish")]
+
    JoinAdapterThread,
+

+
    #[error("failed to wait for thread to process results from adapters to finish")]
+
    JoinResultThread,
+

+
    #[error("failed to create a new broker instance")]
+
    NewBroker(#[source] BrokerError),
+

+
    #[error("failure when using a pull queue")]
+
    PullQueue(#[source] PullQueueError),
}

impl QueueError {