Radish alpha
r
rad:z2UcCU1LgMshWvXj6hXSDDrwB8q8M
Radicle Job Collaborative Object
Radicle
Git
radicle-job src lib.rs
//! Radicle "job COB".
//!
//! A [`Job`] records results of automated processing of a repository for a
//! given Git commit, by one or more nodes.
//!
//! For any one of these processes there is a corresponding [`Run`]. For
//! example, nodes that run CI for a repository might add `Run` to a `Job` for a
//! specific commit. If there are several nodes running CI, they would each add
//! their own `Run` to the same `Job`.
//!
//! # Example
//!
//! ```
//! # use radicle::crypto::Signer;
//! # use radicle::git::{raw::Repository, Oid};
//! # use radicle::test;
//! # use url::Url;
//! # use uuid::Uuid;
//! #
//! # use radicle_job::{Jobs, Run, Runs, Reason};
//! #
//! # fn commit(repo: &Repository) -> Oid {
//! #     let tree = {
//! #         let tree = repo.treebuilder(None).unwrap();
//! #         let oid = tree.write().unwrap();
//! #         repo.find_tree(oid).unwrap()
//! #     };
//! #
//! #     let author = repo.signature().unwrap();
//! #     repo.commit(None, &author, &author, "Test Commit", &tree, &[])
//! #         .unwrap()
//! #         .into()
//! # }
//! #
//! # let test::setup::NodeWithRepo {
//! #     node: alice, repo, ..
//! # } = test::setup::NodeWithRepo::default();
//! # let oid = commit(&repo.backend);
//! # let repo = (&*repo).clone();
//! let mut jobs = Jobs::open(repo).unwrap();
//! #
//! # let test::setup::NodeWithRepo { node: bob, .. } = test::setup::NodeWithRepo::default();
//! let mut job = jobs.create(oid, &alice.signer).unwrap();
//! #
//! let uuid = Uuid::new_v4();
//! let log = Url::parse(&format!("https://example.com/ci/logs?run={}", uuid)).unwrap();
//! job.run(uuid, log, &alice.signer).unwrap();
//! ```

#![deny(missing_docs)]

use std::collections::HashMap;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;

use indexmap::IndexMap;
use once_cell::sync::Lazy;
use radicle::cob::store::Cob;
use radicle::cob::{self, store, EntryId, Evaluate, ObjectId, Op, TypeName};
use radicle::crypto;
use radicle::crypto::signature::Signer;
use radicle::node::device::Device;
use radicle::node::NodeId;
use radicle::prelude::ReadRepository;
use radicle::storage::{RepositoryError, SignRepository, WriteRepository};
use radicle::{cob::store::CobAction, git::Oid};
use serde::{Deserialize, Serialize};
use url::Url;
use uuid::Uuid;

pub mod display;
pub mod error;

/// Type name of a patch.
pub static TYPENAME: Lazy<TypeName> =
    Lazy::new(|| FromStr::from_str("xyz.radworks.job").expect("type name is valid"));

/// The identifier for a given [`Job`] collaborative object.
///
/// When a [`Job`] is created, through [`Jobs::create`], the identifier is
/// also returned as part of [`JobMut::id`].
///
/// Identifiers can be used to retrieve a [`Job`] or [`JobMut`] through
/// [`Jobs::get`] and [`Jobs::get_mut`], respectively.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct JobId(ObjectId);

impl JobId {
    fn as_object_id(&self) -> &ObjectId {
        &self.0
    }
}

impl fmt::Display for JobId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(&self.0.to_string())
    }
}

impl FromStr for JobId {
    type Err = <ObjectId as FromStr>::Err;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        ObjectId::from_str(s).map(Self)
    }
}

impl From<JobId> for ObjectId {
    fn from(JobId(oid): JobId) -> Self {
        oid
    }
}

impl From<ObjectId> for JobId {
    fn from(oid: ObjectId) -> Self {
        Self(oid)
    }
}

/// A `Job` describes a generic task run for a given commit [`Oid`] by a set of
/// nodes.
///
/// A node may run the task many times, which are accumulated in its [`Runs`]
/// set. Each [`Run`] is identified by a [`Uuid`].
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Job {
    oid: Oid,
    runs: HashMap<NodeId, Runs>,
}

/// A set of [`Run`]s identified by a [`Uuid`].
///
/// Iteration over this set is guaranteed to come in insertion order.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Runs(IndexMap<Uuid, Run>);

