Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add a module to run a sub-process with a time limit
Lars Wirzenius committed 1 year ago
commit 68f392dd7806bbf2297635792160f6ac18b540a9
parent 0f90e97a3ebec4ea7011a36167291de7f7226651
2 files changed +798 -0
modified src/lib.rs
@@ -24,4 +24,5 @@ pub mod queueproc;
pub mod run;
#[cfg(test)]
pub mod test;
+
pub mod timeoutcmd;
pub mod util;
added src/timeoutcmd.rs
@@ -0,0 +1,797 @@
+
//! Run a command (an external program) as a sub-process, capturing
+
//! its output in real time, with a maximum duration.
+
//!
+
//! This is meant for the CI broker to run a CI adapter and process
+
//! the single-line messages the adapter writes to its standard
+
//! output, as well as capture stderr output, which the adapter uses
+
//! for logging. If the adapter runs for too long, it gets terminated.
+
//!
+
//! Note that if the [`Command`] that is created to run the command
+
//! invokes a shell, the shell **must** `exec` the command it runs, or
+
//! in some other way make sure the processes the shell launches get
+
//! terminated when the shell process ends. Otherwise the time out
+
//! management here does not work reliably.
+
//!
+
//! The child can be given some data via its stdin.
+
//!
+
//! This module is not entirely generic, as it assumes textual output with
+
//! lines, instead of arbitrary byte strings.
+
//!
+
//! # Example
+
//! ```
+
//! # use std::{process::Command, time::Duration};
+
//! # use radicle_ci_broker::timeoutcmd::{RunningProcess, TimeoutCommand};
+
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+
//! let mut cmd = Command::new("bash");
+
//! cmd.arg("-c").arg("exec cat"); // Note exec!
+
//!
+
//! let mut to = TimeoutCommand::new(Duration::from_secs(10));
+
//! to.feed_stdin(b"hello, world\n");
+
//! let running = to.spawn(cmd)?;
+
//!
+
//! // Capture stdout output. We ignore stderr output.
+
//! let stdout = running.stdout();
+
//! let mut captured = vec![];
+
//! while let Some(line) = stdout.line() {
+
//!     captured.push(line);
+
//! }
+
//!
+
//! // Wait for child process to terminate.
+
//! let tor = running.wait()?;
+
//! assert_eq!(tor.status().code(), Some(0));
+
//! assert_eq!(captured, ["hello, world\n"]);
+
//! # Ok(())
+
//! # }
+
//! ```
+

+
#![allow(unused_imports)]
+

+
use std::{
+
    io::{Read, Write},
+
    process::{Child, Command, ExitStatus, Stdio},
+
    sync::{
+
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender, TryRecvError},
+
        Arc, Mutex,
+
    },
+
    thread::{sleep, spawn, JoinHandle},
+
    time::{Duration, Instant},
+
};
+

+
use crate::logger;
+

+
const WAIT_FOR_IDLE_CHILD: Duration = Duration::from_millis(1000);
+
const WAIT_FOR_OUTPUT: Duration = Duration::from_millis(100);
+
const KIB: usize = 1024;
+
const MIB: usize = 1024 * KIB;
+
const MAX_OUTPUT_BYTES: usize = 10 * MIB;
+

+
/// Spawn a child process, with a maximum duration and capture its
+
/// output. See also [`RunningProcess`].
+
pub struct TimeoutCommand {
+
    max_duration: Duration,
+
    stdin_data: Vec<u8>,
+
}
+

+
// This works by using multiple threads.
+
//
+
// * a thread to send data to child's stdin
+
// * a thread to read child's stdout, as bytes
+
// * a thread to read child's stderr, as bytes
+
// * a thread to monitor how long the child runs
+
// * the calling thread waits for the monitor thread to tell it the
+
//   child has ended or needs to be terminate
+
//
+
// Communication between the threads happens via
+
// [`std::sync::mpsc::sync_channel`] channels. In performance tests,
+
// these are quite fast if the channel buffer is sufficiently large:
+
// throughput of over 2 MiB/s have been measured.
+
//
+
// Output from the child is collected into lines, which can be passed
+
// to the caller. For the CI broker we only care about lines.
+

