Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
This patch adds functionality to the CI broker to set a time limit to
Merged liw opened 1 year ago

the adapter. The limit can be set in the configuration file, for example: max_run_time: 12765h. https://docs.rs/duration-str/0.11.2/duration_str/ shows the supported formats. The default is one hour.

In addition to limiting the run time, this patch changes how the adapter’s stdout and stderr output is captured. This should make the CI broker not get stuck if the adapter produces a lot of output to either stream.

Additionally, there is a limit to how much output the CI broker accepts from the adapter. If there is more output, the adapter process is terminated with extreme prejudice, and the CI run fails. The limit is not currently configurable, and is hard coded to 10 MiB. I can make this, too, configurable, if someone explains why they need it to be configurable.

All the interesting code is in the new module src/timeoutcmd.rs. That is some quite intricate code. It’s intricate, because it’s doing several things concurrently: spawning a sub-process, feeding it input via stdin, capture anything it writes to stdout or stderr, terminating the sub-process if it runs for too long, or produces too much output, or the main thread requests. Further, the output capturing is done in a way that allows the main thread to process the output in real time: this is needed for the CI broker to react to the adapter’s messages as they are received, without waiting until the adapter process ends. The module also does this without getting stuck or crashing, in any of the scenarios I have been able to come up with. The end of the timeoutcmd.rs module has a bunch of tests to verify the code works in those scenarios to make sure the module keeps working.

12 files changed +1155 -83 0f90e97a 55e1e7db
modified Cargo.lock
@@ -191,6 +191,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"

[[package]]
+
name = "arrayvec"
+
version = "0.7.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
+

+
[[package]]
name = "as-slice"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -358,6 +364,15 @@ dependencies = [
]

[[package]]
+
name = "chrono"
+
version = "0.4.38"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
+
dependencies = [
+
 "num-traits",
+
]
+

+
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -651,6 +666,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"

[[package]]
+
name = "duration-str"
+
version = "0.11.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "709d653e7c92498eb29fb86a2a6f0f3502b97530f33aedb32ef848d4d28b31a3"
+
dependencies = [
+
 "chrono",
+
 "rust_decimal",
+
 "serde",
+
 "thiserror",
+
 "time",
+
 "winnow",
+
]
+

