Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src timeoutcmd.rs
//! Run a command (an external program) as a sub-process, capturing
//! its output in real time, with a maximum duration.
//!
//! This is meant for the CI broker to run a CI adapter and process
//! the single-line messages the adapter writes to its standard
//! output, as well as capture stderr output, which the adapter uses
//! for logging. If the adapter runs for too long, it gets terminated.
//!
//! Note that if the [`Command`] that is created to run the command
//! invokes a shell, the shell **must** `exec` the command it runs, or
//! in some other way make sure the processes the shell launches get
//! terminated when the shell process ends. Otherwise the time out
//! management here does not work reliably.
//!
//! The child can be given some data via its stdin.
//!
//! This module is not entirely generic, as it assumes textual output with
//! lines, instead of arbitrary byte strings.
//!
//! # Example
//! ```
//! # use std::{process::Command, time::Duration};
//! # use radicle_ci_broker::timeoutcmd::{ChildProcess, TimeoutCommand, RealtimeLines};
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut cmd = Command::new("bash");
//! cmd.arg("-c").arg("exec cat"); // Note exec!
//!
//! let mut to = TimeoutCommand::new(Duration::from_secs(10));
//! to.feed_stdin(b"hello, world\n");
//! let mut stdout: RealtimeLines = to.stdout();
//! let running = to.spawn(cmd)?;
//!
//! // Capture stdout output. We ignore stderr output.
//! let mut captured = vec![];
//! while let Some(line) = stdout.line() {
//!     captured.push(line);
//! }
//!
//! // Wait for child process to terminate.
//! let finished = running.wait()?;
//! assert_eq!(finished.exit_code().code(), Some(0));
//! assert_eq!(captured, ["hello, world\n"]);
//! # Ok(())
//! # }
//! ```

#![allow(unused_imports)]

use std::{
    io::{Read, Seek, Write},
    process::{Child, Command, ExitStatus, Stdio},
    sync::{
        Arc, Condvar, Mutex, MutexGuard,
        mpsc::{
            Receiver, RecvTimeoutError, Sender, SyncSender, TryRecvError, channel, sync_channel,
        },
    },
    thread::{JoinHandle, sleep, spawn},
    time::{Duration, Instant},
};

#[cfg(unix)]
use std::os::unix::process::CommandExt;

use tempfile::tempfile;

use crate::logger;

const KIB: usize = 1024;
const MIB: usize = 1024 * KIB;
const MAX_OUTPUT_BYTES: usize = 10 * MIB;

/// Spawn a child process, with a maximum duration and capture its
/// output. See also [`RunningProcess`].
pub struct TimeoutCommand {
    max_duration: Duration,
    stdin_data: Vec<u8>,
    stdout: RealtimeLines,
}

// This works by using multiple threads.
//
// * a thread to read child's stdout, as bytes
// * a thread to read child's stderr, as bytes
// * a thread to monitor how long the child runs
// * the calling thread waits for the monitor thread to tell it the
//   child has ended or needs to be terminate
//
// Communication between the threads happens via
// [`std::sync::mpsc::sync_channel`] channels. In performance tests,
// these are quite fast if the channel buffer is sufficiently large:
// throughput of over 2 MiB/s have been measured.
//
// Output from the child is collected into lines, which can be passed
// to the caller. For the CI broker we only care about lines.

impl TimeoutCommand {
    /// Create a new time-limited command. The sub-process will run at
    /// most as long as the argument specifies. See
    /// [`TimeoutCommand::spawn`] for actually creating the
    /// sub-process.
    pub fn new(max_duration: Duration) -> Self {
        Self {
            max_duration,
            stdin_data: vec![],
            stdout: RealtimeLines::new(max_duration),
        }
    }

    /// Feed the sub-process the specified binary data via its
    /// standard input. If this method is not used, the sub-process
    /// stdin will be fed no data. If this method is not used, the
    /// effect is that stdin comes from an empty file.
    pub fn feed_stdin(&mut self, data: &[u8]) {
        self.stdin_data = data.to_vec();
    }

    pub fn stdout(&self) -> RealtimeLines {
        self.stdout.clone()
    }

