Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src cob.rs
//! Create and update Radicle collaborative objects to notify about CI runs.
//!
//! Using the [radicle-job
//! crate](https://crates.io/crates/radicle-job), this module enables
//! the creation of a "job COB" for every CI run, and updating it when
//! the run ends so that the COB captures the result of the run.

use std::{
    collections::{BTreeSet, HashMap},
    str::FromStr,
    time::Duration,
};

use url::Url;
use uuid::Uuid;

use radicle::{
    node::{
        Handle, Node,
        sync::{Announcer, AnnouncerConfig, ReplicationFactor},
    },
    prelude::{Profile, ReadStorage, RepoId},
    storage::git::Repository,
};
use radicle_job::*;

use crate::{ergo::Oid, logger, msg::RunId};

/// Lookup cache for job COBs.
///
/// This assumes job COBs are not removed from the node.
pub struct KnownJobCobs {
    profile: Profile,
    known: HashMap<(RepoId, Oid), JobId>,
}

impl KnownJobCobs {
    /// Create a new [`KnownJobCobs`].
    pub fn new() -> Result<Self, JobError> {
        let profile = Profile::load().map_err(JobError::profile)?;
        Ok(Self {
            profile,
            known: HashMap::new(),
        })
    }

    /// Look up job COB for a specific commit.
    pub fn create_job(&mut self, repo_id: RepoId, oid: Oid) -> Option<JobId> {
        let key = (repo_id, oid);
        if let Some(job_id) = self.known.get(&key) {
            Some(*job_id)
        } else if let Ok(job_id) = self.fallible_create_job(repo_id, oid) {
            self.known.insert(key, job_id);
            Some(job_id)
        } else {
            None
        }
    }

    fn fallible_create_job(&self, repo_id: RepoId, oid: Oid) -> Result<JobId, JobError> {
        let repo = repository(&self.profile, repo_id)?;
        let signer = self.profile.signer().map_err(JobError::signer)?;

        let mut jobs = jobs(&repo)?;
        match job_for_commit(&jobs, oid) {
            Err(JobError::NoJob(_)) => {
                let job = jobs.create(oid, &signer).map_err(JobError::create_job)?;
                announce(&self.profile, repo_id, false)?;
                logger::job_create(&repo_id, &oid, job.id());
                Ok(*job.id())
            }
            Err(err) => {
                logger::job_failure(
                    "failed to find job COB for Git object",
                    &repo_id,
                    &oid,
                    Some(&err),
                );
                Err(err)
            }
            Ok(job_id) => {
                logger::job_reuse(&repo_id, &oid, &job_id);
                Ok(job_id)
            }
        }
    }

    /// Create a new run for an existing job. The run id should be the one
    /// assigned by the CI broker, not the one by the adapter. The log URL
    /// has to be the from the adapter.
    pub fn create_run(
        &mut self,
        repo_id: RepoId,
        oid: Oid,
        run_id: RunId,
        url: &Url,
        announce: bool,
    ) {
        if let Some(job_id) = self.create_job(repo_id, oid)
            && let Err(err) = self.fallible_create_run(job_id, repo_id, run_id, url, announce)
        {
            logger::job_failure(
                "failed to add a run to a job COB",
                &repo_id,
                &oid,
                Some(&err),
            );
        }
    }

    fn fallible_create_run(
        &self,
        job_id: JobId,
        repo_id: RepoId,
        run_id: RunId,
        url: &Url,
        a: bool,
    ) -> Result<(), JobError> {
        let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::uuid)?;

        let repo = repository(&self.profile, repo_id)?;
        let signer = self.profile.signer().map_err(JobError::signer)?;

        let mut jobs = jobs(&repo)?;
        let mut job = jobs.get_mut(&job_id).map_err(JobError::get_jobs_mut)?;
        job.run(uuid, url.clone(), &signer)
            .map_err(JobError::add_run)?;

        announce(&self.profile, repo_id, a)?;

        logger::job_run_create(job_id, uuid);
        Ok(())
    }
}

/// Mark a run as having finished successfully.
pub fn succeeded(repo_id: RepoId, oid: Oid, run_id: RunId) {
    if let Err(err) = finish(repo_id, oid, run_id, Reason::Succeeded) {
        logger::job_failure(
            "failed to mark a run as succeeded",
            &repo_id,
            &oid,
            Some(&err),
        );
    }
}

/// Mark a run as having finished in failure.
pub fn failed(repo_id: RepoId, oid: Oid, run_id: RunId) {
    if let Err(err) = finish(repo_id, oid, run_id, Reason::Failed) {
        logger::job_failure("failed to mark a run as failed", &repo_id, &oid, Some(&err));
    }
}

