Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor: rewrite the timeoutcmd module for simplicity
Lars Wirzenius committed 1 year ago
commit 55023f2e4fcf9d4a5ff021b893dc552197e55c72
parent b73ab44962134073624f51e93e165b6bde4fe4d7
2 files changed +425 -480
modified src/bin/cibtoolcmd/timeout.rs
@@ -48,6 +48,7 @@ impl Leaf for TimeoutCmd {
        cmd.arg("-c").arg(&self.script);

        let mut to = TimeoutCommand::new(Duration::from_secs(self.timeout));
+
        let mut stdout = to.stdout();

        if let Some(bytes) = self.generate {
            let mut stdin: Vec<u8> = vec![];
@@ -66,40 +67,39 @@ impl Leaf for TimeoutCmd {
        println!("spawn child");
        let running = to.spawn(cmd)?;

-
        if let Some(secs) = self.kill_after {
-
            sleep(Duration::from_secs(secs));
-
            running.kill().unwrap();
-
        }
-

        let mut stdout_bytes = 0;
-
        if !self.fill_buffers {
-
            let stdout = running.stdout();
-
            while let Some(line) = stdout.line() {
-
                stdout_bytes += line.as_bytes().len();
-
                if self.verbose {
-
                    println!("stdout: {line:?}");
+
        let tor = if let Some(secs) = self.kill_after {
+
            sleep(Duration::from_secs(secs));
+
            running.kill()?
+
        } else {
+
            if !self.fill_buffers {
+
                while let Some(line) = stdout.line() {
+
                    stdout_bytes += line.as_bytes().len();
+
                    if self.verbose {
+
                        println!("stdout: {line:?}");
+
                    }
                }
+
                println!("finished reading stdout");
            }
-
            println!("finished reading stdout");

-
            let stderr = running.stderr();
-
            while let Some(line) = stderr.line() {
-
                if self.verbose {
-
                    println!("stderr: {line:?}");
-
                }
+
            running.wait()?
+
        };
+

+
        let stderr = tor.stderr();
+
        for line in stderr {
+
            if self.verbose {
+
                println!("stderr: {line:?}");
            }
-
            println!("finished reading stderr");
        }
+
        println!("finished reading stderr");

-
        let tor = running.wait()?;
        let elapsed = started.elapsed();
        let speed = (stdout_bytes as f64) / elapsed.as_secs_f64();

        println!("stdout bytes: {stdout_bytes}");
        println!("duration: {} ms", elapsed.as_millis());
        println!("speed: {:.0} B/s", speed);
-
        println!("exit: {}", tor.status());
-
        println!("timed out? {}", tor.timed_out());
+
        println!("exit: {}", tor.exit_code());

        Ok(())
    }
modified src/timeoutcmd.rs
@@ -20,25 +20,25 @@
//! # Example
//! ```
//! # use std::{process::Command, time::Duration};
-
//! # use radicle_ci_broker::timeoutcmd::{RunningProcess, TimeoutCommand};
+
//! # use radicle_ci_broker::timeoutcmd::{ChildProcess, TimeoutCommand, RealtimeLines};
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut cmd = Command::new("bash");
//! cmd.arg("-c").arg("exec cat"); // Note exec!
//!
//! let mut to = TimeoutCommand::new(Duration::from_secs(10));
//! to.feed_stdin(b"hello, world\n");
+
//! let mut stdout: RealtimeLines = to.stdout();
//! let running = to.spawn(cmd)?;
//!
//! // Capture stdout output. We ignore stderr output.
-
//! let stdout = running.stdout();
//! let mut captured = vec![];
//! while let Some(line) = stdout.line() {
//!     captured.push(line);
//! }
//!
//! // Wait for child process to terminate.
-
//! let tor = running.wait()?;
-
//! assert_eq!(tor.status().code(), Some(0));
+
//! let finished = running.wait()?;
+
//! assert_eq!(finished.exit_code().code(), Some(0));
//! assert_eq!(captured, ["hello, world\n"]);
//! # Ok(())
//! # }
@@ -47,20 +47,24 @@
#![allow(unused_imports)]

use std::{
-
    io::{Read, Write},
+
    io::{Read, Seek, Write},
    process::{Child, Command, ExitStatus, Stdio},
    sync::{
-
        mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender, TryRecvError},
-
        Arc, Mutex,
+
        mpsc::{
+
            channel, sync_channel, Receiver, RecvTimeoutError, Sender, SyncSender, TryRecvError,
+
        },
+
        Arc, Condvar, Mutex,
    },
    thread::{sleep, spawn, JoinHandle},
    time::{Duration, Instant},
};

+
use tempfile::tempfile;
+

use crate::logger;

-
const WAIT_FOR_IDLE_CHILD: Duration = Duration::from_millis(1000);
-
const WAIT_FOR_OUTPUT: Duration = Duration::from_millis(100);
+
// const WAIT_FOR_IDLE_CHILD: Duration = Duration::from_millis(1000);
+
// const WAIT_FOR_OUTPUT: Duration = Duration::from_millis(100);
const KIB: usize = 1024;
const MIB: usize = 1024 * KIB;
const MAX_OUTPUT_BYTES: usize = 10 * MIB;
@@ -70,11 +74,11 @@ const MAX_OUTPUT_BYTES: usize = 10 * MIB;
pub struct TimeoutCommand {
    max_duration: Duration,
    stdin_data: Vec<u8>,
+
    stdout: RealtimeLines,
}

// This works by using multiple threads.
//
-
// * a thread to send data to child's stdin
// * a thread to read child's stdout, as bytes
// * a thread to read child's stderr, as bytes
// * a thread to monitor how long the child runs
@@ -98,654 +102,595 @@ impl TimeoutCommand {
        Self {
            max_duration,
            stdin_data: vec![],
+
            stdout: RealtimeLines::default(),
        }
    }

    /// Feed the sub-process the specified binary data via its
    /// standard input. If this method is not used, the sub-process
-
    /// stdin will be fed no data. The sub-process stdin always comes
-
    /// from a pipe, however, so if this method is not used, the
-
    /// effect is that stdin comes from `/dev/null` or another empty
-
    /// file.
+
    /// stdin will be fed no data. If this method is not used, the
+
    /// effect is that stdin comes from an empty file.
    pub fn feed_stdin(&mut self, data: &[u8]) {
        self.stdin_data = data.to_vec();
    }

+
    pub fn stdout(&self) -> RealtimeLines {
+
        self.stdout.clone()
+
    }
+

    /// Start a new sub-process to execute the specified command.
    ///
    /// The caller should set up the [`std::process::Command`] value.
    /// This method will redirect stdin, stdout, and stderr to use
    /// pipes.
-
    pub fn spawn(&self, mut command: Command) -> Result<RunningProcess, TimeoutError> {
-
        // Set up child stdin/stdout/stderr redirection.
-
        let mut child = command
-
            .stdin(Stdio::piped())
-
            .stdout(Stdio::piped())
-
            .stderr(Stdio::piped())
-
            .spawn()
-
            .map_err(|err| TimeoutError::Spawn(command, err))?;
-

-
        // Set up thread to write data to child stdin.
-
        let stdin = child.stdin.take().ok_or(TimeoutError::TakeStdin)?;
-
        let stdin_data = self.stdin_data.clone();
-
        let stdin_writer = spawn(move || writer(stdin, stdin_data));
-

-
        // Set up thread to capture child stdout.
-
        let stdout = child.stdout.take().ok_or(TimeoutError::TakeStdout)?;
-
        let (stdout_termination_tx, stdout_termination_rx) = sync_channel(1);
-
        let (stdout_lines_tx, stdout_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
-
        let stdout_reader =
-
            spawn(move || NonBlockingReader::new("stdout", stdout, stdout_lines_tx).read_to_end());
-
        let stdout_lines = LineReceiver::new("stdout", stdout_lines_rx, stdout_termination_rx);
-

-
        // Set up thread to capture child stderr.
-
        let stderr = child.stderr.take().ok_or(TimeoutError::TakeStderr)?;
-
        let (stderr_termination_tx, stderr_termination_rx) = sync_channel(1);
-
        let (stderr_lines_tx, stderr_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
-
        let stderr_reader =
-
            spawn(move || NonBlockingReader::new("stderr", stderr, stderr_lines_tx).read_to_end());
-
        let stderr_lines = LineReceiver::new("stderr", stderr_lines_rx, stderr_termination_rx);
-

-
        // Set up thread to monitor child termination or overlong run time.
-
        let (tx, timed_out_rx) = sync_channel(1);
-
        let (kill_tx, kill_rx) = sync_channel(1);
-
        let nanny = Nanny::new(
-
            self.max_duration,
-
            child,
-
            tx,
-
            kill_rx,
-
            vec![stdout_termination_tx, stderr_termination_tx],
-
        );
-
        let monitor = spawn(move || nanny.monitor());
-

-
        Ok(RunningProcess {
-
            child_monitor: Some(monitor),
-
            timed_out_rx,
-
            stdin_writer,
-
            stdout_lines,
-
            stdout_reader,
-
            stderr_lines,
-
            stderr_reader,
-
            kill_tx,
-
        })
+
    pub fn spawn(&self, command: Command) -> Result<ChildProcess, TimeoutError> {
+
        ChildProcess::new(command, &self.stdin_data, self.stdout(), self.max_duration)
    }
}

-
/// Manage a running child process and capture its output.
+
/// Represent a finished process.
///
-
/// This is created by [`TimeoutCommand::spawn`].
-
pub struct RunningProcess {
-
    child_monitor: Option<JoinHandle<Result<(), TimeoutError>>>,
-
    timed_out_rx: NannyReceiver,
-
    stdin_writer: JoinHandle<Result<(), std::io::Error>>,
-
    stdout_lines: LineReceiver,
-
    stdout_reader: JoinHandle<Result<(), TimeoutError>>,
-
    stderr_lines: LineReceiver,
-
    stderr_reader: JoinHandle<Result<(), TimeoutError>>,
-
    kill_tx: KillSender,
+
/// This allows retrieval of the process's standard error output and
+
/// the exit code. The standard output is captured via the
+
/// [`RealtimeLines`] buffer given to [`ChildProcess`].
+
#[derive(Debug)]
+
pub struct FinishedProcess {
+
    exit: ExitStatus,
+
    stderr: Vec<u8>,
}

-
impl RunningProcess {
-
    /// Return a [`LineReceiver`] that returns lines from the
-
    /// sub-process standard output.
-
    pub fn stdout(&self) -> &LineReceiver {
-
        &self.stdout_lines
-
    }
-

-
    /// Return a [`LineReceiver`] that returns lines from the
-
    /// sub-process standard error output.
-
    pub fn stderr(&self) -> &LineReceiver {
-
        &self.stderr_lines
-
    }
-

-
    /// Terminate sub-process with extreme prejudice.
-
    pub fn kill(&self) -> Result<(), TimeoutError> {
-
        let x = self.kill_tx.send(());
-
        logger::timeoutcmd_request_termination(x);
-
        Ok(())
+
impl FinishedProcess {
+
    /// Exit code of the finished process.
+
    pub fn exit_code(&self) -> ExitStatus {
+
        self.exit
    }

-
    /// Wait for child process to terminate. It may terminate because
-
    /// it ends normally, or because it has run for longer than the
-
    /// limit set with [`TimeoutCommand::new`]. The return value of
-
    /// this method will specify why, see
-
    /// [`TimeoutResult::timed_out`].
-
    ///
-
    /// Note that if the sub-process produces a lot of output, you
-
    /// must read it to avoid the process getting stuck; see
-
    /// [`RunningProcess::stdout`] and [`RunningProcess::stderr`]. If
-
    /// you don't read the output, the sub-process will fill its
-
    /// output pipe buffer, or the inter-thread communication channel
-
    /// buffer, and the sub-process will block on output, and not
-
    /// progress. This may be unwanted. The blocking won't affect the
-
    /// sub-process getting terminated due to running for too long.
-
    pub fn wait(mut self) -> Result<TimeoutResult, TimeoutError> {
-
        logger::timeoutcmd_wait_word_from_nanny();
-
        let (mut child, timed_out) = self.timed_out_rx.recv().map_err(TimeoutError::ChildRecv)?;
-
        logger::timeoutcmd_wait_got_word_from_nanny();
-
        if let Some(monitor) = self.child_monitor.take() {
-
            logger::timeoutcmd_wait_on_nanny_to_end();
-
            monitor
-
                .join()
-
                .map_err(|_| TimeoutError::JoinChildMonitor)??;
-
        }
-

-
        logger::timeoutcmd_wait_on_stdin_writer_to_end();
-
        self.stdin_writer.join().ok();
-

-
        logger::timeoutcmd_wait_on_stdout_reader_to_end();
-
        self.stdout_reader.join().ok();
-

-
        logger::timeoutcmd_wait_on_stderr_reader_to_end();
-
        self.stderr_reader.join().ok();
-

-
        logger::timeoutcmd_wait_on_child_to_end();
-
        let status = child.wait().map_err(TimeoutError::Wait)?;
-
        logger::timeoutcmd_wait_status(status);
-
        logger::timeoutcmd_ok();
-
        Ok(TimeoutResult { timed_out, status })
+
    /// The captured output from the process's standard error.
+
    pub fn stderr(&self) -> &[u8] {
+
        &self.stderr
    }
}

-
/// Did the sub-process started with [`TimeoutCommand::spawn`]
-
/// terminate normally, or did it get terminated unilaterally for
-
/// running too long? What was its exit code?
-
#[derive(Debug)]
-
pub struct TimeoutResult {
-
    timed_out: bool,
-
    status: ExitStatus,
+
/// Represent a running child process.
+
///
+
/// The child gets some data fed to it via its standard input. The
+
/// child is terminated if it runs for too long.
+
pub struct ChildProcess {
+
    deadline: Option<Instant>,
+
    child: Child,
+
    stdout: RealtimeLines,
+
    stdout_rx: Receiver<()>,
+
    stdout_thread: JoinHandle<Result<(), TimeoutError>>,
+
    stderr_thread: JoinHandle<Result<Vec<u8>, TimeoutError>>,
+
    arc: Arc<Mutex<RealtimeLines>>,
}

-
impl TimeoutResult {
-
    /// Exit code of the of the sub-process. There is always an exit
-
    /// code: [`RunningProcess::wait`] does not return until the
-
    /// sub-process has exited.
-
    pub fn status(&self) -> ExitStatus {
-
        self.status
+
impl ChildProcess {
+
    /// Create a new [`ChildProcess`].
+
    pub fn new(
+
        mut cmd: Command,
+
        stdin: &[u8],
+
        stdout_lines: RealtimeLines,
+
        timeout: Duration,
+
    ) -> Result<Self, TimeoutError> {
+
        // Write data to be fed to child's stdin to a temporary
+
        // file that automatically gets deleted when we exit this
+
        // function. Using a file instead of a pipe means our
+
        // logic can be simpler as we don't need to worry about
+
        // the pipe buffer filling up.
+
        let mut file = tempfile::tempfile()?;
+
        file.write_all(stdin)?;
+
        file.rewind()?;
+

+
        // Redirect child stdin/stdout/stderr and start the child
+
        // process.
+
        let mut child = cmd
+
            .stdin(file)
+
            .stdout(Stdio::piped())
+
            .stderr(Stdio::piped())
+
            .spawn()
+
            .map_err(|err| TimeoutError::Spawn(cmd, err))?;
+

+
        // Create the mutex around the line buffer we're given and
+
        // a condition variable to signal end of input. The mutex
+
        // is used to wait on the condition variable, not so much
+
        // to protect the line buffer, but mutex is a convenient
+
        // way to transfer the buffer to the thread.
+
        let (stdout_tx, stdout_rx) = channel::<()>();
+
        let arc = Arc::new(Mutex::new(stdout_lines.clone()));
+

+
        // Start thread to read from stdout, using the line buffer
+
        // we just created and hid in `arc`.
+
        let stdout = child
+
            .stdout
+
            .take()
+
            .ok_or(TimeoutError::TakeHandle("stdout"))?;
+
        let stdout_thread = {
+
            let arc = arc.clone();
+
            spawn(move || Self::line_reader(arc, stdout_tx, stdout))
+
        };
+

+
        // Launch a thread that reads everything the child writes
+
        // to its stderr.
+
        let stderr = child
+
            .stderr
+
            .take()
+
            .ok_or(TimeoutError::TakeHandle("stderr"))?;
+
        let stderr_thread = Self::capture(stderr)?;
+

+
        Ok(Self {
+
            deadline: Some(
+
                Instant::now()
+
                    .checked_add(timeout)
+
                    .ok_or(TimeoutError::Deadline)?,
+
            ),
+
            child,
+
            stdout: stdout_lines,
+
            stdout_rx,
+
            stdout_thread,
+
            stderr_thread,
+
            arc,
+
        })
    }

-
    /// Did the sub-process get terminated for running too long?
-
    pub fn timed_out(&self) -> bool {
-
        self.timed_out
+
    #[allow(dead_code)]
+
    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
+
        self.child.kill().map_err(|_| TimeoutError::Kill)?;
+
        self.stdout.finish();
+
        self.deadline = None;
+
        self.wait()
    }
-
}

-
type NannySender = SyncSender<(Child, bool)>;
-
type NannyReceiver = Receiver<(Child, bool)>;
+
    pub fn wait(mut self) -> Result<FinishedProcess, TimeoutError> {
+
        let result = self.child.try_wait();
+
        match result {
+
            Ok(Some(status)) => {
+
                let stderr = self
+
                    .stderr_thread
+
                    .join()
+
                    .map_err(|_| TimeoutError::Thread)??;
+

+
                return Ok(FinishedProcess {
+
                    exit: status,
+
                    stderr,
+
                });
+
            }
+
            Ok(None) => (),
+
            Err(err) => return Err(TimeoutError::Wait(err)),
+
        }

-
type KillSender = SyncSender<()>;
-
type KillReceiver = Receiver<()>;
+
        self.stdout.finish();
+
        if let Some(deadline) = self.deadline {
+
            let max_wait = deadline - Instant::now();

-
struct Nanny {
-
    max_duration: Duration,
-
    child: Option<Child>,
-
    tx: NannySender,
-
    term_tx: Vec<TerminationSender>,
-
    kill_rx: KillReceiver,
-
}
+
            // Wait for to finish, up to the given timeout.
+
            let _guardlock = &*self.arc;
+
            match self.stdout_rx.recv_timeout(max_wait) {
+
                Ok(_) | Err(RecvTimeoutError::Disconnected) => (),
+
                Err(RecvTimeoutError::Timeout) => {
+
                    return Err(TimeoutError::TimedOut);
+
                }
+
            }
+
        };

-
impl Nanny {
-
    fn new(
-
        max_duration: Duration,
-
        child: Child,
-
        tx: NannySender,
-
        kill_rx: KillReceiver,
-
        term_tx: Vec<TerminationSender>,
-
    ) -> Self {
-
        Self {
-
            max_duration,
-
            child: Some(child),
-
            tx,
-
            term_tx,
-
            kill_rx,
+
        self.stdout_thread
+
            .join()
+
            .map_err(|_| TimeoutError::Thread)??;
+

+
        let stderr = self
+
            .stderr_thread
+
            .join()
+
            .map_err(|_| TimeoutError::Thread)?;
+
        let stderr = stderr?;
+

+
        match self.child.wait() {
+
            Ok(exit) => Ok(FinishedProcess { exit, stderr }),
+
            Err(err) => Err(TimeoutError::Wait(err)),
        }
    }

-
    fn monitor(mut self) -> Result<(), TimeoutError> {
-
        let mut child = if let Some(child) = self.child.take() {
-
            child
-
        } else {
-
            panic!("programming error: Nanny does not have a child to monitor");
-
        };
-
        let started = Instant::now();
-
        let mut timed_out = false;
-
        logger::timeoutcmd_nanny_start();
+
    // Read data from a stream, push into a [`RealtimeLines`]
+
    // and notify when done using a condition variable in
+
    // `arc`. This function will be called in a dedicated
+
    // thread, so that we don't need to use non-blocking I/O
+
    // or async.
+
    fn line_reader(
+
        arc: Arc<Mutex<RealtimeLines>>,
+
        tx: Sender<()>,
+
        mut stream: impl Read,
+
    ) -> Result<(), TimeoutError> {
+
        let mutex = &*arc;
        loop {
-
            let elapsed = started.elapsed();
-

-
            if self.kill_rx.try_recv().is_ok() {
-
                let x = child.kill();
-
                logger::timeoutcmd_nanny_terminated_as_requested(x);
+
            let mut bytes = vec![0; 1024];
+
            let n = stream.read(&mut bytes)?;
+
            if n == 0 {
                break;
-
            } else if elapsed > self.max_duration {
-
                let x = child.kill();
-
                logger::timeoutcmd_nanny_too_long(child.id(), elapsed, self.max_duration, x);
-
                timed_out = true;
-
                break;
-
            }
-

-
            if matches!(child.try_wait(), Ok(None)) {
-
                sleep(WAIT_FOR_IDLE_CHILD);
            } else {
-
                logger::timeoutcmd_nanny_child_died();
-
                break;
+
                let mut buf = mutex.lock().map_err(|_| TimeoutError::Lock)?;
+
                buf.push(bytes[..n].to_vec());
            }
        }
+
        let mut buf = mutex.lock().map_err(|_| TimeoutError::Lock)?;
+
        buf.finish();

-
        logger::timeoutcmd_nanny_time_to_end();
-
        self.tx
-
            .send((child, timed_out))
-
            .map_err(TimeoutError::ChildSend)?;
-
        for tx in self.term_tx.iter() {
-
            tx.send(()).map_err(TimeoutError::ChildSendToLine)?;
-
        }
-

-
        logger::timeoutcmd_nanny_ends();
+
        // We've reached the end. Notify parent the nanny
+
        // thread.
+
        tx.send(()).ok();
        Ok(())
    }
-
}

-
fn writer(mut stream: impl Write, data: Vec<u8>) -> Result<(), std::io::Error> {
-
    let mut written = 0;
-
    while written < data.len() {
-
        // We write one byte at a time. This lets us avoid doing
-
        // non-blocking I/O, but is less efficient. by only writing
-
        // one byte at a time, we only block when we can't write to
-
        // the stream. When the stream is a pipe, this happens when
-
        // the pipe buffer fills up. This function should be in its
-
        // own thread, and so it doesn't matter if it blocks, but
-
        // measurements are more useful when they're taken after each
-
        // byte.
-
        //
-
        // When, inevitably, byte-at-a-time becomes too inefficient,
-
        // this will need to be rewritten to use non-blocking I/O.
-
        //
-
        // Or async.
-
        let n = stream.write(&data[written..written + 1])?;
-
        written += n;
-
        stream.flush()?;
+
    // Start thread that captures stderr output into a byte
+
    // vector buffer. We don't need a condition variable for
+
    // this, as we'll just read to the end to capture
+
    // everything.
+
    fn capture(
+
        mut stream: impl Read + Send + 'static,
+
    ) -> Result<JoinHandle<Result<Vec<u8>, TimeoutError>>, TimeoutError> {
+
        let thread = spawn(move || {
+
            let mut buf = vec![];
+
            loop {
+
                let mut chunk = vec![0; MIB];
+
                let n = stream.read(&mut chunk).map_err(TimeoutError::Io)?;
+
                if n == 0 {
+
                    return Ok(buf);
+
                } else {
+
                    buf.append(&mut chunk[..n].to_vec());
+
                    if buf.len() > MAX_OUTPUT_BYTES {
+
                        return Err(TimeoutError::TooMuch);
+
                    }
+
                }
+
            }
+
        });
+
        Ok(thread)
    }
-

-
    Ok(())
}

-
type TerminationSender = SyncSender<()>;
-
type TerminationReceiver = Receiver<()>;
-

-
/// Receive one line of output at time.
+
/// A buffer of lines that can be filled in the background by a
+
/// producer thread and queried while that happens by a consumer
+
/// thread.
+
///
+
/// This is used to capture the child process output to its stdout in
+
/// such a way that it can be processed in real time, without having
+
/// to wait for the child process to terminate.
///
-
/// See the [module description](index.html) for an example.
-
pub struct LineReceiver {
-
    name: &'static str,
-
    child_terminated: TerminationReceiver,
-
    bytes: OutputReader,
+
/// The buffer is filled by pushing byte vectors, and consumed line by
+
/// line until there are no more lines: the buffer is empty and the
+
/// child process has terminated.
+
///
+
/// The buffer can be cloned and each clone is logically the same
+
/// buffer. This allows separate producer and consumer threads to own
+
/// the buffer, but internally it's the same one.
+
#[derive(Default, Clone)]
+
pub struct RealtimeLines {
+
    // We have an unlocked buffer protected by a mutex, and a
+
    // condition variable to signal consumers of new data or the
+
    // producer having finished.
+
    data: Arc<(Mutex<UnlockedBuf>, Condvar)>,
}

-
impl LineReceiver {
-
    fn new(name: &'static str, bytes: OutputReader, child_terminated: TerminationReceiver) -> Self {
-
        Self {
-
            name,
-
            child_terminated,
-
            bytes,
-
        }
+
impl RealtimeLines {
+
    /// Push some binary data to the buffer.
+
    ///
+
    /// The incoming data is a byte vector, as we may not receive
+
    /// complete lines from the child process. We also get a vector,
+
    /// not a slice, to make it easier to append the date to the
+
    /// buffer.
+
    pub fn push(&mut self, more_data: Vec<u8>) {
+
        // Get mutex and condition variable.
+
        let (mutex, var) = &*self.data;
+

+
        // Lock mutex to get unlocked buffer (really the mutex guard,
+
        // but that lets us access the unlocked buffer protected by
+
        // the mutex).
+
        let mut buf = mutex.lock().expect("lock for push");
+

+
        // Push the data to the unhlocked buffer.
+
        buf.push(more_data);
+

+
        // Notify consumer of new data.
+
        var.notify_all();
    }

-
    /// Return the next line, if any, or `None` if there will be no
-
    /// more lines. Note that this blocks until there is a line, or
-
    /// the child process terminates.
-
    pub fn line(&self) -> Option<String> {
-
        let mut line = vec![];
+
    /// Signal that the producer has finished and there will be no more
+
    /// incoming data.
+
    pub fn finish(&mut self) {
+
        let (mutex, var) = &*self.data;
+
        let mut buf = mutex.lock().expect("lock for push");
+
        buf.finish();
+
        var.notify_all();
+
    }
+

+
    /// Get next line from the buffer, if any. This will wait for a
+
    /// new complete line to arrive, if there isn't any in the buffer
+
    /// yet, or for the producer to finish. Returns `None` for end of
+
    /// file.
+
    pub fn line(&mut self) -> Option<String> {
+
        let (mutex, var) = &*self.data;
+

+
        // Lock the mutex to get access to unlocked buffer.
+
        let mut buf = mutex.lock().expect("lock to wait for line");

        loop {
-
            // Get a byte if there is one.
-
            logger::timeoutcmd_line_reader_try_byte(self.name);
-
            let y = self.bytes.try_recv();
-
            logger::timeoutcmd_line_reader_tried_byte(self.name, y);
-
            match y {
-
                Ok(byte) => {
-
                    line.push(byte);
-
                    if byte == b'\n' {
-
                        let line = String::from_utf8_lossy(&line).to_string();
-
                        logger::timeoutcmd_line_reader_got_line(self.name, &line);
-
                        return Some(line);
-
                    }
-
                }
-
                Err(TryRecvError::Empty) => {
-
                    sleep(WAIT_FOR_OUTPUT);
-
                }
-
                Err(TryRecvError::Disconnected) => {
-
                    if line.is_empty() {
-
                        // Sender has closed the channel, there will be no more lines.
-
                        logger::timeoutcmd_line_reader_got_disconnected(self.name);
-
                        return None;
-
                    } else {
-
                        let line = String::from_utf8_lossy(&line).to_string();
-
                        logger::timeoutcmd_line_reader_got_line(self.name, &line);
-
                        return Some(line);
-
                    }
+
            match buf.line() {
+
                // We got a line: return it.
+
                Some(line) => {
+
                    return Some(line);
                }
-
            }

-
            logger::timeoutcmd_line_reader_did_child_die(self.name);
-
            let x = self.child_terminated.try_recv();
-
            match x {
-
                Ok(_) => {
-
                    logger::timeoutcmd_line_reader_child_died(self.name);
+
                // We didn't get a line, but the input stream has
+
                // finished. Return final partial line, if there is one,
+
                // or None.
+
                None if buf.is_finished() => {
+
                    let line = buf.line();
+
                    return line;
                }
-
                Err(std::sync::mpsc::TryRecvError::Disconnected) => {
-
                    logger::timeoutcmd_line_reader_child_channel_disconnected(self.name);
+

+
                // Wait for more input, then try again. We can't
+
                // assume the new input results in either a complete
+
                // line or the input stream finishing, so we loop.
+
                None => {
+
                    buf = var.wait(buf).expect("wait for line");
                }
-
                _ => {}
            }
        }
    }
}

-
type OutputSender = SyncSender<u8>;
-
type OutputReader = Receiver<u8>;
-

-
struct NonBlockingReader<R: Read> {
-
    name: &'static str,
-
    stream: R,
-
    tx: OutputSender,
+
// An unlocked buffer from which lines can be extracted.
+
//
+
// All the locking and thread synchronization is handled by [`RealtimeLines`].
+
#[derive(Default, Debug)]
+
struct UnlockedBuf {
+
    data: Vec<u8>,
+
    finished: bool,
}

-
impl<R: Read> NonBlockingReader<R> {
-
    fn new(name: &'static str, stream: R, tx: OutputSender) -> Self {
-
        Self { name, stream, tx }
+
impl UnlockedBuf {
+
    // Mark producer as finished. There will be no more data.
+
    fn finish(&mut self) {
+
        self.finished = true;
    }

-
    fn read_to_end(mut self) -> Result<(), TimeoutError> {
-
        let mut count = 0;
-
        loop {
-
            // We read one byte at a time. This lets us avoid doing
-
            // non-blocking I/O but is less efficient. We want to
-
            // avoid blocking for an arbitrary amount of time, if
-
            // reading one byte at a time. When reading from a pipe,
-
            // the pipe writer end may not get closed until the child
-
            // process writing to the pipe ends, and we may not want
-
            // to wait that long.
-
            //
-
            // If this becomes too inefficient, this needs to be
-
            // rewritten to use non-blocking I/O or async.
-

-
            logger::timeoutcmd_nonblocking_try_byte(self.name, count);
-
            let mut byte = vec![0; 1];
-
            let x = self.stream.read(&mut byte);
-
            logger::timeoutcmd_nonblocking_tried_byte(self.name, &x, &byte);
-
            match x {
-
                Ok(0) => {
-
                    logger::timeoutcmd_nonblocking_eof(self.name);
-
                    break;
-
                }
-
                Ok(1) => {
-
                    count += 1;
-
                    self.tx
-
                        .try_send(byte[0])
-
                        .map_err(|_| TimeoutError::TooMuch(self.name))?;
-
                }
-
                Ok(_) => {
-
                    logger::timeoutcmd_nonblocking_got_too_much(self.name, x, &byte);
-
                    return Err(TimeoutError::ReadMucn);
-
                }
-
                Err(err) => {
-
                    logger::timeoutcmd_nonblocking_read_error(self.name, &err);
-
                    return Err(TimeoutError::Read(err));
-
                }
+
    // Has producer finished? Since this structure does not itself
+
    // read from the input stream, this is how we know we've reached
+
    // the end of the file.
+
    fn is_finished(&self) -> bool {
+
        self.finished
+
    }
+

+
    // Push data into the buffer.
+
    fn push(&mut self, mut more_data: Vec<u8>) {
+
        self.data.append(&mut more_data);
+
    }
+

+
    // // Does the buffer contain at least one line? We need to check
+
    // // this to avoid unnecessary waiting or more data.
+
    // fn has_line(&self) -> bool {
+
    //     self.data.contains(&b'\n')
+
    // }
+

+
    // Get next line, if any. If producer has finished, get final,
+
    // possibly partial line if any. If the isn't at least one line in
+
    // the buffer, return `None`. Note that this does not mean end of
+
    // file.
+
    fn line(&mut self) -> Option<String> {
+
        for (i, byte) in self.data.iter().enumerate() {
+
            if *byte == b'\n' {
+
                let range = 0..i + 1;
+
                let line = String::from_utf8_lossy(&self.data[range.clone()]).to_string();
+
                self.data.drain(range);
+
                return Some(line);
            }
        }

-
        logger::timeoutcmd_nonblocking_ends(self.name);
-
        Ok(())
+
        if self.finished && !self.data.is_empty() {
+
            let line = String::from_utf8_lossy(&self.data).to_string();
+
            self.data.clear();
+
            return Some(line);
+
        }
+

+
        None
    }
}

+
/// All possible errors from the module.
#[derive(Debug, thiserror::Error)]
pub enum TimeoutError {
+
    #[error(transparent)]
+
    Io(#[from] std::io::Error),
+

    /// Couldn't spawn child process.
    #[error("failed to spawn command: {0:?}")]
    Spawn(Command, #[source] std::io::Error),

-
    /// Couldn't get file descriptor of child process stdin.
-
    #[error("failed to extract stdin stream from child")]
-
    TakeStdin,
-

-
    /// Couldn't get file descriptor of child process stdout.
-
    #[error("failed to extract stdout stream from child")]
-
    TakeStdout,
-

-
    /// Couldn't get file descriptor of child process stderr.
-
    #[error("failed to extract stderr stream from child")]
-
    TakeStderr,
+
    #[error("thread join failed")]
+
    Thread,

-
    /// Couldn't check if child process is still running.
-
    #[error("failed to check whether command is still running")]
-
    TryWait(#[source] std::io::Error),
+
    #[error("failed to lock mutex")]
+
    Lock,

-
    /// Reading from child stdout or stderr returned too much data.
-
    #[error("read from command standard output returned more data than requested")]
-
    ReadMucn,
+
    #[error("failed to lock condition variable")]
+
    LockVar,

-
    /// Reading from child stdout or stderr failed.
-
    #[error("problem reading from command output (stdout or stderr)")]
-
    Read(#[source] std::io::Error),
+
    #[error("timed out waiting for child")]
+
    TimedOut,

-
    /// Channel buffer got full, which means child process wrote too much output.
-
    #[error("sub-process produces too much to {0}")]
-
    TooMuch(&'static str),
+
    #[error("failed to kill child process")]
+
    Kill,

-
    /// Couldn't join thread that feeds data to child stdin.
-
    #[error("problem waiting for thread that writes to command standard input")]
-
    JoinStdinFeeder,
-

-
    /// Couldn't write to child stdin.
-
    #[error("problem writing to command standard input")]
-
    FeedStdin(#[source] std::io::Error),
-

-
    #[error("problem waiting for thread that monitors command")]
-
    JoinChildMonitor,
-

-
    /// Couldn't join thread that reads child stdout.
-
    #[error("problem waiting for thread that reads command standard output")]
-
    JoinStdoutReader,
-

-
    /// Couldn't join thread that reads child stderr.
-
    #[error("problem waiting for thread that reads command standard error output")]
-
    JoinStderrReader,
-

-
    /// Couldn't terminate child process.
-
    #[error("problem forcing child process to terminate")]
-
    Kill(#[source] std::io::Error),
-

-
    /// Couldn't wait for child process to terminate.
-
    #[error("problem waiting for child process to terminate")]
+
    #[error("failed waiting for child process to terminate")]
    Wait(#[source] std::io::Error),

-
    /// Mutex lock error.
-
    #[error("failed to lock command output buffer")]
-
    MutexLock,
+
    #[error("child exit code is not known")]
+
    ExitCode,

-
    #[error("failed to receive notification from child monitor")]
-
    ChildRecv(#[source] std::sync::mpsc::RecvError),
+
    #[error("failed to take child {0} file handle")]
+
    TakeHandle(&'static str),

-
    #[error("failed to send notification from child monitor")]
-
    ChildSend(#[source] std::sync::mpsc::SendError<(Child, bool)>),
+
    #[error("programming error: failed to get thread to wait on")]
+
    TakeThread,

-
    #[error("failed to send notification from child monitor to line receiver")]
-
    ChildSendToLine(#[source] std::sync::mpsc::SendError<()>),
+
    #[error("failed to compute deadline")]
+
    Deadline,
+

+
    #[error("child process produced too much output")]
+
    TooMuch,
}

#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;

-
    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(100);
+
    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(10);
    const SHORT_TIMEOUT: Duration = Duration::from_secs(3);

    fn setup(
        script: &str,
        timeout: Duration,
        stdin: Option<&'static str>,
-
    ) -> Result<RunningProcess, Box<dyn std::error::Error>> {
+
    ) -> Result<(ChildProcess, RealtimeLines), TimeoutError> {
        let mut cmd = Command::new("sh");
        cmd.arg("-c").arg(script);
        let mut to = TimeoutCommand::new(timeout);
        if let Some(stdin) = stdin {
            to.feed_stdin(stdin.as_bytes());
        }
-
        Ok(to.spawn(cmd)?)
+
        Ok((to.spawn(cmd)?, to.stdout()))
    }

    #[test]
    fn bin_true() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec true", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
+
        let (running, _) = setup("exec true", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
+
        let finished = running.wait()?;
+
        assert_eq!(finished.exit_code().code(), Some(0));
        Ok(())
    }

    #[test]
    fn bin_false() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec false", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(1));
-
        assert!(!tor.timed_out());
+
        let (running, _) = setup("exec false", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
+
        let result = running.wait();
+
        assert!(matches!(result, Ok(FinishedProcess { exit, .. }) if exit.code() == Some(1)));
        Ok(())
    }

    #[test]
    fn sleep_1() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, _) = setup(
            "exec sleep 1",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
+
        let result = running.wait();
+
        assert!(matches!(result, Ok(FinishedProcess { exit, .. }) if exit.code() == Some(0)));
        Ok(())
    }

    #[test]
    fn sleep_for_too_long() -> Result<(), Box<dyn std::error::Error>> {
-
        let started = Instant::now();
-
        let running = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
-
        let tor = running.wait()?;
-
        eprintln!("duration: {} ms", started.elapsed().as_millis());
-
        assert_eq!(tor.status().code(), None);
-
        assert!(tor.timed_out());
+
        let (running, _) = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
+
        let result = running.wait();
+
        assert!(matches!(result, Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn hello_world() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, mut stdout) = setup(
            "exec echo hello, world",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        let stdout = running.stdout();
-
        let stderr = running.stderr();
-

        assert_eq!(stdout.line(), Some("hello, world\n".into()));
        assert_eq!(stdout.line(), None);

-
        assert_eq!(stderr.line(), None);
+
        let result = running.wait();
+
        eprintln!("result={result:#?}");
+
        let finished = result.unwrap();
+
        assert_eq!(finished.exit_code().code(), Some(0));
+
        assert_eq!(finished.stderr(), b"");

-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
        Ok(())
    }

    #[test]
    fn hello_world_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, mut stdout) = setup(
            "exec echo hello, world 1>&2",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        let stdout = running.stdout();
-
        let stderr = running.stderr();
-

        assert_eq!(stdout.line(), None);

-
        assert_eq!(stderr.line(), Some("hello, world\n".into()));
-
        assert_eq!(stderr.line(), None);
+
        let finished = running.wait().unwrap();
+
        assert_eq!(finished.exit_code().code(), Some(0));
+
        assert_eq!(finished.stderr(), b"hello, world\n");

-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
        Ok(())
    }

    #[test]
    fn pipe_through_cat() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, mut stdout) = setup(
            "exec cat",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            Some("hello, world"),
        )?;
-
        let stdout = running.stdout();
-
        let stderr = running.stderr();

        assert_eq!(stdout.line(), Some("hello, world".into()));
        assert_eq!(stdout.line(), None);

-
        assert_eq!(stderr.line(), None);
+
        let finished = running.wait().unwrap();
+

+
        assert_eq!(finished.exit_code().code(), Some(0));
+
        assert_eq!(finished.stderr(), b"");

-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), Some(0));
-
        assert!(!tor.timed_out());
        Ok(())
    }

    #[test]
    fn yes_to_stdout() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec yes", SHORT_TIMEOUT, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(tor.timed_out());
+
        let (running, _) = setup("exec yes", SHORT_TIMEOUT, None)?;
+
        assert!(matches!(running.wait(), Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn yes_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(tor.timed_out());
+
        let (running, _) = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
+
        assert!(matches!(
+
            running.wait(),
+
            Err(TimeoutError::TimedOut) | Err(TimeoutError::TooMuch)
+
        ));
        Ok(())
    }

    #[test]
    fn kill() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, _) = setup(
            "exec sleep 1000",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        sleep(Duration::from_millis(5000));
-
        running.kill()?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(!tor.timed_out());
+
        sleep(Duration::from_millis(100));
+
        let finished = running.kill()?;
+
        assert_eq!(finished.exit_code().code(), None);
+

        Ok(())
    }

    #[test]
    fn kill_stderr() -> Result<(), Box<dyn std::error::Error>> {
-
        let running = setup(
+
        let (running, _) = setup(
            "exec sleep 1000 1>&2",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
-
        sleep(Duration::from_millis(5000));
-
        running.kill()?;
-
        let tor = running.wait()?;
-
        assert_eq!(tor.status().code(), None);
-
        assert!(!tor.timed_out());
+
        sleep(Duration::from_millis(100));
+
        let finished = running.kill()?;
+
        assert_eq!(finished.exit_code().code(), None);
+

        Ok(())
    }
}