Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src msg.rs
//! Messages for communicating with the adapter.
//!
//! The broker spawns an adapter child process, and sends it a request
//! via the child's stdin. The child sends responses via its stdout,
//! which the broker reads and processes. These messages are
//! represented using the types in this module.
//!
//! The types in this module are meant to be useful for anyone writing
//! a Radicle CI adapter.

#![deny(missing_docs)]

use std::{
    fmt,
    hash::{Hash, Hasher},
    io::{BufRead, BufReader, Read, Write},
    str::FromStr,
};

use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

use radicle::{
    Profile,
    identity::Did,
    node::{Alias, AliasStore},
    patch::{self, RevisionId},
    storage::{ReadRepository, ReadStorage, git::paths},
};
pub use radicle::{
    cob::patch::PatchId,
    prelude::{NodeId, RepoId},
};
pub use radicle_surf::Commit;

use crate::{
    ci_event::{CiEvent, CiEventV1},
    ergo::Oid,
    logger,
};

// This gets put into every [`Request`] message so the adapter can
// detect its getting a message it knows how to handle.
const PROTOCOL_VERSION: usize = 1;

/// The type of a run identifier. For maximum generality, this is a
/// string rather than an integer.
///
/// # Example
/// ```rust
/// use radicle_ci_broker::msg::RunId;
/// let id = RunId::from("abracadabra");
/// println!("{}", id.to_string());
/// ```
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RunId {
    id: String,
}

impl Default for RunId {
    fn default() -> Self {
        Self {
            id: Uuid::new_v4().to_string(),
        }
    }
}

impl Hash for RunId {
    fn hash<H: Hasher>(&self, h: &mut H) {
        self.id.hash(h);
    }
}

impl From<&str> for RunId {
    fn from(id: &str) -> Self {
        Self { id: id.into() }
    }
}

impl TryFrom<Value> for RunId {
    type Error = ();
    fn try_from(id: Value) -> Result<Self, Self::Error> {
        match id {
            Value::String(s) => Ok(Self::from(s.as_str())),
            _ => Err(()),
        }
    }
}

impl fmt::Display for RunId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.id)
    }
}

impl RunId {
    /// Return representation of identifier as a string slice.
    pub fn as_str(&self) -> &str {
        &self.id
    }
}

/// The result of a CI run.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum RunResult {
    /// CI run was successful.
    Success,

    /// CI run failed.
    Failure,
}

impl fmt::Display for RunResult {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        match self {
            Self::Failure => write!(f, "failure"),
            Self::Success => write!(f, "success"),
        }
    }
}

/// Build a [`Request`].
#[derive(Debug, Default)]
pub struct RequestBuilder<'a> {
    profile: Option<&'a Profile>,
    ci_event: Option<&'a CiEvent>,
}

impl<'a> RequestBuilder<'a> {
    /// Set the node profile to use.
    pub fn profile(mut self, profile: &'a Profile) -> Self {
        self.profile = Some(profile);
        self
    }

    /// Set the CI event to use.
    pub fn ci_event(mut self, event: &'a CiEvent) -> Self {
        self.ci_event = Some(event);
        self
    }