    /// Start a new sub-process to execute the specified command.
    ///
    /// The caller should set up the [`std::process::Command`] value.
    /// This method will redirect stdin, stdout, and stderr to use
    /// pipes.
    pub fn spawn(&self, command: Command) -> Result<ChildProcess, TimeoutError> {
        ChildProcess::new(command, &self.stdin_data, self.stdout(), self.max_duration)
    }
}

/// Represent a finished process.
///
/// This allows retrieval of the process's standard error output and
/// the exit code. The standard output is captured via the
/// [`RealtimeLines`] buffer given to [`ChildProcess`].
#[derive(Debug)]
pub struct FinishedProcess {
    exit: ExitStatus,
    stderr: Vec<u8>,
}

impl FinishedProcess {
    /// Exit code of the finished process.
    pub fn exit_code(&self) -> ExitStatus {
        self.exit
    }

    /// The captured output from the process's standard error.
    pub fn stderr(&self) -> &[u8] {
        &self.stderr
    }
}

/// Represent a running child process.
///
/// The child gets some data fed to it via its standard input. The
/// child is terminated if it runs for too long.
pub struct ChildProcess {
    deadline: Instant,
    child: Child,
    stdout: RealtimeLines,
    stdout_rx: Receiver<()>,
    stdout_thread: JoinHandle<Result<(), TimeoutError>>,
    stderr_thread: JoinHandle<Result<Vec<u8>, TimeoutError>>,
    arc: Arc<Mutex<RealtimeLines>>,

    #[allow(dead_code)]
    timeout_thread: JoinHandle<()>,
}

impl ChildProcess {
    /// Create a new [`ChildProcess`].
    pub fn new(
        mut cmd: Command,
        stdin: &[u8],
        stdout_lines: RealtimeLines,
        timeout: Duration,
    ) -> Result<Self, TimeoutError> {
        // Write data to be fed to child's stdin to a temporary
        // file that automatically gets deleted when we exit this
        // function. Using a file instead of a pipe means our
        // logic can be simpler as we don't need to worry about
        // the pipe buffer filling up.
        let mut file = tempfile::tempfile()?;
        file.write_all(stdin)?;
        file.rewind()?;

        // Redirect child stdin/stdout/stderr and start the child
        // process.
        let mut child = cmd
            .stdin(file)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .process_group(0)
            .spawn()
            .map_err(|err| TimeoutError::Spawn(cmd, err))?;

        // Create the mutex around the line buffer we're given and
        // a condition variable to signal end of input. The mutex
        // is used to wait on the condition variable, not so much
        // to protect the line buffer, but mutex is a convenient
        // way to transfer the buffer to the thread.
        let (stdout_tx, stdout_rx) = channel::<()>();
        let arc = Arc::new(Mutex::new(stdout_lines.clone()));

        // Start thread to read from stdout, using the line buffer
        // we just created and hid in `arc`.
        let stdout = child
            .stdout
            .take()
            .ok_or(TimeoutError::TakeHandle("stdout"))?;
        let stdout_thread = {
            let arc = arc.clone();
            spawn(move || Self::line_reader(arc, stdout_tx, stdout))
        };

        // Launch a thread that reads everything the child writes
        // to its stderr.
        let stderr = child
            .stderr
            .take()
            .ok_or(TimeoutError::TakeHandle("stderr"))?;
        let stderr_thread = Self::capture(stderr);

        // Launch a thread that notifies the `CondVar` when the child
        // has been running too long.
        let timeout_thread = {
            let mut lines = stdout_lines.clone();
            spawn(move || {
                sleep(timeout);
                lines.finish();
            })
        };

        Ok(Self {
            deadline: Instant::now()
                .checked_add(timeout)
                .ok_or(TimeoutError::Deadline)?,

            child,
            stdout: stdout_lines,
            stdout_rx,
            stdout_thread,
            stderr_thread,
            arc,
            timeout_thread,
        })
    }

