Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat(src/adapter.rs): run a CI adapter
Lars Wirzenius committed 2 years ago
commit 1c90e35cef8d04745536199ec37d81b7b9597a73
parent b096d9fb6f6322aae92070c0c8d6c6faad325a18
1 file changed +418 -0
added src/adapter.rs
@@ -0,0 +1,418 @@
+
//! Run a Radicle CI adapter.
+
//!
+
//! Given an executable that conforms to the CI adapter API, execute
+
//! it by feeding it the "trigger" message via its stdin and reading
+
//! response messages from its stdout. Return the result of the run,
+
//! or an error if something went badly wrong. The CI run failing due
+
//! to something in the repository under test is expected, and not
+
//! considered as something going badly wrong.
+

+
use std::{
+
    collections::HashMap,
+
    ffi::OsStr,
+
    io::{BufRead, BufReader},
+
    os::unix::process::ExitStatusExt,
+
    path::{Path, PathBuf},
+
    process::{Command, Stdio},
+
};
+

+
use crate::{
+
    msg::{MessageError, Request, Response},
+
    run::{Run, RunState},
+
};
+

+
/// An external executable that runs CI on request.
+
#[derive(Debug, Default, Clone, Eq, PartialEq)]
+
pub struct Adapter {
+
    bin: PathBuf,
+
    env: HashMap<String, String>,
+
}
+

+
impl Adapter {
+
    pub fn new(bin: &Path) -> Self {
+
        Self {
+
            bin: bin.into(),
+
            env: HashMap::new(),
+
        }
+
    }
+

+
    pub fn with_environment(mut self, env: &HashMap<String, String>) -> Self {
+
        for (key, value) in env.iter() {
+
            self.env.insert(key.into(), value.into());
+
        }
+
        self
+
    }
+

+
    fn envs(&self) -> impl Iterator<Item = (&OsStr, &OsStr)> {
+
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
+
    }
+

+
    pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
        run.set_state(RunState::Running);
+
        let x = self.run_helper(trigger, run);
+
        run.set_state(RunState::Finished);
+
        x
+
    }
+

+
    fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
        assert!(matches!(trigger, Request::Trigger { .. }));
+

+
        // Spawn the adapter sub-process.
+
        let mut child = Command::new(&self.bin)
+
            .stdin(Stdio::piped())
+
            .stdout(Stdio::piped())
+
            .envs(self.envs())
+
            .spawn()
+
            .map_err(|e| AdapterError::SpawnAdapter(self.bin.clone(), e))?;
+

+
        // Write the request to trigger a run to the child's stdin.
+
        // Then close the pipe to prevent the child from trying to
+
        // read another message that will never be sent.
+
        {
+
            let stdin = child.stdin.take().ok_or(AdapterError::StdinHandle)?;
+
            trigger.to_writer(stdin)?;
+
        }
+

+
        // Get the child's stdout into a BufReader so that we can loop
+
        // over lines.
+
        let stdout = child.stdout.take().ok_or(AdapterError::StdoutHandle)?;
+
        let stdout = BufReader::new(stdout);
+
        let mut lines = stdout.lines();
+

+
        if let Some(line) = lines.next() {
+
            let line = line.map_err(AdapterError::ReadLine)?;
+
            let resp = Response::from_str(&line)?;
+
            match resp {
+
                Response::Triggered { run_id } => {
+
                    run.set_adapter_run_id(run_id);
+
                }
+
                _ => return Err(AdapterError::NotTriggered(resp)),
+
            }
+
        }
+

+
        if let Some(line) = lines.next() {
+
            let line = line.map_err(AdapterError::ReadLine)?;
+
            let resp = Response::from_str(&line)?;
+
            match resp {
+
                Response::Finished { result } => {
+
                    run.set_result(result);
+
                }
+
                _ => return Err(AdapterError::NotTriggered(resp)),
+
            }
+
        }
+

+
        if let Some(line) = lines.next() {
+
            let line = line.map_err(AdapterError::ReadLine)?;
+
            let resp = Response::from_str(&line)?;
+
            return Err(AdapterError::TooMany(resp));
+
        }
+

+
        let wait = child.wait().map_err(AdapterError::Wait)?;
+
        let wait = wait.into_raw();
