Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: cache job COB ID lookup
Lars Wirzenius committed 4 months ago
commit e541019e3a496e18a7068e1a136e7610684837f5
parent 21db3db
6 files changed +274 -66
modified src/adapter.rs
@@ -13,7 +13,7 @@ use std::{
    os::unix::process::ExitStatusExt,
    path::{Path, PathBuf},
    process::{Command, ExitStatus},
-
    sync::mpsc::Sender,
+
    sync::{Arc, Mutex, mpsc::Sender},
    time::Duration,
};

@@ -22,7 +22,7 @@ use tempfile::{TempDir, tempdir};
use url::Url;

use crate::{
-
    cob::{create_run, failed, succeeded},
+
    cob::{KnownJobCobs, failed, succeeded},
    config::AdapterSpec,
    db::{Db, DbError},
    logger,
@@ -136,6 +136,7 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

+
    #[allow(clippy::too_many_arguments)]
    pub fn run(
        &self,
        trigger: &Request,
@@ -144,11 +145,20 @@ impl Adapter {
        run_notification: &NotificationSender,
        max_run_time: Duration,
        child_info: Sender<ChildInfo>,
+
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

-
        let x = self.run_helper(trigger, run, db, run_notification, max_run_time, child_info);
+
        let x = self.run_helper(
+
            trigger,
+
            run,
+
            db,
+
            run_notification,
+
            max_run_time,
+
            child_info,
+
            known_job_cobs,
+
        );

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
@@ -160,6 +170,7 @@ impl Adapter {
        x
    }

+
    #[allow(clippy::too_many_arguments)]
    fn run_helper(
        &self,
        trigger: &Request,
@@ -168,6 +179,7 @@ impl Adapter {
        run_notification: &NotificationSender,
        max_run_time: Duration,
        child_pid: Sender<ChildInfo>,
+
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

@@ -201,7 +213,7 @@ impl Adapter {

        let mut outcome = MaybeResult::default();

-
        if let Err(err) = self.read_stdout(run, db, run_notification, stdout) {
+
        if let Err(err) = self.read_stdout(run, db, run_notification, stdout, known_job_cobs) {
            outcome.set_error(err);
        }

@@ -246,6 +258,7 @@ impl Adapter {
        db: &Db,
        run_notification: &NotificationSender,
        mut stdout: RealtimeLines,
+
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<(), AdapterError> {
        #[allow(clippy::unwrap_used)]
        let no_url = Url::parse("https://no.url.example.com").unwrap();
@@ -272,12 +285,15 @@ impl Adapter {
                    // there's nothing useful we can do about it, as long as we
                    // let CI run, which want to do.
                    let url = url.unwrap_or(no_url);
-
                    create_run(
-
                        run.repo_id(),
-
                        run.whence().oid(),
-
                        run.broker_run_id().clone(),
-
                        &url,
-
                    );
+

+
                    if let Ok(mut known) = known_job_cobs.lock() {
+
                        known.create_run(
+
                            run.repo_id(),
+
                            run.whence().oid(),
+
                            run.broker_run_id().clone(),
+
                            &url,
+
                        );
+
                    }
                }
                _ => {
                    return Err(AdapterError::NotTriggered(resp));
@@ -454,10 +470,15 @@ pub enum AdapterError {
    /// Can't write adapter config.
    #[error("failed to write adapter configuration")]
    AdapterConfigWrite(#[source] std::io::Error),
+

+
    /// Can't lock mutex.
+
    #[error("failed to lock mutex")]
+
    Mutex,
}

#[cfg(test)]
mod test {
+
    use super::*;
    use std::{fs::write, io::ErrorKind, path::PathBuf, sync::mpsc::channel, time::Duration};

    use tempfile::{NamedTempFile, TempDir, tempdir};
@@ -468,6 +489,7 @@ mod test {
    use super::{Adapter, Db, Run};
    use crate::{
        adapter::AdapterError,
+
        cob::KnownJobCobs,
        msg::{MessageError, Response, RunId, RunResult},
        notif::NotificationChannel,
        run::{RunBuilder, Whence},
@@ -497,6 +519,11 @@ mod test {
    }

    #[allow(clippy::unwrap_used)]
+
    fn known() -> Arc<Mutex<KnownJobCobs>> {
+
        Arc::new(Mutex::new(KnownJobCobs::new().unwrap()))
+
    }
+

+
    #[allow(clippy::unwrap_used)]
    fn adapter(tmp: &TempDir, shell: &'static str) -> PathBuf {
        let bin = tmp.path().join("adapter.sh");
        mock_adapter(&bin, shell).unwrap();
@@ -520,7 +547,15 @@ echo '{"response":"finished","result":"success"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx)?;
+
        Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        )?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -543,7 +578,15 @@ echo '{"response":"finished","result":"failure"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );

        match x {
            Ok(_) => (),
@@ -573,7 +616,15 @@ exit 1
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -596,7 +647,15 @@ kill -9 $$
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal(9))));

@@ -617,7 +676,15 @@ kill -9 $$
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

@@ -641,7 +708,15 @@ kill -9 $$
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal(9))));

@@ -664,7 +739,15 @@ echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

@@ -689,7 +772,15 @@ kill -9 $$
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("Adapter::run result: {x:#?}");
        if let Err(AdapterError::Failed(x)) = x {
            use std::os::unix::process::ExitStatusExt;
@@ -717,7 +808,15 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -743,7 +842,15 @@ echo '{"response":"finished","result":"success"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -773,7 +880,15 @@ echo '{"response":"finished","result":"success"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -795,7 +910,15 @@ echo '{"response":"finished","result":"success"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -825,7 +948,15 @@ echo '{"response":"finished","result":"success"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -859,7 +990,15 @@ echo '{"response":"finished","result":"success"}'
        let (pid_tx, _) = channel();
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX, pid_tx);
+
        let x = Adapter::new(&bin).run(
+
            &trigger_request()?,
+
            &mut run,
+
            &db,
+
            &sender,
+
            MAX,
+
            pid_tx,
+
            known(),
+
        );
        eprintln!("result from run: {x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/bin/cibtool.rs
@@ -250,6 +250,9 @@ enum RunSubCmd {

#[derive(Debug, thiserror::Error)]
enum CibToolError {
+
    #[error("failed to create cache of job COBs")]
+
    KnownJobCobs(#[source] radicle_ci_broker::cob::JobError),
+

    #[error("failed to look up node profile")]
    Profile(#[source] radicle::profile::Error),

modified src/bin/cibtoolcmd/cob.rs
@@ -1,3 +1,5 @@
+
use std::sync::{Arc, Mutex};
+

use url::Url;

use radicle::{git::Oid, identity::RepoId};
@@ -22,9 +24,13 @@ impl Leaf for CobCmd {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
        let url = Url::parse("https://liw.fi").unwrap();
        let run_id = RunId::default();
+
        let known = Arc::new(Mutex::new(
+
            KnownJobCobs::new().map_err(CibToolError::KnownJobCobs)?,
+
        ));
        for i in 0..self.n {
            println!("{i}");
-
            create_run(self.repo_id, self.oid, run_id.clone(), &url);
+
            let mut known = known.lock().unwrap();
+
            known.create_run(self.repo_id, self.oid, run_id.clone(), &url);
        }
        Ok(())
    }
modified src/broker.rs
@@ -5,7 +5,7 @@

use std::{
    path::{Path, PathBuf},
-
    sync::mpsc::Sender,
+
    sync::{Arc, Mutex, mpsc::Sender},
    time::Duration,
};

@@ -16,7 +16,7 @@ use radicle::prelude::RepoId;

use crate::{
    adapter::Adapter,
-
    cob::create_job,
+
    cob::KnownJobCobs,
    db::{Db, DbError},
    logger,
    msg::{PatchEvent, PushEvent, Request, RunId},
@@ -60,6 +60,7 @@ impl Broker {
        trigger: &Request,
        run_notification: &NotificationSender,
        child_info: Sender<ChildInfo>,
+
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<Run, BrokerError> {
        let broker_run_id = RunId::default();
        let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
@@ -70,6 +71,7 @@ impl Broker {
                trigger,
                run_notification,
                child_info,
+
                known_job_cobs,
            )
        })?;
        Ok(run)
@@ -82,6 +84,7 @@ impl Broker {
        trigger: &Request,
        run_notification: &NotificationSender,
        child_info: Sender<ChildInfo>,
+
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<Run, BrokerError> {
        logger::broker_start_run(trigger, &broker_run_id);
        let (common, whence, oid) = match &trigger {
@@ -136,8 +139,10 @@ impl Broker {
        // it. We won't do anything about a failure, as there's
        // nothing useful we can do about it, as long as we let CI
        // run, which want to do.
-
        if let Some(job_id) = create_job(trigger.repo(), oid) {
-
            run.set_job_id(job_id);
+
        if let Ok(mut known) = known_job_cobs.lock() {
+
            if let Some(job_id) = known.create_job(trigger.repo(), oid) {
+
                run.set_job_id(job_id);
+
            }
        }

        // We run the adapter, but if that fails, we just
@@ -150,6 +155,7 @@ impl Broker {
            run_notification,
            self.max_run_time,
            child_info,
+
            known_job_cobs,
        ) {
            logger::error("failed to run adapter or it failed to run CI", &e);
        }
@@ -209,6 +215,7 @@ pub enum BrokerError {

#[cfg(test)]
mod test {
+
    use super::*;
    use std::{path::Path, sync::mpsc::channel, time::Duration};
    use tempfile::tempdir;

@@ -224,6 +231,11 @@ mod test {
        Ok(Broker::new(filename, Duration::from_secs(1))?)
    }

+
    #[allow(clippy::unwrap_used)]
+
    fn known() -> Arc<Mutex<KnownJobCobs>> {
+
        Arc::new(Mutex::new(KnownJobCobs::new().unwrap()))
+
    }
+

    #[test]
    fn executes_adapter() -> TestResult<()> {
        const ADAPTER: &str = r#"#!/bin/sh
@@ -246,7 +258,7 @@ echo '{"response":"finished","result":"success"}'

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx, known());
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -280,7 +292,7 @@ exit 1

        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx);
+
        let x = broker.execute_ci(&adapter, &trigger, &sender, pid_tx, known());
        assert!(x.is_ok());
        let run = x?;
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/cob.rs
@@ -5,7 +5,11 @@
//! the creation of a "job COB" for every CI run, and updating it when
//! the run ends so that the COB captures the result of the run.

-
use std::{collections::BTreeSet, str::FromStr, time::Duration};
+
use std::{
+
    collections::{BTreeSet, HashMap},
+
    str::FromStr,
+
    time::Duration,
+
};

use url::Url;
use uuid::Uuid;
@@ -23,22 +27,46 @@ use radicle_job::*;

use crate::{logger, msg::RunId};

-
/// Create a new job COB, for a given Git object in a given
-
/// repository. The new job can be found later by using the same oid,
-
/// it is not necessary to keep the job id (indeed, that is not
-
/// returned). Separate instances of the CI broker have no way of
-
/// communicating the job id.
-
pub fn create_job(repo_id: RepoId, oid: Oid) -> Option<JobId> {
-
    fn fallible_create(repo_id: RepoId, oid: Oid) -> Result<JobId, JobError> {
-
        let profile = profile()?;
-
        let repo = repository(&profile, repo_id)?;
-
        let signer = profile.signer().map_err(JobError::Signer)?;
+
/// Lookup cache for job COBs.
+
///
+
/// This assumes job COBs are not removed from the node.
+
pub struct KnownJobCobs {
+
    profile: Profile,
+
    known: HashMap<(RepoId, Oid), JobId>,
+
}
+

+
impl KnownJobCobs {
+
    /// Create a new [`KnownJobCobs`].
+
    pub fn new() -> Result<Self, JobError> {
+
        let profile = Profile::load().map_err(JobError::Profile)?;
+
        Ok(Self {
+
            profile,
+
            known: HashMap::new(),
+
        })
+
    }
+

+
    /// Look up job COB for a specific commit.
+
    pub fn create_job(&mut self, repo_id: RepoId, oid: Oid) -> Option<JobId> {
+
        let key = (repo_id, oid);
+
        if let Some(job_id) = self.known.get(&key) {
+
            Some(*job_id)
+
        } else if let Ok(job_id) = self.fallible_create_job(repo_id, oid) {
+
            self.known.insert(key, job_id);
+
            Some(job_id)
+
        } else {
+
            None
+
        }
+
    }
+

+
    fn fallible_create_job(&self, repo_id: RepoId, oid: Oid) -> Result<JobId, JobError> {
+
        let repo = repository(&self.profile, repo_id)?;
+
        let signer = self.profile.signer().map_err(JobError::Signer)?;

        let mut jobs = jobs(&repo)?;
        match job_for_commit(&jobs, oid) {
            Err(JobError::NoJob(_)) => {
                let job = jobs.create(oid, &signer).map_err(JobError::CreateJob)?;
-
                announce(&profile, repo_id)?;
+
                announce(&self.profile, repo_id)?;
                logger::job_create(&repo_id, &oid, job.id());
                Ok(*job.id())
            }
@@ -58,39 +86,43 @@ pub fn create_job(repo_id: RepoId, oid: Oid) -> Option<JobId> {
        }
    }

-
    fallible_create(repo_id, oid).ok()
-
}
+
    /// Create a new run for an existing job. The run id should be the one
+
    /// assigned by the CI broker, not the one by the adapter. The log URL
+
    /// has to be the from the adapter.
+
    pub fn create_run(&mut self, repo_id: RepoId, oid: Oid, run_id: RunId, url: &Url) {
+
        if let Some(job_id) = self.create_job(repo_id, oid) {
+
            if let Err(err) = self.fallible_create_run(job_id, repo_id, run_id, url) {
+
                logger::job_failure(
+
                    "failed to add a run to a job COB",
+
                    &repo_id,
+
                    &oid,
+
                    Some(&err),
+
                );
+
            }
+
        }
+
    }

-
/// Create a new run for an existing job. The run id should be the one
-
/// assigned by the CI broker, not the one by the adapter. The log URL
-
/// has to be the from the adapter.
-
pub fn create_run(repo_id: RepoId, oid: Oid, run_id: RunId, url: &Url) {
-
    fn fallible_crate(repo_id: RepoId, oid: Oid, run_id: RunId, url: &Url) -> Result<(), JobError> {
+
    fn fallible_create_run(
+
        &self,
+
        job_id: JobId,
+
        repo_id: RepoId,
+
        run_id: RunId,
+
        url: &Url,
+
    ) -> Result<(), JobError> {
        let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;

-
        let profile = profile()?;
-
        let repo = repository(&profile, repo_id)?;
-
        let signer = profile.signer().map_err(JobError::Signer)?;
+
        let repo = repository(&self.profile, repo_id)?;
+
        let signer = self.profile.signer().map_err(JobError::Signer)?;

        let mut jobs = jobs(&repo)?;
-
        let job_id = job_for_commit(&jobs, oid)?;
        let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
        job.run(uuid, url.clone(), &signer)
            .map_err(JobError::AddRun)?;
-
        announce(&profile, repo_id)?;
+
        announce(&self.profile, repo_id)?;

        logger::job_run_create(job_id, uuid);
        Ok(())
    }
-

-
    if let Err(err) = fallible_crate(repo_id, oid, run_id, url) {
-
        logger::job_failure(
-
            "failed to add a run to a job COB",
-
            &repo_id,
-
            &oid,
-
            Some(&err),
-
        );
-
    }
}

/// Mark a run as having finished successfully.
modified src/queueproc.rs
@@ -19,6 +19,7 @@ use crate::{
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
+
    cob::KnownJobCobs,
    db::{Db, DbError, QueuedCiEvent},
    filter::{EventFilter, Trigger},
    logger,
@@ -74,6 +75,9 @@ impl QueueProcessorBuilder {
            current: CurrentlyPicked::default(),
            child_pid_tx,
            child_pid_rx,
+
            known_job_cobs: Arc::new(Mutex::new(
+
                KnownJobCobs::new().map_err(QueueError::KnownJobCobs)?,
+
            )),
        })
    }

@@ -146,6 +150,7 @@ pub struct QueueProcessor {
    current: CurrentlyPicked,
    child_pid_tx: Sender<ChildInfo>,
    child_pid_rx: Receiver<ChildInfo>,
+
    known_job_cobs: Arc<Mutex<KnownJobCobs>>,
}

impl QueueProcessor {
@@ -196,7 +201,8 @@ impl QueueProcessor {
                            let p = self.processor()?;
                            let repoid = qe.event().repository().copied();
                            self.current.insert(qe.event().repository());
-
                            let h = spawn(move || p.pick_and_process_one(qe, adapters));
+
                            let known = self.known_job_cobs.clone();
+
                            let h = spawn(move || p.pick_and_process_one(qe, adapters, known));
                            handles.push((repoid, h));
                        }
                        Ok(None) => {}
@@ -409,6 +415,7 @@ impl Processor {
        &self,
        qe: QueuedCiEvent,
        adapters: Vec<Adapter>,
+
        known_job_cobs: Arc<Mutex<KnownJobCobs>>,
    ) -> Result<MaybeShutdown, QueueError> {
        for adapter in adapters.iter() {
            self.run_tx.notify()?;
@@ -428,7 +435,13 @@ impl Processor {
                    let trigger = trigger?;

                    self.broker
-
                        .execute_ci(adapter, &trigger, &self.run_tx, self.child_pid_tx.clone())
+
                        .execute_ci(
+
                            adapter,
+
                            &trigger,
+
                            &self.run_tx,
+
                            self.child_pid_tx.clone(),
+
                            known_job_cobs.clone(),
+
                        )
                        .map_err(QueueError::execute_ci)?;
                }
            }
@@ -515,6 +528,9 @@ impl ChildInfo {

#[derive(Debug, thiserror::Error)]
pub enum QueueError {
+
    #[error("failed to create cache of job COBs")]
+
    KnownJobCobs(#[source] crate::cob::JobError),
+

    #[error("failed to load node profile")]
    Profile(#[source] radicle::profile::Error),