    pub fn id(&self) -> u32 {
        self.child.id()
    }

    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
        self.kill_helper();
        self.wait()
    }

    fn kill_helper(&mut self) {
        // Kill the whole process groups. This includes any child
        // processes of the adapter. We terminate with extreme
        // prejudice, as the adapter had all the time the node
        // operator is willing to give it to finish, and it's past
        // time to be nice.
        unsafe {
            libc::killpg(self.child.id() as i32, libc::SIGKILL);
        }
        self.stdout.finish();
    }

    pub fn wait(mut self) -> Result<FinishedProcess, TimeoutError> {
        self.stdout.finish();

        let max_wait = self.deadline - Instant::now();

        // Wait for child stdout to finish, up to the given timeout.
        let _guardlock = &*self.arc;
        match self.stdout_rx.recv_timeout(max_wait) {
            Ok(_) | Err(RecvTimeoutError::Disconnected) => {}
            Err(RecvTimeoutError::Timeout) => {
                self.kill_helper();
                return Err(TimeoutError::TimedOut);
            }
        }

        self.stdout_thread
            .join()
            .map_err(|_| TimeoutError::Thread)??;

        let stderr = self
            .stderr_thread
            .join()
            .map_err(|_| TimeoutError::Thread)?;
        let stderr = stderr?;

        match self.child.wait() {
            Ok(exit) => Ok(FinishedProcess { exit, stderr }),
            Err(err) => Err(TimeoutError::Wait(err)),
        }
    }

    // Read data from a stream, push into a [`RealtimeLines`]
    // and notify when done using a condition variable in
    // `arc`. This function will be called in a dedicated
    // thread, so that we don't need to use non-blocking I/O
    // or async.
    fn line_reader(
        arc: Arc<Mutex<RealtimeLines>>,
        tx: Sender<()>,
        mut stream: impl Read,
    ) -> Result<(), TimeoutError> {
        let mutex = &*arc;
        loop {
            let mut bytes = vec![0; 1024];
            let n = stream.read(&mut bytes)?;
            if n == 0 {
                break;
            } else {
                let mut buf = mutex.lock().map_err(|_| TimeoutError::Lock)?;
                buf.push(bytes[..n].to_vec());
            }
        }
        let mut buf = mutex.lock().map_err(|_| TimeoutError::Lock)?;
        buf.finish();

        // We've reached the end. Notify parent the nanny
        // thread.
        tx.send(()).ok();
        Ok(())
    }

    // Start thread that captures stderr output into a byte
    // vector buffer. We don't need a condition variable for
    // this, as we'll just read to the end to capture
    // everything.
    fn capture(
        mut stream: impl Read + Send + 'static,
    ) -> JoinHandle<Result<Vec<u8>, TimeoutError>> {
        spawn(move || {
            let mut buf = vec![];
            loop {
                let mut chunk = vec![0; MIB];
                let n = stream.read(&mut chunk).map_err(TimeoutError::Io)?;
                if n == 0 {
                    return Ok(buf);
                } else {
                    buf.append(&mut chunk[..n].to_vec());
                    if buf.len() > MAX_OUTPUT_BYTES {
                        return Err(TimeoutError::TooMuch);
                    }
                }
            }
        })
    }
}

/// A buffer of lines that can be filled in the background by a
/// producer thread and queried while that happens by a consumer
/// thread.
///
/// This is used to capture the child process output to its stdout in
/// such a way that it can be processed in real time, without having
/// to wait for the child process to terminate.
///
/// The buffer is filled by pushing byte vectors, and consumed line by
/// line until there are no more lines: the buffer is empty and the
/// child process has terminated.
///
/// The buffer can be cloned and each clone is logically the same
/// buffer. This allows separate producer and consumer threads to own
/// the buffer, but internally it's the same one.
#[derive(Clone)]
pub struct RealtimeLines {
    // We have an unlocked buffer protected by a mutex, and a
    // condition variable to signal consumers of new data or the
    // producer having finished.
    data: Arc<(Mutex<UnlockedBuf>, Condvar)>,
    started: Instant,
    max_duration: Duration,
}

impl RealtimeLines {
    fn new(max_duration: Duration) -> Self {
        Self {
            data: Arc::new((Mutex::new(UnlockedBuf::default()), Condvar::default())),
            started: Instant::now(),
            max_duration,
        }
    }

    /// Push some binary data to the buffer.
    ///
    /// The incoming data is a byte vector, as we may not receive
    /// complete lines from the child process. We also get a vector,
    /// not a slice, to make it easier to append the date to the
    /// buffer.
    pub fn push(&mut self, more_data: Vec<u8>) {
        // Get mutex and condition variable.
        let (mutex, var) = &*self.data;

        // Lock mutex to get unlocked buffer (really the mutex guard,
        // but that lets us access the unlocked buffer protected by
        // the mutex).
        let mut buf = mutex.lock().expect("lock for push");

        // Push the data to the unhlocked buffer.
        buf.push(more_data);

        // Notify consumer of new data.
        var.notify_all();
    }

