Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: add module for creating or updating job COBs
Lars Wirzenius committed 9 months ago
commit 6a4c0260756e27829c12a4dfa1aab4ba9fc8845a
parent ce5e594
5 files changed +205 -1
modified Cargo.lock
@@ -2006,6 +2006,7 @@ dependencies = [
 "time",
 "tracing",
 "tracing-subscriber",
+
 "url",
 "uuid",
 "valuable",
]
modified Cargo.toml
@@ -33,6 +33,7 @@ thiserror = "1.0.63"
time = { version = "0.3.36", features = ["formatting", "macros"] }
tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_level_trace", "valuable"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json", "valuable"] }
+
url = "2.5.4"
uuid = { version = "1.10.0", features = ["v4"] }
valuable = { version = "0.1.0", features = ["derive"] }

added src/cob.rs
@@ -0,0 +1,162 @@
+
//! Create and update Radicle collaborative objects to notify about CI runs.
+
//!
+
//! Using the [radicle-job
+
//! crate](https://crates.io/crates/radicle-job), this module enables
+
//! the creation of a "job COB" for every CI run, and updating it when
+
//! the run ends so that the COB captures the result of the run.
+

+
use std::str::FromStr;
+

+
use url::Url;
+
use uuid::Uuid;
+

+
use radicle::{
+
    git::Oid,
+
    prelude::{Profile, ReadStorage, RepoId},
+
    storage::git::Repository,
+
};
+
use radicle_job::*;
+

+
use crate::{logger, msg::RunId};
+

+
/// Create a new job COB, for a given Git object in a given
+
/// repository. The new job can be found later by using the same oid,
+
/// it is not necessary to keep the job id (indeed, that is not
+
/// returned). Separate instances of the CI broker have no way of
+
/// communicating the job id.
+
pub fn create_job(repo_id: RepoId, oid: Oid) -> Result<(), JobError> {
+
    let profile = profile()?;
+
    let repo = repository(&profile, repo_id)?;
+
    let signer = profile.signer().map_err(JobError::Signer)?;
+

+
    let mut jobs = jobs(&repo)?;
+
    match job_for_commit(&jobs, oid) {
+
        Err(JobError::NoJob(_)) => {
+
            let job = jobs.create(oid, &signer).map_err(JobError::CreateJob)?;
+
            logger::job_create(&repo_id, &oid, job.id());
+
            Ok(())
+
        }
+
        Err(err) => Err(err),
+
        Ok(_) => Err(JobError::JobExists(oid)),
+
    }
+
}
+

+
/// Create a new run for an existing job. The run id should be the one
+
/// assigned by the CI broker, not the one by the adapter. The log URL
+
/// has to be the from the adapter.
+
pub fn create_run(repo_id: RepoId, oid: Oid, run_id: RunId, url: &Url) -> Result<(), JobError> {
+
    let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;
+

+
    let profile = profile()?;
+
    let repo = repository(&profile, repo_id)?;
+
    let signer = profile.signer().map_err(JobError::Signer)?;
+

+
    let mut jobs = jobs(&repo)?;
+
    let job_id = job_for_commit(&jobs, oid)?;
+
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
+
    job.run(uuid, url.clone(), &signer)
+
        .map_err(JobError::AddRun)?;
+

+
    logger::job_run_create(job_id, uuid);
+
    Ok(())
+
}
+

+
/// Mark a run as having finished successfully.
+
pub fn succeeded(repo_id: RepoId, oid: Oid, run_id: RunId) -> Result<(), JobError> {
+
    finish(repo_id, oid, run_id, Reason::Succeeded)
+
}
+

+
/// Mark a run as having finished in failure.
+
pub fn failed(repo_id: RepoId, oid: Oid, run_id: RunId) -> Result<(), JobError> {
+
    finish(repo_id, oid, run_id, Reason::Failed)
+
}
+

+
fn finish(repo_id: RepoId, oid: Oid, run_id: RunId, reason: Reason) -> Result<(), JobError> {
+
    let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;
+

+
    let profile = profile()?;
+
    let repo = repository(&profile, repo_id)?;
+
    let signer = profile.signer().map_err(JobError::Signer)?;
+

+
    let mut jobs = jobs(&repo)?;
+
    let job_id = job_for_commit(&jobs, oid)?;
+
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
+
    job.finish(uuid, reason, &signer)
+
        .map_err(JobError::Finish)?;
+

+
    logger::job_run_finished(job_id, uuid, reason);
+
    Ok(())
+
}
+

+
fn profile() -> Result<Profile, JobError> {
+
    Profile::load().map_err(JobError::Profile)
+
}
+

