Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: update status page in a background thread
Lars Wirzenius committed 2 years ago
commit 5d3aa44a92be306481612d8c8f10c85e9c9feefa
parent 2dafe88a84dc031485a71b405f3cda43fa8e00fa
6 files changed +145 -92
modified src/adapter.rs
@@ -19,6 +19,7 @@ use std::{
use crate::{
    msg::{MessageError, Request, Response},
    run::{Run, RunState},
+
    status::Status,
};

/// An external executable that runs CI on request.
@@ -47,14 +48,26 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

-
    pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
-
        run.set_state(RunState::Running);
-
        let x = self.run_helper(trigger, run);
+
    pub fn run(
+
        &self,
+
        trigger: &Request,
+
        run: &mut Run,
+
        status: &mut Status,
+
    ) -> Result<(), AdapterError> {
+
        run.set_state(RunState::Triggered);
+
        status.ci_run(run);
+
        let x = self.run_helper(trigger, run, status);
        run.set_state(RunState::Finished);
+
        status.ci_run(run);
        x
    }

-
    fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
    fn run_helper(
+
        &self,
+
        trigger: &Request,
+
        run: &mut Run,
+
        status: &mut Status,
+
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
@@ -84,7 +97,9 @@ impl Adapter {
            let resp = Response::from_str(&line)?;
            match resp {
                Response::Triggered { run_id } => {
+
                    run.set_state(RunState::Running);
                    run.set_adapter_run_id(run_id);
+
                    status.ci_run(run);
                }
                _ => return Err(AdapterError::NotTriggered(resp)),
            }
@@ -96,6 +111,7 @@ impl Adapter {
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
+
                    status.ci_run(run);
                }
                _ => return Err(AdapterError::NotTriggered(resp)),
            }
@@ -157,7 +173,7 @@ pub enum AdapterError {

#[cfg(test)]
mod test {
-
    use std::{fs::write, io::ErrorKind};
+
    use std::{fs::write, io::ErrorKind, path::Path};

    use tempfile::tempdir;

@@ -165,6 +181,7 @@ mod test {
    use crate::{
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
+
        status::Status,
        test::{mock_adapter, trigger_request, TestResult},
    };

@@ -180,7 +197,8 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -198,7 +216,8 @@ echo '{"response":"finished","result":"failure"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
        assert_eq!(run.result(), Some(&RunResult::Failure));

        Ok(())
@@ -216,7 +235,8 @@ echo '{"response":"finished","result":{"error":"error message\nsecond line"}}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
        assert_eq!(
            run.result(),
            Some(&RunResult::Error("error message\nsecond line".into()))
@@ -236,7 +256,8 @@ kill -9 $BASHPID
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        assert!(matches!(x, Err(AdapterError::Failed(_))));

        Ok(())
@@ -254,7 +275,8 @@ kill -9 $BASHPID
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        assert!(matches!(x, Err(AdapterError::Failed(_))));

        Ok(())
@@ -273,7 +295,8 @@ kill -9 $BASHPID
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        assert!(matches!(x, Err(AdapterError::Failed(_))));

        Ok(())
@@ -291,7 +314,8 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        assert!(matches!(
            x,
            Err(AdapterError::Message(MessageError::DeserializeResponse(_)))
@@ -311,7 +335,8 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        assert!(matches!(
            x,
            Err(AdapterError::NotTriggered(Response::Finished {
@@ -335,7 +360,8 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        assert!(matches!(
            x,
            Err(AdapterError::TooMany(Response::Finished {
@@ -352,7 +378,8 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
@@ -376,7 +403,8 @@ echo '{"response":"finished","result":"success"}'
        write(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
@@ -404,7 +432,8 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = Run::default();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
                assert_eq!(bin, filename);
modified src/bin/ci-broker.rs
@@ -1,22 +1,20 @@
use std::{
    error::Error,
    path::{Path, PathBuf},
+
    thread::{sleep, spawn},
+
    time::Duration,
};

use log::{debug, info};

use radicle::prelude::Profile;
use radicle_ci_broker::{
-
    adapter::Adapter,
-
    broker::Broker,
-
    config::Config,
-
    error::BrokerError,
-
    event::{BrokerEvent, NodeEventSource},
-
    msg::Request,
-
    run::Run,
-
    status::StatusBuilder,
+
    adapter::Adapter, broker::Broker, config::Config, error::BrokerError, event::NodeEventSource,
+
    msg::Request, status::Status,
};

+
const STATUS_UPDATE_DELAY: Duration = Duration::from_millis(1000);
+

fn main() {
    if let Err(e) = fallible_main() {
        eprintln!("ERROR: {}", e);
@@ -76,41 +74,29 @@ fn fallible_main() -> Result<(), BrokerError> {
    }
    debug!("added filters to node event source");

+
    // Spawn a thread that updates the status page.
+
    let mut status = Status::new(config.status_page().unwrap_or(Path::new("/dev/null")));
+
    let s2 = status.clone();
+
    let _status_thread = spawn(move || status_updater(s2));
+

    // This loop ends when there's an error, e.g., failure to read an
    // event from the node.
-
    let mut counter = 0;
-
    let mut status = StatusBuilder::new(config.status_page().unwrap_or(Path::new("/dev/null")));
    loop {
-
        status.write()?;
        debug!("waiting for event from node");
        for e in source.event()? {
            status.broker_event(&e);
-
            status.write()?;
-

-
            counter += 1;
            debug!("broker event {e:#?}");
-
            let BrokerEvent::RefChanged {
-
                rid,
-
                oid,
-
                name: _,
-
                old: _,
-
            } = e;
-

            let req = Request::trigger(&profile, &e)?;
-
            let mut run = Run::default();
-
            if let Some(adapter) = broker.adapter(&rid) {
-
                adapter.run(&req, &mut run)?;
-
                status.ci_run(run.adapter_run_id().unwrap(), run.result().unwrap());
-
                println!(
-
                    "Run CI run #{}: {}, {} -> {}",
-
                    counter,
-
                    rid,
-
                    oid,
-
                    run.result().unwrap()
-
                );
-
            } else {
-
                eprintln!("ERROR: no adapter available for repository {rid}: not running CI");
-
            }
+
            broker.execute_ci(&req, &mut status)?;
+
        }
+
    }
+
}
+

+
fn status_updater(mut status: Status) {
+
    loop {
+
        if let Err(e) = status.write() {
+
            eprintln!("ERROR: failed to update status page: {e}");
        }
+
        sleep(STATUS_UPDATE_DELAY);
    }
}
modified src/bin/status.rs
@@ -14,6 +14,6 @@ fn main() {
}

fn fallible_main() -> Result<(), StatusError> {
-
    StatusBuilder::new(Path::new("status.json")).write()?;
+
    Status::new(Path::new("status.json")).write()?;
    Ok(())
}
modified src/broker.rs
@@ -7,7 +7,7 @@ use std::collections::HashMap;

use radicle::prelude::RepoId;

-
use crate::{adapter::Adapter, error::BrokerError, msg::Request, run::Run};
+
use crate::{adapter::Adapter, error::BrokerError, msg::Request, run::Run, status::Status};

/// A CI broker.
///
@@ -38,7 +38,7 @@ impl Broker {
    }

    #[allow(clippy::result_large_err)]
-
    pub fn execute_ci(&self, trigger: &Request) -> Result<Run, BrokerError> {
+
    pub fn execute_ci(&self, trigger: &Request, status: &mut Status) -> Result<Run, BrokerError> {
        let adapter = match trigger {
            Request::Trigger {
                common,
@@ -55,7 +55,7 @@ impl Broker {
        };

        let mut run = Run::default();
-
        adapter.run(trigger, &mut run)?;
+
        adapter.run(trigger, &mut run, status)?;

        Ok(run)
    }
@@ -63,12 +63,15 @@ impl Broker {

#[cfg(test)]
mod test {
+
    use std::path::Path;
+

    use tempfile::tempdir;

    use super::{Adapter, Broker, RepoId};
    use crate::{
        msg::{RunId, RunResult},
        run::RunState,
+
        status::Status,
        test::{mock_adapter, trigger_request, TestResult},
    };

@@ -154,7 +157,8 @@ echo '{"response":"finished","result":"success"}'

        let trigger = trigger_request()?;

-
        let x = broker.execute_ci(&trigger);
+
        let mut status = Status::new(Path::new("/dev/null"));
+
        let x = broker.execute_ci(&trigger, &mut status);
        assert!(x.is_ok());
        let run = x.unwrap();
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/run.rs
@@ -63,7 +63,6 @@ impl Run {

/// State of CI run.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
-
#[serde(untagged)]
#[serde(rename_all = "snake_case")]
pub enum RunState {
    /// Run has been triggered, but has not yet been started.
@@ -78,6 +77,22 @@ pub enum RunState {

impl fmt::Display for RunState {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
-
        write!(f, "{}", serde_json::to_string(self).unwrap())
+
        let s = match self {
+
            Self::Finished => "finished",
+
            Self::Running => "running",
+
            Self::Triggered => "triggered",
+
        };
+
        write!(f, "{}", s)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+

+
    #[test]
+
    fn serialize_run_state() {
+
        let s = serde_json::to_string(&RunState::Finished).unwrap();
+
        assert_eq!(s, r#""finished""#);
    }
}
modified src/status.rs
@@ -1,25 +1,38 @@
-
use std::path::{Path, PathBuf};
+
use std::{
+
    path::{Path, PathBuf},
+
    sync::{Arc, Mutex, MutexGuard},
+
};

use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

-
use crate::{
-
    event::BrokerEvent,
-
    msg::{RunId, RunResult},
-
};
+
use crate::{event::BrokerEvent, run::Run};

-
#[derive(Serialize)]
-
pub struct Status {
+
#[derive(Debug, Serialize)]
+
struct StatusData {
    timestamp: String,
+
    broker_event_counter: usize,
    ci_broker_version: &'static str,
    ci_broker_git_commit: &'static str,
    latest_broker_event: Option<BrokerEvent>,
-
    latest_ci_run: Option<RunId>,
-
    latest_ci_run_result: Option<RunResult>,
+
    latest_ci_run: Option<Run>,
}

-
impl Status {
-
    pub fn write(&self, filename: &Path) -> Result<(), StatusError> {
+
impl Default for StatusData {
+
    fn default() -> Self {
+
        Self {
+
            timestamp: "".into(),
+
            broker_event_counter: 0,
+
            ci_broker_version: env!("CARGO_PKG_VERSION"),
+
            ci_broker_git_commit: env!("GIT_HEAD"),
+
            latest_broker_event: None,
+
            latest_ci_run: None,
+
        }
+
    }
+
}
+

+
impl StatusData {
+
    fn write(&self, filename: &Path) -> Result<(), StatusError> {
        let s = serde_json::to_string_pretty(&self).map_err(StatusError::serialize)?;
        std::fs::write(filename, s.as_bytes())
            .map_err(|e| StatusError::status_write(filename, e))?;
@@ -27,42 +40,39 @@ impl Status {
    }
}

-
pub struct StatusBuilder {
+
pub struct Status {
    filename: PathBuf,
-
    latest_broker_event: Option<BrokerEvent>,
-
    latest_ci_run: Option<RunId>,
-
    latest_ci_run_result: Option<RunResult>,
+
    status: Arc<Mutex<StatusData>>,
}

-
impl StatusBuilder {
+
impl Status {
    pub fn new(filename: &Path) -> Self {
        Self {
            filename: filename.into(),
-
            latest_ci_run: None,
-
            latest_ci_run_result: None,
-
            latest_broker_event: None,
+
            status: Arc::new(Mutex::new(StatusData::default())),
        }
    }

+
    fn lock(&mut self) -> MutexGuard<StatusData> {
+
        self.status.lock().expect("lock StatusGuard::status")
+
    }
+

    pub fn broker_event(&mut self, event: &BrokerEvent) {
-
        self.latest_broker_event = Some(event.clone());
+
        let mut status = self.lock();
+
        status.latest_broker_event = Some(event.clone());
+
        status.broker_event_counter += 1;
    }

-
    pub fn ci_run(&mut self, run_id: &RunId, result: &RunResult) {
-
        self.latest_ci_run = Some(run_id.clone());
-
        self.latest_ci_run_result = Some(result.clone());
+
    pub fn ci_run(&mut self, run: &Run) {
+
        let mut status = self.lock();
+
        status.latest_ci_run = Some(run.clone());
    }

-
    pub fn write(&self) -> Result<(), StatusError> {
-
        let status = Status {
-
            timestamp: Self::now()?,
-
            ci_broker_version: env!("CARGO_PKG_VERSION"),
-
            ci_broker_git_commit: env!("GIT_HEAD"),
-
            latest_broker_event: self.latest_broker_event.clone(),
-
            latest_ci_run: self.latest_ci_run.clone(),
-
            latest_ci_run_result: self.latest_ci_run_result.clone(),
-
        };
-
        status.write(&self.filename)?;
+
    pub fn write(&mut self) -> Result<(), StatusError> {
+
        let filename = self.filename.clone();
+
        let mut status = self.lock();
+
        status.timestamp = Self::now()?;
+
        status.write(&filename)?;
        Ok(())
    }

@@ -74,6 +84,15 @@ impl StatusBuilder {
    }
}

+
impl Clone for Status {
+
    fn clone(&self) -> Self {
+
        Self {
+
            filename: self.filename.clone(),
+
            status: Arc::clone(&self.status),
+
        }
+
    }
+
}
+

#[derive(Debug, thiserror::Error)]
pub enum StatusError {
    #[error("failed to format current time stamp")]