impl Runs {
    /// Insert a new [`Run`], identified by the given [`Uuid`].
    pub fn insert(&mut self, uuid: Uuid, run: Run) -> Option<Run> {
        self.0.insert(uuid, run)
    }

    /// Check that the set of runs contains the given [`Uuid`].
    pub fn contains_key(&self, uuid: &Uuid) -> bool {
        self.0.contains_key(uuid)
    }

    /// Get the [`Run`] identifier by the given [`Uuid`].
    ///
    /// Return `None` if the `Uuid` does not exist.`
    pub fn get(&self, uuid: &Uuid) -> Option<&Run> {
        self.0.get(uuid)
    }

    /// Get the `nth` [`Run`] of the set.
    pub fn get_index(&self, nth: usize) -> Option<(&Uuid, &Run)> {
        self.0.get_index(nth)
    }

    /// Get the latest [`Run`] and its corresponding [`Uuid`].
    pub fn latest(&self) -> Option<(&Uuid, &Run)> {
        self.0.iter().next_back()
    }

    /// Get all [`Run`]s that have started [`Status`].
    pub fn started(&self) -> Runs {
        self.iter()
            .filter_map(|(uuid, run)| run.is_started().then_some((*uuid, run.clone())))
            .collect()
    }

    /// Get all [`Run`]s that have finished [`Status`].
    pub fn finished(&self) -> Runs {
        self.iter()
            .filter_map(|(uuid, run)| run.is_finished().then_some((*uuid, run.clone())))
            .collect()
    }

    /// Get all [`Run`]s that have finished [`Status`] and have the succeeded
    /// [`Reason`].
    pub fn succeeded(&self) -> Runs {
        self.iter()
            .filter_map(|(uuid, run)| run.succeeded().then_some((*uuid, run.clone())))
            .collect()
    }

    /// Get all [`Run`]s that have finished [`Status`] and have the failed
    /// [`Reason`].
    pub fn failed(&self) -> Runs {
        self.iter()
            .filter_map(|(uuid, run)| run.failed().then_some((*uuid, run.clone())))
            .collect()
    }

    /// Partition all [`Run`]s into started, succeeded, and failed.
    pub fn partition(&self) -> (Runs, Runs, Runs) {
        let mut started = IndexMap::new();
        let mut succeeded = IndexMap::new();
        let mut failed = IndexMap::new();

        for (uuid, run) in self.0.iter() {
            match run.status {
                Status::Started => started.insert(*uuid, run.clone()),
                Status::Finished(Reason::Succeeded) => succeeded.insert(*uuid, run.clone()),
                Status::Finished(Reason::Failed) => failed.insert(*uuid, run.clone()),
            };
        }
        (Runs(started), Runs(succeeded), Runs(failed))
    }

    /// Check is the set of [`Runs`] empty.
    pub fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    /// Get the number of [`Runs`] in the set.
    pub fn len(&self) -> usize {
        self.0.len()
    }

    /// Iterate over the [`Run`]s and their corresponding [`Uuid`]s.
    ///
    /// The order of the iteration is guaranteed to be insertion order.
    pub fn iter(&self) -> impl Iterator<Item = (&Uuid, &Run)> {
        self.0.iter()
    }
}

impl FromIterator<(Uuid, Run)> for Runs {
    fn from_iter<T: IntoIterator<Item = (Uuid, Run)>>(iter: T) -> Self {
        Self(iter.into_iter().collect())
    }
}

impl<'a> IntoIterator for &'a Runs {
    type Item = (&'a Uuid, &'a Run);
    type IntoIter = indexmap::map::Iter<'a, Uuid, Run>;

    fn into_iter(self) -> Self::IntoIter {
        self.0.iter()
    }
}

impl IntoIterator for Runs {
    type Item = (Uuid, Run);
    type IntoIter = indexmap::map::IntoIter<Uuid, Run>;

    fn into_iter(self) -> Self::IntoIter {
        self.0.into_iter()
    }
}

/// The collaborative object actions that are used for Radicle COB operations.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Action {
    /// Request a [`Job`] be created for the given [`Oid`].
    ///
    /// Note this is expected only once for initializing the [`Job`]. Every
    /// other `Request` for the same `Oid` will be ignored.
    Request {
        /// The commit the [`Job`] corresponds to.
        oid: Oid,
    },
    /// Notify that the node has started a [`Run`].
    Run {
        /// The [`Uuid`] that identifies this particular [`Run`] of the node.
        uuid: Uuid,
        /// The [`Url`] where the node will log any information or data.
        log: Url,
    },
    /// Notify that the node has finished a [`Run`].
    Finished {
        /// The [`Uuid`] that identifies the [`Run`] that is finished.
        uuid: Uuid,
        /// The [`Reason`] which the node finished with.
        reason: Reason,
    },
}

