Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix: max_run_time setting in cib
Lars Wirzenius committed 8 months ago
commit 27317200c10b027f6f32d9853976d974e52fd224
parent b3898f6
3 files changed +42 -35
modified Cargo.lock
@@ -1383,9 +1383,9 @@ dependencies = [

[[package]]
name = "libc"
-
version = "0.2.171"
+
version = "0.2.175"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
+
checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543"

[[package]]
name = "libgit2-sys"
@@ -1994,6 +1994,7 @@ dependencies = [
 "culpa",
 "duration-str",
 "html-page",
+
 "libc",
 "nonempty 0.11.0",
 "qcheck",
 "qcheck-macros",
modified Cargo.toml
@@ -15,6 +15,7 @@ anyhow = "1.0.95"
clap = { version = "4.5.11", features = ["derive", "wrap_help"] }
duration-str = "0.12.0"
html-page = "0.4.0"
+
libc = "0.2.175"
nonempty = "0.11.0"
radicle-crypto = "0.12.0"
radicle-git-ext = "0.8.0"
modified src/timeoutcmd.rs
@@ -53,12 +53,15 @@ use std::{
        mpsc::{
            channel, sync_channel, Receiver, RecvTimeoutError, Sender, SyncSender, TryRecvError,
        },
-
        Arc, Condvar, Mutex,
+
        Arc, Condvar, Mutex, MutexGuard,
    },
    thread::{sleep, spawn, JoinHandle},
    time::{Duration, Instant},
};

+
#[cfg(unix)]
+
use std::os::unix::process::CommandExt;
+

use tempfile::tempfile;

use crate::logger;
@@ -154,7 +157,7 @@ impl FinishedProcess {
/// The child gets some data fed to it via its standard input. The
/// child is terminated if it runs for too long.
pub struct ChildProcess {
-
    deadline: Option<Instant>,
+
    deadline: Instant,
    child: Child,
    stdout: RealtimeLines,
    stdout_rx: Receiver<()>,
@@ -189,6 +192,7 @@ impl ChildProcess {
            .stdin(file)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
+
            .process_group(0)
            .spawn()
            .map_err(|err| TimeoutError::Spawn(cmd, err))?;

@@ -230,11 +234,10 @@ impl ChildProcess {
        };

        Ok(Self {
-
            deadline: Some(
-
                Instant::now()
-
                    .checked_add(timeout)
-
                    .ok_or(TimeoutError::Deadline)?,
-
            ),
+
            deadline: Instant::now()
+
                .checked_add(timeout)
+
                .ok_or(TimeoutError::Deadline)?,
+

            child,
            stdout: stdout_lines,
            stdout_rx,
@@ -246,9 +249,15 @@ impl ChildProcess {
    }

    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
-
        self.child.kill().map_err(|_| TimeoutError::Kill)?;
+
        // Kill the whole process groups. This includes any child
+
        // processes of the adapter. We terminate with extreme
+
        // prejudice, as the adapter had all the time the node
+
        // operator is willing to give it to finish, and it's past
+
        // time to be nice.
+
        unsafe {
+
            libc::killpg(self.child.id() as i32, libc::SIGKILL);
+
        }
        self.stdout.finish();
-
        self.deadline = None;
        self.wait()
    }

@@ -271,18 +280,17 @@ impl ChildProcess {
        }

        self.stdout.finish();
-
        if let Some(deadline) = self.deadline {
-
            let max_wait = deadline - Instant::now();
-

-
            // Wait for to finish, up to the given timeout.
-
            let _guardlock = &*self.arc;
-
            match self.stdout_rx.recv_timeout(max_wait) {
-
                Ok(_) | Err(RecvTimeoutError::Disconnected) => (),
-
                Err(RecvTimeoutError::Timeout) => {
-
                    return Err(TimeoutError::TimedOut);
-
                }
+

+
        let max_wait = self.deadline - Instant::now();
+

+
        // Wait for child stdout to finish, up to the given timeout.
+
        let _guardlock = &*self.arc;
+
        match self.stdout_rx.recv_timeout(max_wait) {
+
            Ok(_) | Err(RecvTimeoutError::Disconnected) => {}
+
            Err(RecvTimeoutError::Timeout) => {
+
                return Err(TimeoutError::TimedOut);
            }
-
        };
+
        }

        self.stdout_thread
            .join()
@@ -431,25 +439,23 @@ impl RealtimeLines {
        // Lock the mutex to get access to unlocked buffer.
        let mut buf = mutex.lock().expect("lock to wait for line");

-
        let started = Instant::now();
        loop {
-
            let timed_out = self.started.elapsed() > self.max_duration;
-
            if timed_out {
-
                eprintln!("timed out");
+
            let remaining = self
+
                .max_duration
+
                .checked_sub(self.started.elapsed())
+
                .unwrap_or_default();
+

+
            if remaining.as_millis() == 0 {
                return None;
            }
-
            let line = buf.line(); // this doesn't block but if there is no output, do we time out?
-
            eprintln!(
-
                "{} trying to read line: {line:?}",
-
                started.elapsed().as_millis()
-
            );
+

+
            let line = buf.line();
            match line {
                // We didn't get a line, but the input stream has
                // finished. Return final partial line, if there is one,
                // or None.
                None if buf.is_finished() => {
                    let line = buf.line();
-
                    eprintln!("return line after finished {line:?}");
                    return line;
                }

@@ -457,13 +463,12 @@ impl RealtimeLines {
                // assume the new input results in either a complete
                // line or the input stream finishing, so we loop.
                None => {
-
                    eprintln!("wait for line");
-
                    buf = var.wait(buf).expect("wait for line"); // var is not notified if we time out
+
                    let result = var.wait_timeout(buf, remaining).expect("wait for line");
+
                    buf = result.0;
                }

                // We got a line: return it.
                Some(line) => {
-
                    eprintln!("return line {line:?}");
                    return Some(line);
                }
            }