+
impl TimeoutCommand {
+
    /// Create a new time-limited command. The sub-process will run at
+
    /// most as long as the argument specifies. See
+
    /// [`TimeoutCommand::spawn`] for actually creating the
+
    /// sub-process.
+
    pub fn new(max_duration: Duration) -> Self {
+
        Self {
+
            max_duration,
+
            stdin_data: vec![],
+
        }
+
    }
+

+
    /// Feed the sub-process the specified binary data via its
+
    /// standard input. If this method is not used, the sub-process
+
    /// stdin will be fed no data. The sub-process stdin always comes
+
    /// from a pipe, however, so if this method is not used, the
+
    /// effect is that stdin comes from `/dev/null` or another empty
+
    /// file.
+
    pub fn feed_stdin(&mut self, data: &[u8]) {
+
        self.stdin_data = data.to_vec();
+
    }
+

+
    /// Start a new sub-process to execute the specified command.
+
    ///
+
    /// The caller should set up the [`std::process::Command`] value.
+
    /// This method will redirect stdin, stdout, and stderr to use
+
    /// pipes.
+
    pub fn spawn(&self, mut command: Command) -> Result<RunningProcess, TimeoutError> {
+
        // Set up child stdin/stdout/stderr redirection.
+
        let mut child = command
+
            .stdin(Stdio::piped())
+
            .stdout(Stdio::piped())
+
            .stderr(Stdio::piped())
+
            .spawn()
+
            .map_err(|err| TimeoutError::Spawn(command, err))?;
+

+
        // Set up thread to write data to child stdin.
+
        let stdin = child.stdin.take().ok_or(TimeoutError::TakeStdin)?;
+
        let stdin_data = self.stdin_data.clone();
+
        let stdin_writer = spawn(move || writer(stdin, stdin_data));
+

+
        // Set up thread to capture child stdout.
+
        let stdout = child.stdout.take().ok_or(TimeoutError::TakeStdout)?;
+
        let (stdout_termination_tx, stdout_termination_rx) = sync_channel(1);
+
        let (stdout_lines_tx, stdout_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
+
        let stdout_reader =
+
            spawn(move || NonBlockingReader::new("stdout", stdout, stdout_lines_tx).read_to_end());
+
        let stdout_lines = LineReceiver::new("stdout", stdout_lines_rx, stdout_termination_rx);
+

+
        // Set up thread to capture child stdout.
+
        let stderr = child.stderr.take().ok_or(TimeoutError::TakeStderr)?;
+
        let (stderr_termination_tx, stderr_termination_rx) = sync_channel(1);
+
        let (stderr_lines_tx, stderr_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
+
        let stderr_reader =
+
            spawn(move || NonBlockingReader::new("stderr", stderr, stderr_lines_tx).read_to_end());
+
        let stderr_lines = LineReceiver::new("stderr", stderr_lines_rx, stderr_termination_rx);
+

+
        // Set up thread to monitor child termination or overlong run time.
+
        let (tx, timed_out_rx) = sync_channel(1);
+
        let (kill_tx, kill_rx) = sync_channel(1);
+
        let nanny = Nanny::new(
+
            self.max_duration,
+
            child,
+
            tx,
+
            kill_rx,
+
            vec![stdout_termination_tx, stderr_termination_tx],
+
        );
+
        let monitor = spawn(move || nanny.monitor());
+

+
        Ok(RunningProcess {
+
            child_monitor: Some(monitor),
+
            timed_out_rx,
+
            stdin_writer,
+
            stdout_lines,
+
            stdout_reader,
+
            stderr_lines,
+
            stderr_reader,
+
            kill_tx,
+
        })
+
    }
+
}
+

+
/// Manage a running child process and capture its output.
+
///
+
/// This is created by [`TimeoutCommand::spawn`].
+
pub struct RunningProcess {
+
    child_monitor: Option<JoinHandle<Result<(), TimeoutError>>>,
+
    timed_out_rx: NannyReceiver,
+
    stdin_writer: JoinHandle<Result<(), std::io::Error>>,
+
    stdout_lines: LineReceiver,
+
    stdout_reader: JoinHandle<Result<(), TimeoutError>>,
+
    stderr_lines: LineReceiver,
+
    stderr_reader: JoinHandle<Result<(), TimeoutError>>,
+
    kill_tx: KillSender,
+
}
+

+
impl RunningProcess {
+
    /// Return a [`LineReceiver`] that returns lines from the
+
    /// sub-process standard output.
+
    pub fn stdout(&self) -> &LineReceiver {
+
        &self.stdout_lines
+
    }
+

+
    /// Return a [`LineReceiver`] that returns lines from the
+
    /// sub-process standard error output.
+
    pub fn stderr(&self) -> &LineReceiver {
+
        &self.stderr_lines
+
    }
+

+
    /// Terminate sub-process with extreme prejudice.
+
    pub fn kill(&self) -> Result<(), TimeoutError> {
+
        let x = self.kill_tx.send(());
+
        logger::debug2(format!("request termination of child process: {x:?}"));
+
        Ok(())
+
    }
+

+
    /// Wait for child process to terminate. It may terminate because
+
    /// it ends normally, or because it has run for longer than the
+
    /// limit set with [`TimeoutCommand::new`]. The return value of
+
    /// this method will specify why, see
+
    /// [`TimeoutResult::timed_out`].
+
    ///
+
    /// Note that if the sub-process produces a lot of output, you
+
    /// must read it to avoid the process getting stuck; see
+
    /// [`RunningProcess::stdout`] and [`RunningProcess::stderr`]. If
+
    /// you don't read the output, the sub-process will fill its
+
    /// output pipe buffer, or the inter-thread communication channel
+
    /// buffer, and the sub-process will block on output, and not
+
    /// progress. This may be unwanted. The blocking won't affect the
+
    /// sub-process getting terminated due to running for too long.
+
    pub fn wait(mut self) -> Result<TimeoutResult, TimeoutError> {
+
        logger::debug("wait: wait for word from nanny");
+
        let (mut child, timed_out) = self.timed_out_rx.recv().map_err(TimeoutError::ChildRecv)?;
+
        logger::debug("got word from nanny");
+
        if let Some(monitor) = self.child_monitor.take() {
+
            logger::debug("wait: wait on nanny thread to end");
+
            monitor
+
                .join()
+
                .map_err(|_| TimeoutError::JoinChildMonitor)??;
+
        }
+

+
        logger::debug("wait: wait for stdin writer to terminate");
+
        self.stdin_writer.join().ok();
+

+
        logger::debug("wait: wait for stdout reader to terminate");
+
        self.stdout_reader.join().ok();
+

+
        logger::debug("wait: wait for stderr reader to terminate");
+
        self.stderr_reader.join().ok();
+

+
        logger::debug("wait: wait for child to terminate");
+
        let status = child.wait().map_err(TimeoutError::Wait)?;
+
        logger::debug2(format!("wait: wait status: {status:?}"));
+
        logger::debug("wait: return Ok result");
+
        Ok(TimeoutResult { timed_out, status })
+
    }
+
}
+

+
/// Did the sub-process started with [`TimeoutCommand::spawn`]
+
/// terminate normally, or did it get terminated unilaterally for
+
/// running too long? What was its exit code?
+
#[derive(Debug)]
+
pub struct TimeoutResult {
+
    timed_out: bool,
+
    status: ExitStatus,
+
}
+

+
impl TimeoutResult {
+
    /// Exit code of the of the sub-process. There is always an exit
+
    /// code: [`RunningProcess::wait`] does not return until the
+
    /// sub-process has exited.
+
    pub fn status(&self) -> ExitStatus {
+
        self.status
+
    }
+

+
    /// Did the sub-process get terminated for running too long?
+
    pub fn timed_out(&self) -> bool {
+
        self.timed_out
+
    }
+
}
+

+
type NannySender = SyncSender<(Child, bool)>;
+
type NannyReceiver = Receiver<(Child, bool)>;
+

+
type KillSender = SyncSender<()>;
+
type KillReceiver = Receiver<()>;
+

+
struct Nanny {
+
    max_duration: Duration,
+
    child: Option<Child>,
+
    tx: NannySender,
+
    term_tx: Vec<TerminationSender>,
+
    kill_rx: KillReceiver,
+
}
+

+
impl Nanny {
+
    fn new(
+
        max_duration: Duration,
+
        child: Child,
+
        tx: NannySender,
+
        kill_rx: KillReceiver,
+
        term_tx: Vec<TerminationSender>,
+
    ) -> Self {
+
        Self {
+
            max_duration,
+
            child: Some(child),
+
            tx,
+
            term_tx,
+
            kill_rx,
+
        }
+
    }
+

+
    fn monitor(mut self) -> Result<(), TimeoutError> {
+
        let mut child = if let Some(child) = self.child.take() {
+
            child
+
        } else {
+
            panic!("programming error: Nanny does not have a child to monitor");
+
        };
+
        let started = Instant::now();
+
        let mut timed_out = false;
+
        logger::debug("nanny: start monitoring child");
+
        loop {
+
            let elapsed = started.elapsed();
+

+
            if self.kill_rx.try_recv().is_ok() {
+
                let x = child.kill();
+
                logger::debug2(format!("nanny: terminated child by request: {x:?}"));
+
                break;
+
            } else if elapsed > self.max_duration {
+
                logger::debug2(format!(
+
                    "nanny: child has run for too long ({} ms > {} ms)",
+
                    elapsed.as_millis(),
+
                    self.max_duration.as_millis(),
+
                ));
+
                logger::debug2(format!(
+
                    "nanny: terminate child process {} with extreme prejudice",
+
                    child.id(),
+
                ));
+
                let x = child.kill();
+
                timed_out = true;
+
                logger::debug2(format!("nanny: termination result: {x:?}"));
+
                break;
+
            }
+

+
            if matches!(child.try_wait(), Ok(None)) {
+
                logger::debug2(format!(
+
                    "nanny: child is still running, that's fine ({} ms <= {} ms)",
+
                    elapsed.as_millis(),
+
                    self.max_duration.as_millis(),
+
                ));
+
                sleep(WAIT_FOR_IDLE_CHILD);
+
            } else {
+
                logger::debug("nanny: child has terminated");
+
                break;
+
            }
+
        }
+

+
        logger::debug("nanny: tell RunningProcess it's time");
+
        self.tx
+
            .send((child, timed_out))
+
            .map_err(TimeoutError::ChildSend)?;
+
        for tx in self.term_tx.iter() {
+
            logger::debug("nanny: tell line receiver it's time");
+
            tx.send(()).map_err(TimeoutError::ChildSendToLine)?;
+
        }
+

+
        logger::debug("nanny ends");
+
        Ok(())
+
    }
+
}
+

+
fn writer(mut stream: impl Write, data: Vec<u8>) -> Result<(), std::io::Error> {
+
    let mut written = 0;
+
    while written < data.len() {
+
        // We write one byte at a time. This lets us avoid doing
+
        // non-blocking I/O, but is less efficient. by only writing
+
        // one byte at a time, we only block when we can't write to
+
        // the stream. When the stream is a pipe, this happens when
+
        // the pipe buffer fills up. This function should be in its
+
        // own thread, and so it doesn't matter if it blocks, but
+
        // measurements are more useful when they're taken after each
+
        // byte.
+
        //
+
        // When, inevitably, byte-at-a-time becomes too inefficient,
+
        // this will need to be rewritten to use non-blocking I/O.
+
        //
+
        // Or async.
+
        let n = stream.write(&data[written..written + 1])?;
+
        written += n;
+
        logger::debug("writer wrote {n:?}");
+
        stream.flush()?;
+
    }
+

+
    Ok(())
+
}
+

+
type TerminationSender = SyncSender<()>;
+
type TerminationReceiver = Receiver<()>;
+

+
/// Receive one line of output at time.
+
///
+
/// See the [module description](index.html) for an example.
+
pub struct LineReceiver {
+
    name: &'static str,
+
    child_terminated: TerminationReceiver,
+
    bytes: OutputReader,
+
}
+

+
impl LineReceiver {
+
    fn new(name: &'static str, bytes: OutputReader, child_terminated: TerminationReceiver) -> Self {
+
        Self {
+
            name,
+
            child_terminated,
+
            bytes,
+
        }
+
    }
+

+
    /// Return the next line, if any, or `None` if there will be no
+
    /// more lines. Note that this blocks until there is a line, or
+
    /// the child process terminates.
+
    pub fn line(&self) -> Option<String> {
+
        let mut line = vec![];
+

+
        loop {
+
            // Get a byte if there is one.
+
            logger::debug2(format!(
+
                "line receiver {}: try to receive next byte",
+
                self.name
+
            ));
+
            let y = self.bytes.try_recv();
+
            logger::debug2(format!(
+
                "line receiver {}: tried to read line: {y:?}",
+
                self.name
+
            ));
+
            match y {
+
                Ok(byte) => {
+
                    line.push(byte);
+
                    if byte == b'\n' {
+
                        let line = String::from_utf8_lossy(&line).to_string();
+
                        logger::debug2(format!(
+
                            "line-receiver {}: received line={line:?}",
+
                            self.name
+
                        ));
+
                        return Some(line);
+
                    }
+
                }
+
                Err(TryRecvError::Empty) => {
+
                    sleep(WAIT_FOR_OUTPUT);
+
                }
+
                Err(TryRecvError::Disconnected) => {
+
                    if line.is_empty() {
+
                        // Sender has closed the channel, there will be no more lines.
+
                        logger::debug2(format!("line-receiver {}: disconnected", self.name));
+
                        return None;
+
                    } else {
+
                        let line = String::from_utf8_lossy(&line).to_string();
+
                        logger::debug2(format!(
+
                            "line-receiver {}: received line={line:?}",
+
                            self.name
+
                        ));
+
                        return Some(line);
+
                    }
+
                }
+
            }
+

+
            logger::debug("line-receiver: has child terminated?");
+
            let x = self.child_terminated.try_recv();
+
            // logger::debug("line receiver {}: x={x:?}", self.name);
+
            match x {
+
                Ok(_) => {
+
                    logger::debug2(format!(
+
                        "line receiver {}: OK: child has terminated, not returning line",
+
                        self.name,
+
                    ));
+
                }
+
                Err(std::sync::mpsc::TryRecvError::Disconnected) => {
+
                    logger::debug2(format!(
+
                        "line receiver {}: Disconnected: child has terminated, not returning line",
+
                        self.name,
+
                    ));
+
                }
+
                _ => {}
+
            }
+
        }
+
    }
+
}
+

+
type OutputSender = SyncSender<u8>;
+
type OutputReader = Receiver<u8>;
+

+
struct NonBlockingReader<R: Read> {
+
    name: &'static str,
+
    stream: R,
+
    tx: OutputSender,
+
}
+

+
impl<R: Read> NonBlockingReader<R> {
+
    fn new(name: &'static str, stream: R, tx: OutputSender) -> Self {
+
        Self { name, stream, tx }
+
    }
+

+
    fn read_to_end(mut self) -> Result<(), TimeoutError> {
+
        let mut count = 0;
+
        loop {
+
            // We read one byte at a time. This lets us avoid doing
+
            // non-blocking I/O but is less efficient. We want to
+
            // avoid blocking for an arbitrary amount of time, if
+
            // reading one byte at a time. When reading from a pipe,
+
            // the pipe writer end may not get closed until the child
+
            // process writing to the pipe ends, and we may not want
+
            // to wait that long.
+
            //
+
            // If this becomes too inefficient, this needs to be
+
            // rewritten to use non-blocking I/O or async.
+

+
            logger::debug2(format!(
+
                "read_to_end {}: try to receive next byte ({count} so far)",
+
                self.name,
+
            ));
+
            let mut byte = vec![0; 1];
+
            let x = self.stream.read(&mut byte);
+
            logger::debug2(format!("read_to_end {}: x={x:?} byte={byte:?}", self.name));
+
            match x {
+
                Ok(0) => {
+
                    logger::debug2(format!("read_to_end {}: end of file", self.name));
+
                    break;
+
                }
+
                Ok(1) => {
+
                    count += 1;
+
                    self.tx
+
                        .try_send(byte[0])
+
                        .map_err(|_| TimeoutError::TooMuch(self.name))?;
+
                }
+
                Ok(_) => {
+
                    logger::debug2(format!("read_to_end {}: read too much", self.name));
+
                    return Err(TimeoutError::ReadMucn);
+
                }
+
                Err(err) => {
+
                    logger::debug2(format!("read_to_end {}: read error: {err}", self.name));
+
                    return Err(TimeoutError::Read(err));
+
                }
+
            }
+
        }
+

+
        logger::debug2(format!("read_to_end {}: terminate", self.name));
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum TimeoutError {
+
    /// Couldn't spawn child process.
+
    #[error("failed to spawn command: {0:?}")]
+
    Spawn(Command, #[source] std::io::Error),
+

+
    /// Couldn't get file descriptor of child process stdin.
+
    #[error("failed to extract stdin stream from child")]
+
    TakeStdin,
+

+
    /// Couldn't get file descriptor of child process stdout.
+
    #[error("failed to extract stdout stream from child")]
+
    TakeStdout,
+

+
    /// Couldn't get file descriptor of child process stderr.
+
    #[error("failed to extract stderr stream from child")]
+
    TakeStderr,
+

+
    /// Couldn't check if child process is still running.
+
    #[error("failed to check whether command is still running")]
+
    TryWait(#[source] std::io::Error),
+

+
    /// Reading from child stdout or stderr returned too much data.
+
    #[error("read from command standard output returned more data than requested")]
+
    ReadMucn,
+

+
    /// Reading from child stdout or stderr failed.
+
    #[error("problem reading from command output (stdout or stderr)")]
+
    Read(#[source] std::io::Error),
+

+
    /// Channel buffer got full, which means child process wrote too much output.
+
    #[error("sub-process produces too much to {0}")]
+
    TooMuch(&'static str),
+

+
    /// Couldn't join thread that feeds data to child stdin.
+
    #[error("problem waiting for thread that writes to command standard input")]
+
    JoinStdinFeeder,
+

+
    /// Couldn't write to child stdin.
+
    #[error("problem writing to command standard input")]
+
    FeedStdin(#[source] std::io::Error),
+

+
    #[error("problem waiting for thread that monitors command")]
+
    JoinChildMonitor,
+

+
    /// Couldn't join thread that reads child stdout.
+
    #[error("problem waiting for thread that reads command standard output")]
+
    JoinStdoutReader,
+

+
    /// Couldn't join thread that reads child stderr.
+
    #[error("problem waiting for thread that reads command standard error output")]
+
    JoinStderrReader,
+

+
    /// Couldn't terminate child process.
+
    #[error("problem forcing child process to terminate")]
+
    Kill(#[source] std::io::Error),
+

+
    /// Couldn't wait for child process to terminate.
+
    #[error("problem waiting for child process to terminate")]
+
    Wait(#[source] std::io::Error),
+

+
    /// Mutex lock error.
+
    #[error("failed to lock command output buffer")]
+
    MutexLock,
+

+
    #[error("failed to receive notification from child monitor")]
+
    ChildRecv(#[source] std::sync::mpsc::RecvError),
+

+
    #[error("failed to send notification from child monitor")]
+
    ChildSend(#[source] std::sync::mpsc::SendError<(Child, bool)>),
+

+
    #[error("failed to send notification from child monitor to line receiver")]
+
    ChildSendToLine(#[source] std::sync::mpsc::SendError<()>),
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+

+
    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(100);
+
    const SHORT_TIMEOUT: Duration = Duration::from_secs(3);
+

+
    fn setup(
+
        script: &str,
+
        timeout: Duration,
+
        stdin: Option<&'static str>,
+
    ) -> Result<RunningProcess, Box<dyn std::error::Error>> {
+
        let mut cmd = Command::new("bash");
+
        cmd.arg("-c").arg(script);
+
        let mut to = TimeoutCommand::new(timeout);
+
        if let Some(stdin) = stdin {
+
            to.feed_stdin(stdin.as_bytes());
+
        }
+
        Ok(to.spawn(cmd)?)
+
    }
+

+
    #[test]
+
    fn bin_true() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec /bin/true",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn bin_false() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec /bin/false",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(1));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn sleep_1() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec sleep 1",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn sleep_for_too_long() -> Result<(), Box<dyn std::error::Error>> {
+
        let started = Instant::now();
+
        let running = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
+
        let tor = running.wait()?;
+
        eprintln!("duration: {} ms", started.elapsed().as_millis());
+
        assert_eq!(tor.status().code(), None);
+
        assert!(tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn hello_world() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec echo hello, world",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let stdout = running.stdout();
+
        let stderr = running.stderr();
+

+
        assert_eq!(stdout.line(), Some("hello, world\n".into()));
+
        assert_eq!(stdout.line(), None);
+

+
        assert_eq!(stderr.line(), None);
+

+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn hello_world_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec echo hello, world 1>&2",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        let stdout = running.stdout();
+
        let stderr = running.stderr();
+

+
        assert_eq!(stdout.line(), None);
+

+
        assert_eq!(stderr.line(), Some("hello, world\n".into()));
+
        assert_eq!(stderr.line(), None);
+

+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn pipe_through_cat() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec cat",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            Some("hello, world"),
+
        )?;
+
        let stdout = running.stdout();
+
        let stderr = running.stderr();
+

+
        assert_eq!(stdout.line(), Some("hello, world".into()));
+
        assert_eq!(stdout.line(), None);
+

+
        assert_eq!(stderr.line(), None);
+

+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), Some(0));
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn yes_to_stdout() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup("exec yes", SHORT_TIMEOUT, None)?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn yes_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn kill() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec sleep 1000",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        sleep(Duration::from_millis(5000));
+
        running.kill()?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn kill_stderr() -> Result<(), Box<dyn std::error::Error>> {
+
        let running = setup(
+
            "exec sleep 1000 1>&2",
+
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
+
            None,
+
        )?;
+
        sleep(Duration::from_millis(5000));
+
        running.kill()?;
+
        let tor = running.wait()?;
+
        assert_eq!(tor.status().code(), None);
+
        assert!(!tor.timed_out());
+
        Ok(())
+
    }
+
}