impl CobAction for Action {
    fn parents(&self) -> Vec<radicle::git::Oid> {
        match self {
            Action::Request { oid } => vec![*oid],
            _ => Vec::new(),
        }
    }
}

impl Job {
    /// Construct a new [`Job`].
    fn new(oid: Oid) -> Self {
        Self {
            oid,
            runs: HashMap::new(),
        }
    }

    /// Get the [`Oid`] this [`Job`] is running tasks for.
    pub fn oid(&self) -> &Oid {
        &self.oid
    }

    /// Get all the nodes that have started, but not finished, [`Runs`].
    pub fn started(&self) -> HashMap<NodeId, Runs> {
        self.filter_map_by(|runs| runs.started())
    }

    /// Get all the nodes that have started, and finished, [`Runs`].
    pub fn finished(&self) -> HashMap<NodeId, Runs> {
        self.filter_map_by(|runs| runs.finished())
    }

    /// Get all the nodes that have succeeded [`Runs`].
    pub fn succeeded(&self) -> HashMap<NodeId, Runs> {
        self.filter_map_by(|runs| runs.succeeded())
    }

    /// Get all the nodes that have failed [`Runs`].
    pub fn failed(&self) -> HashMap<NodeId, Runs> {
        self.filter_map_by(|runs| runs.failed())
    }

    /// Get all nodes' started, succeeded, and failed runs – respectively.
    pub fn partition(&self) -> HashMap<NodeId, (Runs, Runs, Runs)> {
        self.runs
            .iter()
            .map(|(node, runs)| (*node, runs.partition()))
            .collect()
    }

    /// Get the latest [`Run`] of the given [`NodeId`].
    pub fn latest_of(&self, node: &NodeId) -> Option<(&Uuid, &Run)> {
        self.runs
            .get(node)
            .and_then(|runs| runs.0.iter().next_back())
    }

    /// Get the latest [`Run`]s of all [`NodeId`]s.
    pub fn latest(&self) -> impl Iterator<Item = (&NodeId, &Uuid, &Run)> + '_ {
        self.runs
            .iter()
            .filter_map(|(node, runs)| runs.latest().map(|(uuid, run)| (node, uuid, run)))
    }

    /// Get the raw `HashMap` of the node runs.
    pub fn runs(&self) -> &HashMap<NodeId, Runs> {
        &self.runs
    }

    /// Get the [`Runs`] of a given [`NodeId`].
    pub fn runs_of(&self, node: &NodeId) -> Option<&Runs> {
        self.runs.get(node)
    }

    fn filter_map_by<P>(&self, p: P) -> HashMap<NodeId, Runs>
    where
        P: Fn(&Runs) -> Runs,
    {
        self.runs
            .iter()
            .filter_map(|(node, runs)| {
                let runs = p(runs);
                (!runs.is_empty()).then_some((*node, runs))
            })
            .collect()
    }

    fn insert(&mut self, node: NodeId, uuid: Uuid, run: Run) -> bool {
        let runs = self.runs.entry(node).or_default();
        if runs.contains_key(&uuid) {
            false
        } else {
            runs.insert(uuid, run);
            true
        }
    }

    fn update(
        &mut self,
        node: NodeId,
        uuid: Uuid,
        reason: Reason,
        timestamp: cob::Timestamp,
    ) -> bool {
        let Some(runs) = self.runs.get_mut(&node) else {
            return false;
        };
        let mut updated = false;
        runs.0.entry(uuid).and_modify(|run| {
            updated = true;
            *run = run.clone().finish(reason, timestamp);
        });
        updated
    }

    fn action(&mut self, node: NodeId, action: Action, timestamp: cob::Timestamp) {
        match action {
            // Cannot request for another `oid`, so we ignore any superfluous
            // request actions
            Action::Request { .. } => {}
            Action::Run { uuid, log } => {
                self.insert(node, uuid, Run::new(log, timestamp));
            }
            Action::Finished { uuid, reason } => {
                self.update(node, uuid, reason, timestamp);
            }
        }
    }
}

