//! Messages for communicating with the adapter.
//!
//! The broker spawns an adapter child process, and sends it a request
//! via the child's stdin. The child sends responses via its stdout,
//! which the broker reads and processes. These messages are
//! represented using the types in this module.
//!
//! The types in this module are meant to be useful for anyone writing
//! a Radicle CI adapter.
#![deny(missing_docs)]
use std::{
fmt,
hash::{Hash, Hasher},
io::{BufRead, BufReader, Read, Write},
str::FromStr,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use radicle::{
Profile,
identity::Did,
node::{Alias, AliasStore},
patch::{self, RevisionId},
storage::{ReadRepository, ReadStorage, git::paths},
};
pub use radicle::{
cob::patch::PatchId,
prelude::{NodeId, RepoId},
};
pub use radicle_surf::Commit;
use crate::{
ci_event::{CiEvent, CiEventV1},
ergo::Oid,
logger,
};
// This gets put into every [`Request`] message so the adapter can
// detect its getting a message it knows how to handle.
const PROTOCOL_VERSION: usize = 1;
/// The type of a run identifier. For maximum generality, this is a
/// string rather than an integer.
///
/// # Example
/// ```rust
/// use radicle_ci_broker::msg::RunId;
/// let id = RunId::from("abracadabra");
/// println!("{}", id.to_string());
/// ```
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RunId {
id: String,
}
impl Default for RunId {
fn default() -> Self {
Self {
id: Uuid::new_v4().to_string(),
}
}
}
impl Hash for RunId {
fn hash<H: Hasher>(&self, h: &mut H) {
self.id.hash(h);
}
}
impl From<&str> for RunId {
fn from(id: &str) -> Self {
Self { id: id.into() }
}
}
impl TryFrom<Value> for RunId {
type Error = ();
fn try_from(id: Value) -> Result<Self, Self::Error> {
match id {
Value::String(s) => Ok(Self::from(s.as_str())),
_ => Err(()),
}
}
}
impl fmt::Display for RunId {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", self.id)
}
}
impl RunId {
/// Return representation of identifier as a string slice.
pub fn as_str(&self) -> &str {
&self.id
}
}
/// The result of a CI run.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum RunResult {
/// CI run was successful.
Success,
/// CI run failed.
Failure,
}
impl fmt::Display for RunResult {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match self {
Self::Failure => write!(f, "failure"),
Self::Success => write!(f, "success"),
}
}
}
/// Build a [`Request`].
#[derive(Debug, Default)]
pub struct RequestBuilder<'a> {
profile: Option<&'a Profile>,
ci_event: Option<&'a CiEvent>,
}
impl<'a> RequestBuilder<'a> {
/// Set the node profile to use.
pub fn profile(mut self, profile: &'a Profile) -> Self {
self.profile = Some(profile);
self
}
/// Set the CI event to use.
pub fn ci_event(mut self, event: &'a CiEvent) -> Self {
self.ci_event = Some(event);
self
}
/// Create a [`Request::Trigger``] message from a [`crate::ci_event::Civet`].
pub fn build_trigger_from_ci_event(self) -> Result<Request, MessageError> {
fn repository(repo: &RepoId, profile: &Profile) -> Result<Repository, MessageError> {
let rad_repo = match profile.storage.repository(*repo) {
Err(err) => {
return Err(MessageError::repository_error(err));
}
Ok(rad_repo) => rad_repo,
};
let project_info = match rad_repo.project() {
Err(err) => {
return Err(MessageError::repository_error(err));
}
Ok(x) => x,
};
let identity = rad_repo
.identity()
.map_err(MessageError::repository_error)?;
let delegates = rad_repo
.delegates()
.map_err(MessageError::repository_error)?;
Ok(Repository {
id: *repo,
name: project_info.name().to_string(),
description: project_info.description().to_string(),
private: !identity.visibility().is_public(),
default_branch: project_info.default_branch().to_string(),
delegates: delegates.iter().copied().collect(),
})
}
fn common_fields(
event_type: EventType,
repo: &RepoId,
profile: &Profile,
) -> Result<EventCommonFields, MessageError> {
let repository = match repository(repo, profile) {
Err(err) => {
return Err(err)?;
}
Ok(x) => x,
};
Ok(EventCommonFields {
version: PROTOCOL_VERSION,
event_type,
repository,
})
}
fn author(node: &NodeId, profile: &Profile) -> Result<Author, MessageError> {
let did = Did::from(*node);
did_to_author(profile, &did)
}
fn commits(
git_repo: &radicle_surf::Repository,
tip: Oid,
base: Oid,
) -> Result<Vec<Oid>, radicle_surf::Error> {
// We have an object ID from the `radicle` crate. We need to
// convert into a value of the type `radicle-surf` wants, which
// is from `radicle-git-ext`. As of 2026-01-16, we have multiple
// versions of `radicle-git-ext` in our dependency graph: the latest
// version of `radicle-surf` depends on an older version of `radicle-git-ext`
// thatn `radicle` itself does.
// Unwrapping is OK here, because we know `tip` is OK.
#[allow(clippy::unwrap_used)]
let commit = {
let ext_oid = radicle_surf::Oid::try_from(tip.as_ref()).unwrap();
git_repo.commit(ext_oid).unwrap()
};
git_repo
.history(commit)?
.take_while(|c| {
if let Ok(c) = c {
c.id.as_bytes() != base.as_ref()
} else {
false
}
})
.filter_map(|result| {
if let Ok(commit) = result {
if let Ok(oid) = Oid::from_str(&commit.id.to_string()) {
Some(Ok(oid))
} else {
None
}
} else {
None
}
})
.collect::<Result<Vec<Oid>, _>>()
}
fn patch_cob(
rad_repo: &radicle::storage::git::Repository,
patch_id: &PatchId,
) -> Result<radicle::cob::patch::Patch, MessageError> {
let x = match patch::Patches::open(rad_repo) {
Err(err) => {
return Err(MessageError::repository_error(err))?;
}
Ok(x) => x,
};
let x = match x.get(patch_id) {
Err(err) => {
return Err(MessageError::cob_store_error(err))?;
}
Ok(x) => x,
};
let x = match x {
None => {
logger::patch_cob_lookup(&rad_repo.id, patch_id);
return Err(MessageError::PatchCob(*patch_id));
}
Some(x) => x,
};
Ok(x)
}
fn revisions(
patch_cob: &radicle::cob::patch::Patch,
author: &Author,
) -> Result<Vec<Revision>, MessageError> {
patch_cob
.revisions()
.map(|(rid, r)| {
Ok::<Revision, MessageError>(Revision {
id: rid.into(),
author: author.clone(),
description: r.description().to_string(),
base: *r.base(),
oid: r.head(),
timestamp: r.timestamp().as_secs(),
})
})
.collect::<Result<Vec<Revision>, MessageError>>()
}
fn patch_base(
patch_cob: &radicle::cob::patch::Patch,
patch_id: &PatchId,
author: &Author,
) -> Result<Oid, MessageError> {
let author_pk = radicle::crypto::PublicKey::from(author.id);
let (_id, revision) = match patch_cob.latest_by(&author_pk) {
None => {
return Err(MessageError::LatestPatchRevision(*patch_id));
}
Some(x) => x,
};
Ok(*revision.base())
}
let profile = self.profile.ok_or(MessageError::NoProfile)?;
match self.ci_event {
None => Err(MessageError::CiEventNotSet),
Some(CiEvent::V1(CiEventV1::BranchCreated {
from_node,
repo,
branch,
tip,
})) => {
Ok(Request::Trigger {
common: common_fields(EventType::Push, repo, profile)?,
push: Some(PushEvent {
pusher: author(from_node, profile)?,
before: *tip, // Branch created: we only use the tip
after: *tip,
branch: branch.as_str().to_string(),
commits: vec![*tip], // Branch created, only use tip.
}),
patch: None,
})
}
Some(CiEvent::V1(CiEventV1::BranchUpdated {
from_node,
repo,
branch,
tip,
old_tip,
})) => {
let git_repo =
radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
.map_err(MessageError::radicle_surf_error)?;
let mut commits =
commits(&git_repo, *tip, *old_tip).map_err(MessageError::radicle_surf_error)?;
if commits.is_empty() {
commits = vec![*old_tip];
}
Ok(Request::Trigger {
common: common_fields(EventType::Push, repo, profile)?,
push: Some(PushEvent {
pusher: author(from_node, profile)?,
before: *tip, // Branch created: we only use the tip
after: *tip,
branch: branch.as_str().to_string(),
commits,
}),
patch: None,
})
}
Some(CiEvent::V1(CiEventV1::BranchDeleted {
from_node,
repo,
branch,
tip,
})) => {
Ok(Request::Trigger {
common: common_fields(EventType::Push, repo, profile)?,
push: Some(PushEvent {
pusher: author(from_node, profile)?,
before: *tip, // Branch created: we only use the tip
after: *tip,
branch: branch.as_str().to_string(),
commits: vec![*tip],
}),
patch: None,
})
}
Some(CiEvent::V1(CiEventV1::TagCreated {
from_node,
repo,
tag,
tip,
})) => {
Ok(Request::Trigger {
common: common_fields(EventType::Push, repo, profile)?,
push: Some(PushEvent {
pusher: author(from_node, profile)?,
before: *tip, // Branch created: we only use the tip
after: *tip,
branch: tag.as_str().to_string(),
commits: vec![*tip], // Branch created, only use tip.
}),
patch: None,
})
}
Some(CiEvent::V1(CiEventV1::TagUpdated {
from_node,
repo,
tag,
tip,
old_tip,
})) => {
let git_repo =
radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
.map_err(MessageError::radicle_surf_error)?;
let mut commits =
commits(&git_repo, *tip, *old_tip).map_err(MessageError::radicle_surf_error)?;
if commits.is_empty() {
commits = vec![*old_tip];
}
Ok(Request::Trigger {
common: common_fields(EventType::Push, repo, profile)?,
push: Some(PushEvent {
pusher: author(from_node, profile)?,
before: *tip, // Branch created: we only use the tip
after: *tip,
branch: tag.as_str().to_string(),
commits,
}),
patch: None,
})
}
Some(CiEvent::V1(CiEventV1::TagDeleted {
from_node,
repo,
tag,
tip,
})) => {
Ok(Request::Trigger {
common: common_fields(EventType::Push, repo, profile)?,
push: Some(PushEvent {
pusher: author(from_node, profile)?,
before: *tip, // Branch created: we only use the tip
after: *tip,
branch: tag.as_str().to_string(),
commits: vec![*tip],
}),
patch: None,
})
}
Some(CiEvent::V1(CiEventV1::PatchCreated {
from_node,
repo,
patch: patch_id,
new_tip,
})) => {
let rad_repo = profile
.storage
.repository(*repo)
.map_err(MessageError::repository_error)?;
let git_repo =
radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
.map_err(MessageError::radicle_surf_error)?;
let author = author(from_node, profile)?;
let patch_cob = patch_cob(&rad_repo, patch_id)?;
let revisions = revisions(&patch_cob, &author)?;
let patch_base = patch_base(&patch_cob, patch_id, &author)?;
let commits = commits(&git_repo, *new_tip, patch_base)
.map_err(MessageError::radicle_surf_error)?;
Ok(Request::Trigger {
common: common_fields(EventType::Patch, repo, profile)?,
push: None,
patch: Some(PatchEvent {
action: PatchAction::Created,
patch: Patch {
id: **patch_id,
author,
title: patch_cob.title().to_string(),
state: State {
status: patch_cob.state().to_string(),
conflicts: match patch_cob.state() {
patch::State::Open { conflicts, .. } => conflicts.to_vec(),
_ => vec![],
},
},
before: patch_base,
after: *new_tip,
commits,
target: patch_cob
.target()
.head(&rad_repo)
.map_err(MessageError::repository_error)?,
labels: patch_cob.labels().map(|l| l.name().to_string()).collect(),
assignees: patch_cob.assignees().collect(),
revisions,
},
}),
})
}
Some(CiEvent::V1(CiEventV1::PatchUpdated {
from_node,
repo,
patch: patch_id,
new_tip,
})) => {
let rad_repo = profile
.storage
.repository(*repo)
.map_err(MessageError::repository_error)?;
let git_repo =
radicle_surf::Repository::open(paths::repository(&profile.storage, repo))
.map_err(MessageError::radicle_surf_error)?;
let author = author(from_node, profile)?;
let patch_cob = patch_cob(&rad_repo, patch_id)?;
let revisions = revisions(&patch_cob, &author)?;
let patch_base = patch_base(&patch_cob, patch_id, &author)?;
let commits = commits(&git_repo, *new_tip, patch_base)
.map_err(MessageError::radicle_surf_error)?;
Ok(Request::Trigger {
common: common_fields(EventType::Patch, repo, profile)?,
push: None,
patch: Some(PatchEvent {
action: PatchAction::Updated,
patch: Patch {
id: **patch_id,
author,
title: patch_cob.title().to_string(),
state: State {
status: patch_cob.state().to_string(),
conflicts: match patch_cob.state() {
patch::State::Open { conflicts, .. } => conflicts.to_vec(),
_ => vec![],
},
},
before: patch_base,
after: *new_tip,
commits,
target: patch_cob
.target()
.head(&rad_repo)
.map_err(MessageError::repository_error)?,
labels: patch_cob.labels().map(|l| l.name().to_string()).collect(),
assignees: patch_cob.assignees().collect(),
revisions,
},
}),
})
}
Some(event) => Err(MessageError::UnknownCiEvent(event.clone())),
}
}
}
/// A request message sent by the broker to its adapter child process.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "request")]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum Request {
/// Trigger a run.
Trigger {
/// Common fields for all message variants.
#[serde(flatten)]
common: EventCommonFields,
/// The push event, if any. `branch` may tag name if tag event.
#[serde(flatten)]
push: Option<PushEvent>,
/// The patch event, if any.
#[serde(flatten)]
patch: Option<PatchEvent>,
},
}
impl Request {
/// Repository that the event concerns.
pub fn repo(&self) -> RepoId {
match self {
Self::Trigger {
common,
push: _,
patch: _,
} => common.repository.id,
}
}
/// Return the commit the event concerns. In other words, the
/// commit that CI should run against.
pub fn commit(&self) -> Result<Oid, MessageError> {
match self {
Self::Trigger {
common: _,
push,
patch,
} => {
if let Some(push) = push {
if let Some(oid) = push.commits.first() {
Ok(*oid)
} else {
Err(MessageError::NoCommits)
}
} else if let Some(patch) = patch {
if let Some(oid) = patch.patch.commits.first() {
Ok(*oid)
} else {
Err(MessageError::NoCommits)
}
} else {
Err(MessageError::UnknownRequest)
}
}
}
}
/// Serialize the request as a pretty JSON, including the newline.
/// This is meant for the broker to use.
pub fn to_json_pretty(&self) -> Result<String, MessageError> {
serde_json::to_string_pretty(&self).map_err(MessageError::serialize_request)
}
/// Serialize the request as a single-line JSON, including the
/// newline. This is meant for the broker to use.
pub fn to_writer<W: Write>(&self, mut writer: W) -> Result<(), MessageError> {
let mut line = serde_json::to_string(&self).map_err(MessageError::serialize_request)?;
line.push('\n');
writer
.write(line.as_bytes())
.map_err(MessageError::WriteRequest)?;
Ok(())
}
/// Read a request from a reader. This is meant for the adapter to
/// use.
pub fn from_reader<R: Read>(reader: R) -> Result<Self, MessageError> {
let mut line = String::new();
let mut r = BufReader::new(reader);
r.read_line(&mut line).map_err(MessageError::ReadLine)?;
let req: Self =
serde_json::from_slice(line.as_bytes()).map_err(MessageError::deserialize_request)?;
Ok(req)
}
/// Parse a request from a string. This is meant for tests to use.
pub fn try_from_str(s: &str) -> Result<Self, MessageError> {
let req: Self =
serde_json::from_slice(s.as_bytes()).map_err(MessageError::deserialize_request)?;
Ok(req)
}
}
fn did_to_author(profile: &Profile, did: &Did) -> Result<Author, MessageError> {
let alias = profile.aliases().alias(did);
Ok(Author { id: *did, alias })
}
impl fmt::Display for Request {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
serde_json::to_string(&self).map_err(|_| fmt::Error)?
)
}
}
/// Type of event.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum EventType {
/// A push event to a branch.
Push,
/// A new or changed patch.
Patch,
/// A new or changed tag.
Tag,
}
/// Common fields in all variations of a [`Request`] message.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventCommonFields {
/// Version of the request message.
pub version: usize,
/// The type of the event.
pub event_type: EventType,
/// The repository the event is related to.
pub repository: Repository,
}
/// A push event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushEvent {
/// The author of the change.
pub pusher: Author,
/// The commit on which the change is based.
pub before: Oid,
/// FIXME
pub after: Oid,
/// The branch where the push occurred.
pub branch: String,
/// The commits in the change.
pub commits: Vec<Oid>,
}
/// An event related to a Radicle patch object.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatchEvent {
/// What action has happened to the patch.
pub action: PatchAction,
/// Metadata about the patch.
pub patch: Patch,
}
/// What action has happened to the patch?
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PatchAction {
/// Patch has been created.
Created,
/// Patch has been updated.
Updated,
}
#[cfg(test)]
impl PatchAction {
fn as_str(&self) -> &str {
match self {
Self::Created => "created",
Self::Updated => "updated",
}
}
}
impl TryFrom<&str> for PatchAction {
type Error = MessageError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"created" => Ok(Self::Created),
"updated" => Ok(Self::Updated),
_ => Err(Self::Error::UnknownPatchAction(value.into())),
}
}
}
/// Fields in a [`Request`] message describing the repository
/// concerned.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Repository {
/// The unique repository id.
pub id: RepoId,
/// The name of the repository.
pub name: String,
/// A description of the repository.
pub description: String,
/// Is it a private repository?
pub private: bool,
/// The default branch in the repository: the branch that gets
/// updated when a change is merged.
pub default_branch: String,
/// The delegates of the repository: those who can actually merge
/// the change.
pub delegates: Vec<Did>,
}
/// Fields describing the author of a change.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Author {
/// The DID of the author. This is guaranteed to be unique.
pub id: Did,
/// The alias, or name, of the author. This need not be unique.
pub alias: Option<Alias>,
}
impl std::fmt::Display for Author {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)?;
if let Some(alias) = &self.alias {
write!(f, " ({alias})")?;
}
Ok(())
}
}
/// The state of a patch.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct State {
/// State of the patch.
pub status: String,
/// FIXME.
pub conflicts: Vec<(RevisionId, Oid)>,
}
/// Revision of a patch.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Revision {
/// FIXME.
pub id: Oid,
/// Author of the revision.
pub author: Author,
/// Description of the revision.
pub description: String,
/// Base commit on which the revision of the patch should be
/// applied.
pub base: Oid,
/// FIXME.
pub oid: Oid,
/// Time stamp of the revision.
pub timestamp: u64,
}
impl std::fmt::Display for Revision {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)
}
}
/// Metadata about a Radicle patch.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Patch {
/// The patch id.
pub id: Oid,
/// The author of the patch.
pub author: Author,
/// The title of the patch.
pub title: String,
/// The state of the patch.
pub state: State,
/// The commit preceding the patch.
pub before: Oid,
/// FIXME.
pub after: Oid,
/// The list of commits in the patch.
pub commits: Vec<Oid>,
/// FIXME.
pub target: Oid,
/// Labels assigned to the patch.
pub labels: Vec<String>,
/// Who're in charge of the patch.
pub assignees: Vec<Did>,
/// List of revisions of the patch.
pub revisions: Vec<Revision>,
}
/// A response message from the adapter child process to the broker.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "response")]
pub enum Response {
/// A CI run has been triggered.
Triggered {
/// The identifier for the CI run assigned by the adapter.
run_id: RunId,
/// Optional informational URL for the run.
info_url: Option<String>,
},
/// A CI run has finished.
Finished {
/// The result of a CI run.
result: RunResult,
},
}
impl Response {
/// Create a `Response::Triggered` message without an info URL.
pub fn triggered(run_id: RunId) -> Self {
Self::Triggered {
run_id,
info_url: None,
}
}
/// Create a `Response::Triggered` message with an info URL.
pub fn triggered_with_url(run_id: RunId, url: &str) -> Self {
Self::Triggered {
run_id,
info_url: Some(url.into()),
}
}
/// Create a `Response::Finished` message.
pub fn finished(result: RunResult) -> Self {
Self::Finished { result }
}
/// Does the message indicate a result for the CI run?
pub fn result(&self) -> Option<&RunResult> {
if let Self::Finished { result } = self {
Some(result)
} else {
None
}
}
/// Serialize a response as a single-line JSON, including the
/// newline. This is meant for the adapter to use.
pub fn to_writer<W: Write>(&self, mut writer: W) -> Result<(), MessageError> {
let mut line = serde_json::to_string(&self).map_err(MessageError::serialize_response)?;
line.push('\n');
writer
.write(line.as_bytes())
.map_err(MessageError::WriteResponse)?;
Ok(())
}
/// Serialize the response as a pretty JSON, including the newline.
/// This is meant for the broker to use.
pub fn to_json_pretty(&self) -> Result<String, MessageError> {
serde_json::to_string_pretty(&self).map_err(MessageError::serialize_response)
}
/// Read a response from a reader. This is meant for the broker to
/// use.
pub fn from_reader<R: Read + BufRead>(reader: &mut R) -> Result<Option<Self>, MessageError> {
let mut line = String::new();
let mut r = BufReader::new(reader);
let n = r.read_line(&mut line).map_err(MessageError::ReadLine)?;
if n == 0 {
// Child's stdout was closed.
Ok(None)
} else {
let req: Self = serde_json::from_slice(line.as_bytes())
.map_err(MessageError::deserialize_response)?;
Ok(Some(req))
}
}
/// Read a response from a string slice. This is meant for the
/// broker to use.
#[allow(clippy::should_implement_trait)]
pub fn from_str(line: &str) -> Result<Self, MessageError> {
let req: Self =
serde_json::from_slice(line.as_bytes()).map_err(MessageError::deserialize_response)?;
Ok(req)
}
}
/// All possible errors from the CI broker messages.
#[derive(Debug, thiserror::Error)]
pub enum MessageError {
/// [`RequestBuilder`] does not have profile set.
#[error("RequestBuilder must have profile set")]
NoProfile,
/// [`RequestBuilder`] does not have event set.
#[error("RequestBuilder must have broker event set")]
NoEvent,
/// [`RequestBuilder`] does not have event handler set.
#[error("RequestBuilder has no event handler set")]
NoEventHandler,
/// We got a CI event we don't know what to do with.
#[error("programming error: unknown CI event {0:?}")]
UnknownCiEvent(CiEvent),
/// CI event was not set for [`RequestBuilder`].
#[error("programming error: CI event was not set for request builder")]
CiEventNotSet,
/// Request message lacks commits to run CI on.
#[error("unacceptable request message: lacks Git commits to run CI on")]
NoCommits,
/// Request message is neither a "push" nor a "patch"
#[error("unacceptable request message: neither 'push' nor 'patch'")]
UnknownRequest,
/// Failed to serialize a request message as JSON. This should
/// never happen and likely indicates a programming failure.
#[error("failed to serialize a request into JSON to a file handle")]
SerializeRequest(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Failed to serialize a response message as JSON. This should never
/// happen and likely indicates a programming failure.
#[error("failed to serialize a request into JSON to a file handle")]
SerializeResponse(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Failed to write the serialized request message to an open file.
#[error("failed to write JSON to file handle")]
WriteRequest(#[source] std::io::Error),
/// Failed to write the serialized response message to an open
/// file.
#[error("failed to write JSON to file handle")]
WriteResponse(#[source] std::io::Error),
/// Failed to read a line of JSON from an open file.
#[error("failed to read line from file handle")]
ReadLine(#[source] std::io::Error),
/// Failed to parse JSON as a request or a response.
#[error("failed to read a JSON request from a file handle")]
DeserializeRequest(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Failed to parse JSON as a response or a response.
#[error("failed to read a JSON response from a file handle")]
DeserializeResponse(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Error retrieving context to generate trigger.
#[error("could not generate trigger from event")]
Trigger,
/// Error looking up the patch COB.
#[error("could look up patch COB {0}: not found?")]
PatchCob(PatchId),
/// Error looking up latest revision for a patch COB.
#[error("failed to look up latest revision for patch {0}")]
LatestPatchRevision(PatchId),
/// Error from Radicle repository.
#[error("error from Radicle repository")]
RepositoryError(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Error from Radicle COB.
#[error("error from Radicle collaborative object")]
CobStoreError(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Error from `radicle-surf` crate.
#[error("error from radicle-surf")]
RadicleSurfError(#[source] Box<dyn std::error::Error + Send + 'static>),
/// Trying to create a PatchAction from an invalid value.
#[error("invalid patch action {0:?}")]
UnknownPatchAction(String),
}
impl MessageError {
fn serialize_request(err: serde_json::Error) -> Self {
Self::SerializeRequest(Box::new(err))
}
fn serialize_response(err: serde_json::Error) -> Self {
Self::SerializeResponse(Box::new(err))
}
fn deserialize_request(err: serde_json::Error) -> Self {
Self::DeserializeRequest(Box::new(err))
}
fn deserialize_response(err: serde_json::Error) -> Self {
Self::DeserializeResponse(Box::new(err))
}
fn repository_error(err: radicle::storage::RepositoryError) -> Self {
Self::RepositoryError(Box::new(err))
}
fn cob_store_error(err: radicle::cob::store::Error) -> Self {
Self::CobStoreError(Box::new(err))
}
fn radicle_surf_error(err: radicle_surf::Error) -> Self {
Self::RadicleSurfError(Box::new(err))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)] // OK in tests: panic is fine
#[allow(missing_docs)]
pub mod trigger_from_ci_event_tests {
use crate::ci_event::{CiEvent, CiEventV1};
use crate::msg::{EventType, Request, RequestBuilder};
use git_ref_format_core::RefString;
use radicle::cob::Title;
use radicle::patch::{MergeTarget, Patches};
use radicle::prelude::Did;
use radicle::storage::ReadRepository;
use crate::test::{MockNode, TestResult};
#[test]
fn trigger_push_from_branch_created() -> TestResult<()> {
let mock_node = MockNode::new()?;
let profile = mock_node.profile()?;
let project = mock_node.node().project();
let (_, repo_head) = project.repo.head()?;
let cmt = radicle::test::fixtures::commit(
"my test commit",
&[repo_head.into()],
&project.backend,
);
let ci_event = CiEvent::V1(CiEventV1::BranchCreated {
from_node: *profile.id(),
repo: project.id,
branch: RefString::try_from("master")?,
tip: cmt,
});
let req = RequestBuilder::default()
.profile(&profile)
.ci_event(&ci_event)
.build_trigger_from_ci_event()?;
let Request::Trigger {
common,
push,
patch,
} = req;
assert!(patch.is_none());
assert!(push.is_some());
assert_eq!(common.event_type, EventType::Push);
assert_eq!(common.repository.id, project.id);
assert_eq!(common.repository.name, project.repo.project()?.name());
let push = push.unwrap();
assert_eq!(push.after, cmt);
assert_eq!(push.before, cmt); // in this case of branch creation
assert_eq!(
push.branch,
"master".replace("$nid", &profile.id().to_string())
);
assert_eq!(push.commits, vec![cmt]);
assert_eq!(push.pusher.id, Did::from(profile.id()));
Ok(())
}
#[test]
fn trigger_push_from_branch_updated() -> TestResult<()> {
let mock_node = MockNode::new()?;
let profile = mock_node.profile()?;
let project = mock_node.node().project();
let (_, repo_head) = project.repo.head()?;
let cmt = radicle::test::fixtures::commit(
"my test commit",
&[repo_head.into()],
&project.backend,
);
let ci_event = CiEvent::V1(CiEventV1::BranchUpdated {
from_node: *profile.id(),
repo: project.id,
branch: RefString::try_from("master")?,
old_tip: repo_head,
tip: cmt,
});
let req = RequestBuilder::default()
.profile(&profile)
.ci_event(&ci_event)
.build_trigger_from_ci_event()?;
let Request::Trigger {
common,
push,
patch,
} = req;
assert!(patch.is_none());
assert!(push.is_some());
assert_eq!(common.event_type, EventType::Push);
assert_eq!(common.repository.id, project.id);
assert_eq!(common.repository.name, project.repo.project()?.name());
let push = push.unwrap();
assert_eq!(push.after, cmt);
assert_eq!(push.before, cmt); // in this case of branch creation
assert_eq!(
push.branch,
"master".replace("$nid", &profile.id().to_string())
);
assert_eq!(push.commits, vec![cmt]);
assert_eq!(push.pusher.id, Did::from(profile.id()));
Ok(())
}
#[test]
fn trigger_patch_from_patch_created() -> TestResult<()> {
let mock_node = MockNode::new()?;
let profile = mock_node.profile()?;
let project = mock_node.node().project();
let (_, repo_head) = project.repo.head()?;
let cmt = radicle::test::fixtures::commit(
"my test commit",
&[repo_head.into()],
&project.backend,
);
let node = mock_node.node();
let mut patches = Patches::open(&project.repo)?;
let mut cache = radicle::cob::cache::NoCache;
let patch_cob = patches.create(
Title::new("my patch title").unwrap(),
"my patch description",
MergeTarget::Delegates,
repo_head,
cmt,
&[],
&mut cache,
&node.signer,
)?;
let ci_event = CiEvent::V1(CiEventV1::PatchCreated {
from_node: *profile.id(),
repo: project.id,
patch: *patch_cob.id(),
new_tip: cmt,
});
let req = RequestBuilder::default()
.profile(&profile)
.ci_event(&ci_event)
.build_trigger_from_ci_event()?;
let Request::Trigger {
common,
push,
patch,
} = req;
assert!(patch.is_some());
assert!(push.is_none());
assert_eq!(common.event_type, EventType::Patch);
assert_eq!(common.repository.id, project.id);
assert_eq!(common.repository.name, project.repo.project()?.name());
let patch = patch.unwrap();
assert_eq!(patch.action.as_str(), "created");
assert_eq!(patch.patch.id.to_string(), patch_cob.id.to_string());
assert_eq!(patch.patch.title, patch_cob.title());
assert_eq!(patch.patch.state.status, patch_cob.state().to_string());
assert_eq!(patch.patch.target, repo_head);
assert_eq!(patch.patch.revisions.len(), 1);
let rev = patch.patch.revisions.first().unwrap();
assert_eq!(rev.id.to_string(), patch_cob.id.to_string());
assert_eq!(rev.base, repo_head);
assert_eq!(rev.oid, cmt);
assert_eq!(rev.author.id, Did::from(profile.id()));
assert_eq!(rev.description, patch_cob.description());
assert_eq!(rev.timestamp, patch_cob.timestamp().as_secs());
assert_eq!(patch.patch.after, cmt);
assert_eq!(patch.patch.before, repo_head);
assert_eq!(patch.patch.commits, vec![cmt]);
Ok(())
}
#[test]
fn trigger_patch_from_patch_updated() -> TestResult<()> {
let mock_node = MockNode::new()?;
let profile = mock_node.profile()?;
let project = mock_node.node().project();
let (_, repo_head) = project.repo.head()?;
let cmt = radicle::test::fixtures::commit(
"my test commit",
&[repo_head.into()],
&project.backend,
);
let node = mock_node.node();
let mut patches = Patches::open(&project.repo)?;
let mut cache = radicle::cob::cache::NoCache;
let patch_cob = patches.create(
Title::new("my patch title").unwrap(),
"my patch description",
MergeTarget::Delegates,
repo_head,
cmt,
&[],
&mut cache,
&node.signer,
)?;
let ci_event = CiEvent::V1(CiEventV1::PatchUpdated {
from_node: *profile.id(),
repo: project.id,
patch: *patch_cob.id(),
new_tip: cmt,
});
let req = RequestBuilder::default()
.profile(&profile)
.ci_event(&ci_event)
.build_trigger_from_ci_event()?;
let Request::Trigger {
common,
push,
patch,
} = req;
assert!(patch.is_some());
assert!(push.is_none());
assert_eq!(common.event_type, EventType::Patch);
assert_eq!(common.repository.id, project.id);
assert_eq!(common.repository.name, project.repo.project()?.name());
let patch = patch.unwrap();
assert_eq!(patch.action.as_str(), "updated");
assert_eq!(patch.patch.id.to_string(), patch_cob.id.to_string());
assert_eq!(patch.patch.title, patch_cob.title());
assert_eq!(patch.patch.state.status, patch_cob.state().to_string());
assert_eq!(patch.patch.target, repo_head);
assert_eq!(patch.patch.revisions.len(), 1);
let rev = patch.patch.revisions.first().unwrap();
assert_eq!(rev.id.to_string(), patch_cob.id.to_string());
assert_eq!(rev.base, repo_head);
assert_eq!(rev.oid, cmt);
assert_eq!(rev.author.id, Did::from(profile.id()));
assert_eq!(rev.description, patch_cob.description());
assert_eq!(rev.timestamp, patch_cob.timestamp().as_secs());
assert_eq!(patch.patch.after, cmt);
assert_eq!(patch.patch.before, repo_head);
assert_eq!(patch.patch.commits, vec![cmt]);
Ok(())
}
}
/// Helper functions for writing adapters.
pub mod helper {
use std::{
fs::{File, OpenOptions},
io::Write,
path::{Path, PathBuf},
process::Command,
};
use nonempty::{NonEmpty, nonempty};
use radicle::prelude::{Profile, RepoId};
use time::{OffsetDateTime, macros::format_description};
use super::{MessageError, Oid, Request, Response, RunId, RunResult};
/// Exit code to indicate we didn't get one from the process.
pub const NO_EXIT: i32 = 999;
/// Read a request from stdin.
pub fn read_request() -> Result<Request, MessageHelperError> {
let req =
Request::from_reader(std::io::stdin()).map_err(MessageHelperError::ReadRequest)?;
Ok(req)
}
// Write response to stdout.
fn write_response(resp: &Response) -> Result<(), MessageHelperError> {
resp.to_writer(std::io::stdout())
.map_err(|e| MessageHelperError::WriteResponse(resp.clone(), Box::new(e)))?;
Ok(())
}
/// Write a "triggered" response to stdout.
pub fn write_triggered(
run_id: &RunId,
info_url: Option<&str>,
) -> Result<(), MessageHelperError> {
let response = if let Some(url) = info_url {
Response::triggered_with_url(run_id.clone(), url)
} else {
Response::triggered(run_id.clone())
};
write_response(&response)?;
Ok(())
}
/// Write a message indicating failure to stdout.
pub fn write_failed() -> Result<(), MessageHelperError> {
write_response(&Response::Finished {
result: RunResult::Failure,
})?;
Ok(())
}
/// Write a message indicating success to stdout.
pub fn write_succeeded() -> Result<(), MessageHelperError> {
write_response(&Response::Finished {
result: RunResult::Success,
})?;
Ok(())
}
/// Get sources from the local node.
pub fn get_sources(
adminlog: &mut AdminLog,
dry_run: bool,
repoid: RepoId,
commit: Oid,
src: &Path,
) -> Result<(), MessageHelperError> {
let profile = Profile::load().map_err(MessageHelperError::Profile)?;
let storage = profile.storage.path();
let repo_path = storage.join(repoid.canonical());
git_clone(adminlog, dry_run, &repo_path, src)?;
git_checkout(adminlog, dry_run, commit, src)?;
Ok(())
}
/// Run `git clone` for the repository.
fn git_clone(
adminlog: &mut AdminLog,
dry_run: bool,
repo_path: &Path,
src: &Path,
) -> Result<(), MessageHelperError> {
let repo_path = repo_path.to_string_lossy();
let src = src.to_string_lossy();
runcmd(
adminlog,
dry_run,
&nonempty!["git", "clone", &repo_path, &src],
Path::new("."),
)?;
Ok(())
}
// Check out the requested commit.
fn git_checkout(
adminlog: &mut AdminLog,
dry_run: bool,
commit: Oid,
src: &Path,
) -> Result<(), MessageHelperError> {
runcmd(
adminlog,
dry_run,
&nonempty!["git", "config", "advice.detachedHead", "false"],
src,
)?;
let commit = commit.to_string();
runcmd(
adminlog,
dry_run,
&nonempty!["git", "checkout", &commit],
src,
)?;
Ok(())
}
/// Run a program.
pub fn runcmd(
adminlog: &mut AdminLog,
dry_run: bool,
argv: &NonEmpty<&str>,
cwd: &Path,
) -> Result<(i32, Vec<u8>), MessageHelperError> {
if dry_run {
adminlog
.writeln(&format!("runcmd: pretend to run: argv={argv:?}"))
.map_err(MessageHelperError::AdminLog)?;
return Ok((0, vec![]));
}
adminlog.writeln(&format!("runcmd: argv={argv:?}"))?;
let output = Command::new("bash")
.arg("-c")
.arg(r#""$@" 2>&1"#)
.arg("--")
.args(argv)
.current_dir(cwd)
.output()
.map_err(|err| MessageHelperError::Command("bash", err))?;
let exit = output.status.code().unwrap_or(NO_EXIT);
adminlog.writeln(&format!("runcmd: exit={exit}"))?;
if exit != 0 {
indented(adminlog, "stdout", &output.stdout);
indented(adminlog, "stderr", &output.stderr);
}
Ok((exit, output.stdout))
}
/// Log a string with every line indented.
pub fn indented(adminlog: &mut AdminLog, msg: &str, bytes: &[u8]) {
if !bytes.is_empty() {
adminlog.writeln(&format!("{msg}:")).ok();
let text = String::from_utf8_lossy(bytes);
for line in text.lines() {
adminlog.writeln(&format!(" {line}")).ok();
}
}
}
/// A log for the administrator, whose duty it is to keep the
/// software running.
#[derive(Debug, Default)]
pub struct AdminLog {
filename: Option<PathBuf>,
file: Option<File>,
stderr: bool,
buffer: Option<Vec<u8>>,
}
impl AdminLog {
/// Create an admin log that doesn't write to a file, and
/// neither to stderr.
pub fn null() -> Self {
Self::default()
}
/// Create an admin log that writes to stderr.
pub fn stderr() -> Self {
Self {
filename: None,
file: None,
stderr: true,
buffer: None,
}
}
/// Capture output into a buffer.
pub fn capture() -> Self {
Self {
filename: None,
file: None,
stderr: false,
buffer: Some(vec![]),
}
}
/// Return current buffer created by `capture`.
pub fn capture_buffer(&self) -> Option<&[u8]> {
self.buffer.as_deref()
}
/// Create an admin log that writes to a named file.
pub fn open(filename: &Path) -> Result<Self, LogError> {
let file = OpenOptions::new()
.append(true)
.create(true)
.open(filename)
.map_err(|e| LogError::OpenLogFile(filename.into(), e))?;
Ok(Self {
filename: Some(filename.into()),
file: Some(file),
stderr: false,
buffer: None,
})
}
/// Write a line to the admin log.
pub fn writeln(&mut self, text: &str) -> Result<(), LogError> {
self.write("[")?;
self.write(&now()?)?;
self.write("] ")?;
self.write(text)?;
self.write("\n")?;
Ok(())
}
fn write(&mut self, msg: &str) -> Result<(), LogError> {
if let Some(file) = &mut self.file {
#[allow(clippy::unwrap_used)] // we know it's OK
file.write_all(msg.as_bytes())
.map_err(|e| LogError::WriteLogFile(self.filename.clone().unwrap(), e))?;
} else if self.stderr {
std::io::stderr()
.write_all(msg.as_bytes())
.map_err(LogError::WriteLogStderr)?;
} else if let Some(buf) = self.buffer.as_mut() {
buf.extend_from_slice(msg.as_bytes());
}
Ok(())
}
}
fn now() -> Result<String, LogError> {
let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
OffsetDateTime::now_utc()
.format(fmt)
.map_err(LogError::TimeFormat)
}
/// Possible errors from using the admin log.
#[derive(Debug, thiserror::Error)]
pub enum LogError {
/// Can't open named file.
#[error("failed to open log file {0}")]
OpenLogFile(PathBuf, #[source] std::io::Error),
/// Can't write to file.
#[error("failed to write to log file {0}")]
WriteLogFile(PathBuf, #[source] std::io::Error),
/// Can't write to file.
#[error("failed to write to log file {0}")]
WriteLogStderr(#[source] std::io::Error),
/// Can' format time stamp.
#[error("failed to format time stamp")]
TimeFormat(#[source] time::error::Format),
}
/// Possible errors from this module.
#[derive(Debug, thiserror::Error)]
pub enum MessageHelperError {
/// Error reading request from stdin.
#[error("failed to read request from stdin: {0:?}")]
ReadRequest(#[source] MessageError),
/// Error writing response to stdout.
#[error("failed to write response to stdout: {0:?}")]
WriteResponse(Response, #[source] Box<MessageError>),
/// Can't load Radicle profile.
#[error("failed to load Radicle profile")]
Profile(#[source] radicle::profile::Error),
/// Can't run command and capture its output.
#[error("failed to run command {0}")]
Command(&'static str, #[source] std::io::Error),
/// Admin log error.
#[error(transparent)]
AdminLog(#[from] LogError),
}
}