    /// Create a [`Request::Trigger``] message from a [`crate::ci_event::Civet`].
    pub fn build_trigger_from_ci_event(self) -> Result<Request, MessageError> {
        fn repository(repo: &RepoId, profile: &Profile) -> Result<Repository, MessageError> {
            let rad_repo = match profile.storage.repository(*repo) {
                Err(err) => {
                    return Err(MessageError::repository_error(err));
                }
                Ok(rad_repo) => rad_repo,
            };

            let project_info = match rad_repo.project() {
                Err(err) => {
                    return Err(MessageError::repository_error(err));
                }
                Ok(x) => x,
            };
            let identity = rad_repo
                .identity()
                .map_err(MessageError::repository_error)?;
            let delegates = rad_repo
                .delegates()
                .map_err(MessageError::repository_error)?;
            Ok(Repository {
                id: *repo,
                name: project_info.name().to_string(),
                description: project_info.description().to_string(),
                private: !identity.visibility().is_public(),
                default_branch: project_info.default_branch().to_string(),
                delegates: delegates.iter().copied().collect(),
            })
        }

        fn common_fields(
            event_type: EventType,
            repo: &RepoId,
            profile: &Profile,
        ) -> Result<EventCommonFields, MessageError> {
            let repository = match repository(repo, profile) {
                Err(err) => {
                    return Err(err)?;
                }
                Ok(x) => x,
            };
            Ok(EventCommonFields {
                version: PROTOCOL_VERSION,
                event_type,
                repository,
            })
        }

        fn author(node: &NodeId, profile: &Profile) -> Result<Author, MessageError> {
            let did = Did::from(*node);
            did_to_author(profile, &did)
        }

        fn commits(
            git_repo: &radicle_surf::Repository,
            tip: Oid,
            base: Oid,
        ) -> Result<Vec<Oid>, radicle_surf::Error> {
            // We have an object ID from the `radicle` crate. We need to
            // convert into a value of the type `radicle-surf` wants, which
            // is from `radicle-git-ext`. As of 2026-01-16, we have multiple
            // versions of `radicle-git-ext` in our dependency graph: the latest
            // version of `radicle-surf` depends on an older version of `radicle-git-ext`
            // thatn `radicle` itself does.

            // Unwrapping is OK here, because we know `tip` is OK.
            #[allow(clippy::unwrap_used)]
            let commit = {
                let ext_oid = radicle_surf::Oid::try_from(tip.as_ref()).unwrap();
                git_repo.commit(ext_oid).unwrap()
            };
            git_repo
                .history(commit)?
                .take_while(|c| {
                    if let Ok(c) = c {
                        c.id.as_bytes() != base.as_ref()
                    } else {
                        false
                    }
                })
                .filter_map(|result| {
                    if let Ok(commit) = result {
                        if let Ok(oid) = Oid::from_str(&commit.id.to_string()) {
                            Some(Ok(oid))
                        } else {
                            None
                        }
                    } else {
                        None
                    }
                })
                .collect::<Result<Vec<Oid>, _>>()
        }

        fn patch_cob(
            rad_repo: &radicle::storage::git::Repository,
            patch_id: &PatchId,
        ) -> Result<radicle::cob::patch::Patch, MessageError> {
            let x = match patch::Patches::open(rad_repo) {
                Err(err) => {
                    return Err(MessageError::repository_error(err))?;
                }
                Ok(x) => x,
            };

            let x = match x.get(patch_id) {
                Err(err) => {
                    return Err(MessageError::cob_store_error(err))?;
                }
                Ok(x) => x,
            };

            let x = match x {
                None => {
                    logger::patch_cob_lookup(&rad_repo.id, patch_id);
                    return Err(MessageError::PatchCob(*patch_id));
                }
                Some(x) => x,
            };

            Ok(x)
        }

        fn revisions(
            patch_cob: &radicle::cob::patch::Patch,
            author: &Author,
        ) -> Result<Vec<Revision>, MessageError> {
            patch_cob
                .revisions()
                .map(|(rid, r)| {
                    Ok::<Revision, MessageError>(Revision {
                        id: rid.into(),
                        author: author.clone(),
                        description: r.description().to_string(),
                        base: *r.base(),
                        oid: r.head(),
                        timestamp: r.timestamp().as_secs(),
                    })
                })
                .collect::<Result<Vec<Revision>, MessageError>>()
        }

        fn patch_base(
            patch_cob: &radicle::cob::patch::Patch,
            patch_id: &PatchId,
            author: &Author,
        ) -> Result<Oid, MessageError> {
            let author_pk = radicle::crypto::PublicKey::from(author.id);
            let (_id, revision) = match patch_cob.latest_by(&author_pk) {
                None => {
                    return Err(MessageError::LatestPatchRevision(*patch_id));
                }
                Some(x) => x,
            };
            Ok(*revision.base())
        }

        let profile = self.profile.ok_or(MessageError::NoProfile)?;

        match self.ci_event {
            None => Err(MessageError::CiEventNotSet),
            Some(CiEvent::V1(CiEventV1::BranchCreated {
                from_node,
                repo,
                branch,
                tip,
            })) => {
                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
                        pusher: author(from_node, profile)?,
                        before: *tip, // Branch created: we only use the tip
                        after: *tip,
                        branch: branch.as_str().to_string(),
                        commits: vec![*tip], // Branch created, only use tip.
                    }),
                    patch: None,
                })
            }
            Some(CiEvent::V1(CiEventV1::BranchUpdated {
                from_node,
                repo,
                branch,
                tip,
                old_tip,
            })) => {
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
                        .map_err(MessageError::radicle_surf_error)?;
                let mut commits =
                    commits(&git_repo, *tip, *old_tip).map_err(MessageError::radicle_surf_error)?;
                if commits.is_empty() {
                    commits = vec![*old_tip];
                }

                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
                        pusher: author(from_node, profile)?,
                        before: *tip, // Branch created: we only use the tip
                        after: *tip,
                        branch: branch.as_str().to_string(),
                        commits,
                    }),
                    patch: None,
                })
            }
            Some(CiEvent::V1(CiEventV1::BranchDeleted {
                from_node,
                repo,
                branch,
                tip,
            })) => {
                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
                        pusher: author(from_node, profile)?,
                        before: *tip, // Branch created: we only use the tip
                        after: *tip,
                        branch: branch.as_str().to_string(),
                        commits: vec![*tip],
                    }),
                    patch: None,
                })
            }
            Some(CiEvent::V1(CiEventV1::TagCreated {
                from_node,
                repo,
                tag,
                tip,
            })) => {
                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
                        pusher: author(from_node, profile)?,
                        before: *tip, // Branch created: we only use the tip
                        after: *tip,
                        branch: tag.as_str().to_string(),
                        commits: vec![*tip], // Branch created, only use tip.
                    }),
                    patch: None,
                })
            }
            Some(CiEvent::V1(CiEventV1::TagUpdated {
                from_node,
                repo,
                tag,
                tip,
                old_tip,
            })) => {
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
                        .map_err(MessageError::radicle_surf_error)?;
                let mut commits =
                    commits(&git_repo, *tip, *old_tip).map_err(MessageError::radicle_surf_error)?;
                if commits.is_empty() {
                    commits = vec![*old_tip];
                }

                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
                        pusher: author(from_node, profile)?,
                        before: *tip, // Branch created: we only use the tip
                        after: *tip,
                        branch: tag.as_str().to_string(),
                        commits,
                    }),
                    patch: None,
                })
            }
            Some(CiEvent::V1(CiEventV1::TagDeleted {
                from_node,
                repo,
                tag,
                tip,
            })) => {
                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
                        pusher: author(from_node, profile)?,
                        before: *tip, // Branch created: we only use the tip
                        after: *tip,
                        branch: tag.as_str().to_string(),
                        commits: vec![*tip],
                    }),
                    patch: None,
                })
            }
            Some(CiEvent::V1(CiEventV1::PatchCreated {
                from_node,
                repo,
                patch: patch_id,
                new_tip,
            })) => {
                let rad_repo = profile
                    .storage
                    .repository(*repo)
                    .map_err(MessageError::repository_error)?;
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
                        .map_err(MessageError::radicle_surf_error)?;
                let author = author(from_node, profile)?;
                let patch_cob = patch_cob(&rad_repo, patch_id)?;
                let revisions = revisions(&patch_cob, &author)?;
                let patch_base = patch_base(&patch_cob, patch_id, &author)?;
                let commits = commits(&git_repo, *new_tip, patch_base)
                    .map_err(MessageError::radicle_surf_error)?;

                Ok(Request::Trigger {
                    common: common_fields(EventType::Patch, repo, profile)?,
                    push: None,
                    patch: Some(PatchEvent {
                        action: PatchAction::Created,
                        patch: Patch {
                            id: **patch_id,
                            author,
                            title: patch_cob.title().to_string(),
                            state: State {
                                status: patch_cob.state().to_string(),
                                conflicts: match patch_cob.state() {
                                    patch::State::Open { conflicts, .. } => conflicts.to_vec(),
                                    _ => vec![],
                                },
                            },
                            before: patch_base,
                            after: *new_tip,
                            commits,
                            target: patch_cob
                                .target()
                                .head(&rad_repo)
                                .map_err(MessageError::repository_error)?,
                            labels: patch_cob.labels().map(|l| l.name().to_string()).collect(),
                            assignees: patch_cob.assignees().collect(),
                            revisions,
                        },
                    }),
                })
            }
            Some(CiEvent::V1(CiEventV1::PatchUpdated {
                from_node,
                repo,
                patch: patch_id,
                new_tip,
            })) => {
                let rad_repo = profile
                    .storage
                    .repository(*repo)
                    .map_err(MessageError::repository_error)?;
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
                        .map_err(MessageError::radicle_surf_error)?;
                let author = author(from_node, profile)?;
                let patch_cob = patch_cob(&rad_repo, patch_id)?;
                let revisions = revisions(&patch_cob, &author)?;
                let patch_base = patch_base(&patch_cob, patch_id, &author)?;
                let commits = commits(&git_repo, *new_tip, patch_base)
                    .map_err(MessageError::radicle_surf_error)?;

                Ok(Request::Trigger {
                    common: common_fields(EventType::Patch, repo, profile)?,
                    push: None,
                    patch: Some(PatchEvent {
                        action: PatchAction::Updated,
                        patch: Patch {
                            id: **patch_id,
                            author,
                            title: patch_cob.title().to_string(),
                            state: State {
                                status: patch_cob.state().to_string(),
                                conflicts: match patch_cob.state() {
                                    patch::State::Open { conflicts, .. } => conflicts.to_vec(),
                                    _ => vec![],
                                },
                            },
                            before: patch_base,
                            after: *new_tip,
                            commits,
                            target: patch_cob
                                .target()
                                .head(&rad_repo)
                                .map_err(MessageError::repository_error)?,
                            labels: patch_cob.labels().map(|l| l.name().to_string()).collect(),
                            assignees: patch_cob.assignees().collect(),
                            revisions,
                        },
                    }),
                })
            }
            Some(event) => Err(MessageError::UnknownCiEvent(event.clone())),
        }
    }
}