impl store::CobWithType for Job {
    fn type_name() -> &'static TypeName {
        &TYPENAME
    }
}

impl store::Cob for Job {
    type Action = Action;
    type Error = error::Build;

    fn from_root<R: ReadRepository>(op: Op<Self::Action>, repo: &R) -> Result<Self, Self::Error> {
        let mut actions = op.actions.into_iter();
        let Some(Action::Request { oid }) = actions.next() else {
            return Err(error::Build::Initial);
        };
        repo.commit(oid)
            .map_err(|err| error::Build::MissingCommit { oid, err })?;
        let mut runs = Self::new(oid);
        for action in actions {
            runs.action(op.author, action, op.timestamp);
        }
        Ok(runs)
    }

    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a radicle::cob::Entry>>(
        &mut self,
        op: Op<Self::Action>,
        _concurrent: I,
        _repo: &R,
    ) -> Result<(), Self::Error> {
        for action in op.actions {
            self.action(op.author, action, op.timestamp);
        }
        Ok(())
    }
}

impl<R: ReadRepository> Evaluate<R> for Job {
    type Error = error::Apply;

    fn init(entry: &radicle::cob::Entry, store: &R) -> Result<Self, Self::Error> {
        let op = Op::try_from(entry)?;
        let object = Job::from_root(op, store)?;
        Ok(object)
    }

    fn apply<'a, I: Iterator<Item = (&'a Oid, &'a radicle::cob::Entry)>>(
        &mut self,
        entry: &radicle::cob::Entry,
        concurrent: I,
        store: &R,
    ) -> Result<(), Self::Error> {
        let op = Op::try_from(entry)?;
        self.op(op, concurrent.map(|(_, e)| e), store)
            .map_err(error::Apply::from)
    }
}

/// A `Run` represents a task run for a [`Job`]. A `Run` is initially created in
/// with a [`Status::Started`], before processing has started.
///
/// The `Run` can then be marked as [`Status::Finished`] with a given
/// [`Reason`].
///
/// The `Run` also contains a [`Url`] so that any extra metadata, for example
/// logs, can be tracked outside of the `Run` itself.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Run {
    /// The status of the run.
    ///
    /// A run can be in one of three states:
    ///   - Started
    ///   - Succeeded – implies that it has finished
    ///   - Failed – implies that it has finished
    status: Status,
    /// The [`Url`] of the [`Run`] where information is logged by the node.
    log: Url,
    /// The timestamp of the last operation to affect the run.
    timestamp: cob::Timestamp,
}

impl Run {
    /// Create a new `Run` with a [`Url`] that ideally points to the log of that
    /// process.
    pub fn new(log: Url, timestamp: cob::Timestamp) -> Self {
        Self {
            status: Status::Started,
            log,
            timestamp,
        }
    }

    /// Mark the `Run` as finished.
    fn finish(self, reason: Reason, timestamp: cob::Timestamp) -> Self {
        Self {
            status: Status::Finished(reason),
            log: self.log,
            timestamp,
        }
    }

    /// Return URL for the log of this run.
    pub fn log(&self) -> &Url {
        &self.log
    }

    /// Return latest [`Status`] of the `Run`.
    pub fn status(&self) -> &Status {
        &self.status
    }

    /// Return the timestamp of the last update to the `Run`.
    pub fn timestamp(&self) -> &cob::Timestamp {
        &self.timestamp
    }

    /// Returns `true` if the status of the `Run` is [`Status::Started`].
    pub fn is_started(&self) -> bool {
        match self.status {
            Status::Started => true,
            Status::Finished(_) => false,
        }
    }

    /// Returns `true` if the status of the `Run` is [`Status::Finished`].
    pub fn is_finished(&self) -> bool {
        !self.is_started()
    }

    /// Returns `true` if the status of the `Run` is [`Status::Finished`] and
    /// the reason for finishing is [`Reason::Succeeded`].
    pub fn succeeded(&self) -> bool {
        match self.status {
            Status::Started => false,
            Status::Finished(Reason::Failed) => false,
            Status::Finished(Reason::Succeeded) => true,
        }
    }

    /// Returns `true` if the status of the `Run` is [`Status::Finished`] and
    /// the reason for finishing is [`Reason::Failed`].
    pub fn failed(&self) -> bool {
        match self.status {
            Status::Started => false,
            Status::Finished(Reason::Failed) => true,
            Status::Finished(Reason::Succeeded) => false,
        }
    }
}

