Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: terminate a CI run
Lars Wirzenius committed 5 months ago
commit beaf6221c00d1ebc7c08da0ef3a3f6f58922e428
parent c537e57
11 files changed +294 -94
modified src/adapter.rs
@@ -13,6 +13,7 @@ use std::{
    os::unix::process::ExitStatusExt,
    path::{Path, PathBuf},
    process::{Command, ExitStatus},
+
    sync::mpsc::Sender,
    time::Duration,
};

@@ -27,6 +28,7 @@ use crate::{
    logger,
    msg::{MessageError, Request, Response, RunResult},
    notif::NotificationSender,
+
    queueproc::ChildInfo,
    run::{Run, RunState},
    sensitive::Sensitive,
    timeoutcmd::{RealtimeLines, TimeoutCommand, TimeoutError},
@@ -141,11 +143,12 @@ impl Adapter {
        db: &Db,
        run_notification: &NotificationSender,
        max_run_time: Duration,
+
        child_info: Sender<ChildInfo>,
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

-
        let x = self.run_helper(trigger, run, db, run_notification, max_run_time);
+
        let x = self.run_helper(trigger, run, db, run_notification, max_run_time, child_info);

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
@@ -164,6 +167,7 @@ impl Adapter {
        db: &Db,
        run_notification: &NotificationSender,
        max_run_time: Duration,
+
        child_pid: Sender<ChildInfo>,
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

@@ -190,6 +194,8 @@ impl Adapter {
            }
            Err(err) => Err(AdapterError::TimeoutCommand(err))?,
        };
+
        let child_info = ChildInfo::new(run.broker_run_id().clone(), child.id());
+
        child_pid.send(child_info).ok(); // FIXME

        run_notification.notify()?;

@@ -452,7 +458,7 @@ pub enum AdapterError {

#[cfg(test)]
mod test {
-
    use std::{fs::write, io::ErrorKind, path::PathBuf, time::Duration};
+
    use std::{fs::write, io::ErrorKind, path::PathBuf, sync::mpsc::channel, time::Duration};

    use tempfile::{NamedTempFile, TempDir, tempdir};

@@ -511,9 +517,10 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX)?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -533,9 +540,10 @@ echo '{"response":"finished","result":"failure"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);

        match x {
            Ok(_) => (),
@@ -562,9 +570,10 @@ exit 1

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -584,9 +593,10 @@ kill -9 $$

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal(9))));

@@ -604,9 +614,10 @@ kill -9 $$

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

@@ -627,9 +638,10 @@ kill -9 $$

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal(9))));

@@ -649,9 +661,10 @@ echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

