Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
add concurrent adapters
Merged liw opened 11 months ago

One adapter at a time per repository.

Not all commits in this patch build or pass tests separately.

13 files changed +997 -755 d90b609b 4577682f
modified doc/userguide.md
@@ -201,6 +201,7 @@ The configuration fields are:
| field | summary |
|:------|:--------|
| `adapters` | list of CI adapters |
+
| `concurrent_adapters` | max number of adapters to run at the same time |
| `db` | database |
| `default_adapter` |  this will become deprecated, use `triggers` instead |
| `filters` | this will become deprecated, use `triggers` instead |
modified src/adapter.rs
@@ -10,8 +10,9 @@
use std::{
    collections::HashMap,
    ffi::OsStr,
+
    os::unix::process::ExitStatusExt,
    path::{Path, PathBuf},
-
    process::Command,
+
    process::{Command, ExitStatus},
    time::Duration,
};

@@ -25,11 +26,9 @@ use crate::{
    notif::NotificationSender,
    run::{Run, RunState},
    sensitive::Sensitive,
-
    timeoutcmd::{LineReceiver, TimeoutCommand, TimeoutError},
+
    timeoutcmd::{RealtimeLines, TimeoutCommand, TimeoutError},
};

-
const NOT_EXITED: i32 = 999;
-