    /// Signal that the producer has finished and there will be no more
    /// incoming data.
    pub fn finish(&mut self) {
        let (mutex, var) = &*self.data;
        let mut buf = mutex.lock().expect("lock for push");
        buf.finish();
        var.notify_all();
    }

    /// Get next line from the buffer, if any. This will wait for a
    /// new complete line to arrive, if there isn't any in the buffer
    /// yet, or for the producer to finish. Returns `None` for end of
    /// file.
    pub fn line(&mut self) -> Option<String> {
        let (mutex, var) = &*self.data;

        // Lock the mutex to get access to unlocked buffer.
        let mut buf = mutex.lock().expect("lock to wait for line");

        loop {
            let remaining = self
                .max_duration
                .checked_sub(self.started.elapsed())
                .unwrap_or_default();

            if remaining.as_millis() == 0 {
                return None;
            }

            let line = buf.line();
            match line {
                // We didn't get a line, but the input stream has
                // finished. Return final partial line, if there is one,
                // or None.
                None if buf.is_finished() => {
                    let line = buf.line();
                    return line;
                }

                // Wait for more input, then try again. We can't
                // assume the new input results in either a complete
                // line or the input stream finishing, so we loop.
                None => {
                    let result = var.wait_timeout(buf, remaining).expect("wait for line");
                    buf = result.0;
                }

                // We got a line: return it.
                Some(line) => {
                    return Some(line);
                }
            }
        }
    }
}

// An unlocked buffer from which lines can be extracted.
//
// All the locking and thread synchronization is handled by [`RealtimeLines`].
#[derive(Default, Debug)]
struct UnlockedBuf {
    data: Vec<u8>,
    finished: bool,
}

impl UnlockedBuf {
    // Mark producer as finished. There will be no more data.
    fn finish(&mut self) {
        self.finished = true;
    }

    // Has producer finished? Since this structure does not itself
    // read from the input stream, this is how we know we've reached
    // the end of the file.
    fn is_finished(&self) -> bool {
        self.finished
    }

    // Push data into the buffer.
    fn push(&mut self, mut more_data: Vec<u8>) {
        self.data.append(&mut more_data);
    }

    // // Does the buffer contain at least one line? We need to check
    // // this to avoid unnecessary waiting or more data.
    // fn has_line(&self) -> bool {
    //     self.data.contains(&b'\n')
    // }

    // Get next line, if any. If producer has finished, get final,
    // possibly partial line if any. If the isn't at least one line in
    // the buffer, return `None`. Note that this does not mean end of
    // file.
    fn line(&mut self) -> Option<String> {
        for (i, byte) in self.data.iter().enumerate() {
            if *byte == b'\n' {
                let range = 0..i + 1;
                let line = String::from_utf8_lossy(&self.data[range.clone()]).to_string();
                self.data.drain(range);
                return Some(line);
            }
        }

        if self.finished && !self.data.is_empty() {
            let line = String::from_utf8_lossy(&self.data).to_string();
            self.data.clear();
            return Some(line);
        }

        None
    }
}

