Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: notify page generation about changes to runs more often
Lars Wirzenius committed 1 year ago
commit 0f90e97a3ebec4ea7011a36167291de7f7226651
parent 10ee855966a11b04110349511d83cbf8aa0acd76
3 files changed +77 -21
modified src/adapter.rs
@@ -19,6 +19,7 @@ use crate::{
    db::{Db, DbError},
    logger,
    msg::{MessageError, Request, Response},
+
    notif::NotificationSender,
    run::{Run, RunState},
};

@@ -50,11 +51,17 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

-
    pub fn run(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
+
    pub fn run(
+
        &self,
+
        trigger: &Request,
+
        run: &mut Run,
+
        db: &Db,
+
        run_notification: &NotificationSender,
+
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

-
        let x = self.run_helper(trigger, run, db);
+
        let x = self.run_helper(trigger, run, db, run_notification);

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
@@ -62,7 +69,13 @@ impl Adapter {
        x
    }

-
    fn run_helper(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
+
    fn run_helper(
+
        &self,
+
        trigger: &Request,
+
        run: &mut Run,
+
        db: &Db,
+
        run_notification: &NotificationSender,
+
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
@@ -74,6 +87,8 @@ impl Adapter {
            .spawn()
            .map_err(|e| AdapterError::SpawnAdapter(self.bin.clone(), e))?;

+
        run_notification.notify()?;
+

        // Write the request to trigger a run to the child's stdin.
        // Then close the pipe to prevent the child from trying to
        // read another message that will never be sent.
@@ -93,6 +108,7 @@ impl Adapter {
        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
+
            run_notification.notify()?;
            match resp {
                Response::Triggered { run_id, info_url } => {
                    run.set_state(RunState::Running);
@@ -111,6 +127,7 @@ impl Adapter {
        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
+
            run_notification.notify()?;
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
@@ -208,6 +225,10 @@ pub enum AdapterError {
    /// Could no update run in database.
    #[error("failed to update CI run information in database")]
    UpdateRun(#[source] DbError),
+

+
    /// Could not send notification of changes to CI runs.
+
    #[error(transparent)]
+
    Notif(#[from] crate::notif::NotificationError),
}

#[cfg(test)]
@@ -223,6 +244,7 @@ mod test {
    use crate::{
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
+
        notif::NotificationChannel,
        run::Whence,
        test::{mock_adapter, trigger_request, TestResult},
    };
@@ -260,7 +282,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db)?;
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -280,7 +304,9 @@ echo '{"response":"finished","result":"failure"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);

        match x {
            Ok(_) => (),
@@ -307,7 +333,9 @@ exit 1

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -327,7 +355,9 @@ kill -9 $BASHPID

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -351,7 +381,9 @@ kill -9 $BASHPID

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -373,7 +405,9 @@ kill -9 $BASHPID

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -394,7 +428,9 @@ echo '{"response":"finished","result":"success","bad":"field"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -417,7 +453,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -444,7 +482,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -463,7 +503,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -490,7 +532,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -521,7 +565,9 @@ echo '{"response":"finished","result":"success"}'

        let db = db()?;
        let mut run = run()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/broker.rs
@@ -17,6 +17,7 @@ use crate::{
    db::{Db, DbError},
    logger,
    msg::{PatchEvent, PushEvent, Request},
+
    notif::NotificationSender,
    run::{Run, Whence},
};

@@ -64,7 +65,11 @@ impl Broker {
    }

    #[allow(clippy::result_large_err)]
-
    pub fn execute_ci(&mut self, trigger: &Request) -> Result<Run, BrokerError> {
+
    pub fn execute_ci(
+
        &mut self,
+
        trigger: &Request,
+
        run_notification: &NotificationSender,
+
    ) -> Result<Run, BrokerError> {
        logger::broker_start_run(trigger);
        let run = match trigger {
            Request::Trigger {
@@ -102,7 +107,7 @@ impl Broker {
                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
-
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db) {
+
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db, run_notification) {
                        logger::error("failed to run adapter or it failed to run CI", &e);
                    }

@@ -173,6 +178,7 @@ mod test {
    use super::{Adapter, Broker, RepoId};
    use crate::{
        msg::{RunId, RunResult},
+
        notif::NotificationChannel,
        run::RunState,
        test::{mock_adapter, trigger_request, TestResult},
    };
@@ -283,7 +289,9 @@ echo '{"response":"finished","result":"success"}'

        let trigger = trigger_request()?;

-
        let x = broker.execute_ci(&trigger);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = broker.execute_ci(&trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -314,7 +322,9 @@ exit 1

        let trigger = trigger_request()?;

-
        let x = broker.execute_ci(&trigger);
+
        let mut channel = NotificationChannel::new_run();
+
        let sender = channel.tx()?;
+
        let x = broker.execute_ci(&trigger, &sender);
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/queueproc.rs
@@ -134,7 +134,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger)
+
                    .execute_ci(&trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
@@ -152,7 +152,7 @@ impl QueueProcessor {
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
-
                    .execute_ci(&trigger)
+
                    .execute_ci(&trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }