Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
add job COB support
Merged liw opened 9 months ago
8 files changed +337 -9 d5f684b2 e1a741e3
modified Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-
version = 3
+
version = 4

[[package]]
name = "adler2"
@@ -1318,6 +1318,7 @@ checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
dependencies = [
 "equivalent",
 "hashbrown 0.15.2",
+
 "serde",
]

[[package]]
@@ -1989,6 +1990,7 @@ dependencies = [
 "radicle",
 "radicle-crypto",
 "radicle-git-ext",
+
 "radicle-job",
 "radicle-surf",
 "regex",
 "rss",
@@ -2004,6 +2006,7 @@ dependencies = [
 "time",
 "tracing",
 "tracing-subscriber",
+
 "url",
 "uuid",
 "valuable",
]
@@ -2074,6 +2077,23 @@ dependencies = [
]

[[package]]
+
name = "radicle-job"
+
version = "0.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "dd15941f9d42cd8f0c6bf949b649eb4f310a389f5ecec1a3ac9d9d8dfcc8998d"
+
dependencies = [
+
 "indexmap",
+
 "nonempty 0.11.0",
+
 "once_cell",
+
 "qcheck",
+
 "radicle",
+
 "serde",
+
 "thiserror 2.0.12",
+
 "url",
+
 "uuid",
+
]
+

+
[[package]]
name = "radicle-ssh"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3111,6 +3131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
dependencies = [
 "getrandom 0.3.2",
+
 "serde",
]

[[package]]
modified Cargo.toml
@@ -2,7 +2,7 @@
name = "radicle-ci-broker"
version = "0.18.0"
edition = "2021"
-
rust-version = "1.80"
+
rust-version = "1.84"
authors = ["Lars Wirzenius <liw@liw.fi", "cloudhead <cloudhead@radicle.xyz>"]
description = "add integration to CI engins or systems to a Radicle node"
license = "MIT OR Apache-2.0"
@@ -18,6 +18,7 @@ html-page = "0.4.0"
nonempty = "0.11.0"
radicle-crypto = "0.12.0"
radicle-git-ext = "0.8.0"
+
radicle-job = "0.1.0"
radicle-surf = { version = "0.22.0", default-features = false, features = ["serde"] }
regex = "1.10.5"
rss = "2.0.9"
@@ -32,6 +33,7 @@ thiserror = "1.0.63"
time = { version = "0.3.36", features = ["formatting", "macros"] }
tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_level_trace", "valuable"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json", "valuable"] }
+
url = "2.5.4"
uuid = { version = "1.10.0", features = ["v4"] }
valuable = { version = "0.1.0", features = ["derive"] }