/// All possible errors from the module.
#[derive(Debug, thiserror::Error)]
pub enum TimeoutError {
    #[error(transparent)]
    Io(#[from] std::io::Error),

    /// Couldn't spawn child process.
    #[error("failed to spawn command: {0:?}")]
    Spawn(Command, #[source] std::io::Error),

    #[error("thread join failed")]
    Thread,

    #[error("failed to lock mutex")]
    Lock,

    #[error("failed to lock condition variable")]
    LockVar,

    #[error("timed out waiting for child")]
    TimedOut,

    #[error("failed to kill child process")]
    Kill,

    #[error("failed waiting for child process to terminate")]
    Wait(#[source] std::io::Error),

    #[error("child exit code is not known")]
    ExitCode,

    #[error("failed to take child {0} file handle")]
    TakeHandle(&'static str),

    #[error("programming error: failed to get thread to wait on")]
    TakeThread,

    #[error("failed to compute deadline")]
    Deadline,

    #[error("child process produced too much output")]
    TooMuch,
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;

    const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(10);
    const SHORT_TIMEOUT: Duration = Duration::from_secs(3);

    fn setup(
        script: &str,
        timeout: Duration,
        stdin: Option<&'static str>,
    ) -> Result<(ChildProcess, RealtimeLines), TimeoutError> {
        let mut cmd = Command::new("sh");
        cmd.arg("-c").arg(script);
        let mut to = TimeoutCommand::new(timeout);
        if let Some(stdin) = stdin {
            to.feed_stdin(stdin.as_bytes());
        }
        Ok((to.spawn(cmd)?, to.stdout()))
    }

    #[test]
    fn bin_true() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup("exec true", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
        let finished = running.wait()?;
        assert_eq!(finished.exit_code().code(), Some(0));
        Ok(())
    }

    #[test]
    fn bin_false() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup("exec false", LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES, None)?;
        let result = running.wait();
        assert!(matches!(result, Ok(FinishedProcess { exit, .. }) if exit.code() == Some(1)));
        Ok(())
    }

    #[test]
    fn sleep_1() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup(
            "exec sleep 1",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
        let result = running.wait();
        assert!(matches!(result, Ok(FinishedProcess { exit, .. }) if exit.code() == Some(0)));
        Ok(())
    }

    #[test]
    fn sleep_for_too_long() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
        let result = running.wait();
        assert!(matches!(result, Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn hello_world() -> Result<(), Box<dyn std::error::Error>> {
        let (running, mut stdout) = setup(
            "exec echo hello, world",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
        assert_eq!(stdout.line(), Some("hello, world\n".into()));
        assert_eq!(stdout.line(), None);

        let result = running.wait();
        eprintln!("result={result:#?}");
        let finished = result.unwrap();
        assert_eq!(finished.exit_code().code(), Some(0));
        assert_eq!(finished.stderr(), b"");

        Ok(())
    }

    #[test]
    fn hello_world_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
        let (running, mut stdout) = setup(
            "exec echo hello, world 1>&2",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
        assert_eq!(stdout.line(), None);

        let finished = running.wait().unwrap();
        assert_eq!(finished.exit_code().code(), Some(0));
        assert_eq!(finished.stderr(), b"hello, world\n");

        Ok(())
    }

    #[test]
    fn pipe_through_cat() -> Result<(), Box<dyn std::error::Error>> {
        let (running, mut stdout) = setup(
            "exec cat",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            Some("hello, world"),
        )?;

        assert_eq!(stdout.line(), Some("hello, world".into()));
        assert_eq!(stdout.line(), None);

        let finished = running.wait().unwrap();

        assert_eq!(finished.exit_code().code(), Some(0));
        assert_eq!(finished.stderr(), b"");

        Ok(())
    }

    #[test]
    fn yes_to_stdout() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup("exec yes", SHORT_TIMEOUT, None)?;
        assert!(matches!(running.wait(), Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn yes_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
        assert!(matches!(
            running.wait(),
            Err(TimeoutError::TimedOut) | Err(TimeoutError::TooMuch)
        ));
        Ok(())
    }

    #[test]
    fn yes_to_stdout_while_reading_with_realtimelines() -> Result<(), Box<dyn std::error::Error>> {
        let (running, mut stdout) = setup("exec yes", SHORT_TIMEOUT, None)?;
        while stdout.line().is_some() {}
        let result = running.wait();
        eprintln!("result: {result:#?}");
        assert!(matches!(result, Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn sleep_for_too_long_while_reading_with_realtimelines()
    -> Result<(), Box<dyn std::error::Error>> {
        let (running, mut stdout) = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
        while stdout.line().is_some() {}
        let result = running.wait();
        eprintln!("result: {result:#?}");
        assert!(matches!(result, Err(TimeoutError::TimedOut)));
        Ok(())
    }

    #[test]
    fn kill() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup(
            "exec sleep 1000",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
        sleep(Duration::from_millis(100));
        let finished = running.kill()?;
        assert_eq!(finished.exit_code().code(), None);

        Ok(())
    }

    #[test]
    fn kill_stderr() -> Result<(), Box<dyn std::error::Error>> {
        let (running, _) = setup(
            "exec sleep 1000 1>&2",
            LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
            None,
        )?;
        sleep(Duration::from_millis(100));
        let finished = running.kill()?;
        assert_eq!(finished.exit_code().code(), None);

        Ok(())
    }
}