+
        if wait != 0 {
+
            return Err(AdapterError::Failed(wait));
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum AdapterError {
+
    /// A message related error.
+
    #[error(transparent)]
+
    Message(#[from] MessageError),
+

+
    /// Error from spawning a sub-process.
+
    #[error("failed to spawn a CI adapter sub-process: {0}")]
+
    SpawnAdapter(PathBuf, #[source] std::io::Error),
+

+
    /// Error getting the file handle for the adapter's stdin.
+
    #[error("failed to get handle for adapter's stdin")]
+
    StdinHandle,
+

+
    /// Error getting the file handle for the adapter's stdout.
+
    #[error("failed to get handle for adapter's stdout")]
+
    StdoutHandle,
+

+
    #[error("failed to read from adapter stdout")]
+
    ReadLine(#[source] std::io::Error),
+

+
    /// Waiting for child process failed.
+
    #[error("failed to wait for child process to exit")]
+
    Wait(#[source] std::io::Error),
+

+
    /// Child process failed.
+
    #[error("child process failed with wait status {0}")]
+
    Failed(i32),
+

+
    /// First message is not `Response::Triggered`
+
    #[error("adapter's first message is not 'triggered', but {0:?}")]
+
    NotTriggered(Response),
+

+
    /// Too many messages from adapter.
+
    #[error("adapter sent too many messages: first extra is {0:#?}")]
+
    TooMany(Response),
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use std::{fs::write, io::ErrorKind};
+

+
    use tempfile::tempdir;
+

+
    use super::{Adapter, Run};
+
    use crate::{
+
        adapter::AdapterError,
+
        msg::{MessageError, Response, RunResult},
+
        test::{mock_adapter, trigger_request, TestResult},
+
    };
+

+
    #[test]
+
    fn adapter_reports_success() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"success"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        assert_eq!(run.result(), Some(&RunResult::Success));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_reports_failure() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"failure"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        assert_eq!(run.result(), Some(&RunResult::Failure));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_returns_error() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":{"error":"error message\nsecond line"}}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        assert_eq!(
+
            run.result(),
+
            Some(&RunResult::Error("error message\nsecond line".into()))
+
        );
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_is_killed_before_any_messages() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
kill -9 $BASHPID
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_is_killed_after_first_message() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
kill -9 $BASHPID
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_is_killed_after_second_message() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"success"}'
+
kill -9 $BASHPID
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        assert!(matches!(x, Err(AdapterError::Failed(_))));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_produces_as_bad_message() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"success","bad":"field"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        assert!(matches!(
+
            x,
+
            Err(AdapterError::Message(MessageError::DeserializeResponse(_)))
+
        ));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_first_message_isnt_triggered() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"finished","result":"success"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        assert!(matches!(
+
            x,
+
            Err(AdapterError::NotTriggered(Response::Finished {
+
                result: RunResult::Success
+
            }))
+
        ));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_outputs_too_many_messages() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"success"}'
+
echo '{"response":"finished","result":"success"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        assert!(matches!(
+
            x,
+
            Err(AdapterError::TooMany(Response::Finished {
+
                result: RunResult::Success
+
            }))
+
        ));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_does_not_exist() -> TestResult<()> {
+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        match x {
+
            Err(AdapterError::SpawnAdapter(filename, e)) => {
+
                assert_eq!(bin, filename);
+
                assert_eq!(e.kind(), ErrorKind::NotFound);
+
            }
+
            _ => panic!("expected a specific error"),
+
        }
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_is_not_executable() -> TestResult<()> {
+
        const ADAPTER: &str = r#"#!/bin/bash
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"success"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        write(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        match x {
+
            Err(AdapterError::SpawnAdapter(filename, e)) => {
+
                assert_eq!(bin, filename);
+
                assert_eq!(e.kind(), ErrorKind::PermissionDenied);
+
            }
+
            _ => panic!("expected a specific error"),
+
        }
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn adapter_has_bad_interpreter() -> TestResult<()> {
+
        // We test this with a shebang. However, the same kind of code
+
        // paths and errors should happen when a binary can't be
+
        // loaded due to missing dynamic linker or library or such.
+

+
        const ADAPTER: &str = r#"#!/bin/does-not-exist
+
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
+
echo '{"response":"finished","result":"success"}'
+
"#;
+

+
        let tmp = tempdir()?;
+
        let bin = tmp.path().join("adapter.sh");
+
        mock_adapter(&bin, ADAPTER)?;
+

+
        let mut run = Run::default();
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        match x {
+
            Err(AdapterError::SpawnAdapter(filename, e)) => {
+
                assert_eq!(bin, filename);
+
                assert_eq!(e.kind(), ErrorKind::NotFound);
+
            }
+
            _ => panic!("expected a specific error"),
+
        }
+

+
        Ok(())
+
    }
+
}