/// The status of a [`Run`].
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Status {
    /// The [`Run`] has started.
    Started,
    /// The [`Run`] has finished with the given [`Reason`].
    Finished(Reason),
}

/// The reason for a [`Status`] to have finished.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Reason {
    /// The [`Run`] finished and failed.
    Failed,
    /// The [`Run`] finished and succeeded.
    Succeeded,
}

/// The storage for all [`Job`] items.
///
/// To get a handle for [`Jobs`] use [`Jobs::open`].
///
/// The read-only operations for [`Jobs`] are:
///
///   - [`Jobs::counts`]
///   - [`Jobs::get`]
///
/// The write operations for [`Jobs`] are:
///
///   - [`Jobs::create`]
///   - [`Jobs::get_mut`]
pub struct Jobs<'a, R> {
    raw: store::Store<'a, Job, R>,
}

impl<'a, R> Deref for Jobs<'a, R> {
    type Target = store::Store<'a, Job, R>;

    fn deref(&self) -> &Self::Target {
        &self.raw
    }
}

impl<'a, R> Jobs<'a, R>
where
    R: ReadRepository + cob::Store<Namespace = NodeId>,
{
    /// Open a jobs store.
    pub fn open(repository: &'a R) -> Result<Self, RepositoryError> {
        let identity = repository.identity_head()?;
        let raw = store::Store::open(repository)?.identity(identity);

        Ok(Self { raw })
    }

    /// Return the number of [`Job`]s in the store.
    pub fn counts(&self) -> Result<usize, store::Error> {
        Ok(self.all()?.count())
    }

    /// Get a [`Job`], given its [`JobId`] identifier.
    pub fn get(&self, id: &JobId) -> Result<Option<Job>, store::Error> {
        self.raw.get(id.as_object_id())
    }

    /// Find the [`Job`]s that are associated with the `wanted` commit.
    pub fn find_by_commit(&self, wanted: Oid) -> Result<FindByCommit<'a>, store::Error> {
        FindByCommit::new(self, wanted)
    }
}

/// [`Iterator`] for finding each [`Job`] where the [`Job::oid`] matches the
/// wanted commit. See [`Jobs::find_by_commit`].
pub struct FindByCommit<'a> {
    jobs: Box<dyn Iterator<Item = Result<(ObjectId, Job), cob::store::Error>> + 'a>,
    needle: Oid,
}

impl<'a> FindByCommit<'a> {
    fn new<R>(jobs: &Jobs<'a, R>, needle: Oid) -> Result<Self, cob::store::Error>
    where
        R: ReadRepository + cob::Store<Namespace = NodeId>,
    {
        Ok(Self {
            jobs: Box::new(jobs.all()?),
            needle,
        })
    }

    fn wanted(&self, job: &Job) -> bool {
        self.needle == *job.oid()
    }
}

impl Iterator for FindByCommit<'_> {
    type Item = Result<(JobId, Job), cob::store::Error>;

    fn next(&mut self) -> Option<Self::Item> {
        loop {
            let job = self.jobs.next()?;
            match job {
                Ok((id, job)) if self.wanted(&job) => return Some(Ok((JobId::from(id), job))),
                Ok(_) => continue,
                Err(err) => return Some(Err(err)),
            }
        }
    }
}

impl<'a, R> Jobs<'a, R>
where
    R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
{
    /// Get a [`JobMut`], given its [`JobId`] identifier.
    pub fn get_mut<'g>(&'g mut self, id: &JobId) -> Result<JobMut<'a, 'g, R>, store::Error> {
        let job = self
            .raw
            .get(id.as_object_id())?
            .ok_or_else(move || store::Error::NotFound(TYPENAME.clone(), (*id).into()))?;

        Ok(JobMut {
            id: *id,
            job,
            store: self,
        })
    }

    /// Create a new [`Job`] in the repository.
    pub fn create<'g, G>(
        &'g mut self,
        oid: Oid,
        signer: &Device<G>,
    ) -> Result<JobMut<'a, 'g, R>, store::Error>
    where
        G: Signer<crypto::Signature>,
    {
        let (id, job) = store::Transaction::initial::<_, _, Transaction<R>>(
            "Request job",
            &mut self.raw,
            signer,
            |tx, _| {
                tx.request(oid)?;
                Ok(())
            },
        )?;

        Ok(JobMut {
            id: id.into(),
            job,
            store: self,
        })
    }
}

