Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src adapter.rs
//! Run a Radicle CI adapter.
//!
//! Given an executable that conforms to the CI adapter API, execute
//! it by feeding it the "trigger" message via its stdin and reading
//! response messages from its stdout. Return the result of the run,
//! or an error if something went badly wrong. The CI run failing due
//! to something in the repository under test is expected, and not
//! considered as something going badly wrong.

use std::{
    collections::HashMap,
    ffi::OsStr,
    os::unix::process::ExitStatusExt,
    path::{Path, PathBuf},
    process::{Command, ExitStatus},
    sync::{Arc, Mutex, mpsc::Sender},
    time::Duration,
};

use serde::Serialize;
use tempfile::{TempDir, tempdir};
use url::Url;

use crate::{
    cob::{KnownJobCobs, failed, succeeded},
    config::AdapterSpec,
    db::{Db, DbError},
    logger,
    msg::{MessageError, Request, Response, RunResult},
    notif::NotificationSender,
    queueproc::ChildInfo,
    run::{Run, RunState},
    sensitive::Sensitive,
    timeoutcmd::{RealtimeLines, TimeoutCommand, TimeoutError},
};

/// The set of all configured adapters.
#[derive(Clone, Serialize)]
pub struct Adapters {
    adapters: HashMap<String, Adapter>,
    default_adapter: Option<String>,
}

impl Adapters {
    pub fn new(
        adapters: &HashMap<String, AdapterSpec>,
        default_adapter: Option<&str>,
    ) -> Result<Self, AdapterError> {
        if let Some(default) = default_adapter
            && !adapters.contains_key(default)
        {
            return Err(AdapterError::NoDefaultAdapter);
        }

        Ok(Self {
            adapters: HashMap::from_iter(
                adapters
                    .iter()
                    .map(|(k, v)| (k.to_string(), Adapter::from(v))),
            ),
            default_adapter: default_adapter.map(|s| s.into()),
        })
    }

    pub fn default_adapter(&self) -> Option<&Adapter> {
        if let Some(default) = &self.default_adapter {
            self.adapters.get(default)
        } else {
            None
        }
    }

    pub fn get(&self, name: &str) -> Option<&Adapter> {
        self.adapters.get(name)
    }

    pub fn to_json(&self) -> Result<String, AdapterError> {
        serde_json::to_string_pretty(self).map_err(AdapterError::adapters_to_json)
    }
}

/// An external executable that runs CI on request.
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize)]
pub struct Adapter {
    bin: PathBuf,
    env: HashMap<String, String>,
    config: HashMap<String, serde_norway::Value>,
    config_env: Option<String>,
}

impl Adapter {
    pub fn new(bin: &Path) -> Self {
        Self {
            bin: bin.into(),
            env: HashMap::new(),
            ..Default::default()
        }
    }

    pub fn with_environment(mut self, env: &HashMap<String, String>) -> Self {
        for (key, value) in env.iter() {
            self.env.insert(key.into(), value.into());
        }
        self
    }

    pub fn with_sensitive_environment(mut self, env: &HashMap<String, Sensitive>) -> Self {
        for (key, value) in env.iter() {
            self.env.insert(key.into(), value.as_str().into());
        }
        self
    }

    pub fn with_config(mut self, config: HashMap<String, serde_norway::Value>) -> Self {
        self.config = config;
        self
    }

    pub fn with_config_env(mut self, config_env: &str) -> Self {
        self.config_env = Some(config_env.to_string());
        self
    }

    fn write_adapter_config(&self, tmpdir: &TempDir) -> Result<PathBuf, AdapterError> {
        let filename = tmpdir.path().join("adapter.yaml");

        let yaml =
            serde_norway::to_string(&self.config).map_err(AdapterError::adapter_config_to_yaml)?;
        logger::adapter_temp_config(&filename, &yaml);
        std::fs::write(&filename, yaml.as_bytes()).map_err(AdapterError::AdapterConfigWrite)?;

        Ok(filename)
    }