/// The set of all configured adapters.
#[derive(Clone)]
pub struct Adapters {
@@ -137,41 +136,53 @@ impl Adapter {
        let mut cmd = Command::new(&self.bin);
        cmd.envs(self.envs());
        let mut child = TimeoutCommand::new(max_run_time);
+
        let stdout = child.stdout();
        child.feed_stdin(trigger.to_string().as_bytes());
-
        let child = child.spawn(cmd).map_err(|err| match err {
-
            TimeoutError::Spawn(_, err) => AdapterError::SpawnAdapter(self.bin.clone(), err),
-
            _ => AdapterError::TimeoutCommand(err),
-
        })?;
+
        let child = match child.spawn(cmd) {
+
            Ok(child) => child,
+
            Err(TimeoutError::Spawn(_, err)) => {
+
                Err(AdapterError::SpawnAdapter(self.bin.clone(), err))?
+
            }
+
            Err(err) => Err(AdapterError::TimeoutCommand(err))?,
+
        };

        run_notification.notify()?;

-
        let stdout = child.stdout();
-
        let stderr = child.stderr();
-

        let mut outcome = MaybeResult::default();

        if let Err(err) = self.read_stdout(run, db, run_notification, stdout) {
            outcome.set_error(err);
        }

-
        self.read_stderr(stderr);
-

-
        if outcome.has_error() {
-
            child.kill().ok();
-
        }
-

-
        let wait_result = child.wait().expect("FIXME");
-
        if wait_result.timed_out() {
-
            logger::adapter_did_not_exit_voluntarily();
-
            outcome.set_error(AdapterError::Failed(NOT_EXITED));
-
        } else if let Some(exit) = wait_result.status().code() {
-
            logger::adapter_result(exit);
-
            if exit != 0 {
-
                outcome.set_error(AdapterError::Failed(exit));
-
            }
+
        let result = if outcome.has_error() {
+
            child.kill()
        } else {
-
            logger::adapter_did_not_exit();
-
            outcome.set_error(AdapterError::Signal);
+
            child.wait()
+
        };
+

+
        match result {
+
            Ok(finished) => {
+
                let stderr = finished.stderr();
+
                self.log_stderr(stderr);
+

+
                let exit = finished.exit_code();
+
                logger::adapter_result(exit);
+
                if !exit.success() {
+
                    if let Some(signal) = exit.signal() {
+
                        outcome.set_error(AdapterError::Signal(signal));
+
                    } else {
+
                        outcome.set_error(AdapterError::Failed(exit));
+
                    }
+
                }
+
            }
+
            Err(TimeoutError::TimedOut) => {
+
                logger::adapter_did_not_exit_voluntarily();
+
                outcome.set_error(AdapterError::FailedNotExited);
+
            }
+
            Err(err) => {
+
                logger::adapter_did_not_exit(err);
+
                outcome.set_error(AdapterError::Signal(9));
+
            }
        }

        if let Some(err) = outcome.error() {
@@ -186,7 +197,7 @@ impl Adapter {
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
-
        stdout: &LineReceiver,
+
        mut stdout: RealtimeLines,
    ) -> Result<(), AdapterError> {
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
@@ -235,9 +246,9 @@ impl Adapter {
        Ok(())
    }

-
    fn read_stderr(&self, stderr: &LineReceiver) {
-
        while let Some(line) = stderr.line() {
-
            logger::adapter_stderr_line(&line);
+
    fn log_stderr(&self, stderr: &[u8]) {
+
        for line in String::from_utf8_lossy(stderr).lines() {
+
            logger::adapter_stderr_line(line);
        }
    }
}
@@ -309,12 +320,16 @@ pub enum AdapterError {
    Wait(#[source] std::io::Error),

    /// Child process failed.
-
    #[error("child process failed with wait status {0}")]
-
    Failed(i32),
+
    #[error("child process failed with wait status {0:?}")]
+
    Failed(ExitStatus),
+

+
    /// Child process failed: didn't exit.
+
    #[error("child process failed without exiting")]
+
    FailedNotExited,

    /// Child process was killed.
-
    #[error("child process terminated by signal")]
-
    Signal,
+
    #[error("child process terminated by signal {0}")]
+
    Signal(i32),

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
@@ -544,8 +559,12 @@ kill -9 $$
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
-
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::Signal)));
+
        eprintln!("Adapter::run result: {x:#?}");
+
        if let Err(AdapterError::Failed(x)) = x {
+
            use std::os::unix::process::ExitStatusExt;
+
            eprintln!("Adapter::run result: signal={:?}", x.signal());
+
        }
+
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }
@@ -704,7 +723,7 @@ echo '{"response":"finished","result":"success"}'
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
-
        eprintln!("{x:#?}");
+
        eprintln!("result from run: {x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
modified src/bin/cib.rs
@@ -192,6 +192,7 @@ impl QueuedCmd {
            .filters(config.filters())
            .triggers(&config.to_triggers())
            .adapters(&adapters)
+
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
        let thread = processor.process_in_thread();
@@ -271,6 +272,7 @@ impl ProcessEventsCmd {
            .filters(config.filters())
            .triggers(&config.to_triggers())
            .adapters(&adapters)
+
            .concurrent_adapters(config.concurrent_adapters())
            .build()
            .map_err(CibError::process_queue)?;
        let processor = processor.process_in_thread();
modified src/bin/cibtoolcmd/event.rs
@@ -4,7 +4,6 @@ use clap::ValueEnum;

use radicle::patch::PatchId;
use radicle_ci_broker::refs::branch_ref;
-
#[allow(unused_imports)] // FIXME
use radicle_ci_broker::{
    filter::EventFilter, node_event_source::NodeEventSource, refs::ref_string,
    util::read_file_as_objectid,
modified src/bin/cibtoolcmd/timeout.rs
@@ -48,6 +48,7 @@ impl Leaf for TimeoutCmd {
        cmd.arg("-c").arg(&self.script);

        let mut to = TimeoutCommand::new(Duration::from_secs(self.timeout));
+
        let mut stdout = to.stdout();

        if let Some(bytes) = self.generate {
            let mut stdin: Vec<u8> = vec![];
@@ -66,40 +67,39 @@ impl Leaf for TimeoutCmd {
        println!("spawn child");
        let running = to.spawn(cmd)?;

-
        if let Some(secs) = self.kill_after {
-
            sleep(Duration::from_secs(secs));
-
            running.kill().unwrap();
-
        }
-

        let mut stdout_bytes = 0;
-
        if !self.fill_buffers {
-
            let stdout = running.stdout();
-
            while let Some(line) = stdout.line() {
-
                stdout_bytes += line.as_bytes().len();
-
                if self.verbose {
-
                    println!("stdout: {line:?}");
+
        let tor = if let Some(secs) = self.kill_after {
+
            sleep(Duration::from_secs(secs));
+
            running.kill()?
+
        } else {
+
            if !self.fill_buffers {
+
                while let Some(line) = stdout.line() {
+
                    stdout_bytes += line.as_bytes().len();
+
                    if self.verbose {
+
                        println!("stdout: {line:?}");
+
                    }
                }
+
                println!("finished reading stdout");
            }
-
            println!("finished reading stdout");

-
            let stderr = running.stderr();
-
            while let Some(line) = stderr.line() {
-
                if self.verbose {
-
                    println!("stderr: {line:?}");
-
                }
+
            running.wait()?
+
        };
+

+
        let stderr = tor.stderr();
+
        for line in stderr {
+
            if self.verbose {
+
                println!("stderr: {line:?}");
            }
-
            println!("finished reading stderr");
        }
+
        println!("finished reading stderr");

-
        let tor = running.wait()?;
        let elapsed = started.elapsed();
        let speed = (stdout_bytes as f64) / elapsed.as_secs_f64();

        println!("stdout bytes: {stdout_bytes}");
        println!("duration: {} ms", elapsed.as_millis());
        println!("speed: {:.0} B/s", speed);
-
        println!("exit: {}", tor.status());
-
        println!("timed out? {}", tor.timed_out());
+
        println!("exit: {}", tor.exit_code());

        Ok(())
    }
modified src/broker.rs
@@ -42,9 +42,17 @@ impl Broker {
        })
    }

+
    pub fn max_run_time(&self) -> Duration {
+
        self.max_run_time
+
    }
+

+
    pub fn db(&self) -> &Db {
+
        &self.db
+
    }
+

    #[allow(clippy::result_large_err)]
    pub fn execute_ci(
-
        &mut self,
+
        &self,
        adapter: &Adapter,
        trigger: &Request,
        run_notification: &NotificationSender,
@@ -204,7 +212,7 @@ echo '{"response":"finished","result":"success"}'

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db)?;
+
        let broker = broker(&db)?;

        let trigger = trigger_request()?;

@@ -236,7 +244,7 @@ exit 1

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db)?;
+
        let broker = broker(&db)?;

        let trigger = trigger_request()?;

modified src/config.rs
@@ -19,6 +19,20 @@ const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_QUEUE_LEN_INTERVAL: Duration = Duration::from_secs(3600);
const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;

+
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
+
struct NonzeroCount {
+
    count: Option<usize>,
+
}
+

+
impl NonzeroCount {
+
    fn count(&self) -> usize {
+
        match self.count {
+
            None | Some(1) => 1,
+
            Some(n) => n,
+
        }
+
    }
+
}
+

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
@@ -37,6 +51,9 @@ pub struct Config {
    #[serde(deserialize_with = "deserialize_duration")]
    #[serde(default = "default_queue_len_interval")]
    queue_len_interval: Duration,
+

+
    #[serde(default)]
+
    concurrent_adapters: NonzeroCount,
}

fn default_max_run_time() -> Duration {
@@ -58,6 +75,10 @@ impl Config {
        self.report_dir.as_deref()
    }

+
    pub fn concurrent_adapters(&self) -> usize {
+
        self.concurrent_adapters.count()
+
    }
+

    pub fn to_adapters(&self) -> Result<Adapters, AdapterError> {
        Adapters::new(&self.adapters, self.default_adapter.as_deref())
    }
modified src/lib.rs
@@ -19,6 +19,7 @@ pub mod msg;
pub mod node_event_source;
pub mod notif;
pub mod pages;
+
pub mod pull_queue;
pub mod queueadd;
pub mod queueproc;
pub mod refs;
modified src/logger.rs
@@ -3,7 +3,7 @@
use std::{path::Path, process::ExitStatus, time::Duration};

use clap::ValueEnum;
-
use radicle::{git::Oid, identity::RepoId, node::Event, patch::PatchId};
+
use radicle::{identity::RepoId, node::Event, patch::PatchId};
use serde_json::Value;
use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
@@ -21,6 +21,7 @@ use crate::{
    queueadd::AdderError,
    queueproc::QueueError,
    run::Run,
+
    timeoutcmd::TimeoutError,
};

#[derive(Debug, thiserror::Error)]
@@ -401,10 +402,11 @@ pub fn adapter_config(config: &Config) {
    );
}

-
pub fn queueproc_start() {
+
pub fn queueproc_start(concurrent_adapters: usize) {
    info!(
        msg_id = ?Id::QueueProcStart,
        kind = %Kind::Startup,
+
        concurrent_adapters,
        "start thread to process events until a shutdown event"
    );
}
@@ -427,6 +429,22 @@ pub fn queueproc_end(result: &Result<(), QueueError>) {
    }
}

+
pub fn queueproc_start_result_receiver() {
+
    info!(
+
        msg_id = ?Id::QueueProcStart,
+
        kind = %Kind::Startup,
+
        "start thread to process results of CI runs"
+
    );
+
}
+

+
pub fn queueproc_end_result_receiver() {
+
    info!(
+
        msg_id = ?Id::QueueProcStart,
+
        kind = %Kind::Startup,
+
        "end thread to process results of CI runs"
+
    );
+
}
+

pub fn queueproc_channel_disconnect() {
    info!(
        msg_id = ?Id::QueueProcDisconnected,
@@ -482,7 +500,7 @@ pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent, adapter: &Ada
        ?id,
        ?event,
        ?adapter,
-
        "picked event from queue"
+
        "running adapter on event picked from queue"
    );
}

@@ -513,22 +531,20 @@ pub fn queueproc_processed_event(result: &Result<bool, QueueError>) {
    );
}

-
pub fn queueproc_remove_event(id: &QueueId) {
+
pub fn queueproc_remove_event(event: &QueuedCiEvent) {
    info!(
        msg_id = ?Id::QueueProcRemoveEvent,
        kind = %Kind::Debug,
-
        ?id,
+
        ?event,
        "remove event from queue"
    );
}

-
pub fn queueproc_action_run(rid: &RepoId, oid: &Oid, msg: &str) {
+
pub fn queueproc_action_run(event: &CiEvent) {
    info!(
        msg_id = ?Id::QueueProcActionRun,
        kind = %Kind::Debug,
-
        ?rid,
-
        ?oid,
-
        msg = msg,
+
        ?event,
        "Action: run"
    );
}
@@ -663,7 +679,7 @@ pub fn event_end() {
}

pub fn broker_db(filename: &Path) {
-
    info!(
+
    debug!(
        msg_id = ?Id::BrokerDatabase,
        kind = %Kind::Startup,
        filename = %filename.display(),
@@ -737,11 +753,11 @@ pub fn adapter_stderr_line(line: &str) {
    );
}

-
pub fn adapter_result(exit: i32) {
+
pub fn adapter_result(exit: ExitStatus) {
    debug!(
        msg_id = ?Id::AdapterExitCode,
        kind = %Kind::Debug,
-
        exit_code = exit,
+
        ?exit,
        "adapter exit code"
    );
}
@@ -754,10 +770,11 @@ pub fn adapter_did_not_exit_voluntarily() {
    );
}

-
pub fn adapter_did_not_exit() {
+
pub fn adapter_did_not_exit(error: TimeoutError) {
    warn!(
        msg_id = ?Id::AdapterNoExit,
        kind = %Kind::Debug,
+
        ?error,
        "adapter did not exit: probably killed by signal"
    );
}
modified src/notif.rs
@@ -12,6 +12,7 @@ const EVENT_RECV_TIMEOUT: Duration = Duration::from_secs(1);
const UPDATE_IMTERVAL: Duration = Duration::from_secs(1);

/// Channel endpoint for sending notifications.
+
#[derive(Clone)]
pub struct NotificationSender {
    sender: Sender<()>,
}
added src/pull_queue.rs
@@ -0,0 +1,148 @@
+
use std::sync::{Arc, Condvar, Mutex};
+

+
/// A pull queue.
+
///
+
/// A producer produces an item when a consumer is ready for it, not
+
/// before. There can be any number of consumers.
+
///
+
/// This type has interior mutability. Each thread should own
+
/// their clone of this, and all clones refer to the same queue.
+
///
+
/// # Example
+
/// ```
+
/// # use std::thread::spawn;
+
/// # use radicle_ci_broker::pull_queue::PullQueue;
+
/// let mut queue = PullQueue::new();
+
/// let mut clone = queue.clone();
+
/// let producer = spawn(move || {
+
///     for i in 0..10 {
+
///         queue.push(i);
+
///     }
+
/// });
+
/// let consumer = spawn(move ||
+
///     while let Ok(Some(i)) = clone.pop() {
+
///         println!("{i}");
+
///     }
+
/// );
+
/// producer.join().unwrap();
+
/// consumer.join().unwrap();
+
/// ```
+
pub struct PullQueue<T> {
+
    q: Arc<(Mutex<Unlocked<T>>, Condvar)>,
+
}
+

+
impl<T> Clone for PullQueue<T> {
+
    fn clone(&self) -> Self {
+
        Self { q: self.q.clone() }
+
    }
+
}
+

+
impl<T: Clone> Default for PullQueue<T> {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

+
impl<T> Drop for PullQueue<T> {
+
    fn drop(&mut self) {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex
+
            .lock()
+
            .map_err(|_| PullQueueError::Mutex)
+
            .expect("FATAL: mutex was poisoned");
+
        locked.end();
+
        var.notify_all();
+
    }
+
}
+

+
impl<T: Clone> PullQueue<T> {
+
    /// Create a new [`PullQueue`].
+
    pub fn new() -> Self {
+
        Self {
+
            q: Arc::new((Mutex::new(Unlocked::new()), Condvar::new())),
+
        }
+
    }
+

+
    /// Push a new item to the queue. This blocks until there is a
+
    /// consumer who wants the item.
+
    pub fn push(&mut self, i: T) -> Result<(), PullQueueError> {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
+
        loop {
+
            if locked.try_set(i.clone()) {
+
                var.notify_all();
+
                break;
+
            } else {
+
                // Wait for the previous item to be consumed.
+
                locked = var.wait(locked).map_err(|_| PullQueueError::Mutex)?;
+
            }
+
        }
+
        Ok(())
+
    }
+

+
    /// Consume an item, if one is available. This will block until an
+
    /// item is available or not more items will ever be produced.
+
    /// Return `None` when no more items will ever be produced.
+
    pub fn pop(&mut self) -> Result<Option<T>, PullQueueError> {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
+

+
        loop {
+
            if locked.is_ended() {
+
                // Queue is empty and won't get anything more. We're
+
                // done.
+
                var.notify_all();
+
                return Ok(None);
+
            } else if let Some(i) = locked.get() {
+
                // There was something in the queue, return it.
+
                var.notify_all();
+
                return Ok(Some(i));
+
            } else {
+
                locked = var.wait(locked).map_err(|_| PullQueueError::Mutex)?;
+
            }
+
        }
+
    }
+
}
+

+
struct Unlocked<T> {
+
    ended: bool,
+
    item: Option<T>,
+
}
+

+
impl<T> Unlocked<T> {
+
    fn new() -> Self {
+
        Self {
+
            ended: false,
+
            item: None,
+
        }
+
    }
+

+
    fn end(&mut self) {
+
        self.ended = true;
+
    }
+

+
    fn is_ended(&self) -> bool {
+
        self.ended && self.item.is_none()
+
    }
+

+
    fn try_set(&mut self, item: T) -> bool {
+
        if self.item.is_none() {
+
            self.item = Some(item);
+
            true
+
        } else {
+
            false
+
        }
+
    }
+

+
    fn get(&mut self) -> Option<T> {
+
        self.item.take()
+
    }
+
}
+

+
/// All possible errors from [`PullQueue`].
+
#[derive(Debug, thiserror::Error)]
+
pub enum PullQueueError {
+
    /// Mutex locking failed.
+
    #[error("failed to lock mutex for pull queue")]
+
    Mutex,
+
}
modified src/queueproc.rs
@@ -3,22 +3,27 @@
#![allow(clippy::result_large_err)]

use std::{
-
    sync::mpsc::RecvTimeoutError,
+
    collections::HashSet,
+
    sync::{
+
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender},
+
        Arc, Mutex,
+
    },
    thread::{spawn, JoinHandle},
    time::{Duration, Instant},
};

-
use radicle::Profile;
+
use radicle::{identity::RepoId, Profile};

use crate::{
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
-
    db::{Db, DbError, QueueId, QueuedCiEvent},
+
    db::{Db, DbError, QueuedCiEvent},
    filter::{EventFilter, Trigger},
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
+
    pull_queue::{PullQueue, PullQueueError},
};

#[derive(Default)]
@@ -31,25 +36,56 @@ pub struct QueueProcessorBuilder {
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
    queue_len_interval: Option<Duration>,
+
    concurrent_adapters: Option<usize>,
}

const DEFAULT_QUEUE_LEN_DURATION: Duration = Duration::from_secs(10);

impl QueueProcessorBuilder {
    pub fn build(self) -> Result<QueueProcessor, QueueError> {
+
        let profile = Profile::load().map_err(QueueError::Profile)?;
+
        let broker = self.broker.ok_or(QueueError::Missing("broker"))?;
+
        let filters = self.filters.ok_or(QueueError::Missing("filters"))?;
+
        let triggers = self.triggers.ok_or(QueueError::Missing("triggers"))?;
+
        let adapters = self.adapters.ok_or(QueueError::Missing("adapters"))?;
+
        let run_tx = self.run_tx.ok_or(QueueError::Missing("run_tx"))?;
+
        let concurrent_adapters = self
+
            .concurrent_adapters
+
            .ok_or(QueueError::Missing("concurrent_adapters"))?;
+
        let (procssed_tx, processed_rx) = sync_channel(1);
+

+
        let picked_events = PullQueue::new();
+

+
        let mut descs = vec![];
+
        for _ in 0..concurrent_adapters {
+
            let broker = Broker::new(broker.db().filename(), broker.max_run_time())
+
                .map_err(QueueError::NewBroker)?;
+
            let event_procssor = EventProcessor::new(
+
                profile.clone(),
+
                filters.clone(),
+
                triggers.clone(),
+
                adapters.clone(),
+
                broker,
+
                run_tx.clone(),
+
                procssed_tx.clone(),
+
            );
+
            descs.push(ProcessorDescription::new(
+
                event_procssor,
+
                picked_events.clone(),
+
            ));
+
        }
+

        Ok(QueueProcessor {
            db: self.db.ok_or(QueueError::Missing("db"))?,
-
            profile: Profile::load().map_err(QueueError::Profile)?,
-
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
-
            filters: self.filters.ok_or(QueueError::Missing("filters"))?,
-
            triggers: self.triggers.ok_or(QueueError::Missing("triggers"))?,
-
            adapters: self.adapters.ok_or(QueueError::Missing("adapters"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
-
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
            queue_len_interval: self
                .queue_len_interval
                .unwrap_or(DEFAULT_QUEUE_LEN_DURATION),
            prev_queue_len: Instant::now(),
+
            processors: descs,
+
            processed_rx: Some(processed_rx),
+
            picked_events: Some(picked_events),
+
            current: CurrentlyPicked::default(),
        })
    }

@@ -73,6 +109,11 @@ impl QueueProcessorBuilder {
        self
    }

+
    pub fn concurrent_adapters(mut self, n: usize) -> Self {
+
        self.concurrent_adapters = Some(n);
+
        self
+
    }
+

    pub fn broker(mut self, broker: Broker) -> Self {
        self.broker = Some(broker);
        self
@@ -96,65 +137,79 @@ impl QueueProcessorBuilder {

pub struct QueueProcessor {
    db: Db,
-
    profile: Profile,
-
    filters: Vec<EventFilter>,
-
    triggers: Vec<Trigger>,
-
    broker: Broker,
-
    adapters: Adapters,
    events_rx: NotificationReceiver,
-
    run_tx: NotificationSender,
    queue_len_interval: Duration,
    prev_queue_len: Instant,
+
    picked_events: Option<PullQueue<Picked>>,
+
    processors: Vec<ProcessorDescription>,
+
    processed_rx: Option<Receiver<RepoId>>,
+
    current: CurrentlyPicked,
}

impl QueueProcessor {
    pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
        spawn(move || {
-
            logger::queueproc_start();
+
            logger::queueproc_start(self.processors.len());
+

+
            // Spawn a thread to process results from running adapters.
+
            #[allow(clippy::unwrap_used)]
+
            let rx = self.processed_rx.take().unwrap();
+
            let mut current = self.current.clone();
+
            let results = spawn(move || {
+
                logger::queueproc_start_result_receiver();
+
                while let Ok(repoid) = rx.recv() {
+
                    current.remove(repoid);
+
                    logger::queueproc_end_result_receiver();
+
                }
+
            });
+

+
            // Pick events from queue, send to worker threads that run
+
            // adapters. Results are processed by thread above.
            let result = self.process_until_shutdown();
+

            logger::queueproc_end(&result);
-
            result
+

+
            // Wait for worker threads to terminate. This closes all
+
            // sender ends for results channel.
+
            for proc in self.processors {
+
                proc.join().ok();
+
            }
+

+
            // Wait for results processing thread to terminate.
+
            results.join().ok();
+

+
            Ok(())
        })
    }

+
    #[allow(clippy::unwrap_used)]
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
        let mut done = false;
+
        let mut picked_events = self.picked_events.take().unwrap();
        while !done {
-
            let mut first_error = None;
-
            while let Some(qe) = self.pick_event()? {
-
                // We always drop the picked event at once so that if
-
                // anything goes wrong, we don't try to process the
-
                // same event again. It's better for the failure to be
-
                // logged and let the humans deal with whatever
-
                // complicated failure situation is happening.
-
                self.drop_event(qe.id())?;
-

-
                match self.pick_adapters(qe.event()) {
-
                    Err(err) => {
-
                        if first_error.is_none() {
-
                            first_error = Some(err);
-
                        }
-
                    }
-
                    Ok(None) => (), // We already removed event from queue.
-
                    Ok(Some(adapters)) => {
-
                        for adapter in adapters {
-
                            match self.run_adapter(&qe, &adapter) {
-
                                Err(err) => {
-
                                    if first_error.is_none() {
-
                                        first_error = Some(err)
-
                                    }
-
                                }
-
                                Ok(finished) => done = finished,
-
                            }
-
                        }
-
                    }
+
            // Process all events currently in the event queue, if a
+
            // filter allows it.
+
            loop {
+
                if let Some(qe) = self.pick_event()? {
+
                    let picked = Picked::new(qe.clone());
+
                    self.current.insert(picked.qe.event().repository());
+
                    picked_events.push(picked).map_err(QueueError::PullQueue)?;
+

+
                    // We always drop the picked event as soon as it has
+
                    // been pushed to a processing thread, so that if
+
                    // anything goes wrong, we're less likely try to
+
                    // process the same event again. It's better for the
+
                    // failure to be logged and let the humans deal with
+
                    // whatever complicated failure situation is
+
                    // happening.
+
                    self.drop_event(&qe)?;
+
                } else if self.current.is_empty() {
+
                    break;
                }
            }

-
            if let Some(err) = first_error {
-
                return Err(err);
-
            }
-

+
            // Wait for a notification of new events in the queue.
+
            // This prevents the loop from being a busy loop.
            match self.events_rx.wait_for_notification() {
                Ok(_) => {}
                Err(RecvTimeoutError::Timeout) => {}
@@ -168,22 +223,6 @@ impl QueueProcessor {
        Ok(())
    }

-
    fn run_adapter(&mut self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
-
        let mut done = false;
-

-
        self.run_tx.notify()?;
-
        logger::queueproc_picked_event(qe.id(), qe, adapter);
-
        let res = self.process_event(qe.event(), adapter);
-
        logger::queueproc_processed_event(&res);
-
        match res {
-
            Ok(shut_down) => done = shut_down,
-
            Err(QueueError::BuildTrigger(_, _)) => done = false,
-
            Err(err) => Err(err)?,
-
        }
-
        self.run_tx.notify()?;
-
        Ok(done)
-
    }
-

    fn pick_event(&mut self) -> Result<Option<QueuedCiEvent>, QueueError> {
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;

@@ -201,6 +240,21 @@ impl QueueProcessor {
        }
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());

+
        // If we can access the set of repositories for which CI is
+
        // currently running, remove those repositories from the list.
+
        let ids = self.current.list();
+
        queue = queue
+
            .iter()
+
            .filter(|qe| {
+
                if let Some(repoid) = qe.event().repository() {
+
                    !ids.contains(repoid)
+
                } else {
+
                    true
+
                }
+
            })
+
            .cloned()
+
            .collect();
+

        if let Some(qe) = queue.first() {
            Ok(Some(qe.clone()))
        } else {
@@ -208,7 +262,120 @@ impl QueueProcessor {
        }
    }

-
    fn pick_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
+
    fn drop_event(&mut self, qe: &QueuedCiEvent) -> Result<(), QueueError> {
+
        logger::queueproc_remove_event(qe);
+
        self.db
+
            .remove_queued_ci_event(qe.id())
+
            .map_err(QueueError::db)?;
+
        Ok(())
+
    }
+
}
+

+
#[derive(Default, Clone)]
+
struct CurrentlyPicked {
+
    set: Arc<Mutex<HashSet<RepoId>>>,
+
}
+

+
impl CurrentlyPicked {
+
    fn insert(&mut self, repoid: Option<&RepoId>) {
+
        if let Some(repoid) = repoid {
+
            if let Ok(mut set) = self.set.lock() {
+
                set.insert(*repoid);
+
            }
+
        }
+
    }
+

+
    fn remove(&mut self, repoid: RepoId) {
+
        if let Ok(mut set) = self.set.lock() {
+
            set.remove(&repoid);
+
        }
+
    }
+

+
    fn list(&self) -> Vec<RepoId> {
+
        if let Ok(set) = self.set.lock() {
+
            set.iter().copied().collect()
+
        } else {
+
            vec![]
+
        }
+
    }
+

+
    fn is_empty(&self) -> bool {
+
        if let Ok(set) = self.set.lock() {
+
            set.is_empty()
+
        } else {
+
            false
+
        }
+
    }
+
}
+

+
struct ProcessorDescription {
+
    processing_thread: JoinHandle<Result<bool, QueueError>>,
+
}
+

+
impl ProcessorDescription {
+
    fn new(processor: EventProcessor, mut picked_events: PullQueue<Picked>) -> Self {
+
        Self {
+
            processing_thread: spawn(move || {
+
                while let Ok(Some(picked)) = picked_events.pop() {
+
                    processor.process_picked_event(picked);
+
                }
+
                Ok(false)
+
            }),
+
        }
+
    }
+

+
    fn join(self) -> Result<bool, QueueError> {
+
        self.processing_thread
+
            .join()
+
            .map_err(|_| QueueError::JoinAdapterThread)?
+
    }
+
}
+

+
struct EventProcessor {
+
    profile: Profile,
+
    filters: Vec<EventFilter>,
+
    triggers: Vec<Trigger>,
+
    adapters: Adapters,
+
    broker: Broker,
+
    run_tx: NotificationSender,
+
    processed_tx: SyncSender<RepoId>,
+
}
+

+
impl EventProcessor {
+
    #[allow(clippy::too_many_arguments)]
+
    fn new(
+
        profile: Profile,
+
        filters: Vec<EventFilter>,
+
        triggers: Vec<Trigger>,
+
        adapters: Adapters,
+
        broker: Broker,
+
        run_tx: NotificationSender,
+
        processed_tx: SyncSender<RepoId>,
+
    ) -> Self {
+
        Self {
+
            profile,
+
            filters,
+
            triggers,
+
            adapters,
+
            broker,
+
            run_tx,
+
            processed_tx,
+
        }
+
    }
+

+
    fn process_picked_event(&self, picked: Picked) {
+
        if let Some(adapters) = self.matching_adapters(picked.qe.event()).ok().flatten() {
+
            for adapter in adapters {
+
                self.run_adapter(&picked.qe, &adapter).ok();
+
            }
+
        }
+

+
        if let Some(repoid) = picked.qe.event().repository() {
+
            self.processed_tx.send(*repoid).ok();
+
        }
+
    }
+

+
    fn matching_adapters(&self, e: &CiEvent) -> Result<Option<Vec<Adapter>>, QueueError> {
        let mut adapters = vec![];

        if self.filters.iter().any(|filter| filter.allows(e)) {
@@ -237,162 +404,54 @@ impl QueueProcessor {
        }
    }

-
    fn process_event(&mut self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
-
        logger::queueproc_processing_event(event);
-
        match event {
-
            CiEvent::V1(CiEventV1::Shutdown) => {
-
                logger::queueproc_action_shutdown();
-
                Ok(true)
-
            }
-
            CiEvent::V1(CiEventV1::BranchCreated {
-
                from_node: _,
-
                repo,
-
                branch: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "branch created");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::BranchUpdated {
-
                from_node: _,
-
                repo,
-
                branch: _,
-
                tip,
-
                old_tip: _,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "branch updated");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::BranchDeleted {
-
                from_node: _,
-
                repo,
-
                branch: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "branch deleted");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::TagCreated {
-
                from_node: _,
-
                repo,
-
                tag: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "tag created");
-
                let result = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e));
-
                logger::queueproc_trigger(&result);
-
                let trigger = result?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::TagUpdated {
-
                from_node: _,
-
                repo,
-
                tag: _,
-
                tip,
-
                old_tip: _,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "tag updated");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::TagDeleted {
-
                from_node: _,
-
                repo,
-
                tag: _,
-
                tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, tip, "tag deleted");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::PatchCreated {
-
                from_node: _,
-
                repo,
-
                patch: _,
-
                new_tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, new_tip, "patch created");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e))?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
-
            CiEvent::V1(CiEventV1::PatchUpdated {
-
                from_node: _,
-
                repo,
-
                patch: _,
-
                new_tip,
-
            }) => {
-
                logger::queueproc_action_run(repo, new_tip, "patch updated");
-
                let trigger = RequestBuilder::default()
-
                    .profile(&self.profile)
-
                    .ci_event(event)
-
                    .build_trigger_from_ci_event()
-
                    .map_err(|e| QueueError::build_trigger(event, e));
-
                logger::queueproc_trigger(&trigger);
-
                let trigger = trigger?;
-
                self.broker
-
                    .execute_ci(adapter, &trigger, &self.run_tx)
-
                    .map_err(QueueError::execute_ci)?;
-
                Ok(false)
-
            }
+
    fn run_adapter(&self, qe: &QueuedCiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
        let mut done = false;
+

+
        self.run_tx.notify()?;
+
        logger::queueproc_picked_event(qe.id(), qe, adapter);
+
        let res = self.process_event(qe.event(), adapter);
+
        logger::queueproc_processed_event(&res);
+
        match res {
+
            Ok(shut_down) => done = shut_down,
+
            Err(QueueError::BuildTrigger(_, _)) => done = false,
+
            Err(err) => Err(err)?,
        }
+
        self.run_tx.notify()?;
+
        Ok(done)
    }

-
    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
-
        logger::queueproc_remove_event(id);
-
        self.db.remove_queued_ci_event(id).map_err(QueueError::db)?;
-
        Ok(())
+
    fn process_event(&self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
+
        if matches!(event, CiEvent::V1(CiEventV1::Shutdown)) {
+
            logger::queueproc_action_shutdown();
+
            Ok(true)
+
        } else {
+
            logger::queueproc_action_run(event);
+

+
            let trigger = RequestBuilder::default()
+
                .profile(&self.profile)
+
                .ci_event(event)
+
                .build_trigger_from_ci_event()
+
                .map_err(|e| QueueError::build_trigger(event, e));
+
            logger::queueproc_trigger(&trigger);
+
            let trigger = trigger?;
+

+
            self.broker
+
                .execute_ci(adapter, &trigger, &self.run_tx)
+
                .map_err(QueueError::execute_ci)?;
+

+
            Ok(false)
+
        }
+
    }
+
}
+

+
#[derive(Debug, Clone)]
+
struct Picked {
+
    qe: QueuedCiEvent,
+
}
+

+
impl Picked {
+
    fn new(qe: QueuedCiEvent) -> Self {
+
        Self { qe }
    }
}

@@ -421,6 +480,30 @@ pub enum QueueError {

    #[error("no default adapter specified in configuration")]
    NoDefaultAdapter,
+

+
    #[error("failed to send to channel for picked events")]
+
    SendPicked,
+

+
    #[error("failed to receive from channel for picked events")]
+
    RecvPicked,
+

+
    #[error("failed to send to channel for results of processed events")]
+
    SendProcessResult,
+

+
    #[error("failed to receive from channel for results of processed events")]
+
    RecvProcessResult,
+

+
    #[error("failed to wait for thread to run adapters to finish")]
+
    JoinAdapterThread,
+

+
    #[error("failed to wait for thread to process results from adapters to finish")]
+
    JoinResultThread,
+

+
    #[error("failed to create a new broker instance")]
+
    NewBroker(#[source] BrokerError),
+

+
    #[error("failure when using a pull queue")]
+
    PullQueue(#[source] PullQueueError),
}

impl QueueError {
modified src/timeoutcmd.rs
@@ -20,25 +20,25 @@
//! # Example
//! ```
//! # use std::{process::Command, time::Duration};
-
//! # use radicle_ci_broker::timeoutcmd::{RunningProcess, TimeoutCommand};
+
//! # use radicle_ci_broker::timeoutcmd::{ChildProcess, TimeoutCommand, RealtimeLines};
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut cmd = Command::new("bash");
//! cmd.arg("-c").arg("exec cat"); // Note exec!
//!
//! let mut to = TimeoutCommand::new(Duration::from_secs(10));
//! to.feed_stdin(b"hello, world\n");
+
//! let mut stdout: RealtimeLines = to.stdout();
//! let running = to.spawn(cmd)?;
//!
//! // Capture stdout output. We ignore stderr output.
-
//! let stdout = running.stdout();
//! let mut captured = vec![];
//! while let Some(line) = stdout.line() {
//!     captured.push(line);
//! }
//!
//! // Wait for child process to terminate.
-
//! let tor = running.wait()?;
-
//! assert_eq!(tor.status().code(), Some(0));
+
//! let finished = running.wait()?;
+
//! assert_eq!(finished.exit_code().code(), Some(0));
//! assert_eq!(captured, ["hello, world\n"]);
//! # Ok(())
//! # }
@@ -47,20 +47,22 @@
#![allow(unused_imports)]

use std::{
-
    io::{Read, Write},
+
    io::{Read, Seek, Write},
    process::{Child, Command, ExitStatus, Stdio},
    sync::{
-
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender, TryRecvError},
-
        Arc, Mutex,
+
        mpsc::{
+
            channel, sync_channel, Receiver, RecvTimeoutError, Sender, SyncSender, TryRecvError,
+
        },
+
        Arc, Condvar, Mutex,
    },
    thread::{sleep, spawn, JoinHandle},
    time::{Duration, Instant},
};

+
use tempfile::tempfile;
+

use crate::logger;

-
const WAIT_FOR_IDLE_CHILD: Duration = Duration::from_millis(1000);
-
const WAIT_FOR_OUTPUT: Duration = Duration::from_millis(100);
const KIB: usize = 1024;
const MIB: usize = 1024 * KIB;
const MAX_OUTPUT_BYTES: usize = 10 * MIB;
@@ -70,11 +72,11 @@ const MAX_OUTPUT_BYTES: usize = 10 * MIB;
pub struct TimeoutCommand {
    max_duration: Duration,
    stdin_data: Vec<u8>,
+
    stdout: RealtimeLines,
}

// This works by using multiple threads.
//
-
// * a thread to send data to child's stdin
// * a thread to read child's stdout, as bytes
// * a thread to read child's stderr, as bytes
// * a thread to monitor how long the child runs
@@ -98,654 +100,594 @@ impl TimeoutCommand {
        Self {
            max_duration,
            stdin_data: vec![],
+
            stdout: RealtimeLines::default(),
        }
    }

    /// Feed the sub-process the specified binary data via its
    /// standard input. If this method is not used, the sub-process
-
    /// stdin will be fed no data. The sub-process stdin always comes
-
    /// from a pipe, however, so if this method is not used, the
-
    /// effect is that stdin comes from `/dev/null` or another empty
-
    /// file.
+
    /// stdin will be fed no data. If this method is not used, the
+
    /// effect is that stdin comes from an empty file.
    pub fn feed_stdin(&mut self, data: &[u8]) {
        self.stdin_data = data.to_vec();
    }

+
    pub fn stdout(&self) -> RealtimeLines {
+
        self.stdout.clone()
+
    }
+

    /// Start a new sub-process to execute the specified command.
    ///
    /// The caller should set up the [`std::process::Command`] value.
    /// This method will redirect stdin, stdout, and stderr to use
    /// pipes.
-
    pub fn spawn(&self, mut command: Command) -> Result<RunningProcess, TimeoutError> {
-
        // Set up child stdin/stdout/stderr redirection.
-
        let mut child = command
-
            .stdin(Stdio::piped())
-
            .stdout(Stdio::piped())
-
            .stderr(Stdio::piped())
-
            .spawn()
-
            .map_err(|err| TimeoutError::Spawn(command, err))?;
-

-
        // Set up thread to write data to child stdin.
-
        let stdin = child.stdin.take().ok_or(TimeoutError::TakeStdin)?;
-
        let stdin_data = self.stdin_data.clone();
-
        let stdin_writer = spawn(move || writer(stdin, stdin_data));
-

-
        // Set up thread to capture child stdout.
-
        let stdout = child.stdout.take().ok_or(TimeoutError::TakeStdout)?;
-
        let (stdout_termination_tx, stdout_termination_rx) = sync_channel(1);
-
        let (stdout_lines_tx, stdout_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
-
        let stdout_reader =
-
            spawn(move || NonBlockingReader::new("stdout", stdout, stdout_lines_tx).read_to_end());
-
        let stdout_lines = LineReceiver::new("stdout", stdout_lines_rx, stdout_termination_rx);
-

-
        // Set up thread to capture child stderr.
-
        let stderr = child.stderr.take().ok_or(TimeoutError::TakeStderr)?;
-
        let (stderr_termination_tx, stderr_termination_rx) = sync_channel(1);
-
        let (stderr_lines_tx, stderr_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
-
        let stderr_reader =
-
            spawn(move || NonBlockingReader::new("stderr", stderr, stderr_lines_tx).read_to_end());
-
        let stderr_lines = LineReceiver::new("stderr", stderr_lines_rx, stderr_termination_rx);
-

-
        // Set up thread to monitor child termination or overlong run time.
-
        let (tx, timed_out_rx) = sync_channel(1);
-
        let (kill_tx, kill_rx) = sync_channel(1);
-
        let nanny = Nanny::new(
-
            self.max_duration,
-
            child,
-
            tx,
-
            kill_rx,
-
            vec![stdout_termination_tx, stderr_termination_tx],
-
        );
-
        let monitor = spawn(move || nanny.monitor());
-

-
        Ok(RunningProcess {
-
            child_monitor: Some(monitor),
-
            timed_out_rx,
-
            stdin_writer,
-
            stdout_lines,
-
            stdout_reader,
-
            stderr_lines,
-
            stderr_reader,
-
            kill_tx,
-
        })
+
    pub fn spawn(&self, command: Command) -> Result<ChildProcess, TimeoutError> {
+
        ChildProcess::new(command, &self.stdin_data, self.stdout(), self.max_duration)
    }
}

-
/// Manage a running child process and capture its output.
+
/// Represent a finished process.
///
-
/// This is created by [`TimeoutCommand::spawn`].
-
pub struct RunningProcess {
-
    child_monitor: Option<JoinHandle<Result<(), TimeoutError>>>,
-
    timed_out_rx: NannyReceiver,
-
    stdin_writer: JoinHandle<Result<(), std::io::Error>>,
-
    stdout_lines: LineReceiver,
-
    stdout_reader: JoinHandle<Result<(), TimeoutError>>,
-
    stderr_lines: LineReceiver,
-
    stderr_reader: JoinHandle<Result<(), TimeoutError>>,
-
    kill_tx: KillSender,
+
/// This allows retrieval of the process's standard error output and
+
/// the exit code. The standard output is captured via the
+
/// [`RealtimeLines`] buffer given to [`ChildProcess`].
+
#[derive(Debug)]
+
pub struct FinishedProcess {
+
    exit: ExitStatus,
+
    stderr: Vec<u8>,
}

-
impl RunningProcess {
-
    /// Return a [`LineReceiver`] that returns lines from the
-
    /// sub-process standard output.
-
    pub fn stdout(&self) -> &LineReceiver {
-
        &self.stdout_lines
-
    }
-

-
    /// Return a [`LineReceiver`] that returns lines from the
-
    /// sub-process standard error output.
-
    pub fn stderr(&self) -> &LineReceiver {
-
        &self.stderr_lines
-
    }
-

-
    /// Terminate sub-process with extreme prejudice.
-
    pub fn kill(&self) -> Result<(), TimeoutError> {
-
        let x = self.kill_tx.send(());
-
        logger::timeoutcmd_request_termination(x);
-
        Ok(())
+
impl FinishedProcess {
+
    /// Exit code of the finished process.
+
    pub fn exit_code(&self) -> ExitStatus {
+
        self.exit
    }

-
    /// Wait for child process to terminate. It may terminate because
-
    /// it ends normally, or because it has run for longer than the
-
    /// limit set with [`TimeoutCommand::new`]. The return value of
-
    /// this method will specify why, see
-
    /// [`TimeoutResult::timed_out`].
-
    ///
-
    /// Note that if the sub-process produces a lot of output, you
-
    /// must read it to avoid the process getting stuck; see
-
    /// [`RunningProcess::stdout`] and [`RunningProcess::stderr`]. If
-
    /// you don't read the output, the sub-process will fill its
-
    /// output pipe buffer, or the inter-thread communication channel
-
    /// buffer, and the sub-process will block on output, and not
-
    /// progress. This may be unwanted. The blocking won't affect the
-
    /// sub-process getting terminated due to running for too long.
-
    pub fn wait(mut self) -> Result<TimeoutResult, TimeoutError> {
-
        logger::timeoutcmd_wait_word_from_nanny();
-
        let (mut child, timed_out) = self.timed_out_rx.recv().map_err(TimeoutError::ChildRecv)?;
-
        logger::timeoutcmd_wait_got_word_from_nanny();
-
        if let Some(monitor) = self.child_monitor.take() {
-
            logger::timeoutcmd_wait_on_nanny_to_end();
-
            monitor
-
                .join()
-
                .map_err(|_| TimeoutError::JoinChildMonitor)??;
-
        }
-

-
        logger::timeoutcmd_wait_on_stdin_writer_to_end();
-
        self.stdin_writer.join().ok();
-

-
        logger::timeoutcmd_wait_on_stdout_reader_to_end();
-
        self.stdout_reader.join().ok();
-

-
        logger::timeoutcmd_wait_on_stderr_reader_to_end();
-
        self.stderr_reader.join().ok();
-

-
        logger::timeoutcmd_wait_on_child_to_end();
-
        let status = child.wait().map_err(TimeoutError::Wait)?;
-
        logger::timeoutcmd_wait_status(status);
-
        logger::timeoutcmd_ok();
-
        Ok(TimeoutResult { timed_out, status })
+
    /// The captured output from the process's standard error.
+
    pub fn stderr(&self) -> &[u8] {
+
        &self.stderr
    }
}

-
/// Did the sub-process started with [`TimeoutCommand::spawn`]
-
/// terminate normally, or did it get terminated unilaterally for
-
/// running too long? What was its exit code?
-
#[derive(Debug)]
-
pub struct TimeoutResult {
-
    timed_out: bool,
-
    status: ExitStatus,
+
/// Represent a running child process.
+
///
+
/// The child gets some data fed to it via its standard input. The
+
/// child is terminated if it runs for too long.
+
pub struct ChildProcess {
+
    deadline: Option<Instant>,
+
    child: Child,
+
    stdout: RealtimeLines,
+
    stdout_rx: Receiver<()>,
+
    stdout_thread: JoinHandle<Result<(), TimeoutError>>,
+
    stderr_thread: JoinHandle<Result<Vec<u8>, TimeoutError>>,
+
    arc: Arc<Mutex<RealtimeLines>>,
}

-
impl TimeoutResult {
-
    /// Exit code of the of the sub-process. There is always an exit
-
    /// code: [`RunningProcess::wait`] does not return until the
-
    /// sub-process has exited.
-
    pub fn status(&self) -> ExitStatus {
-
        self.status
+
impl ChildProcess {
+
    /// Create a new [`ChildProcess`].
+
    pub fn new(
+
        mut cmd: Command,
+
        stdin: &[u8],
+
        stdout_lines: RealtimeLines,
+
        timeout: Duration,
+
    ) -> Result<Self, TimeoutError> {
+
        // Write data to be fed to child's stdin to a temporary
+
        // file that automatically gets deleted when we exit this
+
        // function. Using a file instead of a pipe means our
+
        // logic can be simpler as we don't need to worry about
+
        // the pipe buffer filling up.
+
        let mut file = tempfile::tempfile()?;
+
        file.write_all(stdin)?;
+
        file.rewind()?;
+

+
        // Redirect child stdin/stdout/stderr and start the child
+
        // process.
+
        let mut child = cmd
+
            .stdin(file)
+
            .stdout(Stdio::piped())
+
            .stderr(Stdio::piped())
+
            .spawn()
+
            .map_err(|err| TimeoutError::Spawn(cmd, err))?;
+

+
        // Create the mutex around the line buffer we're given and
+
        // a condition variable to signal end of input. The mutex
+
        // is used to wait on the condition variable, not so much
+
        // to protect the line buffer, but mutex is a convenient
+
        // way to transfer the buffer to the thread.
+
        let (stdout_tx, stdout_rx) = channel::<()>();
+
        let arc = Arc::new(Mutex::new(stdout_lines.clone()));
+

+
        // Start thread to read from stdout, using the line buffer
+
        // we just created and hid in `arc`.
+
        let stdout = child
+
            .stdout
+
            .take()
+
            .ok_or(TimeoutError::TakeHandle("stdout"))?;
+
        let stdout_thread = {
+
            let arc = arc.clone();
+
            spawn(move || Self::line_reader(arc, stdout_tx, stdout))
+
        };
+

+
        // Launch a thread that reads everything the child writes
+
        // to its stderr.
+
        let stderr = child
+
            .stderr
+
            .take()
+
            .ok_or(TimeoutError::TakeHandle("stderr"))?;
+
        let stderr_thread = Self::capture(stderr)?;
+

+
        Ok(Self {
+
            deadline: Some(
+
                Instant::now()
+
                    .checked_add(timeout)
+
                    .ok_or(TimeoutError::Deadline)?,
+
            ),
+
            child,
+
            stdout: stdout_lines,
+
            stdout_rx,
+
            stdout_thread,
+
            stderr_thread,
+
            arc,
+
        })
    }

-
    /// Did the sub-process get terminated for running too long?
-
    pub fn timed_out(&self) -> bool {
-
        self.timed_out
+
    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
+
        self.child.kill().map_err(|_| TimeoutError::Kill)?;
+
        self.stdout.finish();
+
        self.deadline = None;
+
        self.wait()
    }
-
}

-
type NannySender = SyncSender<(Child, bool)>;
-
type NannyReceiver = Receiver<(Child, bool)>;
+
    pub fn wait(mut self) -> Result<FinishedProcess, TimeoutError> {
+
        let result = self.child.try_wait();
+
        match result {
+
            Ok(Some(status)) => {
+
                let stderr = self
+
                    .stderr_thread
+
                    .join()
+
                    .map_err(|_| TimeoutError::Thread)??;
+

+
                return Ok(FinishedProcess {
+
                    exit: status,
+
                    stderr,
+
                });
+
            }
+
            Ok(None) => (),
+
            Err(err) => return Err(TimeoutError::Wait(err)),
+
        }

-
type KillSender = SyncSender<()>;
-
type KillReceiver = Receiver<()>;
+
        self.stdout.finish();
+
        if let Some(deadline) = self.deadline {
+
            let max_wait = deadline - Instant::now();

-
struct Nanny {
-
    max_duration: Duration,
-
    child: Option<Child>,
-
    tx: NannySender,
-
    term_tx: Vec<TerminationSender>,
-
    kill_rx: KillReceiver,
-
}
+
            // Wait for to finish, up to the given timeout.
+
            let _guardlock = &*self.arc;
+
            match self.stdout_rx.recv_timeout(max_wait) {
+
                Ok(_) | Err(RecvTimeoutError::Disconnected) => (),
+
                Err(RecvTimeoutError::Timeout) => {
+
                    return Err(TimeoutError::TimedOut);
+
                }
+
            }
+
        };

-
impl Nanny {
-
    fn new(
-
        max_duration: Duration,
-
        child: Child,
-
        tx: NannySender,
-
        kill_rx: KillReceiver,
-
        term_tx: Vec<TerminationSender>,
-
    ) -> Self {
-
        Self {
-
            max_duration,
-
            child: Some(child),
-
            tx,
-
            term_tx,
-
            kill_rx,
+
        self.stdout_thread
+
            .join()
+
            .map_err(|_| TimeoutError::Thread)??;
+

+
        let stderr = self
+
            .stderr_thread
+
            .join()
+
            .map_err(|_| TimeoutError::Thread)?;
+
        let stderr = stderr?;
+

+
        match self.child.wait() {
+
            Ok(exit) => Ok(FinishedProcess { exit, stderr }),
+
            Err(err) => Err(TimeoutError::Wait(err)),
        }
    }

-
    fn monitor(mut self) -> Result<(), TimeoutError> {
-
        let mut child = if let Some(child) = self.child.take() {
-
            child
-
        } else {
-
            panic!("programming error: Nanny does not have a child to monitor");
-
        };
-
        let started = Instant::now();
-
        let mut timed_out = false;
-
        logger::timeoutcmd_nanny_start();
+
    // Read data from a stream, push into a [`RealtimeLines`]
+
    // and notify when done using a condition variable in
+
    // `arc`. This function will be called in a dedicated
+
    // thread, so that we don't need to use non-blocking I/O
+
    // or async.
+
    fn line_reader(
+
        arc: Arc<Mutex<RealtimeLines>>,
+
        tx: Sender<()>,
+
        mut stream: impl Read,
+
    ) -> Result<(), TimeoutError> {
+
        let mutex = &*arc;
        loop {
-
            let elapsed = started.elapsed();
-

-
            if self.kill_rx.try_recv().is_ok() {
-
                let x = child.kill();
-
                logger::timeoutcmd_nanny_terminated_as_requested(x);
+
            let mut bytes = vec![0; 1024];
+
            let n = stream.read(&mut bytes)?;
+
            if n == 0 {
                break;
-
            } else if elapsed > self.max_duration {
-
                let x = child.kill();
-
                logger::timeoutcmd_nanny_too_long(child.id(), elapsed, self.max_duration, x);
-
                timed_out = true;
-
                break;
-
            }
-

-
            if matches!(child.try_wait(), Ok(None)) {
-
                sleep(WAIT_FOR_IDLE_CHILD);
            } else {
-
                logger::timeoutcmd_nanny_child_died();
-
                break;
+
                let mut buf = mutex.lock().map_err(|_| TimeoutError::Lock)?;
+
                buf.push(bytes[..n].to_vec());
            }
        }
+
        let mut buf = mutex.lock().map_err(|_| TimeoutError::Lock)?;
+
        buf.finish();

-
        logger::timeoutcmd_nanny_time_to_end();
-
        self.tx
-
            .send((child, timed_out))
-
            .map_err(TimeoutError::ChildSend)?;
-
        for tx in self.term_tx.iter() {
-
            tx.send(()).map_err(TimeoutError::ChildSendToLine)?;
-
        }
-

-
        logger::timeoutcmd_nanny_ends();
+
        // We've reached the end. Notify parent the nanny
+
        // thread.
+
        tx.send(()).ok();
        Ok(())
    }
-
}

-
fn writer(mut stream: impl Write, data: Vec<u8>) -> Result<(), std::io::Error> {
-
    let mut written = 0;
-
    while written < data.len() {
-
        // We write one byte at a time. This lets us avoid doing
-
        // non-blocking I/O, but is less efficient. by only writing
-
        // one byte at a time, we only block when we can't write to
-
        // the stream. When the stream is a pipe, this happens when
-
        // the pipe buffer fills up. This function should be in its
-
        // own thread, and so it doesn't matter if it blocks, but
-
        // measurements are more useful when they're taken after each
-
        // byte.
-
        //
-
        // When, inevitably, byte-at-a-time becomes too inefficient,
-
        // this will need to be rewritten to use non-blocking I/O.
-
        //
-
        // Or async.
-
        let n = stream.write(&data[written..written + 1])?;
-
        written += n;
-
        stream.flush()?;
+
    // Start thread that captures stderr output into a byte
+
    // vector buffer. We don't need a condition variable for
+
    // this, as we'll just read to the end to capture
+
    // everything.
+
    fn capture(
+
        mut stream: impl Read + Send + 'static,
+
    ) -> Result<JoinHandle<Result<Vec<u8>, TimeoutError>>, TimeoutError> {
+
        let thread = spawn(move || {
+
            let mut buf = vec![];
+
            loop {
+
                let mut chunk = vec![0; MIB];
+
                let n = stream.read(&mut chunk).map_err(TimeoutError::Io)?;
+
                if n == 0 {
+
                    return Ok(buf);
+
                } else {
+
                    buf.append(&mut chunk[..n].to_vec());
+
                    if buf.len() > MAX_OUTPUT_BYTES {
+
                        return Err(TimeoutError::TooMuch);
+
                    }
+
                }
+
            }
+
        });
+
        Ok(thread)
    }
-

-
    Ok(())
}

-
type TerminationSender = SyncSender<()>;
-
type TerminationReceiver = Receiver<()>;
-

-
/// Receive one line of output at time.
+
/// A buffer of lines that can be filled in the background by a
+
/// producer thread and queried while that happens by a consumer
+
/// thread.
+
///
+
/// This is used to capture the child process output to its stdout in
+
/// such a way that it can be processed in real time, without having
+
/// to wait for the child process to terminate.
///
-
/// See the [module description](index.html) for an example.
-
pub struct LineReceiver {
-
    name: &'static str,
-
    child_terminated: TerminationReceiver,
-
    bytes: OutputReader,
+
/// The buffer is filled by pushing byte vectors, and consumed line by
+
/// line until there are no more lines: the buffer is empty and the
+
/// child process has terminated.
+
///
+
/// The buffer can be cloned and each clone is logically the same
+
/// buffer. This allows separate producer and consumer threads to own
+
/// the buffer, but internally it's the same one.
+
#[derive(Default, Clone)]
+
pub struct RealtimeLines {
+
    // We have an unlocked buffer protected by a mutex, and a
+
    // condition variable to signal consumers of new data or the
+
    // producer having finished.
+
    data: Arc<(Mutex<UnlockedBuf>, Condvar)>,
}

-
impl LineReceiver {
-
    fn new(name: &'static str, bytes: OutputReader, child_terminated: TerminationReceiver) -> Self {
-
        Self {
-
            name,
-
            child_terminated,
-
            bytes,
-
        }
+
impl RealtimeLines {
+
    /// Push some binary data to the buffer.
+
    ///
+
    /// The incoming data is a byte vector, as we may not receive
+
    /// complete lines from the child process. We also get a vector,
+
    /// not a slice, to make it easier to append the date to the
+
    /// buffer.
+
    pub fn push(&mut self, more_data: Vec<u8>) {
+
        // Get mutex and condition variable.
+
        let (mutex, var) = &*self.data;
+

+
        // Lock mutex to get unlocked buffer (really the mutex guard,
+
        // but that lets us access the unlocked buffer protected by
+
        // the mutex).
+
        let mut buf = mutex.lock().expect("lock for push");
+

+
        // Push the data to the unhlocked buffer.
+
        buf.push(more_data);
+

+
        // Notify consumer of new data.
+
        var.notify_all();
    }

-
    /// Return the next line, if any, or `None` if there will be no
-
    /// more lines. Note that this blocks until there is a line, or
-
    /// the child process terminates.
-
    pub fn line(&self) -> Option<String> {
-
        let mut line = vec![];
+
    /// Signal that the producer has finished and there will be no more
+
    /// incoming data.
+
    pub fn finish(&mut self) {
+
        let (mutex, var) = &*self.data;
+
        let mut buf = mutex.lock().expect("lock for push");
+
        buf.finish();
+
        var.notify_all();
+
    }
+

+
    /// Get next line from the buffer, if any. This will wait for a
+
    /// new complete line to arrive, if there isn't any in the buffer
+
    /// yet, or for the producer to finish. Returns `None` for end of
+
    /// file.
+
    pub fn line(&mut self) -> Option<String> {
+
        let (mutex, var) = &*self.data;
+

+
        // Lock the mutex to get access to unlocked buffer.
+
        let mut buf = mutex.lock().expect("lock to wait for line");

        loop {
-
            // Get a byte if there is one.
-
            logger::timeoutcmd_line_reader_try_byte(self.name);
-
            let y = self.bytes.try_recv();
-
            logger::timeoutcmd_line_reader_tried_byte(self.name, y);
-
            match y {
-
                Ok(byte) => {
-
                    line.push(byte);
-
                    if byte == b'\n' {
-
                        let line = String::from_utf8_lossy(&line).to_string();
-
                        logger::timeoutcmd_line_reader_got_line(self.name, &line);
-
                        return Some(line);
-
                    }
-
                }
-
                Err(TryRecvError::Empty) => {
-
                    sleep(WAIT_FOR_OUTPUT);
-
                }
-
                Err(TryRecvError::Disconnected) => {
-
                    if line.is_empty() {
-
                        // Sender has closed the channel, there will be no more lines.
-
                        logger::timeoutcmd_line_reader_got_disconnected(self.name);
-
                        return None;
-
                    } else {
-
                        let line = String::from_utf8_lossy(&line).to_string();
-
                        logger::timeoutcmd_line_reader_got_line(self.name, &line);
-
                        return Some(line);
-
                    }
+
            match buf.line() {
+
                // We got a line: return it.
+
                Some(line) => {
+
                    return Some(line);
                }
-
            }

-
            logger::timeoutcmd_line_reader_did_child_die(self.name);
-
            let x = self.child_terminated.try_recv();
-
            match x {
-
                Ok(_) => {
-
                    logger::timeoutcmd_line_reader_child_died(self.name);
+
                // We didn't get a line, but the input stream has
+
                // finished. Return final partial line, if there is one,
+
                // or None.
+
                None if buf.is_finished() => {
+
                    let line = buf.line();
+
                    return line;
                }
-
                Err(std::sync::mpsc::TryRecvError::Disconnected) => {
-
                    logger::timeoutcmd_line_reader_child_channel_disconnected(self.name);
+

+
                // Wait for more input, then try again. We can't
+
                // assume the new input results in either a complete
+
                // line or the input stream finishing, so we loop.
+
                None => {
+
                    buf = var.wait(buf).expect("wait for line");
                }
-
                _ => {}
            }
        }
    }
}

-
type OutputSender = SyncSender<u8>;
-
type OutputReader = Receiver<u8>;
-

-
struct NonBlockingReader<R: Read> {
-
    name: &'static str,
-
    stream: R,
-
    tx: OutputSender,
+
// An unlocked buffer from which lines can be extracted.
+
//
+
// All the locking and thread synchronization is handled by [`RealtimeLines`].
+
#[derive(Default, Debug)]
+
struct UnlockedBuf {
+
    data: Vec<u8>,
+
    finished: bool,
}

-
impl<R: Read> NonBlockingReader<R> {
-
    fn new(name: &'static str, stream: R, tx: OutputSender) -> Self {
-
        Self { name, stream, tx }
+
impl UnlockedBuf {
+
    // Mark producer as finished. There will be no more data.
+
    fn finish(&mut self) {
+
        self.finished = true;
    }

-
    fn read_to_end(mut self) -> Result<(), TimeoutError> {
-
        let mut count = 0;
-
        loop {
-
            // We read one byte at a time. This lets us avoid doing
-
            // non-blocking I/O but is less efficient. We want to
-
            // avoid blocking for an arbitrary amount of time, if
-
            // reading one byte at a time. When reading from a pipe,
-
            // the pipe writer end may not get closed until the child
-
            // process writing to the pipe ends, and we may not want
-
            // to wait that long.
-
            //
-
            // If this becomes too inefficient, this needs to be
-
            // rewritten to use non-blocking I/O or async.
-

-
            logger::timeoutcmd_nonblocking_try_byte(self.name, count);
-
            let mut byte = vec![0; 1];
-
            let x = self.stream.read(&mut byte);
-
            logger::timeoutcmd_nonblocking_tried_byte(self.name, &x, &byte);
-
            match x {
-
                Ok(0) => {
-
                    logger::timeoutcmd_nonblocking_eof(self.name);
-
                    break;
-
                }
-
                Ok(1) => {
-
                    count += 1;
-
                    self.tx
-
                        .try_send(byte[0])
-
                        .map_err(|_| TimeoutError::TooMuch(self.name))?;
-
                }
-
                Ok(_) => {
-
                    logger::timeoutcmd_nonblocking_got_too_much(self.name, x, &byte);
-
                    return Err(TimeoutError::ReadMucn);
-
                }
-
                Err(err) => {
-
                    logger::timeoutcmd_nonblocking_read_error(self.name, &err);
-
                    return Err(TimeoutError::Read(err));
-
                }
+
    // Has producer finished? Since this structure does not itself
+
    // read from the input stream, this is how we know we've reached
+
    // the end of the file.
+
    fn is_finished(&self) -> bool {
+
        self.finished
+
    }
+

+
    // Push data into the buffer.
+
    fn push(&mut self, mut more_data: Vec<u8>) {
+
        self.data.append(&mut more_data);
+
    }
+

+
    // // Does the buffer contain at least one line? We need to check
+
    // // this to avoid unnecessary waiting or more data.
+
    // fn has_line(&self) -> bool {
+
    //     self.data.contains(&b'\n')
+
    // }
+

+
    // Get next line, if any. If producer has finished, get final,
+
    // possibly partial line if any. If the isn't at least one line in
+
    // the buffer, return `None`. Note that this does not mean end of
+
    // file.
+
    fn line(&mut self) -> Option<String> {
+
        for (i, byte) in self.data.iter().enumerate() {
+
            if *byte == b'\n' {
+
                let range = 0..i + 1;
+
                let line = String::from_utf8_lossy(&self.data[range.clone()]).to_string();
+
                self.data.drain(range);
+
                return Some(line);
            }
        }

-
        logger::timeoutcmd_nonblocking_ends(self.name);
-
        Ok(())
+
        if self.finished && !self.data.is_empty() {
+
            let line = String::from_utf8_lossy(&self.data).to_string();
+
            self.data.clear();
+
            return Some(line);
+
        }
+

+
        None
    }
}

+
/// All possible errors from the module.
#[derive(Debug, thiserror::Error)]
pub enum TimeoutError {
+
    #[error(transparent)]
+
    Io(#[from] std::io::Error),
+

    /// Couldn't spawn child process.
    #[error("failed to spawn command: {0:?}")]
    Spawn(Command, #[source] std::io::Error),

-
    /// Couldn't get file descriptor of child process stdin.
-
    #[error("failed to extract stdin stream from child")]
-
    TakeStdin,
-

-
    /// Couldn't get file descriptor of child process stdout.
-
    #[error("failed to extract stdout stream from child")]
-
    TakeStdout,
-

-
    /// Couldn't get file descriptor of child process stderr.
-
    #[error("failed to extract stderr stream from child")]
-
    TakeStderr,
+
    #[error("thread join failed")]
+
    Thread,

-
    /// Couldn't check if child process is still running.
-
    #[error("failed to check whether command is still running")]
-
    TryWait(#[source] std::io::Error),
+
    #[error("failed to lock mutex")]
+
    Lock,

-
    /// Reading from child stdout or stderr returned too much data.
-
    #[error("read from command standard output returned more data than requested")]
-
    ReadMucn,
+
    #[error("failed to lock condition variable")]
+
    LockVar,

-
    /// Reading from child stdout or stderr failed.
-
    #[error("problem reading from command output (stdout or stderr)")]
-
    Read(#[source] std::io::Error),
+
    #[error("timed out waiting for child")]
+
    TimedOut,

-
    /// Channel buffer got full, which means child process wrote too much output.
-
    #[error("sub-process produces too much to {0}")]
-
    TooMuch(&'static str),
+
    #[error("failed to kill child process")]
+
    Kill,

-
    /// Couldn't join thread that feeds data to child stdin.
-
    #[error("problem waiting for thread that writes to command standard input")]
-
    JoinStdinFeeder,
-

-
    /// Couldn't write to child stdin.
-
    #[error("problem writing to command standard input")]
-
    FeedStdin(#[source] std::io::Error),
-

-
    #[error("problem waiting for thread that monitors command")]
-
    JoinChildMonitor,
-

-
    /// Couldn't join thread that reads child stdout.
-
    #[error("problem waiting for thread that reads command standard output")]
-
    JoinStdoutReader,
-

-
    /// Couldn't join thread that reads child stderr.
-
    #[error("problem waiting for thread that reads command standard error output")]
-
    JoinStderrReader,
-

-
    /// Couldn't terminate child process.
-
    #[error("problem forcing child process to terminate")]
-
    Kill(#[source] std::io::Error),
-

-
    /// Couldn't wait for child process to terminate.
-
    #[error("problem waiting for child process to terminate")]
+
    #[error("failed waiting for child process to terminate")]
    Wait(#[source] std::io::Error),

-
    /// Mutex lock error.
-
    #[error("failed to lock command output buffer")]
-
    MutexLock,
+
    #[error("child exit code is not known")]
+
    ExitCode,

-
    #[error("failed to receive notification from child monitor")]
-
    ChildRecv(#[source] std::sync::mpsc::RecvError),
+
    #[error("failed to take child {0} file handle")]
+
    TakeHandle(&'static str),

-
    #[error("failed to send notification from child monitor")]
-
    ChildSend(#[source] std::sync::mpsc::SendError<(Child, bool)>),
+
    #[error("programming error: failed to get thread to wait on")]
+
    TakeThread,

-
    #[error("failed to send notification from child monitor to line receiver")]
-
    ChildSendToLine(#[source] std::sync::mpsc::SendError<()>),
+
    #[error("failed to compute deadline")]
+
    Deadline,
+

+
    #[error("child process produced too much output")]
+
    TooMuch,
}

#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;

-
    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(100);
+
    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(10);
    const SHORT_TIMEOUT: Duration = Duration::from_secs(3);

    fn setup(
        script: &str,
        timeout: Duration,
        stdin: Option<&'static str>,
-
    ) -> Result<RunningProcess, Box<dyn std::error::Error>> {
+
    ) -> Result<(ChildProcess, RealtimeLines), TimeoutError> {
        let mut cmd = Command::new("sh");
        cmd.arg("-c").arg(script);
        let mut to = TimeoutCommand::new(timeout);
        if let Some(stdin) = stdin {
            to.feed_stdin(stdin.as_bytes());
        }
-
        Ok(to.spawn(cmd)?)
+
        Ok((to.spawn(cmd)?, to.stdout()))
    }

    #[test]
    fn bin_true() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec true", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
+
        let (running, _) = setup("exec true", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
+
        let finished = running.wait()?;
+
        assert_eq!(finished.exit_code().code(), Some(0));
        Ok(())
    }

    #[test]
    fn bin_false() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec false", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(1));
-
        assert!(!tor.timed_out());
+
        let (running, _) = setup("exec false", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
+
        let result = running.wait();
+
        assert!(matches!(result, Ok(FinishedProcess { exit, .. }) if exit.code() == Some(1)));
        Ok(())
    }

    #[test]
    fn sleep_1() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, _) = setup(
            "exec sleep 1",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
+
        let result = running.wait();
+
        assert!(matches!(result, Ok(FinishedProcess { exit, .. }) if exit.code() == Some(0)));
        Ok(())
    }

    #[test]
    fn sleep_for_too_long() -> Result<(), Box<dyn std::error::Error>> {
-
        let started = Instant::now();
-
        let running = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
-
        let tor = running.wait()?;
-
        eprintln!("duration: {} ms", started.elapsed().as_millis());
-
        assert_eq!(tor.status().code(), None);
-
        assert!(tor.timed_out());
+
        let (running, _) = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
+
        let result = running.wait();
+
        assert!(matches!(result, Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn hello_world() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, mut stdout) = setup(
            "exec echo hello, world",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        let stdout = running.stdout();
-
        let stderr = running.stderr();
-

        assert_eq!(stdout.line(), Some("hello, world\n".into()));
        assert_eq!(stdout.line(), None);

-
        assert_eq!(stderr.line(), None);
+
        let result = running.wait();
+
        eprintln!("result={result:#?}");
+
        let finished = result.unwrap();
+
        assert_eq!(finished.exit_code().code(), Some(0));
+
        assert_eq!(finished.stderr(), b"");

-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
        Ok(())
    }

    #[test]
    fn hello_world_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, mut stdout) = setup(
            "exec echo hello, world 1>&2",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        let stdout = running.stdout();
-
        let stderr = running.stderr();
-

        assert_eq!(stdout.line(), None);

-
        assert_eq!(stderr.line(), Some("hello, world\n".into()));
-
        assert_eq!(stderr.line(), None);
+
        let finished = running.wait().unwrap();
+
        assert_eq!(finished.exit_code().code(), Some(0));
+
        assert_eq!(finished.stderr(), b"hello, world\n");

-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
        Ok(())
    }

    #[test]
    fn pipe_through_cat() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, mut stdout) = setup(
            "exec cat",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            Some("hello, world"),
        )?;
-
        let stdout = running.stdout();
-
        let stderr = running.stderr();

        assert_eq!(stdout.line(), Some("hello, world".into()));
        assert_eq!(stdout.line(), None);

-
        assert_eq!(stderr.line(), None);
+
        let finished = running.wait().unwrap();
+

+
        assert_eq!(finished.exit_code().code(), Some(0));
+
        assert_eq!(finished.stderr(), b"");

-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
        Ok(())
    }

    #[test]
    fn yes_to_stdout() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec yes", SHORT_TIMEOUT, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(tor.timed_out());
+
        let (running, _) = setup("exec yes", SHORT_TIMEOUT, None)?;
+
        assert!(matches!(running.wait(), Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn yes_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(tor.timed_out());
+
        let (running, _) = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
+
        assert!(matches!(
+
            running.wait(),
+
            Err(TimeoutError::TimedOut) | Err(TimeoutError::TooMuch)
+
        ));
        Ok(())
    }

    #[test]
    fn kill() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, _) = setup(
            "exec sleep 1000",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        sleep(Duration::from_millis(5000));
-
        running.kill()?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(!tor.timed_out());
+
        sleep(Duration::from_millis(100));
+
        let finished = running.kill()?;
+
        assert_eq!(finished.exit_code().code(), None);
+

        Ok(())
    }

    #[test]
    fn kill_stderr() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, _) = setup(
            "exec sleep 1000 1>&2",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        sleep(Duration::from_millis(5000));
-
        running.kill()?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(!tor.timed_out());
+
        sleep(Duration::from_millis(100));
+
        let finished = running.kill()?;
+
        assert_eq!(finished.exit_code().code(), None);
+

        Ok(())
    }
}