Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix: when adapter times out, return correct error variant
Merged liw opened 7 months ago

We need to handle “run failed for an intrinsic reason” differently from “run didn’t finish in time”.

Signed-off-by: Lars Wirzenius liw@liw.fi

fix: expected result in test case

wrong result used to be returned by code, test case was always wrong

Signed-off-by: Lars Wirzenius liw@liw.fi

refactor(src/adapter.rs): reduce code duplication in tests

Signed-off-by: Lars Wirzenius liw@liw.fi

feat: say run timed out on report page if that happened

Signed-off-by: Lars Wirzenius liw@liw.fi

6 files changed +160 -118 ce9def49 845a86c0
modified src/adapter.rs
@@ -149,6 +149,10 @@ impl Adapter {
        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

+
        if matches!(x, Err(AdapterError::FailedTimeout)) {
+
            run.set_timed_out();
+
        }
+

        x
    }

@@ -194,11 +198,7 @@ impl Adapter {
            outcome.set_error(err);
        }

-
        let result = if outcome.has_error() {
-
            child.kill()
-
        } else {
-
            child.wait()
-
        };
+
        let result = child.wait();

        match result {
            Ok(finished) => {
@@ -216,16 +216,17 @@ impl Adapter {
                }
            }
            Err(TimeoutError::TimedOut) => {
-
                logger::adapter_did_not_exit_voluntarily();
-
                outcome.set_error(AdapterError::FailedNotExited);
+
                logger::adapter_timed_out();
+
                outcome.set_error(AdapterError::FailedTimeout);
            }
            Err(err) => {
-
                logger::adapter_did_not_exit(err);
-
                outcome.set_error(AdapterError::Signal(9));
+
                logger::adapter_did_not_exit(&err);
+
                outcome.set_error(AdapterError::TimeoutWait(err));
            }
        }

        if let Some(err) = outcome.error() {
+
            run.set_result(RunResult::Failure);
            Err(err)
        } else {
            Ok(())
@@ -334,13 +335,7 @@ struct MaybeResult {

impl MaybeResult {
    fn set_error(&mut self, err: AdapterError) {
-
        if self.error.is_none() {
-
            self.error = Some(err);
-
        }
-
    }
-

-
    fn has_error(&self) -> bool {
-
        self.error.is_some()
+
        self.error = Some(err);
    }

    fn error(self) -> Option<AdapterError> {
@@ -401,14 +396,18 @@ pub enum AdapterError {
    #[error("child process failed with wait status {0:?}")]
    Failed(ExitStatus),

-
    /// Child process failed: didn't exit.
-
    #[error("child process failed without exiting")]
-
    FailedNotExited,
+
    /// Child process was terminated due to taking too long.
+
    #[error("child process was terminated due to taking too long")]
+
    FailedTimeout,

    /// Child process was killed.
    #[error("child process terminated by signal {0}")]
    Signal(i32),

+
    /// Child process did not terminate.
+
    #[error("child process did not terminate")]
+
    TimeoutWait(#[source] crate::timeoutcmd::TimeoutError),
+

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
    NotTriggered(Response),
@@ -452,9 +451,9 @@ pub enum AdapterError {

#[cfg(test)]
mod test {
-
    use std::{fs::write, io::ErrorKind, time::Duration};
+
    use std::{fs::write, io::ErrorKind, path::PathBuf, time::Duration};

-
    use tempfile::{tempdir, NamedTempFile};
+
    use tempfile::{tempdir, NamedTempFile, TempDir};

    use radicle::git::Oid;
    use radicle::prelude::RepoId;
@@ -490,17 +489,24 @@ mod test {
            .build())
    }

+
    #[allow(clippy::unwrap_used)]
+
    fn adapter(tmp: &TempDir, shell: &'static str) -> PathBuf {
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, shell).unwrap();
+
        bin
+
    }
+

    #[test]
    fn adapter_reports_success() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -514,15 +520,15 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_reports_failure() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -541,17 +547,17 @@ echo '{"response":"finished","result":"failure"}'

    #[test]
    fn adapter_exits_nonzero() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
echo woe be me 1>&2
exit 1
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -567,13 +573,33 @@ exit 1

    #[test]
    fn adapter_is_killed_before_any_messages() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
kill -9 $$
-
"#;
+
"#,
+
        );
+

+
        let db = db()?;
+
        let mut run = run()?;
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
+
        eprintln!("{x:#?}");
+
        assert!(matches!(x, Err(AdapterError::Signal(9))));

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_ends_ok_before_first_message() -> TestResult<()> {
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -587,16 +613,16 @@ kill -9 $$
    }

    #[test]
-
    fn adapter_is_killed_after_first_message() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
    fn adapter_is_killed_before_first_message() -> TestResult<()> {
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
kill -9 $$
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -604,21 +630,21 @@ kill -9 $$
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
+
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }

    #[test]
    fn adapter_ends_ok_before_second_message() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -633,16 +659,16 @@ echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'

    #[test]
    fn adapter_is_killed_after_second_message() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
kill -9 $$
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -661,15 +687,15 @@ kill -9 $$

    #[test]
    fn adapter_produces_as_bad_message() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success","bad":"field"}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -687,14 +713,14 @@ echo '{"response":"finished","result":"success","bad":"field"}'

    #[test]
    fn adapter_first_message_isnt_triggered() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"finished","result":"success"}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -714,16 +740,16 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_outputs_too_many_messages() -> TestResult<()> {
-
        const ADAPTER: &str = r#"#!/bin/sh
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
echo '{"response":"finished","result":"success"}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
@@ -798,15 +824,15 @@ echo '{"response":"finished","result":"success"}'
        // paths and errors should happen when a binary can't be
        // loaded due to missing dynamic linker or library or such.

-
        const ADAPTER: &str = r#"#!/bin/does-not-exist
+
        let tmp = tempdir()?;
+
        let bin = adapter(
+
            &tmp,
+
            r#"#!/bin/does-not-exist
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
-
"#;
-

-
        let tmp = tempdir()?;
-
        let bin = tmp.path().join("adapter.sh");
-
        mock_adapter(&bin, ADAPTER)?;
+
"#,
+
        );

        let db = db()?;
        let mut run = run()?;
modified src/broker.rs
@@ -269,7 +269,7 @@ exit 1
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
        assert_eq!(run.state(), RunState::Finished);
-
        assert_eq!(run.result(), Some(&RunResult::Success));
+
        assert_eq!(run.result(), Some(&RunResult::Failure));

        Ok(())
    }
modified src/logger.rs
@@ -118,6 +118,7 @@ enum Id {
    AdapterStderrLine,
    AdapterStdoutLine,
    AdapterTempConfig,
+
    AdapterTimedOut,
    AdapterTooManyMessages,

    BrokerDatabase,
@@ -802,7 +803,7 @@ pub fn adapter_did_not_exit_voluntarily() {
    );
}

-
pub fn adapter_did_not_exit(error: TimeoutError) {
+
pub fn adapter_did_not_exit(error: &TimeoutError) {
    warn!(
        msg_id = ?Id::AdapterNoExit,
        kind = %Kind::Debug,
@@ -811,6 +812,14 @@ pub fn adapter_did_not_exit(error: TimeoutError) {
    );
}

+
pub fn adapter_timed_out() {
+
    info!(
+
        msg_id = ?Id::AdapterTimedOut,
+
        kind = %Kind::FinishRun,
+
        "adapter was terminated due to taking too long"
+
    );
+
}
+

pub fn job_failure(
    msg: &'static str,
    repo_id: &RepoId,
modified src/pages.rs
@@ -370,7 +370,20 @@ impl PageData {
        let mut runs = self.runs(rid);
        runs.sort_by_cached_key(|run| run.timestamp());
        runs.reverse();
+

        for run in runs {
+
            let result = if let Some(result) = run.result() {
+
                result.to_string()
+
            } else {
+
                "unknown".into()
+
            };
+
            let mut status = Element::new(Tag::Span)
+
                .with_class(&result)
+
                .with_text(&result);
+
            if run.timed_out() {
+
                status.push_text(" (timed out)");
+
            }
+

            let current = match run.state() {
                RunState::Triggered => Element::new(Tag::Span)
                    .with_attribute("state", "triggered")
@@ -378,16 +391,7 @@ impl PageData {
                RunState::Running => Element::new(Tag::Span)
                    .with_class("running)")
                    .with_text("running"),
-
                RunState::Finished => {
-
                    let result = if let Some(result) = run.result() {
-
                        result.to_string()
-
                    } else {
-
                        "unknown".into()
-
                    };
-
                    Element::new(Tag::Span)
-
                        .with_class(&result)
-
                        .with_text(&result)
-
                }
+
                RunState::Finished => status,
            };

            table.push_child(
modified src/run.rs
@@ -67,6 +67,7 @@ impl RunBuilder {
            whence,
            state: RunState::Triggered,
            result: None,
+
            timed_out: None,
        }
    }
}
@@ -76,7 +77,6 @@ pub struct Run {
    broker_run_id: RunId,
    adapter_run_id: Option<RunId>,
    adapter_info_url: Option<String>,
-
    job_id: Option<JobId>,
    repo_id: RepoId,
    #[serde(alias = "repo_alias")]
    repo_name: String,
@@ -84,6 +84,11 @@ pub struct Run {
    whence: Whence,
    state: RunState,
    result: Option<RunResult>,
+

+
    // These fields need to be options we can de-serialize from old
+
    // runs in the database.
+
    job_id: Option<JobId>,
+
    timed_out: Option<bool>,
}

impl Run {
@@ -167,6 +172,16 @@ impl Run {
    pub fn result(&self) -> Option<&RunResult> {
        self.result.as_ref()
    }
+

+
    /// Mark run as having timed out.
+
    pub fn set_timed_out(&mut self) {
+
        self.timed_out = Some(true);
+
    }
+

+
    /// Was run marked as having timed out?
+
    pub fn timed_out(&self) -> bool {
+
        self.timed_out == Some(true)
+
    }
}

/// State of CI run.
modified src/timeoutcmd.rs
@@ -249,6 +249,11 @@ impl ChildProcess {
    }

    pub fn kill(mut self) -> Result<FinishedProcess, TimeoutError> {
+
        self.kill_helper();
+
        self.wait()
+
    }
+

+
    fn kill_helper(&mut self) {
        // Kill the whole process groups. This includes any child
        // processes of the adapter. We terminate with extreme
        // prejudice, as the adapter had all the time the node
@@ -258,27 +263,9 @@ impl ChildProcess {
            libc::killpg(self.child.id() as i32, libc::SIGKILL);
        }
        self.stdout.finish();
-
        self.wait()
    }

    pub fn wait(mut self) -> Result<FinishedProcess, TimeoutError> {
-
        let result = self.child.try_wait();
-
        match result {
-
            Ok(Some(status)) => {
-
                let stderr = self
-
                    .stderr_thread
-
                    .join()
-
                    .map_err(|_| TimeoutError::Thread)??;
-

-
                return Ok(FinishedProcess {
-
                    exit: status,
-
                    stderr,
-
                });
-
            }
-
            Ok(None) => (),
-
            Err(err) => return Err(TimeoutError::Wait(err)),
-
        }
-

        self.stdout.finish();

        let max_wait = self.deadline - Instant::now();
@@ -288,6 +275,7 @@ impl ChildProcess {
        match self.stdout_rx.recv_timeout(max_wait) {
            Ok(_) | Err(RecvTimeoutError::Disconnected) => {}
            Err(RecvTimeoutError::Timeout) => {
+
                self.kill_helper();
                return Err(TimeoutError::TimedOut);
            }
        }