Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: whenever run info changes, update it in database
Lars Wirzenius committed 1 year ago
commit d5159a7400703ea991407e5b0c17b9313a3ee979
parent 9a4a39f73b9a6d05913b3f5a98158ac26f91a3c2
2 files changed +53 -28
modified src/adapter.rs
@@ -18,6 +18,7 @@ use std::{
use log::{debug, error};

use crate::{
+
    db::{Db, DbError},
    msg::{MessageError, Request, Response},
    run::{Run, RunState},
};
@@ -50,15 +51,21 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

-
    pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
    pub fn run(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
        debug!("running adapter");
+

        run.set_state(RunState::Triggered);
-
        let x = self.run_helper(trigger, run);
+
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
+

+
        let x = self.run_helper(trigger, run, db);
+

        run.set_state(RunState::Finished);
+
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
+

        x
    }

-
    fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+
    fn run_helper(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
@@ -99,6 +106,7 @@ impl Adapter {
                    if let Some(url) = info_url {
                        run.set_adapter_info_url(&url);
                    }
+
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
                _ => return Err(AdapterError::NotTriggered(resp)),
            }
@@ -113,6 +121,7 @@ impl Adapter {
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
+
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
                }
                _ => return Err(AdapterError::NotFinished(resp)),
            }
@@ -204,18 +213,22 @@ pub enum AdapterError {
    /// Too many messages from adapter.
    #[error("adapter sent too many messages: first extra is {0:#?}")]
    TooMany(Response),
+

+
    /// Could no update run in database.
+
    #[error("failed to update CI run information in database")]
+
    UpdateRun(#[source] DbError),
}

#[cfg(test)]
mod test {
    use std::{fs::write, io::ErrorKind};

-
    use tempfile::tempdir;
+
    use tempfile::{tempdir, NamedTempFile};

    use radicle::git::Oid;
    use radicle::prelude::RepoId;

-
    use super::{Adapter, Run};
+
    use super::{Adapter, Db, Run};
    use crate::{
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
@@ -223,6 +236,12 @@ mod test {
        test::{log_in_tests, mock_adapter, trigger_request, TestResult},
    };

+
    fn db() -> anyhow::Result<Db> {
+
        let tmp = NamedTempFile::new()?;
+
        let db = Db::new(tmp.path())?;
+
        Ok(db)
+
    }
+

    fn run() -> anyhow::Result<Run> {
        Ok(Run::new(
            RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?,
@@ -250,8 +269,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -271,8 +291,9 @@ echo '{"response":"finished","result":"failure"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);

        match x {
            Ok(_) => (),
@@ -299,8 +320,9 @@ exit 1
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -320,8 +342,9 @@ kill -9 $BASHPID
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -345,8 +368,9 @@ kill -9 $BASHPID
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -368,8 +392,9 @@ kill -9 $BASHPID
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -390,8 +415,9 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -414,8 +440,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -442,8 +469,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -462,8 +490,9 @@ echo '{"response":"finished","result":"success"}'
        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -490,8 +519,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        write(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -522,8 +552,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, ADAPTER)?;

+
        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/broker.rs
@@ -17,7 +17,7 @@ use radicle::prelude::RepoId;
use crate::{
    adapter::Adapter,
    db::{Db, DbError},
-
    msg::{PatchEvent, PushEvent, Request, RunId},
+
    msg::{PatchEvent, PushEvent, Request},
    run::{Run, Whence},
};

@@ -68,7 +68,7 @@ impl Broker {
    pub fn execute_ci(&mut self, trigger: &Request) -> Result<Run, BrokerError> {
        info!("Start CI run");
        debug!("Start CI run on {trigger:#?}");
-
        let mut run = match trigger {
+
        let run = match trigger {
            Request::Trigger {
                common,
                push,
@@ -99,12 +99,13 @@ impl Broker {
                    };

                    let mut run = Run::new(*rid, &common.repository.name, whence, now()?);
+
                    self.db.push_run(&run)?;

                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
                    debug!("broker runs adapter");
-
                    if let Err(e) = adapter.run(trigger, &mut run) {
+
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db) {
                        error!("failed to run adapter or it failed to run CI: {e}");
                        let mut e = e.source();
                        while let Some(source) = e {
@@ -122,14 +123,7 @@ impl Broker {
        };
        info!("Finish CI run: {run:?}");

-
        // If the adapter never gave us a run ID, it has not been set.
-
        // In that case, we invent one so that it can be used by the
-
        // database as a key.
-
        if run.adapter_run_id().is_none() {
-
            run.set_adapter_run_id(RunId::default());
-
        }
-

-
        self.db.push_run(&run)?;
+
        self.db.update_run(&run)?;

        Ok(run)
    }