/// A request message sent by the broker to its adapter child process.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "request")]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum Request {
    /// Trigger a run.
    Trigger {
        /// Common fields for all message variants.
        #[serde(flatten)]
        common: EventCommonFields,

        /// The push event, if any. `branch` may tag name if tag event.
        #[serde(flatten)]
        push: Option<PushEvent>,

        /// The patch event, if any.
        #[serde(flatten)]
        patch: Option<PatchEvent>,
    },
}

impl Request {
    /// Repository that the event concerns.
    pub fn repo(&self) -> RepoId {
        match self {
            Self::Trigger {
                common,
                push: _,
                patch: _,
            } => common.repository.id,
        }
    }

    /// Return the commit the event concerns. In other words, the
    /// commit that CI should run against.
    pub fn commit(&self) -> Result<Oid, MessageError> {
        match self {
            Self::Trigger {
                common: _,
                push,
                patch,
            } => {
                if let Some(push) = push {
                    if let Some(oid) = push.commits.first() {
                        Ok(*oid)
                    } else {
                        Err(MessageError::NoCommits)
                    }
                } else if let Some(patch) = patch {
                    if let Some(oid) = patch.patch.commits.first() {
                        Ok(*oid)
                    } else {
                        Err(MessageError::NoCommits)
                    }
                } else {
                    Err(MessageError::UnknownRequest)
                }
            }
        }
    }

    /// Serialize the request as a pretty JSON, including the newline.
    /// This is meant for the broker to use.
    pub fn to_json_pretty(&self) -> Result<String, MessageError> {
        serde_json::to_string_pretty(&self).map_err(MessageError::serialize_request)
    }

    /// Serialize the request as a single-line JSON, including the
    /// newline. This is meant for the broker to use.
    pub fn to_writer<W: Write>(&self, mut writer: W) -> Result<(), MessageError> {
        let mut line = serde_json::to_string(&self).map_err(MessageError::serialize_request)?;
        line.push('\n');
        writer
            .write(line.as_bytes())
            .map_err(MessageError::WriteRequest)?;
        Ok(())
    }

    /// Read a request from a reader. This is meant for the adapter to
    /// use.
    pub fn from_reader<R: Read>(reader: R) -> Result<Self, MessageError> {
        let mut line = String::new();
        let mut r = BufReader::new(reader);
        r.read_line(&mut line).map_err(MessageError::ReadLine)?;
        let req: Self =
            serde_json::from_slice(line.as_bytes()).map_err(MessageError::deserialize_request)?;
        Ok(req)
    }