    fn envs(&self) -> impl Iterator<Item = (&OsStr, &OsStr)> {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

    #[allow(clippy::too_many_arguments)]
    pub fn run(
        &self,
        trigger: &Request,
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
        max_run_time: Duration,
        child_info: Sender<ChildInfo>,
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

        let x = self.run_helper(
            trigger,
            run,
            db,
            run_notification,
            max_run_time,
            child_info,
            known_job_cobs,
        );

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

        if matches!(x, Err(AdapterError::FailedTimeout)) {
            run.set_timed_out();
        }

        x
    }

    #[allow(clippy::too_many_arguments)]
    fn run_helper(
        &self,
        trigger: &Request,
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
        max_run_time: Duration,
        child_pid: Sender<ChildInfo>,
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        let tmp = tempdir().map_err(AdapterError::AdapterConfigDir)?;

        // Start setting up the command to run the adapter sub-process.
        let mut cmd = Command::new(&self.bin);
        cmd.envs(self.envs());

        // Write adapter config file, if requested.
        if let Some(config_env) = &self.config_env {
            let filename = self.write_adapter_config(&tmp)?;
            cmd.env(config_env, filename);
        }

        // Spawn the adapter sub-process.
        let mut child = TimeoutCommand::new(max_run_time);
        let stdout = child.stdout();
        child.feed_stdin(trigger.to_string().as_bytes());
        let child = match child.spawn(cmd) {
            Ok(child) => child,
            Err(TimeoutError::Spawn(_, err)) => Err(AdapterError::spawn_adapter(&self.bin, err))?,
            Err(err) => Err(AdapterError::TimeoutCommand(err))?,
        };
        let child_info = ChildInfo::new(run.broker_run_id().clone(), child.id());
        child_pid.send(child_info).ok(); // FIXME

        run_notification.notify()?;

        let mut outcome = MaybeResult::default();

        if let Err(err) = self.read_stdout(run, db, run_notification, stdout, known_job_cobs) {
            outcome.set_error(err);
        }

        let result = child.wait();

        match result {
            Ok(finished) => {
                let stderr = finished.stderr();
                self.log_stderr(stderr);

                let exit = finished.exit_code();
                logger::adapter_result(exit);
                if !exit.success() {
                    if let Some(signal) = exit.signal() {
                        outcome.set_error(AdapterError::Signal(signal));
                    } else {
                        outcome.set_error(AdapterError::Failed(exit));
                    }
                }
            }
            Err(TimeoutError::TimedOut) => {
                logger::adapter_timed_out();
                outcome.set_error(AdapterError::FailedTimeout);
            }
            Err(err) => {
                logger::adapter_did_not_exit(&err);
                outcome.set_error(AdapterError::TimeoutWait(err));
            }
        }

        if let Some(err) = outcome.error() {
            run.set_result(RunResult::Failure);
            Err(err)
        } else {
            Ok(())
        }
    }

    fn read_stdout(
        &self,
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
        mut stdout: RealtimeLines,
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<(), AdapterError> {
        #[allow(clippy::unwrap_used)]
        let no_url = Url::parse("https://no.url.example.com").unwrap();

        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
            match resp {
                Response::Triggered { run_id, info_url } => {
                    run.set_state(RunState::Running);
                    run.set_adapter_run_id(run_id);

                    let url = if let Some(url) = info_url {
                        run.set_adapter_info_url(&url);
                        Url::parse(&url)
                    } else {
                        Ok(no_url.clone())
                    };

                    db.update_run(run).map_err(AdapterError::UpdateRun)?;

                    // Try to add this CI run tothe job COB. If that fails, the
                    // function logs it. We won't do anything about a failure, as
                    // there's nothing useful we can do about it, as long as we
                    // let CI run, which want to do.
                    let url = url.unwrap_or(no_url);

                    if let Ok(mut known) = known_job_cobs.lock() {
                        known.create_run(
                            run.repo_id(),
                            run.whence().oid(),
                            run.broker_run_id().clone(),
                            &url,
                            false,
                        );
                    }
                }
                _ => {
                    return Err(AdapterError::NotTriggered(resp));
                }
            }
        } else {
            logger::adapter_no_first_response();
            return Err(AdapterError::NoFirstMessage);
        }

        if let Some(line) = stdout.line() {
            logger::adapter_stdout_line(&line);
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
            match resp {
                Response::Finished { result } => {
                    run.set_result(result.clone());
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;

                    // Try to mark this CI run in job COB as finished.
                    // If that fails, the function logs it. We won't
                    // do anything about a failure, as there's nothing
                    // useful we can do about it.
                    let repo_id = run.repo_id();
                    let oid = run.whence().oid();
                    let run_id = run.broker_run_id().clone();
                    match &result {
                        RunResult::Success => succeeded(repo_id, oid, run_id),
                        RunResult::Failure => failed(repo_id, oid, run_id),
                    };
                }
                _ => {
                    return Err(AdapterError::NotFinished(resp));
                }
            }
        } else {
            logger::adapter_no_second_response();
            return Err(AdapterError::NoSecondMessage);
        }

        if let Some(line) = stdout.line() {
            logger::adapter_stdout_line(&line);
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            logger::adapter_too_many_responses();
            return Err(AdapterError::TooMany(resp));
        }

        Ok(())
    }

    fn log_stderr(&self, stderr: &[u8]) {
        for line in String::from_utf8_lossy(stderr).lines() {
            logger::adapter_stderr_line(line);
        }
    }
}

#[derive(Default)]
struct MaybeResult {
    error: Option<AdapterError>,
}

impl MaybeResult {
    fn set_error(&mut self, err: AdapterError) {
        self.error = Some(err);
    }