fn finish(repo_id: RepoId, oid: Oid, run_id: RunId, reason: Reason) -> Result<(), JobError> {
    let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::uuid)?;

    let profile = profile()?;
    let repo = repository(&profile, repo_id)?;
    let signer = profile.signer().map_err(JobError::signer)?;

    let mut jobs = jobs(&repo)?;
    let job_id = job_for_commit(&jobs, oid)?;
    let mut job = jobs.get_mut(&job_id).map_err(JobError::get_jobs_mut)?;
    job.finish(uuid, reason, &signer)
        .map_err(JobError::finish)?;
    announce(&profile, repo_id, true)?;

    logger::job_run_finished(job_id, uuid, reason);
    Ok(())
}

fn profile() -> Result<Profile, JobError> {
    Profile::load().map_err(JobError::profile)
}

fn repository(profile: &Profile, repo_id: RepoId) -> Result<Repository, JobError> {
    profile
        .storage
        .repository(repo_id)
        .map_err(JobError::open_repository)
}

fn jobs<'a>(repo: &'a Repository) -> Result<Jobs<'a, Repository>, JobError> {
    Jobs::open(repo).map_err(JobError::jobs)
}

fn job_for_commit<'a>(jobs: &Jobs<'a, Repository>, wanted: Oid) -> Result<JobId, JobError> {
    for item in jobs.all().map_err(JobError::all_jobs)? {
        let (job_id, job) = item.map_err(JobError::all_jobs_job)?;
        let job_id = JobId::from(job_id);
        if job.oid() == &wanted {
            return Ok(job_id);
        }
    }

    Err(JobError::NoJob(wanted))
}

fn announce(profile: &Profile, repo_id: RepoId, announce: bool) -> Result<(), JobError> {
    if announce {
        const TIMEOUT: Duration = Duration::from_millis(5000);

        let mut node = Node::new(profile.home.socket());

        let (synced, unsynced) = node
            .seeds_for(repo_id, [])
            .map_err(JobError::seeds)?
            .iter()
            .fold(
                (BTreeSet::new(), BTreeSet::new()),
                |(mut synced, mut unsynced), seed| {
                    if seed.is_synced() {
                        synced.insert(seed.nid);
                    } else {
                        unsynced.insert(seed.nid);
                    }
                    (synced, unsynced)
                },
            );

        let announcer = Announcer::new(AnnouncerConfig::public(
            *profile.id(),
            ReplicationFactor::MustReach(1),
            BTreeSet::new(),
            synced,
            unsynced,
        ))
        .map_err(|_| JobError::Announcer)?;
        node.announce(repo_id, [], TIMEOUT, announcer, |_, _| ())
            .map_err(JobError::announce)?;
    }

    Ok(())
}

/// Errors from managing job COBs.
#[derive(Debug, thiserror::Error)]
pub enum JobError {
    #[error("failed to load Radicle profile")]
    Profile(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to open repository in Radicle node storage")]
    OpenRepository(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to list job COBs in repository")]
    Jobs(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to get all job COBs")]
    AllJobs(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to create a new job COB")]
    CreateJob(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to create a signer for Radicle repository")]
    Signer(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("couldn't get job when iterating")]
    AllJobsJob(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to get mutable job COB")]
    GetJobMut(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to add a run to a job COB")]
    AddRun(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("could not mark a run as finished")]
    Finish(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to construct a UUID from a run id")]
    Uuid(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to find job COB for oid {0}")]
    NoJob(Oid),

    #[error("a job for Git object {0} already exists")]
    JobExists(Oid),

    #[error("failed to get seeds for node")]
    Seeds(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to announce COB change")]
    Announce(#[source] Box<dyn std::error::Error + Send + 'static>),

    #[error("failed to create a COB announcer")]
    Announcer,
}

impl JobError {
    fn profile(err: radicle::profile::Error) -> Self {
        Self::Profile(Box::new(err))
    }

    fn open_repository(err: radicle::storage::RepositoryError) -> Self {
        Self::OpenRepository(Box::new(err))
    }

    fn jobs(err: radicle::storage::RepositoryError) -> Self {
        Self::Jobs(Box::new(err))
    }

    fn all_jobs(err: radicle::cob::store::Error) -> Self {
        Self::AllJobs(Box::new(err))
    }

    fn all_jobs_job(err: radicle::cob::store::Error) -> Self {
        Self::AllJobsJob(Box::new(err))
    }

    fn get_jobs_mut(err: radicle::cob::store::Error) -> Self {
        Self::GetJobMut(Box::new(err))
    }

    fn add_run(err: radicle::cob::store::Error) -> Self {
        Self::AddRun(Box::new(err))
    }

    fn finish(err: radicle::cob::store::Error) -> Self {
        Self::Finish(Box::new(err))
    }

    fn create_job(err: radicle::cob::store::Error) -> Self {
        Self::CreateJob(Box::new(err))
    }

    fn signer(err: radicle::profile::SignerError) -> Self {
        Self::Signer(Box::new(err))
    }

    fn uuid(err: uuid::Error) -> Self {
        Self::Uuid(Box::new(err))
    }

    fn seeds(err: radicle::node::Error) -> Self {
        Self::Seeds(Box::new(err))
    }

    fn announce(err: radicle::node::Error) -> Self {
        Self::Announce(Box::new(err))
    }
}