    /// Parse a request from a string. This is meant for tests to use.
    pub fn try_from_str(s: &str) -> Result<Self, MessageError> {
        let req: Self =
            serde_json::from_slice(s.as_bytes()).map_err(MessageError::deserialize_request)?;
        Ok(req)
    }
}

fn did_to_author(profile: &Profile, did: &Did) -> Result<Author, MessageError> {
    let alias = profile.aliases().alias(did);
    Ok(Author { id: *did, alias })
}

impl fmt::Display for Request {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{}",
            serde_json::to_string(&self).map_err(|_| fmt::Error)?
        )
    }
}

/// Type of event.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum EventType {
    /// A push event to a branch.
    Push,

    /// A new or changed patch.
    Patch,

    /// A new or changed tag.
    Tag,
}

/// Common fields in all variations of a [`Request`] message.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventCommonFields {
    /// Version of the request message.
    pub version: usize,

    /// The type of the event.
    pub event_type: EventType,

    /// The repository the event is related to.
    pub repository: Repository,
}

/// A push event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushEvent {
    /// The author of the change.
    pub pusher: Author,

    /// The commit on which the change is based.
    pub before: Oid,

    /// FIXME
    pub after: Oid,

    /// The branch where the push occurred.
    pub branch: String,

    /// The commits in the change.
    pub commits: Vec<Oid>,
}

/// An event related to a Radicle patch object.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatchEvent {
    /// What action has happened to the patch.
    pub action: PatchAction,

    /// Metadata about the patch.
    pub patch: Patch,
}

/// What action has happened to the patch?
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PatchAction {
    /// Patch has been created.
    Created,

    /// Patch has been updated.
    Updated,
}

#[cfg(test)]
impl PatchAction {
    fn as_str(&self) -> &str {
        match self {
            Self::Created => "created",
            Self::Updated => "updated",
        }
    }
}

impl TryFrom<&str> for PatchAction {
    type Error = MessageError;
    fn try_from(value: &str) -> Result<Self, Self::Error> {
        match value {
            "created" => Ok(Self::Created),
            "updated" => Ok(Self::Updated),
            _ => Err(Self::Error::UnknownPatchAction(value.into())),
        }
    }
}

/// Fields in a [`Request`] message describing the repository
/// concerned.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Repository {
    /// The unique repository id.
    pub id: RepoId,

    /// The name of the repository.
    pub name: String,

    /// A description of the repository.
    pub description: String,

    /// Is it a private repository?
    pub private: bool,

    /// The default branch in the repository: the branch that gets
    /// updated when a change is merged.
    pub default_branch: String,

    /// The delegates of the repository: those who can actually merge
    /// the change.
    pub delegates: Vec<Did>,
}

/// Fields describing the author of a change.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Author {
    /// The DID of the author. This is guaranteed to be unique.
    pub id: Did,

    /// The alias, or name, of the author. This need not be unique.
    pub alias: Option<Alias>,
}

impl std::fmt::Display for Author {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.id)?;
        if let Some(alias) = &self.alias {
            write!(f, " ({alias})")?;
        }
        Ok(())
    }
}

/// The state of a patch.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct State {
    /// State of the patch.
    pub status: String,

    /// FIXME.
    pub conflicts: Vec<(RevisionId, Oid)>,
}

/// Revision of a patch.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Revision {
    /// FIXME.
    pub id: Oid,

    /// Author of the revision.
    pub author: Author,

    /// Description of the revision.
    pub description: String,

    /// Base commit on which the revision of the patch should be
    /// applied.
    pub base: Oid,

    /// FIXME.
    pub oid: Oid,

    /// Time stamp of the revision.
    pub timestamp: u64,
}

impl std::fmt::Display for Revision {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.id)
    }
}

/// Metadata about a Radicle patch.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Patch {
    /// The patch id.
    pub id: Oid,

    /// The author of the patch.
    pub author: Author,

    /// The title of the patch.
    pub title: String,

    /// The state of the patch.
    pub state: State,

    /// The commit preceding the patch.
    pub before: Oid,

    /// FIXME.
    pub after: Oid,

    /// The list of commits in the patch.
    pub commits: Vec<Oid>,

    /// FIXME.
    pub target: Oid,

    /// Labels assigned to the patch.
    pub labels: Vec<String>,

    /// Who're in charge of the patch.
    pub assignees: Vec<Did>,

    /// List of revisions of the patch.
    pub revisions: Vec<Revision>,
}

/// A response message from the adapter child process to the broker.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "response")]
pub enum Response {
    /// A CI run has been triggered.
    Triggered {
        /// The identifier for the CI run assigned by the adapter.
        run_id: RunId,

        /// Optional informational URL for the run.
        info_url: Option<String>,
    },

    /// A CI run has finished.
    Finished {
        /// The result of a CI run.
        result: RunResult,
    },
}

impl Response {
    /// Create a `Response::Triggered` message without an info URL.
    pub fn triggered(run_id: RunId) -> Self {
        Self::Triggered {
            run_id,
            info_url: None,
        }
    }

    /// Create a `Response::Triggered` message with an info URL.
    pub fn triggered_with_url(run_id: RunId, url: &str) -> Self {
        Self::Triggered {
            run_id,
            info_url: Some(url.into()),
        }
    }

    /// Create a `Response::Finished` message.
    pub fn finished(result: RunResult) -> Self {
        Self::Finished { result }
    }

    /// Does the message indicate a result for the CI run?
    pub fn result(&self) -> Option<&RunResult> {
        if let Self::Finished { result } = self {
            Some(result)
        } else {
            None
        }
    }