    fn error(self) -> Option<AdapterError> {
        self.error
    }
}

#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
    /// No default adapter.
    #[error("the default adapter is not defined in the configuration")]
    NoDefaultAdapter,

    /// Can't serialize [`Adapters`] to JSON.
    #[error("failed to serialize adapters as JSON")]
    AdaptersToJson(#[source] Box<dyn std::error::Error>),

    /// Error from [`TimeoutCommand`] or [`RunningProcess`].
    #[error(transparent)]
    TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),

    /// Error from spawning a sub-process.
    #[error("failed to spawn a CI adapter sub-process: {0}")]
    SpawnAdapter(PathBuf, #[source] std::io::Error),

    /// Error creating Response from a string.
    #[error("failed to create a Response message from adapter output")]
    ParseResponse(#[source] MessageError),

    /// Request writing failure.
    #[error("failed to write request to adapter stdin")]
    RequestWrite(#[source] MessageError),

    /// Error getting the file handle for the adapter's stdin.
    #[error("failed to get handle for adapter's stdin")]
    StdinHandle,

    /// Error getting the file handle for the adapter's stdout.
    #[error("failed to get handle for adapter's stdout")]
    StdoutHandle,

    /// Error getting the file handle for the adapter's stderr.
    #[error("failed to get handle for adapter's stderr")]
    StderrHandle,

    /// Error reading adapter's stderr.
    #[error("failed to read the adapter's stderr")]
    ReadStderr(#[source] std::io::Error),

    #[error("failed to read from adapter stdout")]
    ReadLine(#[source] std::io::Error),

    /// Waiting for child process failed.
    #[error("failed to wait for child process to exit")]
    Wait(#[source] std::io::Error),

    /// Child process failed.
    #[error("child process failed with wait status {0:?}")]
    Failed(ExitStatus),

    /// Child process was terminated due to taking too long.
    #[error("child process was terminated due to taking too long")]
    FailedTimeout,

    /// Child process was killed.
    #[error("child process terminated by signal {0}")]
    Signal(i32),

    /// Child process did not terminate.
    #[error("child process did not terminate")]
    TimeoutWait(#[source] crate::timeoutcmd::TimeoutError),

    /// First message is not `Response::Triggered`
    #[error("adapter's first message is not 'triggered', but {0:?}")]
    NotTriggered(Response),

    /// There was no first response from adapter.
    #[error("adapter did not sent its first message")]
    NoFirstMessage,

    /// There was no second response from adapter.
    #[error("adapter did not sent its second message")]
    NoSecondMessage,

    /// Second message is not `Response::Finished`
    #[error("adapter's second message is not 'finished', but {0:?}")]
    NotFinished(Response),

    /// Too many messages from adapter.
    #[error("adapter sent too many messages: first extra is {0:#?}")]
    TooMany(Response),

    /// Could no update run in database.
    #[error("failed to update CI run information in database")]
    UpdateRun(#[source] DbError),

    /// Could not send notification of changes to CI runs.
    #[error(transparent)]
    Notif(#[from] crate::notif::NotificationError),

    /// Can't create temp directory for adapter config.
    #[error("failed to create temporary directory for adapter configuration")]
    AdapterConfigDir(#[source] std::io::Error),

    /// Can't serialize adapter configuration to YAML.
    #[error("can't convert adapter configuration to YAML")]
    AdapterConfigToYaml(#[source] Box<dyn std::error::Error>),

    /// Can't write adapter config.
    #[error("failed to write adapter configuration")]
    AdapterConfigWrite(#[source] std::io::Error),

    /// Can't lock mutex.
    #[error("failed to lock mutex")]
    Mutex,
}

impl AdapterError {
    fn adapters_to_json(err: serde_json::Error) -> Self {
        Self::AdaptersToJson(Box::new(err))
    }

    fn spawn_adapter(path: &Path, err: std::io::Error) -> Self {
        Self::SpawnAdapter(path.into(), err)
    }

    fn adapter_config_to_yaml(err: serde_norway::Error) -> Self {
        Self::AdapterConfigToYaml(Box::new(err))
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use std::{
        fs::write, io::ErrorKind, path::PathBuf, str::FromStr, sync::mpsc::channel, time::Duration,
    };

    use tempfile::{NamedTempFile, TempDir, tempdir};

    use radicle::prelude::RepoId;

    use super::{Adapter, Db, Run};
    use crate::{
        adapter::AdapterError,
        cob::KnownJobCobs,
        ergo::Oid,
        msg::{MessageError, Response, RunId, RunResult},
        notif::NotificationChannel,
        run::{RunBuilder, Whence},
        test::{TestResult, mock_adapter, trigger_request},
    };

    const MAX: Duration = Duration::from_secs(10);

    fn db() -> Result<Db, Box<dyn std::error::Error>> {
        let tmp = NamedTempFile::new()?;
        let db = Db::new(tmp.path())?;
        Ok(db)
    }

    fn run() -> Result<Run, Box<dyn std::error::Error>> {
        Ok(RunBuilder::default()
            .broker_run_id(RunId::default())
            .repo_id(RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?)
            .repo_name("test.repo")
            .whence(Whence::branch(
                "main",
                Oid::from_str("ff3099ba5de28d954c41d0b5a84316f943794ea4")?,
                Some("J. Random Hacker <random@example.com>"),
            ))
            .timestamp("2024-02-29T12:58:12+02:00".into())
            .build())
    }

    #[allow(clippy::unwrap_used)]
    fn known() -> Arc<Mutex<KnownJobCobs>> {
        Arc::new(Mutex::new(KnownJobCobs::new().unwrap()))
    }

    #[allow(clippy::unwrap_used)]
    fn adapter(tmp: &TempDir, shell: &'static str) -> PathBuf {
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, shell).unwrap();
        bin
    }

    #[test]
    fn adapter_reports_success() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        )?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
    }

    #[test]
    fn adapter_reports_failure() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );

        match x {
            Ok(_) => (),
            Err(AdapterError::RequestWrite(_)) => (),
            _ => panic!("unexpected result: {x:#?}"),
        }

        Ok(())
    }

    #[test]
    fn adapter_exits_nonzero() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"failure"}'
echo woe be me 1>&2
exit 1
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));

        Ok(())
    }

    #[test]
    fn adapter_is_killed_before_any_messages() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
kill -9 $$
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }

    #[test]
    fn adapter_ends_ok_before_first_message() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

        Ok(())
    }

    #[test]
    fn adapter_is_killed_before_first_message() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
kill -9 $$
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }

    #[test]
    fn adapter_ends_ok_before_second_message() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

        Ok(())
    }

