Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: announce COB changes to other nodes
Lars Wirzenius committed 9 months ago
commit e1a741e33a9c3a3a6778c53f43a57c156aa25bdf
parent c55375e
1 file changed +49 -1
modified src/cob.rs
@@ -5,13 +5,17 @@
//! the creation of a "job COB" for every CI run, and updating it when
//! the run ends so that the COB captures the result of the run.

-
use std::str::FromStr;
+
use std::{collections::BTreeSet, str::FromStr, time::Duration};

use url::Url;
use uuid::Uuid;

use radicle::{
    git::Oid,
+
    node::{
+
        sync::{Announcer, AnnouncerConfig, ReplicationFactor},
+
        Handle, Node,
+
    },
    prelude::{Profile, ReadStorage, RepoId},
    storage::git::Repository,
};
@@ -33,6 +37,7 @@ pub fn create_job(repo_id: RepoId, oid: Oid) -> Result<(), JobError> {
    match job_for_commit(&jobs, oid) {
        Err(JobError::NoJob(_)) => {
            let job = jobs.create(oid, &signer).map_err(JobError::CreateJob)?;
+
            announce(&profile, repo_id)?;
            logger::job_create(&repo_id, &oid, job.id());
            Ok(())
        }
@@ -56,6 +61,7 @@ pub fn create_run(repo_id: RepoId, oid: Oid, run_id: RunId, url: &Url) -> Result
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
    job.run(uuid, url.clone(), &signer)
        .map_err(JobError::AddRun)?;
+
    announce(&profile, repo_id)?;

    logger::job_run_create(job_id, uuid);
    Ok(())
@@ -83,6 +89,7 @@ fn finish(repo_id: RepoId, oid: Oid, run_id: RunId, reason: Reason) -> Result<()
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
    job.finish(uuid, reason, &signer)
        .map_err(JobError::Finish)?;
+
    announce(&profile, repo_id)?;

    logger::job_run_finished(job_id, uuid, reason);
    Ok(())
@@ -118,6 +125,38 @@ fn job_for_commit<'a>(jobs: &Jobs<'a, Repository>, wanted: Oid) -> Result<JobId,
    Err(JobError::NoJob(wanted))
}

+
fn announce(profile: &Profile, repo_id: RepoId) -> Result<(), JobError> {
+
    const TIMEOUT: Duration = Duration::from_millis(5000);
+

+
    let mut node = Node::new(profile.home.socket());
+

+
    let (synced, unsynced) = node.seeds(repo_id).map_err(JobError::Seeds)?.iter().fold(
+
        (BTreeSet::new(), BTreeSet::new()),
+
        |(mut synced, mut unsynced), seed| {
+
            if seed.is_synced() {
+
                synced.insert(seed.nid);
+
            } else {
+
                unsynced.insert(seed.nid);
+
            }
+
            (synced, unsynced)
+
        },
+
    );
+

+
    let announcer = Announcer::new(AnnouncerConfig::public(
+
        *profile.id(),
+
        ReplicationFactor::MustReach(1),
+
        BTreeSet::new(),
+
        synced,
+
        unsynced,
+
    ))
+
    .map_err(|_| JobError::Announcer)?;
+

+
    node.announce(repo_id, TIMEOUT, announcer, |_, _| ())
+
        .map_err(JobError::Announce)?;
+

+
    Ok(())
+
}
+

/// Errors from managing job COBs.
#[derive(Debug, thiserror::Error)]
pub enum JobError {
@@ -159,4 +198,13 @@ pub enum JobError {

    #[error("a job for Git object {0} already exists")]
    JobExists(Oid),
+

+
    #[error("failed to get seeds for node")]
+
    Seeds(#[source] radicle::node::Error),
+

+
    #[error("failed to announce COB change")]
+
    Announce(#[source] radicle::node::Error),
+

+
    #[error("failed to create a COB announcer")]
+
    Announcer,
}