Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix: capture and log all stderr from adapter
Merged liw opened 1 year ago

Previously, we failed to capture all of stderr from the adapter if the adapter sent messages to its stdout could not be parsed, or otherwise meant the CI broker deduced the CI run had failed. Now we read all of stderr (module time limits) before returning an error result.

Signed-off-by: Lars Wirzenius liw@liw.fi

3 files changed +84 -27 7a754fe1 09db9b76
modified ci-broker.md
@@ -588,7 +588,7 @@ error output. This makes it easier to debug adapter problems.
_Who:_ `adapter-devs`, `node-ops`

~~~scenario
-
given a Radicle node, with CI configured with broker.yaml and adapter dummy.sh
+
given a Radicle node, with CI configured with broker.yaml and adapter broken-adapter.sh
given a Git repository xyzzy in the Radicle node
given the Radicle node emits a refsUpdated event for xyzzy
when I run ./env.sh synthetic-events synt.sock event.json --log log.txt
@@ -598,9 +598,22 @@ when I run ./env.sh cibtool --db ci-broker.db event list --json

given a directory reports
when I run ./env.sh cib --config broker.yaml queued
+
then stderr contains "Rivendell"
then stderr contains "Mordor"
~~~

+

+
This adapter outputs a broken response message, and after that
+
something to its stderr. The CI broker is meant to read and log both.
+

+
~~~{#broken-adapter.sh .file .sh}
+
#!/bin/bash
+
set -euo pipefail
+
cat > /dev/null
+
echo '{"response":"Rivendell"}}'
+
echo "This is an adapter error: Mordor" 1>&2
+
~~~
+

## Allows setting minimum log level

_What:_ The node admin should be able to set the minimum log level for
modified src/adapter.rs
@@ -15,6 +15,8 @@ use std::{
    time::Duration,
};

+
//use tracing::Instrument;
+

use crate::{
    db::{Db, DbError},
    logger,
@@ -22,7 +24,7 @@ use crate::{
    notif::NotificationSender,
    run::{Run, RunState},
    sensitive::Sensitive,
-
    timeoutcmd::{TimeoutCommand, TimeoutError},
+
    timeoutcmd::{LineReceiver, TimeoutCommand, TimeoutError},
};

const NOT_EXITED: i32 = 999;
@@ -102,7 +104,52 @@ impl Adapter {
        run_notification.notify()?;

        let stdout = child.stdout();
+
        let stderr = child.stderr();
+

+
        let mut outcome = MaybeResult::default();
+

+
        if let Err(err) = self.read_stdout(run, db, run_notification, stdout) {
+
            outcome.set_error(err);
+
        }
+

+
        self.read_stderr(stderr);
+

+
        if outcome.has_error() {
+
            child.kill().ok();
+
        }
+

+
        let wait_result = child.wait().expect("FIXME");
+
        logger::debug2(format!(
+
            "wait result? {wait_result:?} status.code: {:?}",
+
            wait_result.status().code()
+
        ));
+
        if wait_result.timed_out() {
+
            logger::adapter_did_not_exit_voluntarily();
+
            outcome.set_error(AdapterError::Failed(NOT_EXITED));
+
        } else if let Some(exit) = wait_result.status().code() {
+
            logger::adapter_result(exit);
+
            if exit != 0 {
+
                outcome.set_error(AdapterError::Failed(exit));
+
            }
+
        } else {
+
            logger::adapter_did_not_exit();
+
            outcome.set_error(AdapterError::Signal);
+
        }

+
        if let Some(err) = outcome.error() {
+
            Err(err)
+
        } else {
+
            Ok(())
+
        }
+
    }
+

+
    fn read_stdout(
+
        &self,
+
        run: &mut Run,
+
        db: &Db,
+
        run_notification: &NotificationSender,
+
        stdout: &LineReceiver,
+
    ) -> Result<(), AdapterError> {
        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
@@ -116,13 +163,11 @@ impl Adapter {
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
                _ => {
-
                    child.kill().ok();
                    return Err(AdapterError::NotTriggered(resp));
                }
            }
        } else {
            logger::adapter_no_first_response();
-
            child.kill().ok();
            return Err(AdapterError::NoFirstMessage);
        }

@@ -135,49 +180,48 @@ impl Adapter {
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
                _ => {
-
                    child.kill().ok();
                    return Err(AdapterError::NotFinished(resp));
                }
            }
        } else {
            logger::adapter_no_second_response();
-
            child.kill().ok();
            return Err(AdapterError::NoSecondMessage);
        }

        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            logger::adapter_too_many_responses();
-
            child.kill().ok();
            return Err(AdapterError::TooMany(resp));
        }

-
        let stderr = child.stderr();
+
        Ok(())
+
    }
+

+
    fn read_stderr(&self, stderr: &LineReceiver) {
        while let Some(line) = stderr.line() {
            logger::adapter_stderr_line(&line);
        }
+
    }
+
}

-
        let result = child.wait().expect("FIXME");
-

-
        logger::debug2(format!(
-
            "wait result? {result:?} status.code: {:?}",
-
            result.status().code()
-
        ));
+
#[derive(Default)]
+
struct MaybeResult {
+
    error: Option<AdapterError>,
+
}

-
        if result.timed_out() {
-
            logger::adapter_did_not_exit_voluntarily();
-
            return Err(AdapterError::Failed(NOT_EXITED));
-
        } else if let Some(exit) = result.status().code() {
-
            logger::adapter_result(exit);
-
            if exit != 0 {
-
                return Err(AdapterError::Failed(exit));
-
            }
-
        } else {
-
            logger::adapter_did_not_exit();
-
            return Err(AdapterError::Signal);
+
impl MaybeResult {
+
    fn set_error(&mut self, err: AdapterError) {
+
        if self.error.is_none() {
+
            self.error = Some(err);
        }
+
    }

-
        Ok(())
+
    fn has_error(&self) -> bool {
+
        self.error.is_some()
+
    }
+

+
    fn error(self) -> Option<AdapterError> {
+
        self.error
    }
}

modified src/timeoutcmd.rs
@@ -138,7 +138,7 @@ impl TimeoutCommand {
            spawn(move || NonBlockingReader::new("stdout", stdout, stdout_lines_tx).read_to_end());
        let stdout_lines = LineReceiver::new("stdout", stdout_lines_rx, stdout_termination_rx);

-
        // Set up thread to capture child stdout.
+
        // Set up thread to capture child stderr.
        let stderr = child.stderr.take().ok_or(TimeoutError::TakeStderr)?;
        let (stderr_termination_tx, stderr_termination_rx) = sync_channel(1);
        let (stderr_lines_tx, stderr_lines_rx) = sync_channel(MAX_OUTPUT_BYTES);