Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix: when adapter times out, return correct error variant
Lars Wirzenius committed 7 months ago
commit d49fd6cb2dd8d3147e804401309a42ec45b5debb
parent ce9def4
3 files changed +52 -40
modified src/adapter.rs
@@ -194,11 +194,7 @@ impl Adapter {
            outcome.set_error(err);
        }

-
        let result = if outcome.has_error() {
-
            child.kill()
-
        } else {
-
            child.wait()
-
        };
+
        let result = child.wait();

        match result {
            Ok(finished) => {
@@ -216,16 +212,17 @@ impl Adapter {
                }
            }
            Err(TimeoutError::TimedOut) => {
-
                logger::adapter_did_not_exit_voluntarily();
-
                outcome.set_error(AdapterError::FailedNotExited);
+
                logger::adapter_timed_out();
+
                outcome.set_error(AdapterError::FailedTimeout);
            }
            Err(err) => {
-
                logger::adapter_did_not_exit(err);
-
                outcome.set_error(AdapterError::Signal(9));
+
                logger::adapter_did_not_exit(&err);
+
                outcome.set_error(AdapterError::TimeoutWait(err));
            }
        }

        if let Some(err) = outcome.error() {
+
            run.set_result(RunResult::Failure);
            Err(err)
        } else {
            Ok(())
@@ -334,13 +331,7 @@ struct MaybeResult {

impl MaybeResult {
    fn set_error(&mut self, err: AdapterError) {
-
        if self.error.is_none() {
-
            self.error = Some(err);
-
        }
-
    }
-

-
    fn has_error(&self) -> bool {
-
        self.error.is_some()
+
        self.error = Some(err);
    }

    fn error(self) -> Option<AdapterError> {
@@ -401,14 +392,18 @@ pub enum AdapterError {
    #[error("child process failed with wait status {0:?}")]
    Failed(ExitStatus),

-
    /// Child process failed: didn't exit.
-
    #[error("child process failed without exiting")]
-
    FailedNotExited,
+
    /// Child process was terminated due to taking too long.
+
    #[error("child process was terminated due to taking too long")]
+
    FailedTimeout,

    /// Child process was killed.
    #[error("child process terminated by signal {0}")]
    Signal(i32),

+
    /// Child process did not terminate.
+
    #[error("child process did not terminate")]
+
    TimeoutWait(#[source] crate::timeoutcmd::TimeoutError),
+

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
    NotTriggered(Response),
@@ -581,13 +576,33 @@ kill -9 $$
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
+
        assert!(matches!(x, Err(AdapterError::Signal(9))));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_ends_ok_before_first_message() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/sh
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let db = db()?;
+
        let mut run = run()?;
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

        Ok(())
    }

    #[test]
-
    fn adapter_is_killed_after_first_message() -> TestResult<()> {
+
    fn adapter_is_killed_before_first_message() -> TestResult<()> {
        const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -604,7 +619,7 @@ kill -9 $$
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
+
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }
modified src/logger.rs
@@ -118,6 +118,7 @@ enum Id {
    AdapterStderrLine,
    AdapterStdoutLine,
    AdapterTempConfig,
+
    AdapterTimedOut,
    AdapterTooManyMessages,

    BrokerDatabase,
@@ -802,7 +803,7 @@ pub fn adapter_did_not_exit_voluntarily() {
    );
}

-
pub fn adapter_did_not_exit(error: TimeoutError) {
+
pub fn adapter_did_not_exit(error: &TimeoutError) {
    warn!(
        msg_id = ?Id::AdapterNoExit,
        kind = %Kind::Debug,
@@ -811,6 +812,14 @@ pub fn adapter_did_not_exit(error: TimeoutError) {
    );
}

+
pub fn adapter_timed_out() {
+
    info!(
+
        msg_id = ?Id::AdapterTimedOut,
+
        kind = %Kind::FinishRun,
+
        "adapter was terminated due to taking too long"
+
    );
+
}
+

pub fn job_failure(
    msg: &'static str,
    repo_id: &RepoId,
modified src/timeoutcmd.rs
@@ -249,6 +249,11 @@ impl ChildProcess {
    }

    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
+
        self.kill_helper();
+
        self.wait()
+
    }
+

+
    fn kill_helper(&mut self) {
        // Kill the whole process groups. This includes any child
        // processes of the adapter. We terminate with extreme
        // prejudice, as the adapter had all the time the node
@@ -258,27 +263,9 @@ impl ChildProcess {
            libc::killpg(self.child.id() as i32, libc::SIGKILL);
        }
        self.stdout.finish();
-
        self.wait()
    }

    pub fn wait(mut self) -> Result<FinishedProcess, TimeoutError> {
-
        let result = self.child.try_wait();
-
        match result {
-
            Ok(Some(status)) => {
-
                let stderr = self
-
                    .stderr_thread
-
                    .join()
-
                    .map_err(|_| TimeoutError::Thread)??;
-

-
                return Ok(FinishedProcess {
-
                    exit: status,
-
                    stderr,
-
                });
-
            }
-
            Ok(None) => (),
-
            Err(err) => return Err(TimeoutError::Wait(err)),
-
        }
-

        self.stdout.finish();

        let max_wait = self.deadline - Instant::now();
@@ -288,6 +275,7 @@ impl ChildProcess {
        match self.stdout_rx.recv_timeout(max_wait) {
            Ok(_) | Err(RecvTimeoutError::Disconnected) => {}
            Err(RecvTimeoutError::Timeout) => {
+
                self.kill_helper();
                return Err(TimeoutError::TimedOut);
            }
        }