/// A `JobMut` is a [`Job`] where the underlying `Job` can be mutated by
/// applying actions to it.
pub struct JobMut<'a, 'g, R> {
    /// Git object that the [`Job`] applies to.
    pub id: JobId,

    job: Job,
    store: &'g mut Jobs<'a, R>,
}

impl<R> Deref for JobMut<'_, '_, R> {
    type Target = Job;

    fn deref(&self) -> &Self::Target {
        &self.job
    }
}

impl<'a, 'g, R> JobMut<'a, 'g, R>
where
    R: WriteRepository + cob::Store<Namespace = NodeId>,
{
    /// Create a new `JobMut`.
    pub fn new(id: JobId, job: Job, store: &'g mut Jobs<'a, R>) -> Self {
        Self { id, job, store }
    }

    /// The COB identifier for the underlying [`Job`].
    pub fn id(&self) -> &JobId {
        &self.id
    }

    /// Reload the [`Job`] data from underlying storage.
    pub fn reload(&mut self) -> Result<(), store::Error> {
        self.job = self
            .store
            .get(&self.id)?
            .ok_or_else(|| store::Error::NotFound(TYPENAME.clone(), *self.id.as_object_id()))?;

        Ok(())
    }

    /// Start a new [`Run`] for the node, where the run is identified by the
    /// given [`Uuid`].
    pub fn run<G>(
        &mut self,
        uuid: Uuid,
        log: Url,
        signer: &Device<G>,
    ) -> Result<EntryId, store::Error>
    where
        G: Signer<crypto::Signature>,
    {
        self.transaction("Run node job", signer, |tx| tx.run(uuid, log))
    }

    /// Finish a [`Run`], identified by the given [`Uuid`], for the node, with
    /// the provided [`Reason`].
    pub fn finish<G>(
        &mut self,
        uuid: Uuid,
        reason: Reason,
        signer: &Device<G>,
    ) -> Result<EntryId, store::Error>
    where
        G: Signer<crypto::Signature>,
    {
        self.transaction("Finished node job", signer, |tx| tx.finish(uuid, reason))
    }

    /// Apply COB operations to a `JobMut`.
    fn transaction<G, F>(
        &mut self,
        message: &str,
        signer: &Device<G>,
        operations: F,
    ) -> Result<EntryId, store::Error>
    where
        G: Signer<crypto::Signature>,
        F: FnOnce(&mut Transaction<R>) -> Result<(), store::Error>,
    {
        let mut tx = Transaction::default();
        operations(&mut tx)?;

        let (job, commit) =
            tx.0.commit(message, self.id.into(), &mut self.store.raw, signer)?;
        self.job = job;

        Ok(commit)
    }
}

/// An update for the `Job` COB. This applies a change to the
/// in-memory computed representation of the COB.
struct Transaction<R: ReadRepository>(store::Transaction<Job, R>);

impl<R> From<store::Transaction<Job, R>> for Transaction<R>
where
    R: ReadRepository,
{
    fn from(tx: store::Transaction<Job, R>) -> Self {
        Self(tx)
    }
}

impl<R> From<Transaction<R>> for store::Transaction<Job, R>
where
    R: ReadRepository,
{
    fn from(Transaction(tx): Transaction<R>) -> Self {
        tx
    }
}

impl<R> Default for Transaction<R>
where
    R: ReadRepository,
{
    fn default() -> Self {
        Self(Default::default())
    }
}