    /// Serialize a response as a single-line JSON, including the
    /// newline. This is meant for the adapter to use.
    pub fn to_writer<W: Write>(&self, mut writer: W) -> Result<(), MessageError> {
        let mut line = serde_json::to_string(&self).map_err(MessageError::serialize_response)?;
        line.push('\n');
        writer
            .write(line.as_bytes())
            .map_err(MessageError::WriteResponse)?;
        Ok(())
    }

    /// Serialize the response as a pretty JSON, including the newline.
    /// This is meant for the broker to use.
    pub fn to_json_pretty(&self) -> Result<String, MessageError> {
        serde_json::to_string_pretty(&self).map_err(MessageError::serialize_response)
    }

    /// Read a response from a reader. This is meant for the broker to
    /// use.
    pub fn from_reader<R: Read + BufRead>(reader: &mut R) -> Result<Option<Self>, MessageError> {
        let mut line = String::new();
        let mut r = BufReader::new(reader);
        let n = r.read_line(&mut line).map_err(MessageError::ReadLine)?;
        if n == 0 {
            // Child's stdout was closed.
            Ok(None)
        } else {
            let req: Self = serde_json::from_slice(line.as_bytes())
                .map_err(MessageError::deserialize_response)?;
            Ok(Some(req))
        }
    }

    /// Read a response from a string slice. This is meant for the
    /// broker to use.
    #[allow(clippy::should_implement_trait)]
    pub fn from_str(line: &str) -> Result<Self, MessageError> {
        let req: Self =
            serde_json::from_slice(line.as_bytes()).map_err(MessageError::deserialize_response)?;
        Ok(req)
    }
}

/// All possible errors from the CI broker messages.
#[derive(Debug, thiserror::Error)]
pub enum MessageError {
    /// [`RequestBuilder`] does not have profile set.
    #[error("RequestBuilder must have profile set")]
    NoProfile,

    /// [`RequestBuilder`] does not have event set.
    #[error("RequestBuilder must have broker event set")]
    NoEvent,

    /// [`RequestBuilder`] does not have event handler set.
    #[error("RequestBuilder has no event handler set")]
    NoEventHandler,

    /// We got a CI event we don't know what to do with.
    #[error("programming error: unknown CI event {0:?}")]
    UnknownCiEvent(CiEvent),

    /// CI event was not set for [`RequestBuilder`].
    #[error("programming error: CI event was not set for request builder")]
    CiEventNotSet,

    /// Request message lacks commits to run CI on.
    #[error("unacceptable request message: lacks Git commits to run CI on")]
    NoCommits,

    /// Request message is neither a "push" nor a "patch"
    #[error("unacceptable request message: neither 'push' nor 'patch'")]
    UnknownRequest,