+
fn repository(profile: &Profile, repo_id: RepoId) -> Result<Repository, JobError> {
+
    profile
+
        .storage
+
        .repository(repo_id)
+
        .map_err(JobError::OpenRepository)
+
}
+

+
fn jobs<'a>(repo: &'a Repository) -> Result<Jobs<'a, Repository>, JobError> {
+
    Jobs::open(repo).map_err(JobError::Jobs)
+
}
+

+
fn job_for_commit<'a>(jobs: &Jobs<'a, Repository>, wanted: Oid) -> Result<JobId, JobError> {
+
    eprintln!("job_for_commit: wanted={wanted}");
+
    for item in jobs.all().map_err(JobError::AllJobs)? {
+
        let (job_id, job) = item.map_err(JobError::AllJobsJob)?;
+
        let job_id = JobId::from(job_id);
+
        eprintln!("job_for_commit: consider {job_id} with oid {}", job.oid());
+
        if job.oid() == &wanted {
+
            eprintln!("job_for_commit: wanted={wanted} => {job_id}");
+
            return Ok(job_id);
+
        }
+
    }
+

+
    Err(JobError::NoJob(wanted))
+
}
+

+
/// Errors from managing job COBs.
+
#[derive(Debug, thiserror::Error)]
+
pub enum JobError {
+
    #[error("failed to load Radicle profile")]
+
    Profile(#[source] radicle::profile::Error),
+

+
    #[error("failed to open repository in Radicle node storage")]
+
    OpenRepository(#[source] radicle::storage::RepositoryError),
+

+
    #[error("failed to list job COBs in repository")]
+
    Jobs(#[source] radicle::storage::RepositoryError),
+

+
    #[error("failed to get all job COBs")]
+
    AllJobs(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to create a new job COB")]
+
    CreateJob(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to create a signer for Radicle repository")]
+
    Signer(#[source] radicle::profile::Error),
+

+
    #[error("couldn't get job when iterating")]
+
    AllJobsJob(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to get mutable job COB")]
+
    GetJobMut(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to add a run to a job COB")]
+
    AddRun(#[source] radicle::cob::store::Error),
+

+
    #[error("could not mark a run as finished")]
+
    Finish(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to construct a UUID from a run id")]
+
    Uuid(#[source] uuid::Error),
+

+
    #[error("failed to find job COB for oid {0}")]
+
    NoJob(Oid),
+

+
    #[error("a job for Git object {0} already exists")]
+
    JobExists(Oid),
+
}
modified src/lib.rs
@@ -12,6 +12,7 @@ pub mod adapter;
pub mod broker;
pub mod ci_event;
pub mod ci_event_source;
+
pub mod cob;
pub mod config;
pub mod db;
pub mod filter;
modified src/logger.rs
@@ -3,10 +3,13 @@
use std::{path::Path, process::ExitStatus, time::Duration};

use clap::ValueEnum;
-
use radicle::{identity::RepoId, node::Event, patch::PatchId};
use serde_json::Value;
use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
+
use uuid::Uuid;
+

+
use radicle::{git::Oid, identity::RepoId, node::Event, patch::PatchId};
+
use radicle_job::{JobId, Reason};

use crate::{
    adapter::Adapter,
@@ -131,6 +134,10 @@ enum Id {
    CibEndSuccess,
    CibStart,

+
    JobCreate,
+
    JobRunCreate,
+
    JobRunFinished,
+

    NodeEventSourceCreated,
    NodeEventSourceDisconnected,
    NodeEventSourceEnd,
@@ -820,6 +827,38 @@ pub fn adapter_did_not_exit(error: TimeoutError) {
    );
}

+
pub fn job_create(repo_id: &RepoId, oid: &Oid, job_id: &JobId) {
+
    debug!(
+
        msg_id = ?Id::JobCreate,
+
        kind = %Kind::StartRun,
+
        ?repo_id,
+
        ?oid,
+
        ?job_id,
+
        "created job COB",
+
    );
+
}
+

+
pub fn job_run_create(job_id: JobId, run_id: Uuid) {
+
    debug!(
+
        msg_id = ?Id::JobRunCreate,
+
        kind = %Kind::StartRun,
+
        ?job_id,
+
        ?run_id,
+
        "created run for job COB",
+
    );
+
}
+

+
pub fn job_run_finished(job_id: JobId, run_id: Uuid, reason: Reason) {
+
    debug!(
+
        msg_id = ?Id::JobRunFinished,
+
        kind = %Kind::FinishRun,
+
        ?job_id,
+
        ?run_id,
+
        ?reason,
+
        "marked run finished for job COB",
+
    );
+
}
+

pub fn timeoutcmd_request_termination(result: Result<(), std::sync::mpsc::SendError<()>>) {
    trace!(
        msg_id = ?Id::TimeoutRequestEnd,