use std::io::Write;
use clap::ValueEnum;
use radicle::{git::BranchName, patch::PatchId, storage::git::Repository};
use radicle_ci_broker::{
ergo, filter::EventFilter, node_event_source::NodeEventSource, refs::branch_ref,
refs::ref_string, util::read_file_as_objectid,
};
use super::*;
/// List events in the queue, waiting to be processed.
#[derive(Parser)]
pub struct ListEvents {
/// Show more details about the event using Rust debug formatting.
#[clap(long, hide = true)]
verbose: bool,
/// List events as JSON.
#[clap(long)]
json: bool,
}
impl Leaf for ListEvents {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let db = args.open_db()?;
let event_ids = db.queued_ci_events()?;
if self.json {
let events: Result<Vec<QueuedCiEvent>, DbError> = event_ids
.iter()
.filter_map(|id| match db.get_queued_ci_event(id) {
Ok(Some(event)) => Some(Ok(event)),
Err(e) => Some(Err(e)),
_ => None,
})
.collect();
let events = events?;
let json =
serde_json::to_string_pretty(&events).map_err(CibToolError::EventListToJson)?;
println!("{json}");
} else if self.verbose {
for id in event_ids {
if let Some(e) = db.get_queued_ci_event(&id)? {
println!("{id}: {e:?}");
} else {
println!("{id}: No such event");
}
}
} else {
for id in event_ids {
println!("{id}");
}
}
Ok(())
}
}
/// Show the number of events in the queue.
#[derive(Parser)]
pub struct CountEvents {}
impl Leaf for CountEvents {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let db = args.open_db()?;
println!("{}", db.queued_ci_events()?.len());
Ok(())
}
}
/// Add an event to the queue.
#[derive(Parser)]
pub struct AddEvent {
/// Set the repository the event refers to. Can be a RID, or the
/// repository name.
#[clap(long)]
repo: String,
/// Set the name of the ref the event refers to. Default is the
/// default branch.
#[clap(long, alias = "ref")]
name: Option<String>,
/// Set the commit the event refers to. Can be the SHA1 commit id,
/// or a symbolic Git revision, as understood by `git rev-parse`.
/// For example, `HEAD`.
#[clap(long, default_value = "HEAD")]
commit: String,
/// Type of event to create.
#[clap(long)]
kind: EventKind,
/// The base commit referred to by the event. The value is parsed
/// the same way as `--commit` value.
#[clap(long)]
base: Option<String>,
// The patch id to use for a patch event.
#[clap(long)]
patch_id: Option<PatchId>,
// Read the patch id to use for a patch event from this file.
#[clap(long)]
patch_id_file: Option<PathBuf>,
/// Write the event to this file, as JSON, instead of adding it to
/// the queue.
#[clap(long)]
output: Option<PathBuf>,
/// Write the event ID to this file, after adding the event to the
/// queue.
#[clap(long)]
id_file: Option<PathBuf>,
}
impl AddEvent {
fn branch(&self, r: &ergo::Radicle, repo: &Repository) -> Result<BranchName, TriggerError> {
if let Some(name) = &self.name {
Ok(branch_ref(&ref_string(name)?)?)
} else {
let project = r.project(&repo.id).map_err(TriggerError::Ergonomic)?;
Ok(project.default_branch().clone())
}
}
}
impl Leaf for AddEvent {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let r = ergo::Radicle::new().map_err(EventError::Ergonomic)?;
let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
let nid = *radicle.profile().id();
let repo = r
.repository_by_name(&self.repo)
.map_err(EventError::Ergonomic)?;
let oid = r
.resolve_commit(&repo.id, &self.commit)
.map_err(EventError::Ergonomic)?;
let branch_name = self.branch(&r, &repo)?;
let event = match &self.kind {
EventKind::BranchCreated => {
if self.base.is_some() {
return Err(CibToolError::NoBaseAllowed);
} else {
CiEvent::branch_created(nid, repo.id, &branch_name, oid)
.map_err(CibToolError::CiEvent)?
}
}
EventKind::BranchUpdated => {
if let Some(base) = &self.base {
let base = if let Ok(base) = Oid::from_str(base) {
base
} else {
r.resolve_commit(&repo.id, base)
.map_err(EventError::Ergonomic)?
};
CiEvent::branch_updated(nid, repo.id, &branch_name, oid, base)
.map_err(CibToolError::CiEvent)?
} else {
return Err(CibToolError::BaseRequired);
}
}
EventKind::BranchDeleted => CiEvent::branch_deleted(nid, repo.id, &branch_name, oid)
.map_err(CibToolError::CiEvent)?,
EventKind::PatchCreated => {
if let Some(patch_id) = &self.patch_id {
CiEvent::patch_created(nid, repo.id, *patch_id, oid)
} else if let Some(filename) = &self.patch_id_file {
let patch_id = read_file_as_objectid(filename)?;
CiEvent::patch_created(nid, repo.id, patch_id, oid)
} else {
return Err(CibToolError::PatchIdRequired);
}
}
EventKind::PatchUpdated => {
if let Some(patch_id) = &self.patch_id {
CiEvent::patch_updated(nid, repo.id, *patch_id, oid)
} else if let Some(filename) = &self.patch_id_file {
let patch_id = read_file_as_objectid(filename)?;
CiEvent::patch_created(nid, repo.id, patch_id, oid)
} else {
return Err(CibToolError::PatchIdRequired);
}
}
};
if let Some(output) = &self.output {
let json = serde_json::to_string_pretty(&event)
.map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
std::fs::write(output, json.as_bytes())
.map_err(|e| CibToolError::Write(output.into(), e))?;
} else {
let db = args.open_db()?;
let id = db.push_queued_ci_event(event)?;
println!("{id}");
if let Some(filename) = &self.id_file {
write(filename, id.to_string().as_bytes())
.map_err(|e| CibToolError::WriteEventId(filename.into(), e))?;
}
}
Ok(())
}
}
#[derive(Debug, Clone, ValueEnum)]
enum EventKind {
BranchCreated,
BranchUpdated,
BranchDeleted,
PatchCreated,
PatchUpdated,
}
/// Show an event in the queue.
#[derive(Parser)]
pub struct ShowEvent {
/// ID of event to show.
#[clap(long, required_unless_present = "id_file")]
id: Option<QueueId>,
/// Show event as JSON? Default is a debugging format useful for
/// programmers.
#[clap(long)]
json: bool,
/// Write output to this file.
#[clap(long)]
output: Option<PathBuf>,
/// Read ID of event to show from this file.
#[clap(long)]
id_file: Option<PathBuf>,
}
impl Leaf for ShowEvent {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let db = args.open_db()?;
let id = if let Some(id) = &self.id {
id.clone()
} else if let Some(filename) = &self.id_file {
let id = read(filename).map_err(|e| CibToolError::ReadEventId(filename.into(), e))?;
let id = String::from_utf8_lossy(&id).to_string();
QueueId::from(&id)
} else {
return Err(CibToolError::MissingId);
};
if let Some(event) = db.get_queued_ci_event(&id)? {
if self.json {
let json = serde_json::to_string_pretty(&event.event())
.map_err(|e| CibToolError::EventToJson(event.event().clone(), e))?;
if let Some(filename) = &self.output {
std::fs::write(filename, json.as_bytes())
.map_err(|e| CibToolError::Write(filename.into(), e))?;
} else {
println!("{json}");
}
} else {
println!("{event:#?}");
}
}
Ok(())
}
}
/// Remove an event from the queue.
#[derive(Parser)]
pub struct RemoveEvent {
/// ID of event to remove.
#[clap(long)]
id: Option<QueueId>,
/// Remove all queued events.
#[clap(long)]
all: bool,
/// Read ID of event to remove from this file.
#[clap(long)]
id_file: Option<PathBuf>,
}
impl Leaf for RemoveEvent {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let db = args.open_db()?;
let ids = if let Some(id) = &self.id {
vec![id.clone()]
} else if let Some(filename) = &self.id_file {
let id = read(filename).map_err(|e| CibToolError::ReadEventId(filename.into(), e))?;
let id = String::from_utf8_lossy(&id).to_string();
vec![QueueId::from(&id)]
} else if self.all {
db.queued_ci_events()?
} else {
return Err(CibToolError::MissingId);
};
for id in ids {
db.remove_queued_ci_event(&id)?;
}
Ok(())
}
}
/// Add a shutdown event to the queue.
///
/// The shutdown event causes the CI broker to terminate.
#[derive(Parser)]
pub struct Shutdown {
/// Write ID of the event to this file, after adding the event to
/// the queue.
#[clap(long)]
id_file: Option<PathBuf>,
}
impl Leaf for Shutdown {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let db = args.open_db()?;
let id = db.push_queued_ci_event(CiEvent::V1(CiEventV1::Shutdown))?;
if let Some(filename) = &self.id_file {
write(filename, id.to_string().as_bytes())
.map_err(|e| CibToolError::WriteEventId(filename.into(), e))?;
}
Ok(())
}
}
/// Add a run termination event to the queue.
///
/// The termination event causes the CI broker to terminate a specific run.
#[derive(Parser)]
pub struct Terminate {
/// Terminate a run given its broker run id.
run_id: RunId,
}
impl Leaf for Terminate {
fn run(&self, args: &Args) -> Result<(), CibToolError> {
let db = args.open_db()?;
db.push_queued_ci_event(CiEvent::V1(CiEventV1::Terminate(self.run_id.clone())))?;
Ok(())
}
}
/// Record node events from the node.
///
/// The events are written to the standard output or to the specified
/// file, as one JSON object per line.
#[derive(Parser)]
pub struct RecordEvents {
/// Write events to this file.
#[clap(long)]
output: Option<PathBuf>,
}
impl Leaf for RecordEvents {
fn run(&self, _args: &Args) -> Result<(), CibToolError> {
let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
let mut source =
NodeEventSource::new(radicle.profile()).map_err(CibToolError::EventSubscribe)?;
let mut file = if let Some(filename) = &self.output {
Some(
std::fs::File::create(filename)
.map_err(|e| CibToolError::CreateEventsFile(filename.into(), e))?,
)
} else {
None
};
loop {
match source.node_event() {
Err(e) => return Err(CibToolError::GetNodeEvent(e)),
Ok(None) => break,
Ok(Some(event)) => {
println!("got event {event:?}");
let mut json = serde_json::to_string(&event)
.map_err(|e| CibToolError::NodeEevnetToJson(event.clone(), e))?;
if let Some(file) = &mut file {
json.push('\n');
file.write_all(json.as_bytes()).map_err(|e| {
CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
})?;
} else {
println!("{json}");
}
}
}
}
Ok(())
}
}
/// Convert node events into CI events.
///
/// Node events are read from the specified file. Note that one node
/// event can result in any number of broker events.
///
/// The CI events are written to the standard output or to the
/// specified file, as one JSON object per line.
#[derive(Parser)]
pub struct CiEvents {
/// Write CI events to this file.
#[clap(long)]
output: Option<PathBuf>,
/// Read node events from this file.
input: PathBuf,
}
impl Leaf for CiEvents {
fn run(&self, _args: &Args) -> Result<(), CibToolError> {
let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
let bytes = std::fs::read(&self.input)
.map_err(|e| CibToolError::ReadEvents(self.input.clone(), e))?;
let text = String::from_utf8(bytes)
.map_err(|e| CibToolError::NodeEventNotUtf8(self.input.clone(), e))?;
let mut node_events = vec![];
for line in text.lines() {
let event: radicle::node::Event = serde_json::from_str(line)
.map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
node_events.push(event);
}
let mut ci_events: Vec<CiEvent> = vec![];
for node_event in node_events.iter() {
if let Ok(mut cevs) = CiEvent::from_node_event(node_event, radicle.profile()) {
ci_events.append(&mut cevs);
}
}
let mut jsons = vec![];
for ci_event in ci_events.iter() {
jsons.push(
serde_json::to_string(ci_event)
.map_err(|err| CibToolError::EventToJson(ci_event.clone(), err))?,
);
}
if let Some(filename) = &self.output {
let mut file = std::fs::File::create(filename)
.map_err(|e| CibToolError::CreateBrokerEventsFile(filename.into(), e))?;
for json in jsons {
let json = format!("{json}\n");
file.write_all(json.as_bytes()).map_err(|e| {
CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
})?;
}
} else {
for json in jsons {
println!("{json}");
}
}
Ok(())
}
}
/// Filter broker events recorded in a file.
///
/// Those broker events allowed by the filter are written to the
/// standard output, as one JSON object per line.
#[derive(Parser)]
pub struct FilterEvents {
/// Read event filter from this YAML file.
filters: PathBuf,
/// Read broker events from this file.
input: PathBuf,
/// Show why the decision was made.
#[clap(long)]
explain: bool,
}
impl Leaf for FilterEvents {
fn run(&self, _args: &Args) -> Result<(), CibToolError> {
let filters = EventFilter::from_file(&self.filters)
.map_err(|e| CibToolError::ReadFilters(self.filters.clone(), e))?;
let bytes = std::fs::read(&self.input)
.map_err(|e| CibToolError::ReadBrokerEvents(self.input.clone(), e))?;
let text = String::from_utf8(bytes)
.map_err(|e| CibToolError::BrokerEventNotUtf8(self.input.clone(), e))?;
let mut ci_events = vec![];
for line in text.lines() {
let event: CiEvent = serde_json::from_str(line)
.map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
ci_events.push(event);
}
for event in ci_events.iter() {
for filter in filters.iter() {
let decision = filter.decide(event);
if self.explain {
println!("event={event:#?}");
println!("filter={filter:#?}");
decision.print(0);
println!();
}
if decision.allowed() {
let json = serde_json::to_string_pretty(event)
.map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
println!("{json}");
}
}
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum EventError {
#[error(transparent)]
Ergonomic(#[from] radicle_ci_broker::ergo::ErgoError),
#[error(transparent)]
RefError(#[from] radicle_ci_broker::refs::RefError),
}