| + |
//! Run a command (an external program) as a sub-process, capturing
|
| + |
//! its output in real time, with a maximum duration.
|
| + |
//!
|
| + |
//! This is meant for the CI broker to run a CI adapter and process
|
| + |
//! the single-line messages the adapter writes to its standard
|
| + |
//! output, as well as capture stderr output, which the adapter uses
|
| + |
//! for logging. If the adapter runs for too long, it gets terminated.
|
| + |
//!
|
| + |
//! Note that if the [`Command`] that is created to run the command
|
| + |
//! invokes a shell, the shell **must** `exec` the command it runs, or
|
| + |
//! in some other way make sure the processes the shell launches get
|
| + |
//! terminated when the shell process ends. Otherwise the time out
|
| + |
//! management here does not work reliably.
|
| + |
//!
|
| + |
//! The child can be given some data via its stdin.
|
| + |
//!
|
| + |
//! This module is not entirely generic, as it assumes textual output with
|
| + |
//! lines, instead of arbitrary byte strings.
|
| + |
//!
|
| + |
//! # Example
|
| + |
//! ```
|
| + |
//! # use std::{process::Command, time::Duration};
|
| + |
//! # use radicle_ci_broker::timeoutcmd::{RunningProcess, TimeoutCommand};
|
| + |
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
//! let mut cmd = Command::new("bash");
|
| + |
//! cmd.arg("-c").arg("exec cat"); // Note exec!
|
| + |
//!
|
| + |
//! let mut to = TimeoutCommand::new(Duration::from_secs(10));
|
| + |
//! to.feed_stdin(b"hello, world\n");
|
| + |
//! let running = to.spawn(cmd)?;
|
| + |
//!
|
| + |
//! // Capture stdout output. We ignore stderr output.
|
| + |
//! let stdout = running.stdout();
|
| + |
//! let mut captured = vec![];
|
| + |
//! while let Some(line) = stdout.line() {
|
| + |
//! captured.push(line);
|
| + |
//! }
|
| + |
//!
|
| + |
//! // Wait for child process to terminate.
|
| + |
//! let tor = running.wait()?;
|
| + |
//! assert_eq!(tor.status().code(), Some(0));
|
| + |
//! assert_eq!(captured, ["hello, world\n"]);
|
| + |
//! # Ok(())
|
| + |
//! # }
|
| + |
//! ```
|
| + |
|
| + |
#![allow(unused_imports)]
|
| + |
|
| + |
use std::{
|
| + |
io::{Read, Write},
|
| + |
process::{Child, Command, ExitStatus, Stdio},
|
| + |
sync::{
|
| + |
mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender, TryRecvError},
|
| + |
Arc, Mutex,
|
| + |
},
|
| + |
thread::{sleep, spawn, JoinHandle},
|
| + |
time::{Duration, Instant},
|
| + |
};
|
| + |
|
| + |
use crate::logger;
|
| + |
|
| + |
const WAIT_FOR_IDLE_CHILD: Duration = Duration::from_millis(1000);
|
| + |
const WAIT_FOR_OUTPUT: Duration = Duration::from_millis(100);
|
| + |
const KIB: usize = 1024;
|
| + |
const MIB: usize = 1024 * KIB;
|
| + |
const MAX_OUTPUT_BYTES: usize = 10 * MIB;
|
| + |
|
| + |
/// Spawn a child process, with a maximum duration and capture its
|
| + |
/// output. See also [`RunningProcess`].
|
| + |
pub struct TimeoutCommand {
|
| + |
max_duration: Duration,
|
| + |
stdin_data: Vec<u8>,
|
| + |
}
|
| + |
|
| + |
// This works by using multiple threads.
|
| + |
//
|
| + |
// * a thread to send data to child's stdin
|
| + |
// * a thread to read child's stdout, as bytes
|
| + |
// * a thread to read child's stderr, as bytes
|
| + |
// * a thread to monitor how long the child runs
|
| + |
// * the calling thread waits for the monitor thread to tell it the
|
| + |
// child has ended or needs to be terminate
|
| + |
//
|
| + |
// Communication between the threads happens via
|
| + |
// [`std::sync::mpsc::sync_channel`] channels. In performance tests,
|
| + |
// these are quite fast if the channel buffer is sufficiently large:
|
| + |
// throughput of over 2 MiB/s have been measured.
|
| + |
//
|
| + |
// Output from the child is collected into lines, which can be passed
|
| + |
// to the caller. For the CI broker we only care about lines.
|
| + |
|
| + |
impl TimeoutCommand {
|
| + |
/// Create a new time-limited command. The sub-process will run at
|
| + |
/// most as long as the argument specifies. See
|
| + |
/// [`TimeoutCommand::spawn`] for actually creating the
|
| + |
/// sub-process.
|
| + |
pub fn new(max_duration: Duration) -> Self {
|
| + |
Self {
|
| + |
max_duration,
|
| + |
stdin_data: vec![],
|
| + |
}
|
| + |
}
|
| + |
|
| + |
/// Feed the sub-process the specified binary data via its
|
| + |
/// standard input. If this method is not used, the sub-process
|
| + |
/// stdin will be fed no data. The sub-process stdin always comes
|
| + |
/// from a pipe, however, so if this method is not used, the
|
| + |
/// effect is that stdin comes from `/dev/null` or another empty
|
| + |
/// file.
|
| + |
pub fn feed_stdin(&mut self, data: &[u8]) {
|
| + |
self.stdin_data = data.to_vec();
|
| + |
}
|
| + |
|
| + |
/// Start a new sub-process to execute the specified command.
|
| + |
///
|
| + |
/// The caller should set up the [`std::process::Command`] value.
|
| + |
/// This method will redirect stdin, stdout, and stderr to use
|
| + |
/// pipes.
|
| + |
pub fn spawn(&self, mut command: Command) -> Result<RunningProcess, TimeoutError> {
|
| + |
// Set up child stdin/stdout/stderr redirection.
|
| + |
let mut child = command
|
| + |
.stdin(Stdio::piped())
|
| + |
.stdout(Stdio::piped())
|
| + |
.stderr(Stdio::piped())
|
| + |
.spawn()
|
| + |
.map_err(|err| TimeoutError::Spawn(command, err))?;
|
| + |
|
| + |
// Set up thread to write data to child stdin.
|
| + |
let stdin = child.stdin.take().ok_or(TimeoutError::TakeStdin)?;
|
| + |
let stdin_data = self.stdin_data.clone();
|
| + |
let stdin_writer = spawn(move || writer(stdin, stdin_data));
|
| + |
|
| + |
// Set up thread to capture child stdout.
|
| + |
let stdout = child.stdout.take().ok_or(TimeoutError::TakeStdout)?;
|
| + |
let (stdout_termination_tx, stdout_termination_rx) = sync_channel(1);
|
| + |
let (stdout_lines_tx, stdout_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
|
| + |
let stdout_reader =
|
| + |
spawn(move || NonBlockingReader::new("stdout", stdout, stdout_lines_tx).read_to_end());
|
| + |
let stdout_lines = LineReceiver::new("stdout", stdout_lines_rx, stdout_termination_rx);
|
| + |
|
| + |
// Set up thread to capture child stdout.
|
| + |
let stderr = child.stderr.take().ok_or(TimeoutError::TakeStderr)?;
|
| + |
let (stderr_termination_tx, stderr_termination_rx) = sync_channel(1);
|
| + |
let (stderr_lines_tx, stderr_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);
|
| + |
let stderr_reader =
|
| + |
spawn(move || NonBlockingReader::new("stderr", stderr, stderr_lines_tx).read_to_end());
|
| + |
let stderr_lines = LineReceiver::new("stderr", stderr_lines_rx, stderr_termination_rx);
|
| + |
|
| + |
// Set up thread to monitor child termination or overlong run time.
|
| + |
let (tx, timed_out_rx) = sync_channel(1);
|
| + |
let (kill_tx, kill_rx) = sync_channel(1);
|
| + |
let nanny = Nanny::new(
|
| + |
self.max_duration,
|
| + |
child,
|
| + |
tx,
|
| + |
kill_rx,
|
| + |
vec![stdout_termination_tx, stderr_termination_tx],
|
| + |
);
|
| + |
let monitor = spawn(move || nanny.monitor());
|
| + |
|
| + |
Ok(RunningProcess {
|
| + |
child_monitor: Some(monitor),
|
| + |
timed_out_rx,
|
| + |
stdin_writer,
|
| + |
stdout_lines,
|
| + |
stdout_reader,
|
| + |
stderr_lines,
|
| + |
stderr_reader,
|
| + |
kill_tx,
|
| + |
})
|
| + |
}
|
| + |
}
|
| + |
|
| + |
/// Manage a running child process and capture its output.
|
| + |
///
|
| + |
/// This is created by [`TimeoutCommand::spawn`].
|
| + |
pub struct RunningProcess {
|
| + |
child_monitor: Option<JoinHandle<Result<(), TimeoutError>>>,
|
| + |
timed_out_rx: NannyReceiver,
|
| + |
stdin_writer: JoinHandle<Result<(), std::io::Error>>,
|
| + |
stdout_lines: LineReceiver,
|
| + |
stdout_reader: JoinHandle<Result<(), TimeoutError>>,
|
| + |
stderr_lines: LineReceiver,
|
| + |
stderr_reader: JoinHandle<Result<(), TimeoutError>>,
|
| + |
kill_tx: KillSender,
|
| + |
}
|
| + |
|
| + |
impl RunningProcess {
|
| + |
/// Return a [`LineReceiver`] that returns lines from the
|
| + |
/// sub-process standard output.
|
| + |
pub fn stdout(&self) -> &LineReceiver {
|
| + |
&self.stdout_lines
|
| + |
}
|
| + |
|
| + |
/// Return a [`LineReceiver`] that returns lines from the
|
| + |
/// sub-process standard error output.
|
| + |
pub fn stderr(&self) -> &LineReceiver {
|
| + |
&self.stderr_lines
|
| + |
}
|
| + |
|
| + |
/// Terminate sub-process with extreme prejudice.
|
| + |
pub fn kill(&self) -> Result<(), TimeoutError> {
|
| + |
let x = self.kill_tx.send(());
|
| + |
logger::debug2(format!("request termination of child process: {x:?}"));
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
/// Wait for child process to terminate. It may terminate because
|
| + |
/// it ends normally, or because it has run for longer than the
|
| + |
/// limit set with [`TimeoutCommand::new`]. The return value of
|
| + |
/// this method will specify why, see
|
| + |
/// [`TimeoutResult::timed_out`].
|
| + |
///
|
| + |
/// Note that if the sub-process produces a lot of output, you
|
| + |
/// must read it to avoid the process getting stuck; see
|
| + |
/// [`RunningProcess::stdout`] and [`RunningProcess::stderr`]. If
|
| + |
/// you don't read the output, the sub-process will fill its
|
| + |
/// output pipe buffer, or the inter-thread communication channel
|
| + |
/// buffer, and the sub-process will block on output, and not
|
| + |
/// progress. This may be unwanted. The blocking won't affect the
|
| + |
/// sub-process getting terminated due to running for too long.
|
| + |
pub fn wait(mut self) -> Result<TimeoutResult, TimeoutError> {
|
| + |
logger::debug("wait: wait for word from nanny");
|
| + |
let (mut child, timed_out) = self.timed_out_rx.recv().map_err(TimeoutError::ChildRecv)?;
|
| + |
logger::debug("got word from nanny");
|
| + |
if let Some(monitor) = self.child_monitor.take() {
|
| + |
logger::debug("wait: wait on nanny thread to end");
|
| + |
monitor
|
| + |
.join()
|
| + |
.map_err(|_| TimeoutError::JoinChildMonitor)??;
|
| + |
}
|
| + |
|
| + |
logger::debug("wait: wait for stdin writer to terminate");
|
| + |
self.stdin_writer.join().ok();
|
| + |
|
| + |
logger::debug("wait: wait for stdout reader to terminate");
|
| + |
self.stdout_reader.join().ok();
|
| + |
|
| + |
logger::debug("wait: wait for stderr reader to terminate");
|
| + |
self.stderr_reader.join().ok();
|
| + |
|
| + |
logger::debug("wait: wait for child to terminate");
|
| + |
let status = child.wait().map_err(TimeoutError::Wait)?;
|
| + |
logger::debug2(format!("wait: wait status: {status:?}"));
|
| + |
logger::debug("wait: return Ok result");
|
| + |
Ok(TimeoutResult { timed_out, status })
|
| + |
}
|
| + |
}
|
| + |
|
| + |
/// Did the sub-process started with [`TimeoutCommand::spawn`]
|
| + |
/// terminate normally, or did it get terminated unilaterally for
|
| + |
/// running too long? What was its exit code?
|
| + |
#[derive(Debug)]
|
| + |
pub struct TimeoutResult {
|
| + |
timed_out: bool,
|
| + |
status: ExitStatus,
|
| + |
}
|
| + |
|
| + |
impl TimeoutResult {
|
| + |
/// Exit code of the of the sub-process. There is always an exit
|
| + |
/// code: [`RunningProcess::wait`] does not return until the
|
| + |
/// sub-process has exited.
|
| + |
pub fn status(&self) -> ExitStatus {
|
| + |
self.status
|
| + |
}
|
| + |
|
| + |
/// Did the sub-process get terminated for running too long?
|
| + |
pub fn timed_out(&self) -> bool {
|
| + |
self.timed_out
|
| + |
}
|
| + |
}
|
| + |
|
| + |
type NannySender = SyncSender<(Child, bool)>;
|
| + |
type NannyReceiver = Receiver<(Child, bool)>;
|
| + |
|
| + |
type KillSender = SyncSender<()>;
|
| + |
type KillReceiver = Receiver<()>;
|
| + |
|
| + |
struct Nanny {
|
| + |
max_duration: Duration,
|
| + |
child: Option<Child>,
|
| + |
tx: NannySender,
|
| + |
term_tx: Vec<TerminationSender>,
|
| + |
kill_rx: KillReceiver,
|
| + |
}
|
| + |
|
| + |
impl Nanny {
|
| + |
fn new(
|
| + |
max_duration: Duration,
|
| + |
child: Child,
|
| + |
tx: NannySender,
|
| + |
kill_rx: KillReceiver,
|
| + |
term_tx: Vec<TerminationSender>,
|
| + |
) -> Self {
|
| + |
Self {
|
| + |
max_duration,
|
| + |
child: Some(child),
|
| + |
tx,
|
| + |
term_tx,
|
| + |
kill_rx,
|
| + |
}
|
| + |
}
|
| + |
|
| + |
fn monitor(mut self) -> Result<(), TimeoutError> {
|
| + |
let mut child = if let Some(child) = self.child.take() {
|
| + |
child
|
| + |
} else {
|
| + |
panic!("programming error: Nanny does not have a child to monitor");
|
| + |
};
|
| + |
let started = Instant::now();
|
| + |
let mut timed_out = false;
|
| + |
logger::debug("nanny: start monitoring child");
|
| + |
loop {
|
| + |
let elapsed = started.elapsed();
|
| + |
|
| + |
if self.kill_rx.try_recv().is_ok() {
|
| + |
let x = child.kill();
|
| + |
logger::debug2(format!("nanny: terminated child by request: {x:?}"));
|
| + |
break;
|
| + |
} else if elapsed > self.max_duration {
|
| + |
logger::debug2(format!(
|
| + |
"nanny: child has run for too long ({} ms > {} ms)",
|
| + |
elapsed.as_millis(),
|
| + |
self.max_duration.as_millis(),
|
| + |
));
|
| + |
logger::debug2(format!(
|
| + |
"nanny: terminate child process {} with extreme prejudice",
|
| + |
child.id(),
|
| + |
));
|
| + |
let x = child.kill();
|
| + |
timed_out = true;
|
| + |
logger::debug2(format!("nanny: termination result: {x:?}"));
|
| + |
break;
|
| + |
}
|
| + |
|
| + |
if matches!(child.try_wait(), Ok(None)) {
|
| + |
logger::debug2(format!(
|
| + |
"nanny: child is still running, that's fine ({} ms <= {} ms)",
|
| + |
elapsed.as_millis(),
|
| + |
self.max_duration.as_millis(),
|
| + |
));
|
| + |
sleep(WAIT_FOR_IDLE_CHILD);
|
| + |
} else {
|
| + |
logger::debug("nanny: child has terminated");
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
|
| + |
logger::debug("nanny: tell RunningProcess it's time");
|
| + |
self.tx
|
| + |
.send((child, timed_out))
|
| + |
.map_err(TimeoutError::ChildSend)?;
|
| + |
for tx in self.term_tx.iter() {
|
| + |
logger::debug("nanny: tell line receiver it's time");
|
| + |
tx.send(()).map_err(TimeoutError::ChildSendToLine)?;
|
| + |
}
|
| + |
|
| + |
logger::debug("nanny ends");
|
| + |
Ok(())
|
| + |
}
|
| + |
}
|
| + |
|
| + |
fn writer(mut stream: impl Write, data: Vec<u8>) -> Result<(), std::io::Error> {
|
| + |
let mut written = 0;
|
| + |
while written < data.len() {
|
| + |
// We write one byte at a time. This lets us avoid doing
|
| + |
// non-blocking I/O, but is less efficient. by only writing
|
| + |
// one byte at a time, we only block when we can't write to
|
| + |
// the stream. When the stream is a pipe, this happens when
|
| + |
// the pipe buffer fills up. This function should be in its
|
| + |
// own thread, and so it doesn't matter if it blocks, but
|
| + |
// measurements are more useful when they're taken after each
|
| + |
// byte.
|
| + |
//
|
| + |
// When, inevitably, byte-at-a-time becomes too inefficient,
|
| + |
// this will need to be rewritten to use non-blocking I/O.
|
| + |
//
|
| + |
// Or async.
|
| + |
let n = stream.write(&data[written..written + 1])?;
|
| + |
written += n;
|
| + |
logger::debug("writer wrote {n:?}");
|
| + |
stream.flush()?;
|
| + |
}
|
| + |
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
type TerminationSender = SyncSender<()>;
|
| + |
type TerminationReceiver = Receiver<()>;
|
| + |
|
| + |
/// Receive one line of output at time.
|
| + |
///
|
| + |
/// See the [module description](index.html) for an example.
|
| + |
pub struct LineReceiver {
|
| + |
name: &'static str,
|
| + |
child_terminated: TerminationReceiver,
|
| + |
bytes: OutputReader,
|
| + |
}
|
| + |
|
| + |
impl LineReceiver {
|
| + |
fn new(name: &'static str, bytes: OutputReader, child_terminated: TerminationReceiver) -> Self {
|
| + |
Self {
|
| + |
name,
|
| + |
child_terminated,
|
| + |
bytes,
|
| + |
}
|
| + |
}
|
| + |
|
| + |
/// Return the next line, if any, or `None` if there will be no
|
| + |
/// more lines. Note that this blocks until there is a line, or
|
| + |
/// the child process terminates.
|
| + |
pub fn line(&self) -> Option<String> {
|
| + |
let mut line = vec![];
|
| + |
|
| + |
loop {
|
| + |
// Get a byte if there is one.
|
| + |
logger::debug2(format!(
|
| + |
"line receiver {}: try to receive next byte",
|
| + |
self.name
|
| + |
));
|
| + |
let y = self.bytes.try_recv();
|
| + |
logger::debug2(format!(
|
| + |
"line receiver {}: tried to read line: {y:?}",
|
| + |
self.name
|
| + |
));
|
| + |
match y {
|
| + |
Ok(byte) => {
|
| + |
line.push(byte);
|
| + |
if byte == b'\n' {
|
| + |
let line = String::from_utf8_lossy(&line).to_string();
|
| + |
logger::debug2(format!(
|
| + |
"line-receiver {}: received line={line:?}",
|
| + |
self.name
|
| + |
));
|
| + |
return Some(line);
|
| + |
}
|
| + |
}
|
| + |
Err(TryRecvError::Empty) => {
|
| + |
sleep(WAIT_FOR_OUTPUT);
|
| + |
}
|
| + |
Err(TryRecvError::Disconnected) => {
|
| + |
if line.is_empty() {
|
| + |
// Sender has closed the channel, there will be no more lines.
|
| + |
logger::debug2(format!("line-receiver {}: disconnected", self.name));
|
| + |
return None;
|
| + |
} else {
|
| + |
let line = String::from_utf8_lossy(&line).to_string();
|
| + |
logger::debug2(format!(
|
| + |
"line-receiver {}: received line={line:?}",
|
| + |
self.name
|
| + |
));
|
| + |
return Some(line);
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
logger::debug("line-receiver: has child terminated?");
|
| + |
let x = self.child_terminated.try_recv();
|
| + |
// logger::debug("line receiver {}: x={x:?}", self.name);
|
| + |
match x {
|
| + |
Ok(_) => {
|
| + |
logger::debug2(format!(
|
| + |
"line receiver {}: OK: child has terminated, not returning line",
|
| + |
self.name,
|
| + |
));
|
| + |
}
|
| + |
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
|
| + |
logger::debug2(format!(
|
| + |
"line receiver {}: Disconnected: child has terminated, not returning line",
|
| + |
self.name,
|
| + |
));
|
| + |
}
|
| + |
_ => {}
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
type OutputSender = SyncSender<u8>;
|
| + |
type OutputReader = Receiver<u8>;
|
| + |
|
| + |
struct NonBlockingReader<R: Read> {
|
| + |
name: &'static str,
|
| + |
stream: R,
|
| + |
tx: OutputSender,
|
| + |
}
|
| + |
|
| + |
impl<R: Read> NonBlockingReader<R> {
|
| + |
fn new(name: &'static str, stream: R, tx: OutputSender) -> Self {
|
| + |
Self { name, stream, tx }
|
| + |
}
|
| + |
|
| + |
fn read_to_end(mut self) -> Result<(), TimeoutError> {
|
| + |
let mut count = 0;
|
| + |
loop {
|
| + |
// We read one byte at a time. This lets us avoid doing
|
| + |
// non-blocking I/O but is less efficient. We want to
|
| + |
// avoid blocking for an arbitrary amount of time, if
|
| + |
// reading one byte at a time. When reading from a pipe,
|
| + |
// the pipe writer end may not get closed until the child
|
| + |
// process writing to the pipe ends, and we may not want
|
| + |
// to wait that long.
|
| + |
//
|
| + |
// If this becomes too inefficient, this needs to be
|
| + |
// rewritten to use non-blocking I/O or async.
|
| + |
|
| + |
logger::debug2(format!(
|
| + |
"read_to_end {}: try to receive next byte ({count} so far)",
|
| + |
self.name,
|
| + |
));
|
| + |
let mut byte = vec![0; 1];
|
| + |
let x = self.stream.read(&mut byte);
|
| + |
logger::debug2(format!("read_to_end {}: x={x:?} byte={byte:?}", self.name));
|
| + |
match x {
|
| + |
Ok(0) => {
|
| + |
logger::debug2(format!("read_to_end {}: end of file", self.name));
|
| + |
break;
|
| + |
}
|
| + |
Ok(1) => {
|
| + |
count += 1;
|
| + |
self.tx
|
| + |
.try_send(byte[0])
|
| + |
.map_err(|_| TimeoutError::TooMuch(self.name))?;
|
| + |
}
|
| + |
Ok(_) => {
|
| + |
logger::debug2(format!("read_to_end {}: read too much", self.name));
|
| + |
return Err(TimeoutError::ReadMucn);
|
| + |
}
|
| + |
Err(err) => {
|
| + |
logger::debug2(format!("read_to_end {}: read error: {err}", self.name));
|
| + |
return Err(TimeoutError::Read(err));
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
logger::debug2(format!("read_to_end {}: terminate", self.name));
|
| + |
Ok(())
|
| + |
}
|
| + |
}
|
| + |
|
| + |
#[derive(Debug, thiserror::Error)]
|
| + |
pub enum TimeoutError {
|
| + |
/// Couldn't spawn child process.
|
| + |
#[error("failed to spawn command: {0:?}")]
|
| + |
Spawn(Command, #[source] std::io::Error),
|
| + |
|
| + |
/// Couldn't get file descriptor of child process stdin.
|
| + |
#[error("failed to extract stdin stream from child")]
|
| + |
TakeStdin,
|
| + |
|
| + |
/// Couldn't get file descriptor of child process stdout.
|
| + |
#[error("failed to extract stdout stream from child")]
|
| + |
TakeStdout,
|
| + |
|
| + |
/// Couldn't get file descriptor of child process stderr.
|
| + |
#[error("failed to extract stderr stream from child")]
|
| + |
TakeStderr,
|
| + |
|
| + |
/// Couldn't check if child process is still running.
|
| + |
#[error("failed to check whether command is still running")]
|
| + |
TryWait(#[source] std::io::Error),
|
| + |
|
| + |
/// Reading from child stdout or stderr returned too much data.
|
| + |
#[error("read from command standard output returned more data than requested")]
|
| + |
ReadMucn,
|
| + |
|
| + |
/// Reading from child stdout or stderr failed.
|
| + |
#[error("problem reading from command output (stdout or stderr)")]
|
| + |
Read(#[source] std::io::Error),
|
| + |
|
| + |
/// Channel buffer got full, which means child process wrote too much output.
|
| + |
#[error("sub-process produces too much to {0}")]
|
| + |
TooMuch(&'static str),
|
| + |
|
| + |
/// Couldn't join thread that feeds data to child stdin.
|
| + |
#[error("problem waiting for thread that writes to command standard input")]
|
| + |
JoinStdinFeeder,
|
| + |
|
| + |
/// Couldn't write to child stdin.
|
| + |
#[error("problem writing to command standard input")]
|
| + |
FeedStdin(#[source] std::io::Error),
|
| + |
|
| + |
#[error("problem waiting for thread that monitors command")]
|
| + |
JoinChildMonitor,
|
| + |
|
| + |
/// Couldn't join thread that reads child stdout.
|
| + |
#[error("problem waiting for thread that reads command standard output")]
|
| + |
JoinStdoutReader,
|
| + |
|
| + |
/// Couldn't join thread that reads child stderr.
|
| + |
#[error("problem waiting for thread that reads command standard error output")]
|
| + |
JoinStderrReader,
|
| + |
|
| + |
/// Couldn't terminate child process.
|
| + |
#[error("problem forcing child process to terminate")]
|
| + |
Kill(#[source] std::io::Error),
|
| + |
|
| + |
/// Couldn't wait for child process to terminate.
|
| + |
#[error("problem waiting for child process to terminate")]
|
| + |
Wait(#[source] std::io::Error),
|
| + |
|
| + |
/// Mutex lock error.
|
| + |
#[error("failed to lock command output buffer")]
|
| + |
MutexLock,
|
| + |
|
| + |
#[error("failed to receive notification from child monitor")]
|
| + |
ChildRecv(#[source] std::sync::mpsc::RecvError),
|
| + |
|
| + |
#[error("failed to send notification from child monitor")]
|
| + |
ChildSend(#[source] std::sync::mpsc::SendError<(Child, bool)>),
|
| + |
|
| + |
#[error("failed to send notification from child monitor to line receiver")]
|
| + |
ChildSendToLine(#[source] std::sync::mpsc::SendError<()>),
|
| + |
}
|
| + |
|
| + |
#[cfg(test)]
|
| + |
mod tests {
|
| + |
use super::*;
|
| + |
|
| + |
const LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES: Duration = Duration::from_secs(100);
|
| + |
const SHORT_TIMEOUT: Duration = Duration::from_secs(3);
|
| + |
|
| + |
fn setup(
|
| + |
script: &str,
|
| + |
timeout: Duration,
|
| + |
stdin: Option<&'static str>,
|
| + |
) -> Result<RunningProcess, Box<dyn std::error::Error>> {
|
| + |
let mut cmd = Command::new("bash");
|
| + |
cmd.arg("-c").arg(script);
|
| + |
let mut to = TimeoutCommand::new(timeout);
|
| + |
if let Some(stdin) = stdin {
|
| + |
to.feed_stdin(stdin.as_bytes());
|
| + |
}
|
| + |
Ok(to.spawn(cmd)?)
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn bin_true() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec /bin/true",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), Some(0));
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn bin_false() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec /bin/false",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), Some(1));
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn sleep_1() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec sleep 1",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), Some(0));
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn sleep_for_too_long() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let started = Instant::now();
|
| + |
let running = setup("exec sleep 1000", SHORT_TIMEOUT, None)?;
|
| + |
let tor = running.wait()?;
|
| + |
eprintln!("duration: {} ms", started.elapsed().as_millis());
|
| + |
assert_eq!(tor.status().code(), None);
|
| + |
assert!(tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn hello_world() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec echo hello, world",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
let stdout = running.stdout();
|
| + |
let stderr = running.stderr();
|
| + |
|
| + |
assert_eq!(stdout.line(), Some("hello, world\n".into()));
|
| + |
assert_eq!(stdout.line(), None);
|
| + |
|
| + |
assert_eq!(stderr.line(), None);
|
| + |
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), Some(0));
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn hello_world_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec echo hello, world 1>&2",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
let stdout = running.stdout();
|
| + |
let stderr = running.stderr();
|
| + |
|
| + |
assert_eq!(stdout.line(), None);
|
| + |
|
| + |
assert_eq!(stderr.line(), Some("hello, world\n".into()));
|
| + |
assert_eq!(stderr.line(), None);
|
| + |
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), Some(0));
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn pipe_through_cat() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec cat",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
Some("hello, world"),
|
| + |
)?;
|
| + |
let stdout = running.stdout();
|
| + |
let stderr = running.stderr();
|
| + |
|
| + |
assert_eq!(stdout.line(), Some("hello, world".into()));
|
| + |
assert_eq!(stdout.line(), None);
|
| + |
|
| + |
assert_eq!(stderr.line(), None);
|
| + |
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), Some(0));
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn yes_to_stdout() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup("exec yes", SHORT_TIMEOUT, None)?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), None);
|
| + |
assert!(tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn yes_to_stderr() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup("exec yes 1>&2", SHORT_TIMEOUT, None)?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), None);
|
| + |
assert!(tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn kill() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec sleep 1000",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
sleep(Duration::from_millis(5000));
|
| + |
running.kill()?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), None);
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn kill_stderr() -> Result<(), Box<dyn std::error::Error>> {
|
| + |
let running = setup(
|
| + |
"exec sleep 1000 1>&2",
|
| + |
LONG_ENOUGH_THAT_SCRIPT_SURELY_FINISHES,
|
| + |
None,
|
| + |
)?;
|
| + |
sleep(Duration::from_millis(5000));
|
| + |
running.kill()?;
|
| + |
let tor = running.wait()?;
|
| + |
assert_eq!(tor.status().code(), None);
|
| + |
assert!(!tor.timed_out());
|
| + |
Ok(())
|
| + |
}
|
| + |
}
|