@@ -673,9 +686,10 @@ kill -9 $$

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("Adapter::run result: {x:#?}");
        if let Err(AdapterError::Failed(x)) = x {
            use std::os::unix::process::ExitStatusExt;
@@ -700,9 +714,10 @@ echo '{"response":"finished","result":"success","bad":"field"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -725,9 +740,10 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -754,9 +770,10 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -775,9 +792,10 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -804,9 +822,10 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -837,9 +856,10 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
+
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
        eprintln!("result from run: {x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/bin/cibtool.rs
@@ -178,6 +178,7 @@ impl Subcommand for EventCmd {
        match &self.cmd {
            EventSubCmd::Add(x) => x.run(args)?,
            EventSubCmd::Shutdown(x) => x.run(args)?,
+
            EventSubCmd::Terminate(x) => x.run(args)?,
            EventSubCmd::List(x) => x.run(args)?,
            EventSubCmd::Count(x) => x.run(args)?,
            EventSubCmd::Show(x) => x.run(args)?,
@@ -198,6 +199,7 @@ enum EventSubCmd {
    Count(cibtoolcmd::CountEvents),
    Add(cibtoolcmd::AddEvent),
    Shutdown(cibtoolcmd::Shutdown),
+
    Terminate(cibtoolcmd::Terminate),
    Show(cibtoolcmd::ShowEvent),
    Remove(cibtoolcmd::RemoveEvent),
    Record(cibtoolcmd::RecordEvents),
modified src/bin/cibtoolcmd/event.rs
@@ -335,6 +335,23 @@ impl Leaf for Shutdown {
    }
}

+
/// Add a run termination event to the queue.
+
///
+
/// The termination event causes the CI broker to terminate a specific run.
+
#[derive(Parser)]
+
pub struct Terminate {
+
    /// Terminate a run given its broker run id.
+
    run_id: RunId,
+
}
+

+
impl Leaf for Terminate {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        db.push_queued_ci_event(CiEvent::V1(CiEventV1::Terminate(self.run_id.clone())))?;
+
        Ok(())
+
    }
+
}
+

/// Record node events from the node.
///
/// The events are written to the standard output or to the specified
modified src/broker.rs
@@ -5,6 +5,7 @@

use std::{
    path::{Path, PathBuf},
+
    sync::mpsc::Sender,
    time::Duration,
};

@@ -20,6 +21,7 @@ use crate::{
    logger,
    msg::{PatchEvent, PushEvent, Request, RunId},
    notif::NotificationSender,
+
    queueproc::ChildInfo,
    run::{Run, RunBuilder, Whence},
};

@@ -57,11 +59,19 @@ impl Broker {
        adapter: &Adapter,
        trigger: &Request,
        run_notification: &NotificationSender,
+
        child_info: Sender<ChildInfo>,
    ) -> Result<Run, BrokerError> {
        let broker_run_id = RunId::default();
        let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
-
        let run = span
-
            .in_scope(|| self.execute_helper(adapter, broker_run_id, trigger, run_notification))?;
+
        let run = span.in_scope(|| {
+
            self.execute_helper(
+
                adapter,
+
                broker_run_id,
+
                trigger,
+
                run_notification,
+
                child_info,
+
            )
+
        })?;
        Ok(run)
    }

@@ -71,6 +81,7 @@ impl Broker {
        broker_run_id: RunId,
        trigger: &Request,
        run_notification: &NotificationSender,
+
        child_info: Sender<ChildInfo>,
    ) -> Result<Run, BrokerError> {
        logger::broker_start_run(trigger);
        let (common, whence, oid) = match &trigger {
@@ -138,6 +149,7 @@ impl Broker {
            &self.db,
            run_notification,
            self.max_run_time,
+
            child_info,
        ) {
            logger::error("failed to run adapter or it failed to run CI", &e);
        }
@@ -197,7 +209,7 @@ pub enum BrokerError {

#[cfg(test)]
mod test {
-
    use std::{path::Path, time::Duration};
+
    use std::{path::Path, sync::mpsc::channel, time::Duration};
    use tempfile::tempdir;

    use super::Broker;
@@ -230,9 +242,11 @@ echo '{"response":"finished","result":"success"}'

        let trigger = trigger_request()?;

+
        let (pid_tx, _) = channel();
+

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&adapter, &trigger, &sender);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -262,9 +276,11 @@ exit 1

        let trigger = trigger_request()?;

+
        let (pid_tx, _) = channel();
+

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&adapter, &trigger, &sender);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/ci_event.rs
@@ -12,7 +12,10 @@ use radicle::{
    storage::RefUpdate,
};

-
use crate::refs::{GenericRefName, TagName, ref_string};
+
use crate::{
+
    msg::RunId,
+
    refs::{GenericRefName, TagName, ref_string},
+
};

#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
#[non_exhaustive]
@@ -24,6 +27,7 @@ pub enum CiEvent {
#[non_exhaustive]
pub enum CiEventV1 {
    Shutdown,
+
    Terminate(RunId),
    BranchCreated {
        from_node: NodeId,
        repo: RepoId,
@@ -86,6 +90,7 @@ impl CiEvent {
    pub fn from_node(&self) -> Option<&NodeId> {
        match self {
            Self::V1(CiEventV1::Shutdown) => None,
+
            Self::V1(CiEventV1::Terminate(_)) => None,
            Self::V1(CiEventV1::BranchCreated { from_node, .. }) => Some(from_node),
            Self::V1(CiEventV1::BranchUpdated { from_node, .. }) => Some(from_node),
            Self::V1(CiEventV1::BranchDeleted { from_node, .. }) => Some(from_node),
@@ -101,6 +106,7 @@ impl CiEvent {
    pub fn repository(&self) -> Option<&RepoId> {
        match self {
            Self::V1(CiEventV1::Shutdown) => None,
+
            Self::V1(CiEventV1::Terminate(_)) => None,
            Self::V1(CiEventV1::BranchCreated { repo, .. }) => Some(repo),
            Self::V1(CiEventV1::BranchUpdated { repo, .. }) => Some(repo),
            Self::V1(CiEventV1::BranchDeleted { repo, .. }) => Some(repo),
@@ -144,6 +150,7 @@ impl CiEvent {
    pub fn tip(&self) -> Option<&Oid> {
        match self {
            Self::V1(CiEventV1::Shutdown) => None,
+
            Self::V1(CiEventV1::Terminate(_)) => None,
            Self::V1(CiEventV1::BranchCreated { tip, .. }) => Some(tip),
            Self::V1(CiEventV1::BranchUpdated { tip, .. }) => Some(tip),
            Self::V1(CiEventV1::BranchDeleted { tip, .. }) => Some(tip),
modified src/logger.rs
@@ -18,7 +18,7 @@ use crate::{
    config::Config,
    db::{QueueId, QueuedCiEvent},
    filter::EventFilter,
-
    msg::Request,
+
    msg::{Request, RunId},
    node_event_source::NodeEventSource,
    pages::PageError,
    queueadd::AdderError,
@@ -162,6 +162,7 @@ enum Id {

    QueueProcActionRun,
    QueueProcActionShutdown,
+
    QueueProcActionTerminate,
    QueueProcDisconnected,
    QueueProcEnd,
    QueueProcFilterDecision,
@@ -180,6 +181,9 @@ enum Id {
    TriggerCreate,

    FilterDecision,
+

+
    WorkerStart,
+
    WorkerEnd,
}

#[derive(Debug, thiserror::Error)]
@@ -594,6 +598,15 @@ pub fn queueproc_action_shutdown() {
    );
}

+
pub fn queueproc_action_terminate(run_id: &RunId) {
+
    info!(
+
        msg_id = ?Id::QueueProcActionTerminate,
+
        kind = %Kind::Debug,
+
        ?run_id,
+
        "Action: terminate CI run"
+
    );
+
}
+

pub fn queueadd_start() {
    info!(
        msg_id = ?Id::QueueAddStart,
@@ -921,3 +934,20 @@ pub fn error(msg: &str, e: &impl std::error::Error) {
        e = source.source();
    }
}
+

+
pub fn worker_start(name: &str) {
+
    info!(
+
        msg_id = ?Id::WorkerStart,
+
        name,
+
        "worker starts"
+
    );
+
}
+

+
pub fn worker_end(name: &str, result: &Result<(), impl std::error::Error>) {
+
    info!(
+
        msg_id = ?Id::WorkerEnd,
+
        name,
+
        ?result,
+
        "worker ends"
+
    );
+
}
modified src/pages.rs
@@ -230,6 +230,9 @@ impl PageData {

            let event_element = match event.event() {
                CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
+
                CiEvent::V1(CiEventV1::Terminate(_)) => {
+
                    Element::new(Tag::Span).with_text("terminate")
+
                }
                CiEvent::V1(CiEventV1::BranchCreated {
                    from_node: _,
                    repo,
modified src/queueproc.rs
@@ -3,22 +3,26 @@
#![allow(clippy::result_large_err)]

use std::{
-
    collections::HashSet,
-
    sync::{Arc, Mutex, mpsc::RecvTimeoutError},
+
    collections::{HashMap, HashSet},
+
    sync::{
+
        Arc, Mutex,
+
        mpsc::{Receiver, RecvTimeoutError, Sender, channel},
+
    },
    thread::{sleep, spawn},
    time::{Duration, Instant},
};

+
use libc::{SIGKILL, kill};
use radicle::{Profile, identity::RepoId};

use crate::{
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
-
    db::{Db, DbError, QueueId, QueuedCiEvent},
+
    db::{Db, DbError, QueuedCiEvent},
    filter::{EventFilter, Trigger},
    logger,
-
    msg::{MessageError, RequestBuilder},
+
    msg::{MessageError, RequestBuilder, RunId},
    notif::{NotificationReceiver, NotificationSender},
    worker::Worker,
};
@@ -51,6 +55,7 @@ impl QueueProcessorBuilder {
        let concurrent_adapters = self
            .concurrent_adapters
            .ok_or(QueueError::Missing("concurrent_adapters"))?;
+
        let (child_pid_tx, child_pid_rx) = channel();

        Ok(QueueProcessor {
            profile,
@@ -67,6 +72,8 @@ impl QueueProcessorBuilder {
            concurrent_adapters,
            run_tx,
            current: CurrentlyPicked::default(),
+
            child_pid_tx,
+
            child_pid_rx,
        })
    }

@@ -137,44 +144,72 @@ pub struct QueueProcessor {
    prev_queue_len: Instant,
    run_tx: NotificationSender,
    current: CurrentlyPicked,
+
    child_pid_tx: Sender<ChildInfo>,
+
    child_pid_rx: Receiver<ChildInfo>,
}

impl QueueProcessor {
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
        let mut expecting_new_events = true;
        let mut handles = vec![];
+
        let mut children: HashMap<RunId, u32> = HashMap::new();

        loop {
-
            let mut queue = self.db.queued_ci_events().map_err(QueueError::db)?;
+
            let mut queue = Queue::load(&self.db)?;
+

+
            // Process special events like `Shutdown` and `Terminate`.
+
            while let Some(qe) = self.pick_special_event(&queue) {
+
                queue.remove(&qe);
+
                self.drop_event(&qe)?;
+
                match qe.event() {
+
                    CiEvent::V1(CiEventV1::Shutdown) => {
+
                        logger::queueproc_action_shutdown();
+
                        expecting_new_events = false;
+
                    }
+
                    CiEvent::V1(CiEventV1::Terminate(run_id)) => {
+
                        if let Some(pid) = children.get(run_id) {
+
                            if let Ok(pid) = i32::try_from(*pid) {
+
                                logger::queueproc_action_terminate(run_id);
+
                                unsafe {
+
                                    kill(-pid, SIGKILL);
+
                                }
+
                            }
+
                        }
+
                    }
+
                    _ => (),
+
                };
+
            }

            // If we may spawn another adapter, pick an event from the queue
            // and run the adapters.
            if handles.len() < self.concurrent_adapters {
-
                match self.pick_event(&mut queue) {
-
                    Ok(Some(qe)) => {
-
                        // Remove picked event from queue so we don't re-process it. If we don't
-
                        // do this, and we crash when processing the event, we'll re-process it
-
                        // again and again. It seems better to discard an event, and skip running
-
                        // CI, rather then getting stuck on a specific event.
-
                        self.drop_event(&qe)?;
-

-
                        match self.matching_adapters(qe.event()) {
-
                            Ok(Some(adapters)) => {
-
                                let p = self.processor()?;
-
                                let repoid = qe.event().repository().copied();
-
                                self.current.insert(qe.event().repository());
-
                                let h = spawn(move || p.pick_and_process_one(qe, adapters));
-
                                handles.push((repoid, h));
-
                            }
-
                            Ok(None) => {}
-
                            Err(_) => {}
+
                if let Some(qe) = self.pick_event(&queue) {
+
                    // Remove picked event from queue so we don't re-process it. If we don't
+
                    // do this, and we crash when processing the event, we'll re-process it
+
                    // again and again. It seems better to discard an event, and skip running
+
                    // CI, rather then getting stuck on a specific event.
+
                    queue.remove(&qe);
+
                    self.drop_event(&qe)?;
+

+
                    match self.matching_adapters(qe.event()) {
+
                        Ok(Some(adapters)) => {
+
                            let p = self.processor()?;
+
                            let repoid = qe.event().repository().copied();
+
                            self.current.insert(qe.event().repository());
+
                            let h = spawn(move || p.pick_and_process_one(qe, adapters));
+
                            handles.push((repoid, h));
                        }
+
                        Ok(None) => {}
+
                        Err(_) => {}
                    }
-
                    Ok(None) => {}
-
                    Err(_) => {}
                }
            }

+
            // Receive adapter process IDs.
+
            while let Ok(child_info) = self.child_pid_rx.try_recv() {
+
                children.insert(child_info.run_id().clone(), child_info.pid());
+
            }
+

            // Wait for any threads processing events that have finished. Remove
            // them from the list of currently running threads.
            let mut h2 = vec![];
@@ -184,13 +219,8 @@ impl QueueProcessor {
                    if let Some(repoid) = repoid {
                        self.current.remove(repoid);
                    }
-
                    match h.join() {
-
                        Err(_) => logger::queueproc_thread_join(),
-
                        Ok(Err(_)) => (),
-
                        Ok(Ok(MaybeShutdown::Shutdown)) => {
-
                            expecting_new_events = false;
-
                        }
-
                        Ok(Ok(MaybeShutdown::Continue)) => (),
+
                    if h.join().is_err() {
+
                        logger::queueproc_thread_join();
                    }
                } else {
                    h2.push((repoid, h));
@@ -230,32 +260,35 @@ impl QueueProcessor {
            broker: Broker::new(self.db.filename(), self.broker.max_run_time())
                .map_err(QueueError::NewBroker)?,
            run_tx: self.run_tx.clone(),
+
            child_pid_tx: self.child_pid_tx.clone(),
        })
    }

-
    fn pick_event(&mut self, ids: &mut Vec<QueueId>) -> Result<Option<QueuedCiEvent>, QueueError> {
+
    fn pick_special_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
+
        for qe in queue.iter() {
+
            match qe.event() {
+
                CiEvent::V1(CiEventV1::Shutdown) | CiEvent::V1(CiEventV1::Terminate(_)) => {
+
                    return Some(qe.clone());
+
                }
+
                _ => (),
+
            }
+
        }
+
        None
+
    }
+

+
    fn pick_event(&mut self, queue: &Queue) -> Option<QueuedCiEvent> {
        let elapsed = self.prev_queue_len.elapsed();
        if elapsed > self.queue_len_interval {
-
            logger::queueproc_queue_length(ids.len());
+
            logger::queueproc_queue_length(queue.len());
            self.prev_queue_len = Instant::now();
        }

-
        let mut queue = vec![];
-
        for id in ids.iter() {
-
            if let Some(qe) = self.db.get_queued_ci_event(id).map_err(QueueError::db)? {
-
                #[allow(clippy::single_match)]
-
                match qe.event() {
-
                    CiEvent::V1(CiEventV1::Shutdown) => return Ok(Some(qe.clone())),
-
                    _ => (),
-
                }
-
                queue.push(qe);
-
            }
-
        }
-
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());
+
        let mut q: Vec<&QueuedCiEvent> = queue.iter().collect();
+
        q.sort_by_cached_key(|qe| qe.timestamp().to_string());

        // Remove the repositories for which CI is currently running.
        let current_repos = self.current.list();
-
        queue = queue
+
        q = q
            .iter()
            .filter(|qe| {
                if let Some(repoid) = qe.event().repository() {
@@ -267,20 +300,11 @@ impl QueueProcessor {
            .cloned()
            .collect();

-
        if let Some(qe) = queue.first() {
+
        if let Some(qe) = q.first() {
            logger::queueproc_picked_event(qe.id(), qe);
-

-
            // Remove picked event from queue.
-
            for i in 0..current_repos.len() {
-
                if ids.get(i) == Some(qe.id()) {
-
                    ids.remove(i);
-
                    break;
-
                }
-
            }
-

-
            Ok(Some(qe.clone()))
+
            Some((*qe).clone())
        } else {
-
            Ok(None)
+
            None
        }
    }

@@ -335,10 +359,49 @@ impl Worker for QueueProcessor {
    }
}

+
struct Queue {
+
    queue: Vec<QueuedCiEvent>,
+
}
+

+
impl Queue {
+
    fn load(db: &Db) -> Result<Self, QueueError> {
+
        let ids = db.queued_ci_events().map_err(QueueError::db)?;
+
        let mut queue = vec![];
+
        for id in ids {
+
            if let Some(qe) = db.get_queued_ci_event(&id).map_err(QueueError::db)? {
+
                queue.push(qe);
+
            }
+
        }
+
        Ok(Self { queue })
+
    }
+

+
    fn is_empty(&self) -> bool {
+
        self.queue.is_empty()
+
    }
+

+
    fn len(&self) -> usize {
+
        self.queue.len()
+
    }
+

+
    fn remove(&mut self, unwanted: &QueuedCiEvent) {
+
        for (i, qe) in self.queue.iter().enumerate() {
+
            if qe.id() == unwanted.id() {
+
                self.queue.remove(i);
+
                return;
+
            }
+
        }
+
    }
+

+
    fn iter(&self) -> impl Iterator<Item = &QueuedCiEvent> {
+
        self.queue.iter()
+
    }
+
}
+

struct Processor {
    profile: Profile,
    broker: Broker,
    run_tx: NotificationSender,
+
    child_pid_tx: Sender<ChildInfo>,
}

impl Processor {
@@ -351,10 +414,8 @@ impl Processor {
            self.run_tx.notify()?;
            logger::queueproc_processing_event(qe.event(), adapter);
            match qe.event() {
-
                CiEvent::V1(CiEventV1::Shutdown) => {
-
                    logger::queueproc_action_shutdown();
-
                    return Ok(MaybeShutdown::Shutdown);
-
                }
+
                CiEvent::V1(CiEventV1::Shutdown) => (),
+
                CiEvent::V1(CiEventV1::Terminate(_)) => (),
                _ => {
                    logger::queueproc_action_run(qe.event());

@@ -367,7 +428,7 @@ impl Processor {
                    let trigger = trigger?;

                    self.broker
-
                        .execute_ci(adapter, &trigger, &self.run_tx)
+
                        .execute_ci(adapter, &trigger, &self.run_tx, self.child_pid_tx.clone())
                        .map_err(QueueError::execute_ci)?;
                }
            }
@@ -406,12 +467,52 @@ impl CurrentlyPicked {
    }
}

-
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum MaybeShutdown {
    Shutdown,
+
    Terminate(RunId),
    Continue,
}

+
pub struct AdapterProcess {
+
    run_id: RunId,
+
    pid: u32,
+
}
+

+
impl AdapterProcess {
+
    pub fn new(run_id: RunId, pid: u32) -> Self {
+
        Self { run_id, pid }
+
    }
+

+
    pub fn run_id(&self) -> &RunId {
+
        &self.run_id
+
    }
+

+
    pub fn pid(&self) -> u32 {
+
        self.pid
+
    }
+
}
+

+
#[derive(Debug)]
+
pub struct ChildInfo {
+
    run_id: RunId,
+
    pid: u32,
+
}
+

+
impl ChildInfo {
+
    pub fn new(run_id: RunId, pid: u32) -> Self {
+
        Self { run_id, pid }
+
    }
+

+
    pub fn run_id(&self) -> &RunId {
+
        &self.run_id
+
    }
+

+
    pub fn pid(&self) -> u32 {
+
        self.pid
+
    }
+
}
+

#[derive(Debug, thiserror::Error)]
pub enum QueueError {
    #[error("failed to load node profile")]
modified src/refs.rs
@@ -140,9 +140,7 @@ pub fn branch_from_namespaced(ns: &Namespaced) -> Result<BranchName, RefError> {

/// Create a [`PatchId`] from a string slice.
pub fn patch_from_str(s: &str) -> Result<PatchId, RefError> {
-
    let res = PatchId::from_str(s).map_err(|_| RefError::PatchIdFromStr(s.into()));
-
    eprintln!("{res:#?}");
-
    res
+
    PatchId::from_str(s).map_err(|_| RefError::PatchIdFromStr(s.into()))
}

/// All errors from Git reference manipulation.
modified src/timeoutcmd.rs
@@ -248,6 +248,10 @@ impl ChildProcess {
        })
    }

+
    pub fn id(&self) -> u32 {
+
        self.child.id()
+
    }
+

    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
        self.kill_helper();
        self.wait()
modified src/worker.rs
@@ -6,14 +6,16 @@

use std::thread::{JoinHandle, spawn};

+
use crate::logger;
+

/// Start a new thread. Caller must catch the thread handle and
/// join it to wait for thread to end.
pub fn start_thread<W: Worker>(mut o: W) -> JoinHandle<Result<(), W::Error>> {
    let name = o.name();
    spawn(move || {
-
        eprintln!("start worker {name}");
+
        logger::worker_start(&name);
        let result = o.work();
-
        eprintln!("end worker {name}: result={result:?}");
+
        logger::worker_end(&name, &result);
        result
    })
}
@@ -24,7 +26,7 @@ pub trait Worker: Send + 'static {
    const NAME: &str;

    /// Type of error from this worker.
-
    type Error: std::fmt::Debug + Send;
+
    type Error: std::error::Error + Send;

    /// Do the work the thread is supposed to do.
    fn work(&mut self) -> Result<(), Self::Error>;