Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: use new timeout-command module to execute adapter
Lars Wirzenius committed 1 year ago
commit 30435e200124dd04bc20877e432fd4b486cd7753
parent 0e78c6a8d86b7fc93a4557fb725b12b0124693af
2 files changed +107 -63
modified src/adapter.rs
@@ -10,9 +10,9 @@
use std::{
    collections::HashMap,
    ffi::OsStr,
-
    io::{BufRead, BufReader, Read},
    path::{Path, PathBuf},
-
    process::{Command, Stdio},
+
    process::Command,
+
    time::Duration,
};

use crate::{
@@ -21,6 +21,7 @@ use crate::{
    msg::{MessageError, Request, Response},
    notif::NotificationSender,
    run::{Run, RunState},
+
    timeoutcmd::{TimeoutCommand, TimeoutError},
};

const NOT_EXITED: i32 = 999;
@@ -79,34 +80,20 @@ impl Adapter {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
-
        let mut child = Command::new(&self.bin)
-
            .stdin(Stdio::piped())
-
            .stdout(Stdio::piped())
-
            .stderr(Stdio::piped())
-
            .envs(self.envs())
-
            .spawn()
-
            .map_err(|e| AdapterError::SpawnAdapter(self.bin.clone(), e))?;
+
        let mut cmd = Command::new(&self.bin);
+
        cmd.envs(self.envs());
+
        let mut child = TimeoutCommand::new(Duration::from_secs(1000));
+
        child.feed_stdin(trigger.to_string().as_bytes());
+
        let child = child.spawn(cmd).map_err(|err| match err {
+
            TimeoutError::Spawn(_, err) => AdapterError::SpawnAdapter(self.bin.clone(), err),
+
            _ => AdapterError::TimeoutCommand(err),
+
        })?;

        run_notification.notify()?;

-
        // Write the request to trigger a run to the child's stdin.
-
        // Then close the pipe to prevent the child from trying to
-
        // read another message that will never be sent.
-
        {
-
            let stdin = child.stdin.take().ok_or(AdapterError::StdinHandle)?;
-
            trigger
-
                .to_writer(stdin)
-
                .map_err(AdapterError::RequestWrite)?;
-
        }
-

-
        // Get the child's stdout into a BufReader so that we can loop
-
        // over lines.
-
        let stdout = child.stdout.take().ok_or(AdapterError::StdoutHandle)?;
-
        let stdout = BufReader::new(stdout);
-
        let mut lines = stdout.lines();
+
        let stdout = child.stdout();

-
        if let Some(line) = lines.next() {
-
            let line = line.map_err(AdapterError::ReadLine)?;
+
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
            match resp {
@@ -118,14 +105,18 @@ impl Adapter {
                    }
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
-
                _ => return Err(AdapterError::NotTriggered(resp)),
+
                _ => {
+
                    child.kill().ok();
+
                    return Err(AdapterError::NotTriggered(resp));
+
                }
            }
        } else {
            logger::adapter_no_first_response();
+
            child.kill().ok();
+
            return Err(AdapterError::NoFirstMessage);
        }

-
        if let Some(line) = lines.next() {
-
            let line = line.map_err(AdapterError::ReadLine)?;
+
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
            match resp {
@@ -133,36 +124,47 @@ impl Adapter {
                    run.set_result(result);
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
-
                _ => return Err(AdapterError::NotFinished(resp)),
+
                _ => {
+
                    child.kill().ok();
+
                    return Err(AdapterError::NotFinished(resp));
+
                }
            }
        } else {
            logger::adapter_no_second_response();
+
            child.kill().ok();
+
            return Err(AdapterError::NoSecondMessage);
        }

-
        if let Some(line) = lines.next() {
-
            let line = line.map_err(AdapterError::ReadLine)?;
+
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            logger::adapter_too_many_responses();
+
            child.kill().ok();
            return Err(AdapterError::TooMany(resp));
        }

-
        let wait = child.wait().map_err(AdapterError::Wait)?;
+
        let stderr = child.stderr();
+
        while let Some(line) = stderr.line() {
+
            logger::adapter_stderr_line(&line);
+
        }
+

+
        let result = child.wait().expect("FIXME");

-
        let mut stderr = child.stderr.take().ok_or(AdapterError::StderrHandle)?;
-
        let mut buf = vec![];
-
        stderr
-
            .read_to_end(&mut buf)
-
            .map_err(AdapterError::ReadStderr)?;
-
        let stderr = String::from_utf8_lossy(&buf);
-
        logger::adapter_result(wait.code(), &stderr);
+
        logger::debug2(format!(
+
            "wait result? {result:?} status.code: {:?}",
+
            result.status().code()
+
        ));

-
        if let Some(exit) = wait.code() {
+
        if result.timed_out() {
+
            logger::adapter_did_not_exit_voluntarily();
+
            return Err(AdapterError::Failed(NOT_EXITED));
+
        } else if let Some(exit) = result.status().code() {
+
            logger::adapter_result(exit);
            if exit != 0 {
                return Err(AdapterError::Failed(exit));
            }
        } else {
-
            logger::adapter_did_not_exit_voluntarily();
-
            return Err(AdapterError::Failed(NOT_EXITED));
+
            logger::adapter_did_not_exit();
+
            return Err(AdapterError::Signal);
        }

        Ok(())
@@ -171,6 +173,14 @@ impl Adapter {

#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
+
    /// Error from [`TimeoutCommand`] or [`RunningProcess`].
+
    #[error(transparent)]
+
    TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),
+

+
    /// Error from spawning a sub-process.
+
    #[error("failed to spawn a CI adapter sub-process: {0}")]
+
    SpawnAdapter(PathBuf, #[source] std::io::Error),
+

    /// Error creating Response from a string.
    #[error("failed to create a Response message from adapter output")]
    ParseResponse(#[source] MessageError),
@@ -179,10 +189,6 @@ pub enum AdapterError {
    #[error("failed to write request to adapter stdin")]
    RequestWrite(#[source] MessageError),

-
    /// Error from spawning a sub-process.
-
    #[error("failed to spawn a CI adapter sub-process: {0}")]
-
    SpawnAdapter(PathBuf, #[source] std::io::Error),
-

    /// Error getting the file handle for the adapter's stdin.
    #[error("failed to get handle for adapter's stdin")]
    StdinHandle,
@@ -210,10 +216,22 @@ pub enum AdapterError {
    #[error("child process failed with wait status {0}")]
    Failed(i32),

+
    /// Child process was killed.
+
    #[error("child process terminated by signal")]
+
    Signal,
+

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
    NotTriggered(Response),

+
    /// There was no first response from adapter.
+
    #[error("adapter did not sent its first message")]
+
    NoFirstMessage,
+

+
    /// There was no second response from adapter.
+
    #[error("adapter did not sent its second message")]
+
    NoSecondMessage,
+

    /// Second message is not `Response::Finished`
    #[error("adapter's second message is not 'finished', but {0:?}")]
    NotFinished(Response),
@@ -359,10 +377,7 @@ kill -9 $BASHPID
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
-
        assert!(matches!(
-
            x,
-
            Err(AdapterError::Failed(_)) | Err(AdapterError::RequestWrite(_))
-
        ));
+
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

        Ok(())
    }
@@ -385,7 +400,29 @@ kill -9 $BASHPID
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_ends_ok_before_second_message() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
read
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let db = db()?;
+
        let mut run = run()?;
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        eprintln!("{x:#?}");
+
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

        Ok(())
    }
@@ -409,7 +446,7 @@ kill -9 $BASHPID
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
-
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+
        assert!(matches!(x, Err(AdapterError::Signal)));

        Ok(())
    }
modified src/logger.rs
@@ -429,19 +429,26 @@ pub fn adapter_too_many_responses() {
    error!(slog_scope::logger(), "too many response messages");
}

-
pub fn adapter_result(exit: Option<i32>, stderr: &str) {
-
    if let Some(exit) = exit {
-
        debug!(slog_scope::logger(), "adapter exit code"; "exit_code" => exit);
-
    } else {
-
        debug!(slog_scope::logger(), "adapter was terminated by signal");
-
    }
-
    for line in stderr.lines() {
-
        debug!(slog_scope::logger(), "adapter stderr"; "stderr" => line);
-
    }
+
pub fn adapter_stderr_line(line: &str) {
+
    debug!(slog_scope::logger(), "adapter stderr"; "stderr" => line);
+
}
+

+
pub fn adapter_result(exit: i32) {
+
    debug!(slog_scope::logger(), "adapter exit code"; "exit_code" => exit);
}

pub fn adapter_did_not_exit_voluntarily() {
-
    warn!(slog_scope::logger(), "adapter did not exit voluntarily");
+
    warn!(
+
        slog_scope::logger(),
+
        "adapter did not exit voluntarily: terminated for taking too long"
+
    );
+
}
+

+
pub fn adapter_did_not_exit() {
+
    warn!(
+
        slog_scope::logger(),
+
        "adapter did not exit: probably killed by signal"
+
    );
}

pub fn debug(msg: &str) {