    /// Failed to serialize a request message as JSON. This should
    /// never happen and likely indicates a programming failure.
    #[error("failed to serialize a request into JSON to a file handle")]
    SerializeRequest(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Failed to serialize a response message as JSON. This should never
    /// happen and likely indicates a programming failure.
    #[error("failed to serialize a request into JSON to a file handle")]
    SerializeResponse(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Failed to write the serialized request message to an open file.
    #[error("failed to write JSON to file handle")]
    WriteRequest(#[source] std::io::Error),

    /// Failed to write the serialized response message to an open
    /// file.
    #[error("failed to write JSON to file handle")]
    WriteResponse(#[source] std::io::Error),

    /// Failed to read a line of JSON from an open file.
    #[error("failed to read line from file handle")]
    ReadLine(#[source] std::io::Error),

    /// Failed to parse JSON as a request or a response.
    #[error("failed to read a JSON request from a file handle")]
    DeserializeRequest(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Failed to parse JSON as a response or a response.
    #[error("failed to read a JSON response from a file handle")]
    DeserializeResponse(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Error retrieving context to generate trigger.
    #[error("could not generate trigger from event")]
    Trigger,

    /// Error looking up the patch COB.
    #[error("could look up patch COB {0}: not found?")]
    PatchCob(PatchId),

    /// Error looking up latest revision for a patch COB.
    #[error("failed to look up latest revision for patch {0}")]
    LatestPatchRevision(PatchId),

    /// Error from Radicle repository.
    #[error("error from Radicle repository")]
    RepositoryError(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Error from Radicle COB.
    #[error("error from Radicle collaborative object")]
    CobStoreError(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Error from `radicle-surf` crate.
    #[error("error from radicle-surf")]
    RadicleSurfError(#[source] Box<dyn std::error::Error + Send + 'static>),

    /// Trying to create a PatchAction from an invalid value.
    #[error("invalid patch action {0:?}")]
    UnknownPatchAction(String),
}

impl MessageError {
    fn serialize_request(err: serde_json::Error) -> Self {
        Self::SerializeRequest(Box::new(err))
    }

    fn serialize_response(err: serde_json::Error) -> Self {
        Self::SerializeResponse(Box::new(err))
    }

    fn deserialize_request(err: serde_json::Error) -> Self {
        Self::DeserializeRequest(Box::new(err))
    }

    fn deserialize_response(err: serde_json::Error) -> Self {
        Self::DeserializeResponse(Box::new(err))
    }

    fn repository_error(err: radicle::storage::RepositoryError) -> Self {
        Self::RepositoryError(Box::new(err))
    }

    fn cob_store_error(err: radicle::cob::store::Error) -> Self {
        Self::CobStoreError(Box::new(err))
    }

    fn radicle_surf_error(err: radicle_surf::Error) -> Self {
        Self::RadicleSurfError(Box::new(err))
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used)] // OK in tests: panic is fine
#[allow(missing_docs)]
pub mod trigger_from_ci_event_tests {
    use crate::ci_event::{CiEvent, CiEventV1};
    use crate::msg::{EventType, Request, RequestBuilder};
    use git_ref_format_core::RefString;
    use radicle::cob::Title;
    use radicle::patch::{MergeTarget, Patches};
    use radicle::prelude::Did;
    use radicle::storage::ReadRepository;

    use crate::test::{MockNode, TestResult};

    #[test]
    fn trigger_push_from_branch_created() -> TestResult<()> {
        let mock_node = MockNode::new()?;
        let profile = mock_node.profile()?;

        let project = mock_node.node().project();
        let (_, repo_head) = project.repo.head()?;
        let cmt = radicle::test::fixtures::commit(
            "my test commit",
            &[repo_head.into()],
            &project.backend,
        );

        let ci_event = CiEvent::V1(CiEventV1::BranchCreated {
            from_node: *profile.id(),
            repo: project.id,
            branch: RefString::try_from("master")?,
            tip: cmt,
        });

        let req = RequestBuilder::default()
            .profile(&profile)
            .ci_event(&ci_event)
            .build_trigger_from_ci_event()?;
        let Request::Trigger {
            common,
            push,
            patch,
        } = req;

        assert!(patch.is_none());
        assert!(push.is_some());
        assert_eq!(common.event_type, EventType::Push);
        assert_eq!(common.repository.id, project.id);
        assert_eq!(common.repository.name, project.repo.project()?.name());

        let push = push.unwrap();
        assert_eq!(push.after, cmt);
        assert_eq!(push.before, cmt); // in this case of branch creation
        assert_eq!(
            push.branch,
            "master".replace("$nid", &profile.id().to_string())
        );
        assert_eq!(push.commits, vec![cmt]);
        assert_eq!(push.pusher.id, Did::from(profile.id()));

        Ok(())
    }

    #[test]
    fn trigger_push_from_branch_updated() -> TestResult<()> {
        let mock_node = MockNode::new()?;
        let profile = mock_node.profile()?;

        let project = mock_node.node().project();
        let (_, repo_head) = project.repo.head()?;
        let cmt = radicle::test::fixtures::commit(
            "my test commit",
            &[repo_head.into()],
            &project.backend,
        );

        let ci_event = CiEvent::V1(CiEventV1::BranchUpdated {
            from_node: *profile.id(),
            repo: project.id,
            branch: RefString::try_from("master")?,
            old_tip: repo_head,
            tip: cmt,
        });

        let req = RequestBuilder::default()
            .profile(&profile)
            .ci_event(&ci_event)
            .build_trigger_from_ci_event()?;
        let Request::Trigger {
            common,
            push,
            patch,
        } = req;

        assert!(patch.is_none());
        assert!(push.is_some());
        assert_eq!(common.event_type, EventType::Push);
        assert_eq!(common.repository.id, project.id);
        assert_eq!(common.repository.name, project.repo.project()?.name());

        let push = push.unwrap();
        assert_eq!(push.after, cmt);
        assert_eq!(push.before, cmt); // in this case of branch creation
        assert_eq!(
            push.branch,
            "master".replace("$nid", &profile.id().to_string())
        );
        assert_eq!(push.commits, vec![cmt]);
        assert_eq!(push.pusher.id, Did::from(profile.id()));

        Ok(())
    }

    #[test]
    fn trigger_patch_from_patch_created() -> TestResult<()> {
        let mock_node = MockNode::new()?;
        let profile = mock_node.profile()?;

        let project = mock_node.node().project();
        let (_, repo_head) = project.repo.head()?;
        let cmt = radicle::test::fixtures::commit(
            "my test commit",
            &[repo_head.into()],
            &project.backend,
        );

        let node = mock_node.node();

        let mut patches = Patches::open(&project.repo)?;
        let mut cache = radicle::cob::cache::NoCache;
        let patch_cob = patches.create(
            Title::new("my patch title").unwrap(),
            "my patch description",
            MergeTarget::Delegates,
            repo_head,
            cmt,
            &[],
            &mut cache,
            &node.signer,
        )?;

        let ci_event = CiEvent::V1(CiEventV1::PatchCreated {
            from_node: *profile.id(),
            repo: project.id,
            patch: *patch_cob.id(),
            new_tip: cmt,
        });

        let req = RequestBuilder::default()
            .profile(&profile)
            .ci_event(&ci_event)
            .build_trigger_from_ci_event()?;
        let Request::Trigger {
            common,
            push,
            patch,
        } = req;

        assert!(patch.is_some());
        assert!(push.is_none());
        assert_eq!(common.event_type, EventType::Patch);
        assert_eq!(common.repository.id, project.id);
        assert_eq!(common.repository.name, project.repo.project()?.name());

        let patch = patch.unwrap();
        assert_eq!(patch.action.as_str(), "created");
        assert_eq!(patch.patch.id.to_string(), patch_cob.id.to_string());
        assert_eq!(patch.patch.title, patch_cob.title());
        assert_eq!(patch.patch.state.status, patch_cob.state().to_string());
        assert_eq!(patch.patch.target, repo_head);
        assert_eq!(patch.patch.revisions.len(), 1);
        let rev = patch.patch.revisions.first().unwrap();
        assert_eq!(rev.id.to_string(), patch_cob.id.to_string());
        assert_eq!(rev.base, repo_head);
        assert_eq!(rev.oid, cmt);
        assert_eq!(rev.author.id, Did::from(profile.id()));
        assert_eq!(rev.description, patch_cob.description());
        assert_eq!(rev.timestamp, patch_cob.timestamp().as_secs());
        assert_eq!(patch.patch.after, cmt);
        assert_eq!(patch.patch.before, repo_head);
        assert_eq!(patch.patch.commits, vec![cmt]);

        Ok(())
    }

    #[test]
    fn trigger_patch_from_patch_updated() -> TestResult<()> {
        let mock_node = MockNode::new()?;
        let profile = mock_node.profile()?;

        let project = mock_node.node().project();
        let (_, repo_head) = project.repo.head()?;
        let cmt = radicle::test::fixtures::commit(
            "my test commit",
            &[repo_head.into()],
            &project.backend,
        );

        let node = mock_node.node();

        let mut patches = Patches::open(&project.repo)?;
        let mut cache = radicle::cob::cache::NoCache;
        let patch_cob = patches.create(
            Title::new("my patch title").unwrap(),
            "my patch description",
            MergeTarget::Delegates,
            repo_head,
            cmt,
            &[],
            &mut cache,
            &node.signer,
        )?;

        let ci_event = CiEvent::V1(CiEventV1::PatchUpdated {
            from_node: *profile.id(),
            repo: project.id,
            patch: *patch_cob.id(),
            new_tip: cmt,
        });

        let req = RequestBuilder::default()
            .profile(&profile)
            .ci_event(&ci_event)
            .build_trigger_from_ci_event()?;
        let Request::Trigger {
            common,
            push,
            patch,
        } = req;

        assert!(patch.is_some());
        assert!(push.is_none());
        assert_eq!(common.event_type, EventType::Patch);
        assert_eq!(common.repository.id, project.id);
        assert_eq!(common.repository.name, project.repo.project()?.name());

        let patch = patch.unwrap();
        assert_eq!(patch.action.as_str(), "updated");
        assert_eq!(patch.patch.id.to_string(), patch_cob.id.to_string());
        assert_eq!(patch.patch.title, patch_cob.title());
        assert_eq!(patch.patch.state.status, patch_cob.state().to_string());
        assert_eq!(patch.patch.target, repo_head);
        assert_eq!(patch.patch.revisions.len(), 1);
        let rev = patch.patch.revisions.first().unwrap();
        assert_eq!(rev.id.to_string(), patch_cob.id.to_string());
        assert_eq!(rev.base, repo_head);
        assert_eq!(rev.oid, cmt);
        assert_eq!(rev.author.id, Did::from(profile.id()));
        assert_eq!(rev.description, patch_cob.description());
        assert_eq!(rev.timestamp, patch_cob.timestamp().as_secs());
        assert_eq!(patch.patch.after, cmt);
        assert_eq!(patch.patch.before, repo_head);
        assert_eq!(patch.patch.commits, vec![cmt]);

        Ok(())
    }
}

/// Helper functions for writing adapters.
pub mod helper {

    use std::{
        fs::{File, OpenOptions},
        io::Write,
        path::{Path, PathBuf},
        process::Command,
    };

    use nonempty::{NonEmpty, nonempty};
    use radicle::prelude::{Profile, RepoId};

    use time::{OffsetDateTime, macros::format_description};

    use super::{MessageError, Oid, Request, Response, RunId, RunResult};

    /// Exit code to indicate we didn't get one from the process.
    pub const NO_EXIT: i32 = 999;

    /// Read a request from stdin.
    pub fn read_request() -> Result<Request, MessageHelperError> {
        let req =
            Request::from_reader(std::io::stdin()).map_err(MessageHelperError::ReadRequest)?;
        Ok(req)
    }

    // Write response to stdout.
    fn write_response(resp: &Response) -> Result<(), MessageHelperError> {
        resp.to_writer(std::io::stdout())
            .map_err(|e| MessageHelperError::WriteResponse(resp.clone(), Box::new(e)))?;
        Ok(())
    }

    /// Write a "triggered" response to stdout.
    pub fn write_triggered(
        run_id: &RunId,
        info_url: Option<&str>,
    ) -> Result<(), MessageHelperError> {
        let response = if let Some(url) = info_url {
            Response::triggered_with_url(run_id.clone(), url)
        } else {
            Response::triggered(run_id.clone())
        };
        write_response(&response)?;
        Ok(())
    }

    /// Write a message indicating failure to stdout.
    pub fn write_failed() -> Result<(), MessageHelperError> {
        write_response(&Response::Finished {
            result: RunResult::Failure,
        })?;
        Ok(())
    }

    /// Write a message indicating success to stdout.
    pub fn write_succeeded() -> Result<(), MessageHelperError> {
        write_response(&Response::Finished {
            result: RunResult::Success,
        })?;
        Ok(())
    }

    /// Get sources from the local node.
    pub fn get_sources(
        adminlog: &mut AdminLog,
        dry_run: bool,
        repoid: RepoId,
        commit: Oid,
        src: &Path,
    ) -> Result<(), MessageHelperError> {
        let profile = Profile::load().map_err(MessageHelperError::Profile)?;
        let storage = profile.storage.path();
        let repo_path = storage.join(repoid.canonical());

        git_clone(adminlog, dry_run, &repo_path, src)?;
        git_checkout(adminlog, dry_run, commit, src)?;

        Ok(())
    }

    /// Run `git clone` for the repository.
    fn git_clone(
        adminlog: &mut AdminLog,
        dry_run: bool,
        repo_path: &Path,
        src: &Path,
    ) -> Result<(), MessageHelperError> {
        let repo_path = repo_path.to_string_lossy();
        let src = src.to_string_lossy();
        runcmd(
            adminlog,
            dry_run,
            &nonempty!["git", "clone", &repo_path, &src],
            Path::new("."),
        )?;
        Ok(())
    }

    // Check out the requested commit.
    fn git_checkout(
        adminlog: &mut AdminLog,
        dry_run: bool,
        commit: Oid,
        src: &Path,
    ) -> Result<(), MessageHelperError> {
        runcmd(
            adminlog,
            dry_run,
            &nonempty!["git", "config", "advice.detachedHead", "false"],
            src,
        )?;
        let commit = commit.to_string();
        runcmd(
            adminlog,
            dry_run,
            &nonempty!["git", "checkout", &commit],
            src,
        )?;
        Ok(())
    }

    /// Run a program.
    pub fn runcmd(
        adminlog: &mut AdminLog,
        dry_run: bool,
        argv: &NonEmpty<&str>,
        cwd: &Path,
    ) -> Result<(i32, Vec<u8>), MessageHelperError> {
        if dry_run {
            adminlog
                .writeln(&format!("runcmd: pretend to run: argv={argv:?}"))
                .map_err(MessageHelperError::AdminLog)?;
            return Ok((0, vec![]));
        }

        adminlog.writeln(&format!("runcmd: argv={argv:?}"))?;
        let output = Command::new("bash")
            .arg("-c")
            .arg(r#""$@" 2>&1"#)
            .arg("--")
            .args(argv)
            .current_dir(cwd)
            .output()
            .map_err(|err| MessageHelperError::Command("bash", err))?;

        let exit = output.status.code().unwrap_or(NO_EXIT);
        adminlog.writeln(&format!("runcmd: exit={exit}"))?;

        if exit != 0 {
            indented(adminlog, "stdout", &output.stdout);
            indented(adminlog, "stderr", &output.stderr);
        }

        Ok((exit, output.stdout))
    }

    /// Log a string with every line indented.
    pub fn indented(adminlog: &mut AdminLog, msg: &str, bytes: &[u8]) {
        if !bytes.is_empty() {
            adminlog.writeln(&format!("{msg}:")).ok();
            let text = String::from_utf8_lossy(bytes);
            for line in text.lines() {
                adminlog.writeln(&format!("    {line}")).ok();
            }
        }
    }

    /// A log for the administrator, whose duty it is to keep the
    /// software running.
    #[derive(Debug, Default)]
    pub struct AdminLog {
        filename: Option<PathBuf>,
        file: Option<File>,
        stderr: bool,
        buffer: Option<Vec<u8>>,
    }

    impl AdminLog {
        /// Create an admin log that doesn't write to a file, and
        /// neither to stderr.
        pub fn null() -> Self {
            Self::default()
        }

        /// Create an admin log that writes to stderr.
        pub fn stderr() -> Self {
            Self {
                filename: None,
                file: None,
                stderr: true,
                buffer: None,
            }
        }

        /// Capture output into a buffer.
        pub fn capture() -> Self {
            Self {
                filename: None,
                file: None,
                stderr: false,
                buffer: Some(vec![]),
            }
        }

        /// Return current buffer created by `capture`.
        pub fn capture_buffer(&self) -> Option<&[u8]> {
            self.buffer.as_deref()
        }

        /// Create an admin log that writes to a named file.
        pub fn open(filename: &Path) -> Result<Self, LogError> {
            let file = OpenOptions::new()
                .append(true)
                .create(true)
                .open(filename)
                .map_err(|e| LogError::OpenLogFile(filename.into(), e))?;
            Ok(Self {
                filename: Some(filename.into()),
                file: Some(file),
                stderr: false,
                buffer: None,
            })
        }

        /// Write a line to the admin log.
        pub fn writeln(&mut self, text: &str) -> Result<(), LogError> {
            self.write("[")?;
            self.write(&now()?)?;
            self.write("] ")?;
            self.write(text)?;
            self.write("\n")?;
            Ok(())
        }

        fn write(&mut self, msg: &str) -> Result<(), LogError> {
            if let Some(file) = &mut self.file {
                #[allow(clippy::unwrap_used)] // we know it's OK
                file.write_all(msg.as_bytes())
                    .map_err(|e| LogError::WriteLogFile(self.filename.clone().unwrap(), e))?;
            } else if self.stderr {
                std::io::stderr()
                    .write_all(msg.as_bytes())
                    .map_err(LogError::WriteLogStderr)?;
            } else if let Some(buf) = self.buffer.as_mut() {
                buf.extend_from_slice(msg.as_bytes());
            }
            Ok(())
        }
    }

    fn now() -> Result<String, LogError> {
        let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
        OffsetDateTime::now_utc()
            .format(fmt)
            .map_err(LogError::TimeFormat)
    }

    /// Possible errors from using the admin log.
    #[derive(Debug, thiserror::Error)]
    pub enum LogError {
        /// Can't open named file.
        #[error("failed to open log file {0}")]
        OpenLogFile(PathBuf, #[source] std::io::Error),

        /// Can't write to file.
        #[error("failed to write to log file {0}")]
        WriteLogFile(PathBuf, #[source] std::io::Error),

        /// Can't write to file.
        #[error("failed to write to log file {0}")]
        WriteLogStderr(#[source] std::io::Error),

        /// Can' format time stamp.
        #[error("failed to format time stamp")]
        TimeFormat(#[source] time::error::Format),
    }

    /// Possible errors from this module.
    #[derive(Debug, thiserror::Error)]
    pub enum MessageHelperError {
        /// Error reading request from stdin.
        #[error("failed to read request from stdin: {0:?}")]
        ReadRequest(#[source] MessageError),

        /// Error writing response to stdout.
        #[error("failed to write response to stdout: {0:?}")]
        WriteResponse(Response, #[source] Box<MessageError>),

        /// Can't load Radicle profile.
        #[error("failed to load Radicle profile")]
        Profile(#[source] radicle::profile::Error),

        /// Can't run command and capture its output.
        #[error("failed to run command {0}")]
        Command(&'static str, #[source] std::io::Error),

        /// Admin log error.
        #[error(transparent)]
        AdminLog(#[from] LogError),
    }
}