modified src/adapter.rs
@@ -18,12 +18,14 @@ use std::{

use serde::Serialize;
use tempfile::{tempdir, TempDir};
+
use url::Url;

use crate::{
+
    cob::{create_run, failed, succeeded},
    config::AdapterSpec,
    db::{Db, DbError},
    logger,
-
    msg::{MessageError, Request, Response},
+
    msg::{MessageError, Request, Response, RunResult},
    notif::NotificationSender,
    run::{Run, RunState},
    sensitive::Sensitive,
@@ -237,6 +239,9 @@ impl Adapter {
        run_notification: &NotificationSender,
        mut stdout: RealtimeLines,
    ) -> Result<(), AdapterError> {
+
        #[allow(clippy::unwrap_used)]
+
        let no_url = Url::parse("https://no.url.example.com").unwrap();
+

        if let Some(line) = stdout.line() {
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
            run_notification.notify()?;
@@ -244,10 +249,28 @@ impl Adapter {
                Response::Triggered { run_id, info_url } => {
                    run.set_state(RunState::Running);
                    run.set_adapter_run_id(run_id);
-
                    if let Some(url) = info_url {
+

+
                    let url = if let Some(url) = info_url {
                        run.set_adapter_info_url(&url);
-
                    }
+
                        Url::parse(&url)
+
                    } else {
+
                        Ok(no_url.clone())
+
                    };
+

                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
+

+
                    // Try to add this CI run tothe job COB. If that fails, the
+
                    // function logs it. We won't do anything about a failure, as
+
                    // there's nothing useful we can do about it, as long as we
+
                    // let CI run, which want to do.
+
                    let url = url.unwrap_or(no_url);
+
                    create_run(
+
                        run.repo_id(),
+
                        run.whence().oid(),
+
                        run.broker_run_id().clone(),
+
                        &url,
+
                    )
+
                    .ok();
                }
                _ => {
                    return Err(AdapterError::NotTriggered(resp));
@@ -264,8 +287,21 @@ impl Adapter {
            run_notification.notify()?;
            match resp {
                Response::Finished { result } => {
-
                    run.set_result(result);
+
                    run.set_result(result.clone());
                    db.update_run(run).map_err(AdapterError::UpdateRun)?;
+

+
                    // Try to mark this CI run in job COB as finished.
+
                    // If that fails, the function logs it. We won't
+
                    // do anything about a failure, as there's nothing
+
                    // useful we can do about it.
+
                    let repo_id = run.repo_id();
+
                    let oid = run.whence().oid();
+
                    let run_id = run.broker_run_id().clone();
+
                    match &result {
+
                        RunResult::Success => succeeded(repo_id, oid, run_id),
+
                        RunResult::Failure => failed(repo_id, oid, run_id),
+
                    }
+
                    .ok();
                }
                _ => {
                    return Err(AdapterError::NotFinished(resp));
modified src/broker.rs
@@ -15,6 +15,7 @@ use radicle::prelude::RepoId;

use crate::{
    adapter::Adapter,
+
    cob::create_job,
    db::{Db, DbError},
    logger,
    msg::{PatchEvent, PushEvent, Request, RunId},
@@ -72,7 +73,7 @@ impl Broker {
        run_notification: &NotificationSender,
    ) -> Result<Run, BrokerError> {
        logger::broker_start_run(trigger);
-
        let (common, whence) = match &trigger {
+
        let (common, whence, oid) = match &trigger {
            Request::Trigger {
                common,
                push:
@@ -85,7 +86,11 @@ impl Broker {
                patch: None,
            } => {
                let who = pusher.to_string();
-
                (common, Whence::branch(branch, *after, Some(who.as_str())))
+
                (
+
                    common,
+
                    Whence::branch(branch, *after, Some(who.as_str())),
+
                    *after,
+
                )
            }
            Request::Trigger {
                common,
@@ -101,6 +106,7 @@ impl Broker {
                (
                    common,
                    Whence::patch(patch.id, patch.after, revision, Some(who.as_str())),
+
                    patch.after,
                )
            }
            _ => panic!("neither a push nor a patch event"),
@@ -115,6 +121,12 @@ impl Broker {
            .build();
        self.db.push_run(&run)?;

+
        // Try to create a job COB. If that fails, the function logs
+
        // it. We won't do anything about a failure, as there's
+
        // nothing useful we can do about it, as long as we let CI
+
        // run, which want to do.
+
        create_job(trigger.repo(), oid).ok();
+

        // We run the adapter, but if that fails, we just
        // log the error. The `Run` value records the
        // result of the run.
added src/cob.rs
@@ -0,0 +1,210 @@
+
//! Create and update Radicle collaborative objects to notify about CI runs.
+
//!
+
//! Using the [radicle-job
+
//! crate](https://crates.io/crates/radicle-job), this module enables
+
//! the creation of a "job COB" for every CI run, and updating it when
+
//! the run ends so that the COB captures the result of the run.
+

+
use std::{collections::BTreeSet, str::FromStr, time::Duration};
+

+
use url::Url;
+
use uuid::Uuid;
+

+
use radicle::{
+
    git::Oid,
+
    node::{
+
        sync::{Announcer, AnnouncerConfig, ReplicationFactor},
+
        Handle, Node,
+
    },
+
    prelude::{Profile, ReadStorage, RepoId},
+
    storage::git::Repository,
+
};
+
use radicle_job::*;
+

+
use crate::{logger, msg::RunId};
+

+
/// Create a new job COB, for a given Git object in a given
+
/// repository. The new job can be found later by using the same oid,
+
/// it is not necessary to keep the job id (indeed, that is not
+
/// returned). Separate instances of the CI broker have no way of
+
/// communicating the job id.
+
pub fn create_job(repo_id: RepoId, oid: Oid) -> Result<(), JobError> {
+
    let profile = profile()?;
+
    let repo = repository(&profile, repo_id)?;
+
    let signer = profile.signer().map_err(JobError::Signer)?;
+

+
    let mut jobs = jobs(&repo)?;
+
    match job_for_commit(&jobs, oid) {
+
        Err(JobError::NoJob(_)) => {
+
            let job = jobs.create(oid, &signer).map_err(JobError::CreateJob)?;
+
            announce(&profile, repo_id)?;
+
            logger::job_create(&repo_id, &oid, job.id());
+
            Ok(())
+
        }
+
        Err(err) => Err(err),
+
        Ok(_) => Err(JobError::JobExists(oid)),
+
    }
+
}
+

+
/// Create a new run for an existing job. The run id should be the one
+
/// assigned by the CI broker, not the one by the adapter. The log URL
+
/// has to be the from the adapter.
+
pub fn create_run(repo_id: RepoId, oid: Oid, run_id: RunId, url: &Url) -> Result<(), JobError> {
+
    let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;
+

+
    let profile = profile()?;
+
    let repo = repository(&profile, repo_id)?;
+
    let signer = profile.signer().map_err(JobError::Signer)?;
+

+
    let mut jobs = jobs(&repo)?;
+
    let job_id = job_for_commit(&jobs, oid)?;
+
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
+
    job.run(uuid, url.clone(), &signer)
+
        .map_err(JobError::AddRun)?;
+
    announce(&profile, repo_id)?;
+

+
    logger::job_run_create(job_id, uuid);
+
    Ok(())
+
}
+

+
/// Mark a run as having finished successfully.
+
pub fn succeeded(repo_id: RepoId, oid: Oid, run_id: RunId) -> Result<(), JobError> {
+
    finish(repo_id, oid, run_id, Reason::Succeeded)
+
}
+

+
/// Mark a run as having finished in failure.
+
pub fn failed(repo_id: RepoId, oid: Oid, run_id: RunId) -> Result<(), JobError> {
+
    finish(repo_id, oid, run_id, Reason::Failed)
+
}
+

+
fn finish(repo_id: RepoId, oid: Oid, run_id: RunId, reason: Reason) -> Result<(), JobError> {
+
    let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;
+

+
    let profile = profile()?;
+
    let repo = repository(&profile, repo_id)?;
+
    let signer = profile.signer().map_err(JobError::Signer)?;
+

+
    let mut jobs = jobs(&repo)?;
+
    let job_id = job_for_commit(&jobs, oid)?;
+
    let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
+
    job.finish(uuid, reason, &signer)
+
        .map_err(JobError::Finish)?;
+
    announce(&profile, repo_id)?;
+

+
    logger::job_run_finished(job_id, uuid, reason);
+
    Ok(())
+
}
+

+
fn profile() -> Result<Profile, JobError> {
+
    Profile::load().map_err(JobError::Profile)
+
}
+

+
fn repository(profile: &Profile, repo_id: RepoId) -> Result<Repository, JobError> {
+
    profile
+
        .storage
+
        .repository(repo_id)
+
        .map_err(JobError::OpenRepository)
+
}
+

+
fn jobs<'a>(repo: &'a Repository) -> Result<Jobs<'a, Repository>, JobError> {
+
    Jobs::open(repo).map_err(JobError::Jobs)
+
}
+

+
fn job_for_commit<'a>(jobs: &Jobs<'a, Repository>, wanted: Oid) -> Result<JobId, JobError> {
+
    eprintln!("job_for_commit: wanted={wanted}");
+
    for item in jobs.all().map_err(JobError::AllJobs)? {
+
        let (job_id, job) = item.map_err(JobError::AllJobsJob)?;
+
        let job_id = JobId::from(job_id);
+
        eprintln!("job_for_commit: consider {job_id} with oid {}", job.oid());
+
        if job.oid() == &wanted {
+
            eprintln!("job_for_commit: wanted={wanted} => {job_id}");
+
            return Ok(job_id);
+
        }
+
    }
+

+
    Err(JobError::NoJob(wanted))
+
}
+

+
fn announce(profile: &Profile, repo_id: RepoId) -> Result<(), JobError> {
+
    const TIMEOUT: Duration = Duration::from_millis(5000);
+

+
    let mut node = Node::new(profile.home.socket());
+

+
    let (synced, unsynced) = node.seeds(repo_id).map_err(JobError::Seeds)?.iter().fold(
+
        (BTreeSet::new(), BTreeSet::new()),
+
        |(mut synced, mut unsynced), seed| {
+
            if seed.is_synced() {
+
                synced.insert(seed.nid);
+
            } else {
+
                unsynced.insert(seed.nid);
+
            }
+
            (synced, unsynced)
+
        },
+
    );
+

+
    let announcer = Announcer::new(AnnouncerConfig::public(
+
        *profile.id(),
+
        ReplicationFactor::MustReach(1),
+
        BTreeSet::new(),
+
        synced,
+
        unsynced,
+
    ))
+
    .map_err(|_| JobError::Announcer)?;
+

+
    node.announce(repo_id, TIMEOUT, announcer, |_, _| ())
+
        .map_err(JobError::Announce)?;
+

+
    Ok(())
+
}
+

+
/// Errors from managing job COBs.
+
#[derive(Debug, thiserror::Error)]
+
pub enum JobError {
+
    #[error("failed to load Radicle profile")]
+
    Profile(#[source] radicle::profile::Error),
+

+
    #[error("failed to open repository in Radicle node storage")]
+
    OpenRepository(#[source] radicle::storage::RepositoryError),
+

+
    #[error("failed to list job COBs in repository")]
+
    Jobs(#[source] radicle::storage::RepositoryError),
+

+
    #[error("failed to get all job COBs")]
+
    AllJobs(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to create a new job COB")]
+
    CreateJob(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to create a signer for Radicle repository")]
+
    Signer(#[source] radicle::profile::Error),
+

+
    #[error("couldn't get job when iterating")]
+
    AllJobsJob(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to get mutable job COB")]
+
    GetJobMut(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to add a run to a job COB")]
+
    AddRun(#[source] radicle::cob::store::Error),
+

+
    #[error("could not mark a run as finished")]
+
    Finish(#[source] radicle::cob::store::Error),
+

+
    #[error("failed to construct a UUID from a run id")]
+
    Uuid(#[source] uuid::Error),
+

+
    #[error("failed to find job COB for oid {0}")]
+
    NoJob(Oid),
+

+
    #[error("a job for Git object {0} already exists")]
+
    JobExists(Oid),
+

+
    #[error("failed to get seeds for node")]
+
    Seeds(#[source] radicle::node::Error),
+

+
    #[error("failed to announce COB change")]
+
    Announce(#[source] radicle::node::Error),
+

+
    #[error("failed to create a COB announcer")]
+
    Announcer,
+
}
modified src/lib.rs
@@ -12,6 +12,7 @@ pub mod adapter;
pub mod broker;
pub mod ci_event;
pub mod ci_event_source;
+
pub mod cob;
pub mod config;
pub mod db;
pub mod filter;
modified src/logger.rs
@@ -3,10 +3,13 @@
use std::{path::Path, process::ExitStatus, time::Duration};

use clap::ValueEnum;
-
use radicle::{identity::RepoId, node::Event, patch::PatchId};
use serde_json::Value;
use tracing::{debug, error, info, trace, warn, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
+
use uuid::Uuid;
+

+
use radicle::{git::Oid, identity::RepoId, node::Event, patch::PatchId};
+
use radicle_job::{JobId, Reason};

use crate::{
    adapter::Adapter,
@@ -131,6 +134,10 @@ enum Id {
    CibEndSuccess,
    CibStart,

+
    JobCreate,
+
    JobRunCreate,
+
    JobRunFinished,
+

    NodeEventSourceCreated,
    NodeEventSourceDisconnected,
    NodeEventSourceEnd,
@@ -820,6 +827,38 @@ pub fn adapter_did_not_exit(error: TimeoutError) {
    );
}

+
pub fn job_create(repo_id: &RepoId, oid: &Oid, job_id: &JobId) {
+
    debug!(
+
        msg_id = ?Id::JobCreate,
+
        kind = %Kind::StartRun,
+
        ?repo_id,
+
        ?oid,
+
        ?job_id,
+
        "created job COB",
+
    );
+
}
+

+
pub fn job_run_create(job_id: JobId, run_id: Uuid) {
+
    debug!(
+
        msg_id = ?Id::JobRunCreate,
+
        kind = %Kind::StartRun,
+
        ?job_id,
+
        ?run_id,
+
        "created run for job COB",
+
    );
+
}
+

+
pub fn job_run_finished(job_id: JobId, run_id: Uuid, reason: Reason) {
+
    debug!(
+
        msg_id = ?Id::JobRunFinished,
+
        kind = %Kind::FinishRun,
+
        ?job_id,
+
        ?run_id,
+
        ?reason,
+
        "marked run finished for job COB",
+
    );
+
}
+

pub fn timeoutcmd_request_termination(result: Result<(), std::sync::mpsc::SendError<()>>) {
    trace!(
        msg_id = ?Id::TimeoutRequestEnd,
modified src/run.rs
@@ -217,6 +217,13 @@ impl Whence {
}

impl Whence {
+
    pub fn oid(&self) -> Oid {
+
        match self {
+
            Self::Branch { commit, .. } => *commit,
+
            Self::Patch { commit, .. } => *commit,
+
        }
+
    }
+

    pub fn who(&self) -> Option<&str> {
        match self {
            Self::Branch {