+
[[package]]
name = "ec25519"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1712,6 +1741,7 @@ dependencies = [
 "clap",
 "ctor",
 "culpa",
+
 "duration-str",
 "html-page",
 "radicle",
 "radicle-git-ext",
@@ -1978,6 +2008,16 @@ dependencies = [
]

[[package]]
+
name = "rust_decimal"
+
version = "1.36.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555"
+
dependencies = [
+
 "arrayvec",
+
 "num-traits",
+
]
+

+
[[package]]
name = "rustix"
version = "0.38.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3083,6 +3123,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"

[[package]]
+
name = "winnow"
+
version = "0.6.20"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b"
+
dependencies = [
+
 "memchr",
+
]
+

+
[[package]]
name = "xattr"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -13,6 +13,7 @@ categories = ["development-tools::build-utils"]
[dependencies]
anyhow = "1.0.86"
clap = { version = "4.5.11", features = ["derive", "wrap_help"] }
+
duration-str = "0.11.2"
html-page = "0.4.0"
radicle-git-ext = "0.8.0"
radicle-surf = { version = "0.22.0", default-features = false, features = ["serde"] }
modified src/adapter.rs
@@ -10,9 +10,9 @@
use std::{
    collections::HashMap,
    ffi::OsStr,
-
    io::{BufRead, BufReader, Read},
    path::{Path, PathBuf},
-
    process::{Command, Stdio},
+
    process::Command,
+
    time::Duration,
};

use crate::{
@@ -21,6 +21,7 @@ use crate::{
    msg::{MessageError, Request, Response},
    notif::NotificationSender,
    run::{Run, RunState},
+
    timeoutcmd::{TimeoutCommand, TimeoutError},
};

const NOT_EXITED: i32 = 999;
@@ -57,11 +58,12 @@ impl Adapter {
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
+
        max_run_time: Duration,
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

-
        let x = self.run_helper(trigger, run, db, run_notification);
+
        let x = self.run_helper(trigger, run, db, run_notification, max_run_time);

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
@@ -75,38 +77,25 @@ impl Adapter {
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
+
        max_run_time: Duration,
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
-
        let mut child = Command::new(&self.bin)
-
            .stdin(Stdio::piped())
-
            .stdout(Stdio::piped())
-
            .stderr(Stdio::piped())
-
            .envs(self.envs())
-
            .spawn()
-
            .map_err(|e| AdapterError::SpawnAdapter(self.bin.clone(), e))?;
+
        let mut cmd = Command::new(&self.bin);
+
        cmd.envs(self.envs());
+
        let mut child = TimeoutCommand::new(max_run_time);
+
        child.feed_stdin(trigger.to_string().as_bytes());
+
        let child = child.spawn(cmd).map_err(|err| match err {
+
            TimeoutError::Spawn(_, err) => AdapterError::SpawnAdapter(self.bin.clone(), err),
+
            _ => AdapterError::TimeoutCommand(err),
+
        })?;

        run_notification.notify()?;

-
        // Write the request to trigger a run to the child's stdin.
-
        // Then close the pipe to prevent the child from trying to
-
        // read another message that will never be sent.
-
        {
-
            let stdin = child.stdin.take().ok_or(AdapterError::StdinHandle)?;
-
            trigger
-
                .to_writer(stdin)
-
                .map_err(AdapterError::RequestWrite)?;
-
        }
-

-
        // Get the child's stdout into a BufReader so that we can loop
-
        // over lines.
-
        let stdout = child.stdout.take().ok_or(AdapterError::StdoutHandle)?;
-
        let stdout = BufReader::new(stdout);
-
        let mut lines = stdout.lines();
+
        let stdout = child.stdout();

-
        if let Some(line) = lines.next() {
-
            let line = line.map_err(AdapterError::ReadLine)?;
+
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
            match resp {
@@ -118,14 +107,18 @@ impl Adapter {
                    }
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
-
                _ => return Err(AdapterError::NotTriggered(resp)),
+
                _ => {
+
                    child.kill().ok();
+
                    return Err(AdapterError::NotTriggered(resp));
+
                }
            }
        } else {
            logger::adapter_no_first_response();
+
            child.kill().ok();
+
            return Err(AdapterError::NoFirstMessage);
        }

-
        if let Some(line) = lines.next() {
-
            let line = line.map_err(AdapterError::ReadLine)?;
+
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
            match resp {
@@ -133,36 +126,47 @@ impl Adapter {
                    run.set_result(result);
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
-
                _ => return Err(AdapterError::NotFinished(resp)),
+
                _ => {
+
                    child.kill().ok();
+
                    return Err(AdapterError::NotFinished(resp));
+
                }
            }
        } else {
            logger::adapter_no_second_response();
+
            child.kill().ok();
+
            return Err(AdapterError::NoSecondMessage);
        }

-
        if let Some(line) = lines.next() {
-
            let line = line.map_err(AdapterError::ReadLine)?;
+
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            logger::adapter_too_many_responses();
+
            child.kill().ok();
            return Err(AdapterError::TooMany(resp));
        }

-
        let wait = child.wait().map_err(AdapterError::Wait)?;
+
        let stderr = child.stderr();
+
        while let Some(line) = stderr.line() {
+
            logger::adapter_stderr_line(&line);
+
        }
+

+
        let result = child.wait().expect("FIXME");

-
        let mut stderr = child.stderr.take().ok_or(AdapterError::StderrHandle)?;
-
        let mut buf = vec![];
-
        stderr
-
            .read_to_end(&mut buf)
-
            .map_err(AdapterError::ReadStderr)?;
-
        let stderr = String::from_utf8_lossy(&buf);
-
        logger::adapter_result(wait.code(), &stderr);
+
        logger::debug2(format!(
+
            "wait result? {result:?} status.code: {:?}",
+
            result.status().code()
+
        ));

-
        if let Some(exit) = wait.code() {
+
        if result.timed_out() {
+
            logger::adapter_did_not_exit_voluntarily();
+
            return Err(AdapterError::Failed(NOT_EXITED));
+
        } else if let Some(exit) = result.status().code() {
+
            logger::adapter_result(exit);
            if exit != 0 {
                return Err(AdapterError::Failed(exit));
            }
        } else {
-
            logger::adapter_did_not_exit_voluntarily();
-
            return Err(AdapterError::Failed(NOT_EXITED));
+
            logger::adapter_did_not_exit();
+
            return Err(AdapterError::Signal);
        }

        Ok(())
@@ -171,6 +175,14 @@ impl Adapter {

#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
+
    /// Error from [`TimeoutCommand`] or [`RunningProcess`].
+
    #[error(transparent)]
+
    TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),
+

+
    /// Error from spawning a sub-process.
+
    #[error("failed to spawn a CI adapter sub-process: {0}")]
+
    SpawnAdapter(PathBuf, #[source] std::io::Error),
+

    /// Error creating Response from a string.
    #[error("failed to create a Response message from adapter output")]
    ParseResponse(#[source] MessageError),
@@ -179,10 +191,6 @@ pub enum AdapterError {
    #[error("failed to write request to adapter stdin")]
    RequestWrite(#[source] MessageError),

-
    /// Error from spawning a sub-process.
-
    #[error("failed to spawn a CI adapter sub-process: {0}")]
-
    SpawnAdapter(PathBuf, #[source] std::io::Error),
-

    /// Error getting the file handle for the adapter's stdin.
    #[error("failed to get handle for adapter's stdin")]
    StdinHandle,
@@ -210,10 +218,22 @@ pub enum AdapterError {
    #[error("child process failed with wait status {0}")]
    Failed(i32),

+
    /// Child process was killed.
+
    #[error("child process terminated by signal")]
+
    Signal,
+

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
    NotTriggered(Response),

+
    /// There was no first response from adapter.
+
    #[error("adapter did not sent its first message")]
+
    NoFirstMessage,
+

+
    /// There was no second response from adapter.
+
    #[error("adapter did not sent its second message")]
+
    NoSecondMessage,
+

    /// Second message is not `Response::Finished`
    #[error("adapter's second message is not 'finished', but {0:?}")]
    NotFinished(Response),
@@ -233,7 +253,7 @@ pub enum AdapterError {

#[cfg(test)]
mod test {
-
    use std::{fs::write, io::ErrorKind};
+
    use std::{fs::write, io::ErrorKind, time::Duration};

    use tempfile::{tempdir, NamedTempFile};

@@ -249,6 +269,8 @@ mod test {
        test::{mock_adapter, trigger_request, TestResult},
    };

+
    const MAX: Duration = Duration::from_secs(10);
+

    fn db() -> anyhow::Result<Db> {
        let tmp = NamedTempFile::new()?;
        let db = Db::new(tmp.path())?;
@@ -284,7 +306,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender)?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -306,7 +328,7 @@ echo '{"response":"finished","result":"failure"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);

        match x {
            Ok(_) => (),
@@ -335,7 +357,7 @@ exit 1
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -357,12 +379,9 @@ kill -9 $BASHPID
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
-
        assert!(matches!(
-
            x,
-
            Err(AdapterError::Failed(_)) | Err(AdapterError::RequestWrite(_))
-
        ));
+
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

        Ok(())
    }
@@ -383,9 +402,31 @@ kill -9 $BASHPID
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        eprintln!("{x:#?}");
+
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_ends_ok_before_second_message() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
read
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let db = db()?;
+
        let mut run = run()?;
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

        Ok(())
    }
@@ -407,9 +448,9 @@ kill -9 $BASHPID
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+
        assert!(matches!(x, Err(AdapterError::Signal)));

        Ok(())
    }
@@ -430,7 +471,7 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -455,7 +496,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -484,7 +525,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -505,7 +546,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -534,7 +575,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -567,7 +608,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/bin/cib.rs
@@ -148,7 +148,8 @@ impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let profile = Profile::load().map_err(CibError::profile)?;

-
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let mut broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
        let spec =
            config
                .adapter(&config.default_adapter)
@@ -235,7 +236,8 @@ impl ProcessEventsCmd {
            false,
        );

-
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let mut broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
        let spec =
            config
                .adapter(&config.default_adapter)
modified src/bin/cibtool.rs
@@ -120,6 +120,8 @@ enum Cmd {
    Run(RunCmd),
    Report(cibtoolcmd::ReportCmd),
    Trigger(cibtoolcmd::TriggerCmd),
+
    #[clap(hide = true)]
+
    Timeout(cibtoolcmd::TimeoutCmd),
}

impl Subcommand for Cmd {
@@ -130,6 +132,7 @@ impl Subcommand for Cmd {
            Self::Run(x) => x.run(args),
            Self::Report(x) => x.run(args),
            Self::Trigger(x) => x.run(args),
+
            Self::Timeout(x) => x.run(args),
        }
    }
}
@@ -351,4 +354,7 @@ enum CibToolError {

    #[error("programming error: failed to set up inter-thread notification channel")]
    Notification(#[source] NotificationError),
+

+
    #[error(transparent)]
+
    Timeout(#[from] radicle_ci_broker::timeoutcmd::TimeoutError),
}
modified src/bin/cibtoolcmd/mod.rs
@@ -20,3 +20,6 @@ pub use run::*;

mod trigger;
pub use trigger::*;
+

+
mod timeout;
+
pub use timeout::*;
added src/bin/cibtoolcmd/timeout.rs
@@ -0,0 +1,106 @@
+
use std::{
+
    process::Command,
+
    thread::sleep,
+
    time::{Duration, Instant},
+
};
+

+
use radicle_ci_broker::timeoutcmd::TimeoutCommand;
+

+
use super::*;
+

+
/// Trigger a CI run.
+
///
+
/// This is meant for developer experimentation.
+
#[derive(Parser)]
+
pub struct TimeoutCmd {
+
    /// A Bash script to run. Should start with "exec", or time out won't work.
+
    #[clap(long)]
+
    script: String,
+

+
    /// Text to be fed to script via stdin.
+
    #[clap(long, default_value = "")]
+
    stdin: String,
+

+
    /// Generate at least this much data to feed to script via stdin.
+
    #[clap(long)]
+
    generate: Option<usize>,
+

+
    /// Terminate script after this many seconds.
+
    #[clap(long)]
+
    timeout: u64,
+

+
    /// Verbose output: show stdout and stderr output lines.
+
    #[clap(short, long)]
+
    verbose: bool,
+

+
    /// Don't empty stdout and stderr buffers, let them fill up.
+
    #[clap(long)]
+
    fill_buffers: bool,
+

+
    /// Kill script after this many seconds, unconditionally.
+
    #[clap(long)]
+
    kill_after: Option<u64>,
+
}
+

+
impl Leaf for TimeoutCmd {
+
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let mut cmd = Command::new("bash");
+
        cmd.arg("-c").arg(&self.script);
+

+
        let mut to = TimeoutCommand::new(Duration::from_secs(self.timeout));
+

+
        if let Some(bytes) = self.generate {
+
            let mut stdin: Vec<u8> = vec![];
+
            while stdin.len() < bytes {
+
                for byte in b"hello, world\n" {
+
                    stdin.push(*byte);
+
                }
+
            }
+
            to.feed_stdin(stdin.as_slice());
+
            println!("generated stdin has {} bytes", stdin.len());
+
        } else {
+
            to.feed_stdin(self.stdin.as_bytes());
+
        }
+

+
        let started = Instant::now();
+
        println!("spawn child");
+
        let running = to.spawn(cmd)?;
+

+
        if let Some(secs) = self.kill_after {
+
            sleep(Duration::from_secs(secs));
+
            running.kill().unwrap();
+
        }
+

+
        let mut stdout_bytes = 0;
+
        if !self.fill_buffers {
+
            let stdout = running.stdout();
+
            while let Some(line) = stdout.line() {
+
                stdout_bytes += line.as_bytes().len();
+
                if self.verbose {
+
                    println!("stdout: {line:?}");
+
                }
+
            }
+
            println!("finished reading stdout");
+

+
            let stderr = running.stderr();
+
            while let Some(line) = stderr.line() {
+
                if self.verbose {
+
                    println!("stderr: {line:?}");
+
                }
+
            }
+
            println!("finished reading stderr");
+
        }
+

+
        let tor = running.wait()?;
+
        let elapsed = started.elapsed();
+
        let speed = (stdout_bytes as f64) / elapsed.as_secs_f64();
+

+
        println!("stdout bytes: {stdout_bytes}");
+
        println!("duration: {} ms", elapsed.as_millis());
+
        println!("speed: {:.0} B/s", speed);
+
        println!("exit: {}", tor.status());
+
        println!("timed out? {}", tor.timed_out());
+

+
        Ok(())
+
    }
+
}
modified src/broker.rs
@@ -6,6 +6,7 @@
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
+
    time::Duration,
};

use time::{macros::format_description, OffsetDateTime};
@@ -29,16 +30,18 @@ use crate::{
pub struct Broker {
    default_adapter: Option<Adapter>,
    adapters: HashMap<RepoId, Adapter>,
+
    max_run_time: Duration,
    db: Db,
}

impl Broker {
    #[allow(clippy::result_large_err)]
-
    pub fn new(db_filename: &Path) -> Result<Self, BrokerError> {
+
    pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
        logger::broker_db(db_filename);
        Ok(Self {
            default_adapter: None,
            adapters: HashMap::new(),
+
            max_run_time,
            db: Db::new(db_filename)?,
        })
    }
@@ -107,7 +110,13 @@ impl Broker {
                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
-
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db, run_notification) {
+
                    if let Err(e) = adapter.run(
+
                        trigger,
+
                        &mut run,
+
                        &self.db,
+
                        run_notification,
+
                        self.max_run_time,
+
                    ) {
                        logger::error("failed to run adapter or it failed to run CI", &e);
                    }

@@ -172,7 +181,7 @@ pub enum BrokerError {

#[cfg(test)]
mod test {
-
    use std::path::Path;
+
    use std::{path::Path, time::Duration};
    use tempfile::tempdir;

    use super::{Adapter, Broker, RepoId};
@@ -184,7 +193,7 @@ mod test {
    };

    fn broker(filename: &Path) -> anyhow::Result<Broker> {
-
        Ok(Broker::new(filename)?)
+
        Ok(Broker::new(filename, Duration::from_secs(1))?)
    }

    fn rid() -> anyhow::Result<RepoId> {
modified src/config.rs
@@ -4,24 +4,34 @@ use std::{
    collections::HashMap,
    fmt,
    path::{Path, PathBuf},
+
    time::Duration,
};

+
use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};

use crate::filter::EventFilter;

+
const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;

#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
    pub default_adapter: String,
    pub adapters: HashMap<String, Adapter>,
+
    #[serde(deserialize_with = "deserialize_duration")]
+
    #[serde(default = "default_max_run_time")]
+
    pub max_run_time: Duration,
    pub filters: Vec<EventFilter>,
    pub report_dir: Option<PathBuf>,
    pub status_update_interval_seconds: Option<u64>,
    pub db: PathBuf,
}

+
fn default_max_run_time() -> Duration {
+
    DEFAULT_MAX_RUN_TIME
+
}
+

impl Config {
    pub fn load(filename: &Path) -> Result<Self, ConfigError> {
        let config =
@@ -38,6 +48,10 @@ impl Config {
            .unwrap_or(DEFAULT_STATUS_PAGE_UPDATE_INTERVAL)
    }

+
    pub fn max_run_time(&self) -> Duration {
+
        self.max_run_time
+
    }
+

    pub fn db(&self) -> &Path {
        &self.db
    }
@@ -93,3 +107,39 @@ pub enum ConfigError {
    #[error("failed to convert configuration into JSON")]
    ToJson(#[source] serde_json::Error),
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+

+
    #[test]
+
    #[allow(clippy::unwrap_used)]
+
    fn parse_config_yaml() {
+
        const YAML: &str = r#"---
+
default_adapter: foo
+
adapters: {}
+
filters: []
+
db: "foo.db"
+
max_run_time: 1min
+
...
+
"#;
+

+
        let cfg: Config = serde_yml::from_str(YAML).unwrap();
+
        assert_eq!(cfg.max_run_time(), Duration::from_secs(60));
+
    }
+

+
    #[test]
+
    #[allow(clippy::unwrap_used)]
+
    fn parse_config_yaml_without_max_run_time() {
+
        const YAML: &str = r#"---
+
default_adapter: foo
+
adapters: {}
+
filters: []
+
db: "foo.db"
+
...
+
"#;
+

+
        let cfg: Config = serde_yml::from_str(YAML).unwrap();
+
        assert_eq!(cfg.max_run_time(), DEFAULT_MAX_RUN_TIME);
+
    }
+
}
modified src/lib.rs
@@ -24,4 +24,5 @@ pub mod queueproc;
pub mod run;
#[cfg(test)]
pub mod test;
+
pub mod timeoutcmd;
pub mod util;
modified src/logger.rs
@@ -429,19 +429,26 @@ pub fn adapter_too_many_responses() {
    error!(slog_scope::logger(), "too many response messages");
}

-
pub fn adapter_result(exit: Option<i32>, stderr: &str) {
-
    if let Some(exit) = exit {
-
        debug!(slog_scope::logger(), "adapter exit code"; "exit_code" => exit);
-
    } else {
-
        debug!(slog_scope::logger(), "adapter was terminated by signal");
-
    }
-
    for line in stderr.lines() {
-
        debug!(slog_scope::logger(), "adapter stderr"; "stderr" => line);
-
    }
+
pub fn adapter_stderr_line(line: &str) {
+
    debug!(slog_scope::logger(), "adapter stderr"; "stderr" => line);
+
}
+

+
pub fn adapter_result(exit: i32) {
+
    debug!(slog_scope::logger(), "adapter exit code"; "exit_code" => exit);
}

pub fn adapter_did_not_exit_voluntarily() {
-
    warn!(slog_scope::logger(), "adapter did not exit voluntarily");
+
    warn!(
+
        slog_scope::logger(),
+
        "adapter did not exit voluntarily: terminated for taking too long"
+
    );
+
}
+

+
pub fn adapter_did_not_exit() {
+
    warn!(
+
        slog_scope::logger(),
+
        "adapter did not exit: probably killed by signal"
+
    );
}

pub fn debug(msg: &str) {
added src/timeoutcmd.rs
@@ -0,0 +1,797 @@
+
//! Run a command (an external program) as a sub-process, capturing
+
//! its output in real time, with a maximum duration.
+
//!
+
//! This is meant for the CI broker to run a CI adapter and process
+
//! the single-line messages the adapter writes to its standard
+
//! output, as well as capture stderr output, which the adapter uses
+
//! for logging. If the adapter runs for too long, it gets terminated.
+
//!
+
//! Note that if the [`Command`] that is created to run the command
+
//! invokes a shell, the shell **must** `exec` the command it runs, or
+
//! in some other way make sure the processes the shell launches get
+
//! terminated when the shell process ends. Otherwise the time out
+
//! management here does not work reliably.
+
//!
+
//! The child can be given some data via its stdin.
+
//!
+
//! This module is not entirely generic, as it assumes textual output with
+
//! lines, instead of arbitrary byte strings.
+
//!
+
//! # Example
+
//! ```
+
//! # use std::{process::Command, time::Duration};
+
//! # use radicle_ci_broker::timeoutcmd::{RunningProcess, TimeoutCommand};
+
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+
//! let mut cmd = Command::new("bash");
+
//! cmd.arg("-c").arg("exec cat"); // Note exec!
+
//!
+
//! let mut to = TimeoutCommand::new(Duration::from_secs(10));
+
//! to.feed_stdin(b"hello, world\n");
+
//! let running = to.spawn(cmd)?;
+
//!
+
//! // Capture stdout output. We ignore stderr output.
+
//! let stdout = running.stdout();
+
//! let mut captured = vec![];
+
//! while let Some(line) = stdout.line() {
+
//!     captured.push(line);
+
//! }
+
//!
+
//! // Wait for child process to terminate.
+
//! let tor = running.wait()?;
+
//! assert_eq!(tor.status().code(), Some(0));
+
//! assert_eq!(captured, ["hello, world\n"]);
+
//! # Ok(())
+
//! # }
+
//! ```
+

+
#![allow(unused_imports)]
+

+
use std::{
+
    io::{Read, Write},
+
    process::{Child, Command, ExitStatus, Stdio},
+
    sync::{
+
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender, TryRecvError},
+
        Arc, Mutex,
+
    },
+
    thread::{sleep, spawn, JoinHandle},
+
    time::{Duration, Instant},
+
};
+

+
use crate::logger;
+

+
const WAIT_FOR_IDLE_CHILD: Duration = Duration::from_millis(1000);
+
const WAIT_FOR_OUTPUT: Duration = Duration::from_millis(100);
+
const KIB: usize = 1024;
+
const MIB: usize = 1024 * KIB;
+
const MAX_OUTPUT_BYTES: usize = 10 * MIB;
+

+
/// Spawn a child process, with a maximum duration and capture its
+
/// output. See also [`RunningProcess`].
+
pub struct TimeoutCommand {
+
    max_duration: Duration,
+
    stdin_data: Vec<u8>,
+
}
+

+
// This works by using multiple threads.
+
//
+
// * a thread to send data to child's stdin
+
// * a thread to read child's stdout, as bytes
+
// * a thread to read child's stderr, as bytes
+
// * a thread to monitor how long the child runs
+
// * the calling thread waits for the monitor thread to tell it the
+
//   child has ended or needs to be terminate
+
//
+
// Communication between the threads happens via
+
// [`std::sync::mpsc::sync_channel`] channels. In performance tests,
+
// these are quite fast if the channel buffer is sufficiently large:
+
// throughput of over 2 MiB/s have been measured.
+
//
+
// Output from the child is collected into lines, which can be passed
+
// to the caller. For the CI broker we only care about lines.
+

+
impl TimeoutCommand {
+
    /// Create a new time-limited command. The sub-process will run at
+
    /// most as long as the argument specifies. See
+
    /// [`TimeoutCommand::spawn`] for actually creating the
+
    /// sub-process.
+
    pub fn new(max_duration: Duration) -> Self {
+
        Self {
+
            max_duration,
+
            stdin_data: vec![],
+
        }
+
    }
+

+
    /// Feed the sub-process the specified binary data via its
+
    /// standard input. If this method is not used, the sub-process
+
    /// stdin will be fed no data. The sub-process stdin always comes
+
    /// from a pipe, however, so if this method is not used, the
+
    /// effect is that stdin comes from `/dev/null` or another empty
+
    /// file.
+
    pub fn feed_stdin(&mut self, data: &[u8]) {
+
        self.stdin_data = data.to_vec();
+
    }
+

+
    /// Start a new sub-process to execute the specified command.
+
    ///
+
    /// The caller should set up the [`std::process::Command`] value.
+
    /// This method will redirect stdin, stdout, and stderr to use
+
    /// pipes.
+
    pub fn spawn(&self, mut command: Command) -> Result<RunningProcess, TimeoutError> {
+
        // Set up child stdin/stdout/stderr redirection.
+
        let mut child = command
+
            .stdin(Stdio::piped())
+
            .stdout(Stdio::piped())
+
            .stderr(Stdio::piped())
+
            .spawn()
+
            .map_err(|err| TimeoutError::Spawn(command, err))?;
+

+
        // Set up thread to write data to child stdin.
+
        let stdin = child.stdin.take().ok_or(TimeoutError::TakeStdin)?;
+
        let stdin_data = self.stdin_data.clone();
+
        let stdin_writer = spawn(move || writer(stdin, stdin_data));
+

+
        // Set up thread to capture child stdout.
+
        let stdout = child.stdout.take().ok_or(TimeoutError::TakeStdout)?;
+
        let (stdout_termination_tx, stdout_termination_rx) = sync_channel(1);
+
        let (stdout_lines_tx, stdout_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
+
        let stdout_reader =
+
            spawn(move || NonBlockingReader::new("stdout", stdout, stdout_lines_tx).read_to_end());
+
        let stdout_lines = LineReceiver::new("stdout", stdout_lines_rx, stdout_termination_rx);
+

+
        // Set up thread to capture child stdout.
+
        let stderr = child.stderr.take().ok_or(TimeoutError::TakeStderr)?;
+
        let (stderr_termination_tx, stderr_termination_rx) = sync_channel(1);
+
        let (stderr_lines_tx, stderr_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
+
        let stderr_reader =
+
            spawn(move || NonBlockingReader::new("stderr", stderr, stderr_lines_tx).read_to_end());
+
        let stderr_lines = LineReceiver::new("stderr", stderr_lines_rx, stderr_termination_rx);
+

+
        // Set up thread to monitor child termination or overlong run time.
+
        let (tx, timed_out_rx) = sync_channel(1);
+
        let (kill_tx, kill_rx) = sync_channel(1);
+
        let nanny = Nanny::new(
+
            self.max_duration,
+
            child,
+
            tx,
+
            kill_rx,
+
            vec![stdout_termination_tx, stderr_termination_tx],
+
        );
+
        let monitor = spawn(move || nanny.monitor());
+

+
        Ok(RunningProcess {
+
            child_monitor: Some(monitor),
+
            timed_out_rx,
+
            stdin_writer,
+
            stdout_lines,
+
            stdout_reader,
+
            stderr_lines,
+
            stderr_reader,
+
            kill_tx,
+
        })
+
    }
+
}
+

+
/// Manage a running child process and capture its output.
+
///
+
/// This is created by [`TimeoutCommand::spawn`].
+
pub struct RunningProcess {
+
    child_monitor: Option<JoinHandle<Result<(), TimeoutError>>>,
+
    timed_out_rx: NannyReceiver,
+
    stdin_writer: JoinHandle<Result<(), std::io::Error>>,
+
    stdout_lines: LineReceiver,
+
    stdout_reader: JoinHandle<Result<(), TimeoutError>>,
+
    stderr_lines: LineReceiver,
+
    stderr_reader: JoinHandle<Result<(), TimeoutError>>,
+
    kill_tx: KillSender,
+
}
+

+
impl RunningProcess {
+
    /// Return a [`LineReceiver`] that returns lines from the
+
    /// sub-process standard output.
+
    pub fn stdout(&self) -> &LineReceiver {
+
        &self.stdout_lines
+
    }
+

+
    /// Return a [`LineReceiver`] that returns lines from the
+
    /// sub-process standard error output.
+
    pub fn stderr(&self) -> &LineReceiver {
+
        &self.stderr_lines
+
    }
+

+
    /// Terminate sub-process with extreme prejudice.
+
    pub fn kill(&self) -> Result<(), TimeoutError> {
+
        let x = self.kill_tx.send(());
+
        logger::debug2(format!("request termination of child process: {x:?}"));
+
        Ok(())
+
    }
+

+
    /// Wait for child process to terminate. It may terminate because
+
    /// it ends normally, or because it has run for longer than the
+
    /// limit set with [`TimeoutCommand::new`]. The return value of
+
    /// this method will specify why, see
+
    /// [`TimeoutResult::timed_out`].
+
    ///
+
    /// Note that if the sub-process produces a lot of output, you
+
    /// must read it to avoid the process getting stuck; see
+
    /// [`RunningProcess::stdout`] and [`RunningProcess::stderr`]. If
+
    /// you don't read the output, the sub-process will fill its
+
    /// output pipe buffer, or the inter-thread communication channel
+
    /// buffer, and the sub-process will block on output, and not
+
    /// progress. This may be unwanted. The blocking won't affect the
+
    /// sub-process getting terminated due to running for too long.
+
    pub fn wait(mut self) -> Result<TimeoutResult, TimeoutError> {
+
        logger::debug("wait: wait for word from nanny");
+
        let (mut child, timed_out) = self.timed_out_rx.recv().map_err(TimeoutError::ChildRecv)?;
+
        logger::debug("got word from nanny");
+
        if let Some(monitor) = self.child_monitor.take() {
+
            logger::debug("wait: wait on nanny thread to end");
+
            monitor
+
                .join()
+
                .map_err(|_| TimeoutError::JoinChildMonitor)??;
+
        }
+

+
        logger::debug("wait: wait for stdin writer to terminate");
+
        self.stdin_writer.join().ok();
+

+
        logger::debug("wait: wait for stdout reader to terminate");
+
        self.stdout_reader.join().ok();
+

+
        logger::debug("wait: wait for stderr reader to terminate");
+
        self.stderr_reader.join().ok();
+

+
        logger::debug("wait: wait for child to terminate");
+
        let status = child.wait().map_err(TimeoutError::Wait)?;
+
        logger::debug2(format!("wait: wait status: {status:?}"));
+
        logger::debug("wait: return Ok result");
+
        Ok(TimeoutResult { timed_out, status })
+
    }
+
}
+

+
/// Did the sub-process started with [`TimeoutCommand::spawn`]
+
/// terminate normally, or did it get terminated unilaterally for
+
/// running too long? What was its exit code?
+
#[derive(Debug)]
+
pub struct TimeoutResult {
+
    timed_out: bool,
+
    status: ExitStatus,
+
}
+

+
impl TimeoutResult {
+
    /// Exit code of the of the sub-process. There is always an exit
+
    /// code: [`RunningProcess::wait`] does not return until the
+
    /// sub-process has exited.
+
    pub fn status(&self) -> ExitStatus {
+
        self.status
+
    }
+

+
    /// Did the sub-process get terminated for running too long?
+
    pub fn timed_out(&self) -> bool {
+
        self.timed_out
+
    }
+
}
+

+
type NannySender = SyncSender<(Child, bool)>;
+
type NannyReceiver = Receiver<(Child, bool)>;
+

+
type KillSender = SyncSender<()>;
+
type KillReceiver = Receiver<()>;
+

+
struct Nanny {
+
    max_duration: Duration,
+
    child: Option<Child>,
+
    tx: NannySender,
+
    term_tx: Vec<TerminationSender>,
+
    kill_rx: KillReceiver,
+
}
+

+
impl Nanny {
+
    fn new(
+
        max_duration: Duration,
+
        child: Child,
+
        tx: NannySender,
+
        kill_rx: KillReceiver,
+
        term_tx: Vec<TerminationSender>,
+
    ) -> Self {
+
        Self {
+
            max_duration,
+
            child: Some(child),
+
            tx,
+
            term_tx,
+
            kill_rx,
+
        }
+
    }
+

+
    fn monitor(mut self) -> Result<(), TimeoutError> {
+
        let mut child = if let Some(child) = self.child.take() {
+
            child
+
        } else {
+
            panic!("programming error: Nanny does not have a child to monitor");
+
        };
+
        let started = Instant::now();
+
        let mut timed_out = false;
+
        logger::debug("nanny: start monitoring child");
+
        loop {
+
            let elapsed = started.elapsed();
+

+
            if self.kill_rx.try_recv().is_ok() {
+
                let x = child.kill();
+
                logger::debug2(format!("nanny: terminated child by request: {x:?}"));
+
                break;
+
            } else if elapsed > self.max_duration {
+
                logger::debug2(format!(
+
                    "nanny: child has run for too long ({} ms > {} ms)",
+
                    elapsed.as_millis(),
+
                    self.max_duration.as_millis(),
+
                ));
+
                logger::debug2(format!(
+
                    "nanny: terminate child process {} with extreme prejudice",
+
                    child.id(),
+
                ));
+
                let x = child.kill();
+
                timed_out = true;
+
                logger::debug2(format!("nanny: termination result: {x:?}"));
+
                break;
+
            }
+

+
            if matches!(child.try_wait(), Ok(None)) {
+
                logger::debug2(format!(
+
                    "nanny: child is still running, that's fine ({} ms <= {} ms)",
+
                    elapsed.as_millis(),
+
                    self.max_duration.as_millis(),
+
                ));
+
                sleep(WAIT_FOR_IDLE_CHILD);
+
            } else {
+
                logger::debug("nanny: child has terminated");
+
                break;
+
            }
+
        }
+

+
        logger::debug("nanny: tell RunningProcess it's time");
+
        self.tx
+
            .send((child, timed_out))
+
            .map_err(TimeoutError::ChildSend)?;
+
        for tx in self.term_tx.iter() {
+
            logger::debug("nanny: tell line receiver it's time");
+
            tx.send(()).map_err(TimeoutError::ChildSendToLine)?;
+
        }
+

+
        logger::debug("nanny ends");
+
        Ok(())
+
    }
+
}
+

+
fn writer(mut stream: impl Write, data: Vec<u8>) -> Result<(), std::io::Error> {
+
    let mut written = 0;
+
    while written < data.len() {
+
        // We write one byte at a time. This lets us avoid doing
+
        // non-blocking I/O, but is less efficient. by only writing
+
        // one byte at a time, we only block when we can't write to
+
        // the stream. When the stream is a pipe, this happens when
+
        // the pipe buffer fills up. This function should be in its
+
        // own thread, and so it doesn't matter if it blocks, but
+
        // measurements are more useful when they're taken after each
+
        // byte.
+
        //
+
        // When, inevitably, byte-at-a-time becomes too inefficient,
+
        // this will need to be rewritten to use non-blocking I/O.
+
        //
+
        // Or async.
+
        let n = stream.write(&data[written..written + 1])?;
+
        written += n;
+
        logger::debug("writer wrote {n:?}");
+
        stream.flush()?;
+
    }
+

+
    Ok(())
+
}
+

+
type TerminationSender = SyncSender<()>;
+
type TerminationReceiver = Receiver<()>;
+

+
/// Receive one line of output at time.
+
///
+
/// See the [module description](index.html) for an example.
+
pub struct LineReceiver {
+
    name: &'static str,
+
    child_terminated: TerminationReceiver,
+
    bytes: OutputReader,
+
}
+

+
impl LineReceiver {
+
    fn new(name: &'static str, bytes: OutputReader, child_terminated: TerminationReceiver) -> Self {
+
        Self {
+
            name,
+
            child_terminated,
+
            bytes,
+
        }
+
    }
+

+
    /// Return the next line, if any, or `None` if there will be no
+
    /// more lines. Note that this blocks until there is a line, or
+
    /// the child process terminates.
+
    pub fn line(&self) -> Option<String> {
+
        let mut line = vec![];
+

+
        loop {
+
            // Get a byte if there is one.
+
            logger::debug2(format!(
+
                "line receiver {}: try to receive next byte",
+
                self.name
+
            ));
+
            let y = self.bytes.try_recv();
+
            logger::debug2(format!(
+
                "line receiver {}: tried to read line: {y:?}",
+
                self.name
+
            ));
+
            match y {
+
                Ok(byte) => {
+
                    line.push(byte);
+
                    if byte == b'\n' {
+
                        let line = String::from_utf8_lossy(&line).to_string();
+
                        logger::debug2(format!(
+
                            "line-receiver {}: received line={line:?}",
+
                            self.name
+
                        ));
+
                        return Some(line);
+
                    }
+
                }
+
                Err(TryRecvError::Empty) => {
+
                    sleep(WAIT_FOR_OUTPUT);
+
                }
+
                Err(TryRecvError::Disconnected) => {
+
                    if line.is_empty() {
+
                        // Sender has closed the channel, there will be no more lines.
+
                        logger::debug2(format!("line-receiver {}: disconnected", self.name));
+
                        return None;
+
                    } else {
+
                        let line = String::from_utf8_lossy(&line).to_string();
+
                        logger::debug2(format!(
+
                            "line-receiver {}: received line={line:?}",
+
                            self.name
+
                        ));
+
                        return Some(line);
+
                    }
+
                }
+
            }
+

+
            logger::debug("line-receiver: has child terminated?");
+
            let x = self.child_terminated.try_recv();
+
            // logger::debug("line receiver {}: x={x:?}", self.name);
+
            match x {
+
                Ok(_) => {
+
                    logger::debug2(format!(
+
                        "line receiver {}: OK: child has terminated, not returning line",
+
                        self.name,
+
                    ));
+
                }
+
                Err(std::sync::mpsc::TryRecvError::Disconnected) => {
+
                    logger::debug2(format!(
+
                        "line receiver {}: Disconnected: child has terminated, not returning line",
+
                        self.name,
+
                    ));
+
                }
+
                _ => {}
+
            }
+
        }
+
    }
+
}
+

+
type OutputSender = SyncSender<u8>;
+
type OutputReader = Receiver<u8>;
+

+
struct NonBlockingReader<R: Read> {
+
    name: &'static str,
+
    stream: R,
+
    tx: OutputSender,
+
}
+

+
impl<R: Read> NonBlockingReader<R> {
+
    fn new(name: &'static str, stream: R, tx: OutputSender) -> Self {
+
        Self { name, stream, tx }
+
    }
+

+
    fn read_to_end(mut self) -> Result<(), TimeoutError> {
+
        let mut count = 0;
+
        loop {
+
            // We read one byte at a time. This lets us avoid doing
+
            // non-blocking I/O but is less efficient. We want to
+
            // avoid blocking for an arbitrary amount of time, if
+
            // reading one byte at a time. When reading from a pipe,
+
            // the pipe writer end may not get closed until the child
+
            // process writing to the pipe ends, and we may not want
+
            // to wait that long.
+
            //
+
            // If this becomes too inefficient, this needs to be
+
            // rewritten to use non-blocking I/O or async.
+

+
            logger::debug2(format!(
+
                "read_to_end {}: try to receive next byte ({count} so far)",
+
                self.name,
+
            ));
+
            let mut byte = vec![0; 1];
+
            let x = self.stream.read(&mut byte);
+
            logger::debug2(format!("read_to_end {}: x={x:?} byte={byte:?}", self.name));
+
            match x {
+
                Ok(0) => {
+
                    logger::debug2(format!("read_to_end {}: end of file", self.name));
+
                    break;
+
                }
+
                Ok(1) => {
+
                    count += 1;
+
                    self.tx
+
                        .try_send(byte[0])
+
                        .map_err(|_| TimeoutError::TooMuch(self.name))?;
+
                }
+
                Ok(_) => {
+
                    logger::debug2(format!("read_to_end {}: read too much", self.name));
+
                    return Err(TimeoutError::ReadMucn);
+
                }
+
                Err(err) => {
+
                    logger::debug2(format!("read_to_end {}: read error: {err}", self.name));
+
                    return Err(TimeoutError::Read(err));
+
                }
+
            }
+
        }
+

+
        logger::debug2(format!("read_to_end {}: terminate", self.name));
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum TimeoutError {
+
    /// Couldn't spawn child process.
+
    #[error("failed to spawn command: {0:?}")]
+
    Spawn(Command, #[source] std::io::Error),
+

+
    /// Couldn't get file descriptor of child process stdin.
+
    #[error("failed to extract stdin stream from child")]
+
    TakeStdin,
+

+
    /// Couldn't get file descriptor of child process stdout.
+
    #[error("failed to extract stdout stream from child")]
+
    TakeStdout,
+

+
    /// Couldn't get file descriptor of child process stderr.
+
    #[error("failed to extract stderr stream from child")]
+
    TakeStderr,
+

+
    /// Couldn't check if child process is still running.
+
    #[error("failed to check whether command is still running")]
+
    TryWait(#[source] std::io::Error),
+

+
    /// Reading from child stdout or stderr returned too much data.
+
    #[error("read from command standard output returned more data than requested")]
+
    ReadMucn,
+

+
    /// Reading from child stdout or stderr failed.
+
    #[error("problem reading from command output (stdout or stderr)")]
+
    Read(#[source] std::io::Error),
+

+
    /// Channel buffer got full, which means child process wrote too much output.
+
    #[error("sub-process produces too much to {0}")]
+
    TooMuch(&'static str),
+

+
    /// Couldn't join thread that feeds data to child stdin.
+
    #[error("problem waiting for thread that writes to command standard input")]
+
    JoinStdinFeeder,
+

+
    /// Couldn't write to child stdin.
+
    #[error("problem writing to command standard input")]
+
    FeedStdin(#[source] std::io::Error),
+

+
    #[error("problem waiting for thread that monitors command")]
+
    JoinChildMonitor,
+

+
    /// Couldn't join thread that reads child stdout.
+
    #[error("problem waiting for thread that reads command standard output")]
+
    JoinStdoutReader,
+

+
    /// Couldn't join thread that reads child stderr.
+
    #[error("problem waiting for thread that reads command standard error output")]
+
    JoinStderrReader,
+

+
    /// Couldn't terminate child process.
+
    #[error("problem forcing child process to terminate")]
+
    Kill(#[source] std::io::Error),
+

+
    /// Couldn't wait for child process to terminate.
+
    #[error("problem waiting for child process to terminate")]
+
    Wait(#[source] std::io::Error),
+

+
    /// Mutex lock error.
+
    #[error("failed to lock command output buffer")]
+
    MutexLock,
+

+
    #[error("failed to receive notification from child monitor")]
+
    ChildRecv(#[source] std::sync::mpsc::RecvError),
+

+
    #[error("failed to send notification from child monitor")]
+
    ChildSend(#[source] std::sync::mpsc::SendError<(Child, bool)>),
+

+
    #[error("failed to send notification from child monitor to line receiver")]
+
    ChildSendToLine(#[source] std::sync::mpsc::SendError<()>),
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+

+
    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(100);
+
    const SHORT_TIMEOUT: Duration = Duration::from_secs(3);
+

+
    fn setup(
+
        script: &str,
+
        timeout: Duration,
+
        stdin: Option<&'static str>,
+
    ) -> Result<RunningProcess, Box<dyn std::error::Error>> {
+
        let mut cmd = Command::new("bash");
+
        cmd.arg("-c").arg(script);
+
        let mut to = TimeoutCommand::new(timeout);
+
        if let Some(stdin) = stdin {
+
            to.feed_stdin(stdin.as_bytes());
+
        }
+
        Ok(to.spawn(cmd)?)
+
    }
+

+
    #[test]
+
    fn bin_true() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec /bin/true",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn bin_false() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec /bin/false",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(1));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn sleep_1() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec sleep 1",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn sleep_for_too_long() -> Result<(), Box<dyn std::error::Error>> {
+
        let started = Instant::now();
+
        let running = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
+
        let tor = running.wait()?;
+
        eprintln!("duration: {} ms", started.elapsed().as_millis());
+
        assert_eq!(tor.status().code(), None);
+
        assert!(tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn hello_world() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec echo hello, world",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let stdout = running.stdout();
+
        let stderr = running.stderr();
+

+
        assert_eq!(stdout.line(), Some("hello, world\n".into()));
+
        assert_eq!(stdout.line(), None);
+

+
        assert_eq!(stderr.line(), None);
+

+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn hello_world_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec echo hello, world 1>&2",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let stdout = running.stdout();
+
        let stderr = running.stderr();
+

+
        assert_eq!(stdout.line(), None);
+

+
        assert_eq!(stderr.line(), Some("hello, world\n".into()));
+
        assert_eq!(stderr.line(), None);
+

+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn pipe_through_cat() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec cat",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            Some("hello, world"),
+
        )?;
+
        let stdout = running.stdout();
+
        let stderr = running.stderr();
+

+
        assert_eq!(stdout.line(), Some("hello, world".into()));
+
        assert_eq!(stdout.line(), None);
+

+
        assert_eq!(stderr.line(), None);
+

+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn yes_to_stdout() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup("exec yes", SHORT_TIMEOUT, None)?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn yes_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn kill() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec sleep 1000",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        sleep(Duration::from_millis(5000));
+
        running.kill()?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn kill_stderr() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec sleep 1000 1>&2",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        sleep(Duration::from_millis(5000));
+
        running.kill()?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+
}