    #[test]
    fn adapter_is_killed_after_second_message() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
kill -9 $$
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("Adapter::run result: {x:#?}");
        if let Err(AdapterError::Failed(x)) = x {
            use std::os::unix::process::ExitStatusExt;
            eprintln!("Adapter::run result: signal={:?}", x.signal());
        }
        assert!(matches!(x, Err(AdapterError::Signal(9))));

        Ok(())
    }

    #[test]
    fn adapter_produces_as_bad_message() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success","bad":"field"}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
            _ => panic!("unexpected result: {x:#?}"),
        }

        Ok(())
    }

    #[test]
    fn adapter_first_message_isnt_triggered() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"finished","result":"success"}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
            Err(AdapterError::NotTriggered(Response::Finished {
                result: RunResult::Success
            }))
        ));

        Ok(())
    }

    #[test]
    fn adapter_outputs_too_many_messages() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
echo '{"response":"finished","result":"success"}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
            Err(AdapterError::TooMany(Response::Finished {
                result: RunResult::Success
            }))
        ));

        Ok(())
    }

    #[test]
    fn adapter_does_not_exist() -> TestResult<()> {
        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
                assert_eq!(e.kind(), ErrorKind::NotFound);
            }
            _ => panic!("expected a specific error"),
        }

        Ok(())
    }

    #[test]
    fn adapter_is_not_executable() -> TestResult<()> {
        const ADAPTER: &str = r#"#!/bin/sh
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#;

        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");
        write(&bin, ADAPTER)?;

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
                assert_eq!(e.kind(), ErrorKind::PermissionDenied);
            }
            _ => panic!("expected a specific error"),
        }

        Ok(())
    }

    #[test]
    fn adapter_has_bad_interpreter() -> TestResult<()> {
        // We test this with a shebang. However, the same kind of code
        // paths and errors should happen when a binary can't be
        // loaded due to missing dynamic linker or library or such.

        let tmp = tempdir()?;
        let bin = adapter(
            &tmp,
            r#"#!/bin/does-not-exist
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
"#,
        );

        let db = db()?;
        let mut run = run()?;
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
        let x = Adapter::new(&bin).run(
            &trigger_request()?,
            &mut run,
            &db,
            &sender,
            MAX,
            pid_tx,
            known(),
        );
        eprintln!("result from run: {x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
                assert_eq!(e.kind(), ErrorKind::NotFound);
            }
            _ => panic!("expected a specific error"),
        }

        Ok(())
    }
}