Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix: max_run_time setting in cib
Merged liw opened 8 months ago

The cib configuration setting max_run_time didn’t always work, because its implementation only killed the adapter, not any child processes of the adapter. We fix this by running the adapter in its own process group, and killing the whole group if the time runs out.

In addition, the timeoutcmd.rs module (and the ChildProcess type) is simplified, by making the deadline mandatory. It is no longer possible to run an adapter without a timeout. This makes sense, as it’d not be useful to allow an adapter to run indefinitely. The old behavior can be simulated by setting a very long timeout, such as a year.

Signed-off-by: Lars Wirzenius liw@liw.fi

3 files changed +42 -35 b3898f65 27317200
modified Cargo.lock
@@ -1383,9 +1383,9 @@ dependencies = [

[[package]]
name = "libc"
-
version = "0.2.171"
+
version = "0.2.175"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
+
checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543"

[[package]]
name = "libgit2-sys"
@@ -1994,6 +1994,7 @@ dependencies = [
 "culpa",
 "duration-str",
 "html-page",
+
 "libc",
 "nonempty 0.11.0",
 "qcheck",
 "qcheck-macros",
modified Cargo.toml
@@ -15,6 +15,7 @@ anyhow = "1.0.95"
clap = { version = "4.5.11", features = ["derive", "wrap_help"] }
duration-str = "0.12.0"
html-page = "0.4.0"
+
libc = "0.2.175"
nonempty = "0.11.0"
radicle-crypto = "0.12.0"
radicle-git-ext = "0.8.0"
modified src/timeoutcmd.rs
@@ -53,12 +53,15 @@ use std::{
        mpsc::{
            channel, sync_channel, Receiver, RecvTimeoutError, Sender, SyncSender, TryRecvError,
        },
-
        Arc, Condvar, Mutex,
+
        Arc, Condvar, Mutex, MutexGuard,
    },
    thread::{sleep, spawn, JoinHandle},
    time::{Duration, Instant},
};

+
#[cfg(unix)]
+
use std::os::unix::process::CommandExt;
+

use tempfile::tempfile;

use crate::logger;
@@ -154,7 +157,7 @@ impl FinishedProcess {
/// The child gets some data fed to it via its standard input. The
/// child is terminated if it runs for too long.
pub struct ChildProcess {
-
    deadline: Option<Instant>,
+
    deadline: Instant,
    child: Child,
    stdout: RealtimeLines,
    stdout_rx: Receiver<()>,
@@ -189,6 +192,7 @@ impl ChildProcess {
            .stdin(file)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
+
            .process_group(0)
            .spawn()
            .map_err(|err| TimeoutError::Spawn(cmd, err))?;

@@ -230,11 +234,10 @@ impl ChildProcess {
        };

        Ok(Self {
-
            deadline: Some(
-
                Instant::now()
-
                    .checked_add(timeout)
-
                    .ok_or(TimeoutError::Deadline)?,
-
            ),
+
            deadline: Instant::now()
+
                .checked_add(timeout)
+
                .ok_or(TimeoutError::Deadline)?,
+

            child,
            stdout: stdout_lines,
            stdout_rx,
@@ -246,9 +249,15 @@ impl ChildProcess {
    }

    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
-
        self.child.kill().map_err(|_| TimeoutError::Kill)?;
+
        // Kill the whole process groups. This includes any child
+
        // processes of the adapter. We terminate with extreme
+
        // prejudice, as the adapter had all the time the node
+
        // operator is willing to give it to finish, and it's past
+
        // time to be nice.
+
        unsafe {
+
            libc::killpg(self.child.id() as i32, libc::SIGKILL);
+
        }
        self.stdout.finish();
-
        self.deadline = None;
        self.wait()
    }

@@ -271,18 +280,17 @@ impl ChildProcess {
        }

        self.stdout.finish();
-
        if let Some(deadline) = self.deadline {
-
            let max_wait = deadline - Instant::now();
-

-
            // Wait for to finish, up to the given timeout.
-
            let _guardlock = &*self.arc;
-
            match self.stdout_rx.recv_timeout(max_wait) {
-
                Ok(_) | Err(RecvTimeoutError::Disconnected) => (),
-
                Err(RecvTimeoutError::Timeout) => {
-
                    return Err(TimeoutError::TimedOut);
-
                }
+

+
        let max_wait = self.deadline - Instant::now();
+

+
        // Wait for child stdout to finish, up to the given timeout.
+
        let _guardlock = &*self.arc;
+
        match self.stdout_rx.recv_timeout(max_wait) {
+
            Ok(_) | Err(RecvTimeoutError::Disconnected) => {}
+
            Err(RecvTimeoutError::Timeout) => {
+
                return Err(TimeoutError::TimedOut);
            }
-
        };
+
        }

        self.stdout_thread
            .join()
@@ -431,25 +439,23 @@ impl RealtimeLines {
        // Lock the mutex to get access to unlocked buffer.
        let mut buf = mutex.lock().expect("lock to wait for line");

-
        let started = Instant::now();
        loop {
-
            let timed_out = self.started.elapsed() > self.max_duration;
-
            if timed_out {
-
                eprintln!("timed out");
+
            let remaining = self
+
                .max_duration
+
                .checked_sub(self.started.elapsed())
+
                .unwrap_or_default();
+

+
            if remaining.as_millis() == 0 {
                return None;
            }
-
            let line = buf.line(); // this doesn't block but if there is no output, do we time out?
-
            eprintln!(
-
                "{} trying to read line: {line:?}",
-
                started.elapsed().as_millis()
-
            );
+

+
            let line = buf.line();
            match line {
                // We didn't get a line, but the input stream has
                // finished. Return final partial line, if there is one,
                // or None.
                None if buf.is_finished() => {
                    let line = buf.line();
-
                    eprintln!("return line after finished {line:?}");
                    return line;
                }

@@ -457,13 +463,12 @@ impl RealtimeLines {
                // assume the new input results in either a complete
                // line or the input stream finishing, so we loop.
                None => {
-
                    eprintln!("wait for line");
-
                    buf = var.wait(buf).expect("wait for line"); // var is not notified if we time out
+
                    let result = var.wait_timeout(buf, remaining).expect("wait for line");
+
                    buf = result.0;
                }

                // We got a line: return it.
                Some(line) => {
-
                    eprintln!("return line {line:?}");
                    return Some(line);
                }
            }