impl<R> Deref for Transaction<R>
where
    R: ReadRepository,
{
    type Target = store::Transaction<Job, R>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<R> DerefMut for Transaction<R>
where
    R: ReadRepository,
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

impl<R> Transaction<R>
where
    R: ReadRepository,
{
    /// Add a request operation to the transaction.
    fn request(&mut self, oid: Oid) -> Result<(), store::Error> {
        self.0.push(Action::Request { oid })
    }

    /// Add a new `Run` to a transaction.
    fn run(&mut self, uuid: Uuid, log: Url) -> Result<(), store::Error> {
        self.0.push(Action::Run { uuid, log })
    }

    /// Mark a run as finished.
    fn finish(&mut self, uuid: Uuid, reason: Reason) -> Result<(), store::Error> {
        self.0.push(Action::Finished { uuid, reason })
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
    use radicle::git::{raw::Repository, Oid};
    use radicle::test;
    use url::Url;
    use uuid::Uuid;

    use crate::{Jobs, Reason, Run, Runs, Status};

    fn node_run() -> (Uuid, Url) {
        let uuid = Uuid::new_v4();
        let log = Url::parse(&format!("https://example.com/ci/logs?run={uuid}")).unwrap();
        (uuid, log)
    }

    fn commit(repo: &Repository) -> Oid {
        let tree = {
            let tree = repo.treebuilder(None).unwrap();
            let oid = tree.write().unwrap();
            repo.find_tree(oid).unwrap()
        };

        let author = repo.signature().unwrap();
        repo.commit(None, &author, &author, "Test Commit", &tree, &[])
            .unwrap()
            .into()
    }

    #[test]
    fn e2e() {
        let test::setup::NodeWithRepo {
            node: alice, repo, ..
        } = test::setup::NodeWithRepo::default();
        let oid = commit(&repo.backend);
        let mut jobs = Jobs::open(&*repo).unwrap();

        let test::setup::NodeWithRepo { node: bob, .. } = test::setup::NodeWithRepo::default();
        let mut job = jobs.create(oid, &alice.signer).unwrap();

        let (alice_uuid, alice_log) = node_run();
        job.run(alice_uuid, alice_log.clone(), &alice.signer)
            .unwrap();

        let (bob_uuid, bob_log) = node_run();
        job.run(bob_uuid, bob_log.clone(), &bob.signer).unwrap();

        let alice_runs = job.runs_of(alice.signer.public_key()).unwrap();
        assert!(alice_runs.contains_key(&alice_uuid));
        let run = alice_runs.get(&alice_uuid).unwrap();
        assert_eq!(run.status, Status::Started);
        assert_eq!(run.log, alice_log);

        let bob_runs = job.runs_of(bob.signer.public_key()).unwrap();
        assert!(bob_runs.contains_key(&bob_uuid));
        let run = bob_runs.get(&bob_uuid).unwrap();
        assert_eq!(run.status, Status::Started);
        assert_eq!(run.log, bob_log);

        job.finish(alice_uuid, Reason::Succeeded, &alice.signer)
            .unwrap();

        let finished = job.finished();
        assert!(finished.contains_key(alice.signer.public_key()));
        assert!(!finished.contains_key(bob.signer.public_key()));

        job.finish(bob_uuid, Reason::Failed, &bob.signer).unwrap();

        let succeeded = job.succeeded();
        assert!(succeeded.contains_key(alice.signer.public_key()));
        assert!(!succeeded.contains_key(bob.signer.public_key()));
        let failed = job.failed();
        assert!(!failed.contains_key(alice.signer.public_key()));
        assert!(failed.contains_key(bob.signer.public_key()));
        let started = job.started();
        assert!(started.is_empty());
    }

    #[test]
    fn missing_commit() {
        let test::setup::NodeWithRepo {
            node: alice, repo, ..
        } = test::setup::NodeWithRepo::default();
        let mut jobs = Jobs::open(&*repo).unwrap();
        let oid = test::arbitrary::oid();
        let job = jobs.create(oid, &alice.signer);
        assert!(job.is_err())
    }

    #[test]
    fn idempotent_create() {
        let test::setup::NodeWithRepo {
            node: alice, repo, ..
        } = test::setup::NodeWithRepo::default();
        let oid = commit(&repo.backend);
        let mut jobs = Jobs::open(&*repo).unwrap();
        let job1 = {
            let job1 = jobs.create(oid, &alice.signer).unwrap();
            job1.id
        };
        let job2 = {
            let job2 = jobs.create(oid, &alice.signer).unwrap();
            job2.id
        };

        assert_eq!(job1, job2);
        assert_eq!(jobs.get(&job1).unwrap(), jobs.get(&job2).unwrap());
    }

    #[test]
    fn runs_insertion_order_iteration() {
        let mut runs = Runs::default();
        let uuids = (0..10).map(|_| Uuid::new_v4()).collect::<Vec<_>>();
        for uuid in &uuids {
            runs.insert(
                *uuid,
                Run {
                    status: Status::Started,
                    log: Url::parse("https://example.com/ci/logs").unwrap(),
                    timestamp: radicle::cob::Timestamp::from_secs(1358182),
                },
            );
        }

        assert_eq!(
            uuids,
            runs.iter()
                .map(|(uuid, _)| uuid)
                .copied()
                .collect::<Vec<_>>()
        )
    }
}