Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src bin cibtoolcmd event.rs
use std::io::Write;

use clap::ValueEnum;

use radicle::{git::BranchName, patch::PatchId, storage::git::Repository};
use radicle_ci_broker::{
    ergo, filter::EventFilter, node_event_source::NodeEventSource, refs::branch_ref,
    refs::ref_string, util::read_file_as_objectid,
};

use super::*;

/// List events in the queue, waiting to be processed.
#[derive(Parser)]
pub struct ListEvents {
    /// Show more details about the event using Rust debug formatting.
    #[clap(long, hide = true)]
    verbose: bool,

    /// List events as JSON.
    #[clap(long)]
    json: bool,
}

impl Leaf for ListEvents {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
        let event_ids = db.queued_ci_events()?;
        if self.json {
            let events: Result<Vec<QueuedCiEvent>, DbError> = event_ids
                .iter()
                .filter_map(|id| match db.get_queued_ci_event(id) {
                    Ok(Some(event)) => Some(Ok(event)),
                    Err(e) => Some(Err(e)),
                    _ => None,
                })
                .collect();
            let events = events?;
            let json =
                serde_json::to_string_pretty(&events).map_err(CibToolError::EventListToJson)?;
            println!("{json}");
        } else if self.verbose {
            for id in event_ids {
                if let Some(e) = db.get_queued_ci_event(&id)? {
                    println!("{id}: {e:?}");
                } else {
                    println!("{id}: No such event");
                }
            }
        } else {
            for id in event_ids {
                println!("{id}");
            }
        }
        Ok(())
    }
}

/// Show the number of events in the queue.
#[derive(Parser)]
pub struct CountEvents {}

impl Leaf for CountEvents {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
        println!("{}", db.queued_ci_events()?.len());
        Ok(())
    }
}

/// Add an event to the queue.
#[derive(Parser)]
pub struct AddEvent {
    /// Set the repository the event refers to. Can be a RID, or the
    /// repository name.
    #[clap(long)]
    repo: String,

    /// Set the name of the ref the event refers to. Default is the
    /// default branch.
    #[clap(long, alias = "ref")]
    name: Option<String>,

    /// Set the commit the event refers to. Can be the SHA1 commit id,
    /// or a symbolic Git revision, as understood by `git rev-parse`.
    /// For example, `HEAD`.
    #[clap(long, default_value = "HEAD")]
    commit: String,

    /// Type of event to create.
    #[clap(long)]
    kind: EventKind,

    /// The base commit referred to by the event. The value is parsed
    /// the same way as `--commit` value.
    #[clap(long)]
    base: Option<String>,

    // The patch id to use for a patch event.
    #[clap(long)]
    patch_id: Option<PatchId>,

    // Read the patch id to use for a patch event from this file.
    #[clap(long)]
    patch_id_file: Option<PathBuf>,

    /// Write the event to this file, as JSON, instead of adding it to
    /// the queue.
    #[clap(long)]
    output: Option<PathBuf>,

    /// Write the event ID to this file, after adding the event to the
    /// queue.
    #[clap(long)]
    id_file: Option<PathBuf>,
}

impl AddEvent {
    fn branch(&self, r: &ergo::Radicle, repo: &Repository) -> Result<BranchName, TriggerError> {
        if let Some(name) = &self.name {
            Ok(branch_ref(&ref_string(name)?)?)
        } else {
            let project = r.project(&repo.id).map_err(TriggerError::Ergonomic)?;
            Ok(project.default_branch().clone())
        }
    }
}

impl Leaf for AddEvent {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let r = ergo::Radicle::new().map_err(EventError::Ergonomic)?;

        let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
        let nid = *radicle.profile().id();

        let repo = r
            .repository_by_name(&self.repo)
            .map_err(EventError::Ergonomic)?;

        let oid = r
            .resolve_commit(&repo.id, &self.commit)
            .map_err(EventError::Ergonomic)?;

        let branch_name = self.branch(&r, &repo)?;

        let event = match &self.kind {
            EventKind::BranchCreated => {
                if self.base.is_some() {
                    return Err(CibToolError::NoBaseAllowed);
                } else {
                    CiEvent::branch_created(nid, repo.id, &branch_name, oid)
                        .map_err(CibToolError::CiEvent)?
                }
            }
            EventKind::BranchUpdated => {
                if let Some(base) = &self.base {
                    let base = if let Ok(base) = Oid::from_str(base) {
                        base
                    } else {
                        r.resolve_commit(&repo.id, base)
                            .map_err(EventError::Ergonomic)?
                    };
                    CiEvent::branch_updated(nid, repo.id, &branch_name, oid, base)
                        .map_err(CibToolError::CiEvent)?
                } else {
                    return Err(CibToolError::BaseRequired);
                }
            }
            EventKind::BranchDeleted => CiEvent::branch_deleted(nid, repo.id, &branch_name, oid)
                .map_err(CibToolError::CiEvent)?,
            EventKind::PatchCreated => {
                if let Some(patch_id) = &self.patch_id {
                    CiEvent::patch_created(nid, repo.id, *patch_id, oid)
                } else if let Some(filename) = &self.patch_id_file {
                    let patch_id = read_file_as_objectid(filename)?;
                    CiEvent::patch_created(nid, repo.id, patch_id, oid)
                } else {
                    return Err(CibToolError::PatchIdRequired);
                }
            }
            EventKind::PatchUpdated => {
                if let Some(patch_id) = &self.patch_id {
                    CiEvent::patch_updated(nid, repo.id, *patch_id, oid)
                } else if let Some(filename) = &self.patch_id_file {
                    let patch_id = read_file_as_objectid(filename)?;
                    CiEvent::patch_created(nid, repo.id, patch_id, oid)
                } else {
                    return Err(CibToolError::PatchIdRequired);
                }
            }
        };

        if let Some(output) = &self.output {
            let json = serde_json::to_string_pretty(&event)
                .map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
            std::fs::write(output, json.as_bytes())
                .map_err(|e| CibToolError::Write(output.into(), e))?;
        } else {
            let db = args.open_db()?;
            let id = db.push_queued_ci_event(event)?;
            println!("{id}");

            if let Some(filename) = &self.id_file {
                write(filename, id.to_string().as_bytes())
                    .map_err(|e| CibToolError::WriteEventId(filename.into(), e))?;
            }
        }
        Ok(())
    }
}

#[derive(Debug, Clone, ValueEnum)]
enum EventKind {
    BranchCreated,
    BranchUpdated,
    BranchDeleted,
    PatchCreated,
    PatchUpdated,
}

/// Show an event in the queue.
#[derive(Parser)]
pub struct ShowEvent {
    /// ID of event to show.
    #[clap(long, required_unless_present = "id_file")]
    id: Option<QueueId>,

    /// Show event as JSON? Default is a debugging format useful for
    /// programmers.
    #[clap(long)]
    json: bool,

    /// Write output to this file.
    #[clap(long)]
    output: Option<PathBuf>,

    /// Read ID of event to show from this file.
    #[clap(long)]
    id_file: Option<PathBuf>,
}

impl Leaf for ShowEvent {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;

        let id = if let Some(id) = &self.id {
            id.clone()
        } else if let Some(filename) = &self.id_file {
            let id = read(filename).map_err(|e| CibToolError::ReadEventId(filename.into(), e))?;
            let id = String::from_utf8_lossy(&id).to_string();
            QueueId::from(&id)
        } else {
            return Err(CibToolError::MissingId);
        };

        if let Some(event) = db.get_queued_ci_event(&id)? {
            if self.json {
                let json = serde_json::to_string_pretty(&event.event())
                    .map_err(|e| CibToolError::EventToJson(event.event().clone(), e))?;
                if let Some(filename) = &self.output {
                    std::fs::write(filename, json.as_bytes())
                        .map_err(|e| CibToolError::Write(filename.into(), e))?;
                } else {
                    println!("{json}");
                }
            } else {
                println!("{event:#?}");
            }
        }
        Ok(())
    }
}

/// Remove an event from the queue.
#[derive(Parser)]
pub struct RemoveEvent {
    /// ID of event to remove.
    #[clap(long)]
    id: Option<QueueId>,

    /// Remove all queued events.
    #[clap(long)]
    all: bool,

    /// Read ID of event to remove from this file.
    #[clap(long)]
    id_file: Option<PathBuf>,
}

impl Leaf for RemoveEvent {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;

        let ids = if let Some(id) = &self.id {
            vec![id.clone()]
        } else if let Some(filename) = &self.id_file {
            let id = read(filename).map_err(|e| CibToolError::ReadEventId(filename.into(), e))?;
            let id = String::from_utf8_lossy(&id).to_string();
            vec![QueueId::from(&id)]
        } else if self.all {
            db.queued_ci_events()?
        } else {
            return Err(CibToolError::MissingId);
        };

        for id in ids {
            db.remove_queued_ci_event(&id)?;
        }
        Ok(())
    }
}

/// Add a shutdown event to the queue.
///
/// The shutdown event causes the CI broker to terminate.
#[derive(Parser)]
pub struct Shutdown {
    /// Write ID of the event to this file, after adding the event to
    /// the queue.
    #[clap(long)]
    id_file: Option<PathBuf>,
}

impl Leaf for Shutdown {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
        let id = db.push_queued_ci_event(CiEvent::V1(CiEventV1::Shutdown))?;

        if let Some(filename) = &self.id_file {
            write(filename, id.to_string().as_bytes())
                .map_err(|e| CibToolError::WriteEventId(filename.into(), e))?;
        }

        Ok(())
    }
}

/// Add a run termination event to the queue.
///
/// The termination event causes the CI broker to terminate a specific run.
#[derive(Parser)]
pub struct Terminate {
    /// Terminate a run given its broker run id.
    run_id: RunId,
}

impl Leaf for Terminate {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
        db.push_queued_ci_event(CiEvent::V1(CiEventV1::Terminate(self.run_id.clone())))?;
        Ok(())
    }
}

/// Record node events from the node.
///
/// The events are written to the standard output or to the specified
/// file, as one JSON object per line.
#[derive(Parser)]
pub struct RecordEvents {
    /// Write events to this file.
    #[clap(long)]
    output: Option<PathBuf>,
}

impl Leaf for RecordEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
        let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
        let mut source =
            NodeEventSource::new(radicle.profile()).map_err(CibToolError::EventSubscribe)?;
        let mut file = if let Some(filename) = &self.output {
            Some(
                std::fs::File::create(filename)
                    .map_err(|e| CibToolError::CreateEventsFile(filename.into(), e))?,
            )
        } else {
            None
        };

        loop {
            match source.node_event() {
                Err(e) => return Err(CibToolError::GetNodeEvent(e)),
                Ok(None) => break,
                Ok(Some(event)) => {
                    println!("got event {event:?}");
                    let mut json = serde_json::to_string(&event)
                        .map_err(|e| CibToolError::NodeEevnetToJson(event.clone(), e))?;
                    if let Some(file) = &mut file {
                        json.push('\n');
                        file.write_all(json.as_bytes()).map_err(|e| {
                            CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
                        })?;
                    } else {
                        println!("{json}");
                    }
                }
            }
        }

        Ok(())
    }
}

/// Convert node events into CI events.
///
/// Node events are read from the specified file. Note that one node
/// event can result in any number of broker events.
///
/// The CI events are written to the standard output or to the
/// specified file, as one JSON object per line.
#[derive(Parser)]
pub struct CiEvents {
    /// Write CI events to this file.
    #[clap(long)]
    output: Option<PathBuf>,

    /// Read node events from this file.
    input: PathBuf,
}

impl Leaf for CiEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
        let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;

        let bytes = std::fs::read(&self.input)
            .map_err(|e| CibToolError::ReadEvents(self.input.clone(), e))?;
        let text = String::from_utf8(bytes)
            .map_err(|e| CibToolError::NodeEventNotUtf8(self.input.clone(), e))?;

        let mut node_events = vec![];
        for line in text.lines() {
            let event: radicle::node::Event = serde_json::from_str(line)
                .map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
            node_events.push(event);
        }

        let mut ci_events: Vec<CiEvent> = vec![];
        for node_event in node_events.iter() {
            if let Ok(mut cevs) = CiEvent::from_node_event(node_event, radicle.profile()) {
                ci_events.append(&mut cevs);
            }
        }

        let mut jsons = vec![];
        for ci_event in ci_events.iter() {
            jsons.push(
                serde_json::to_string(ci_event)
                    .map_err(|err| CibToolError::EventToJson(ci_event.clone(), err))?,
            );
        }

        if let Some(filename) = &self.output {
            let mut file = std::fs::File::create(filename)
                .map_err(|e| CibToolError::CreateBrokerEventsFile(filename.into(), e))?;
            for json in jsons {
                let json = format!("{json}\n");
                file.write_all(json.as_bytes()).map_err(|e| {
                    CibToolError::WriteEvent(self.output.as_ref().unwrap().clone(), e)
                })?;
            }
        } else {
            for json in jsons {
                println!("{json}");
            }
        }

        Ok(())
    }
}

/// Filter broker events recorded in a file.
///
/// Those broker events allowed by the filter are written to the
/// standard output, as one JSON object per line.
#[derive(Parser)]
pub struct FilterEvents {
    /// Read event filter from this YAML file.
    filters: PathBuf,

    /// Read broker events from this file.
    input: PathBuf,

    /// Show why the decision was made.
    #[clap(long)]
    explain: bool,
}

impl Leaf for FilterEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
        let filters = EventFilter::from_file(&self.filters)
            .map_err(|e| CibToolError::ReadFilters(self.filters.clone(), e))?;

        let bytes = std::fs::read(&self.input)
            .map_err(|e| CibToolError::ReadBrokerEvents(self.input.clone(), e))?;
        let text = String::from_utf8(bytes)
            .map_err(|e| CibToolError::BrokerEventNotUtf8(self.input.clone(), e))?;

        let mut ci_events = vec![];
        for line in text.lines() {
            let event: CiEvent = serde_json::from_str(line)
                .map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
            ci_events.push(event);
        }

        for event in ci_events.iter() {
            for filter in filters.iter() {
                let decision = filter.decide(event);
                if self.explain {
                    println!("event={event:#?}");
                    println!("filter={filter:#?}");
                    decision.print(0);
                    println!();
                }
                if decision.allowed() {
                    let json = serde_json::to_string_pretty(event)
                        .map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
                    println!("{json}");
                }
            }
        }

        Ok(())
    }
}

#[derive(Debug, thiserror::Error)]
pub enum EventError {
    #[error(transparent)]
    Ergonomic(#[from] radicle_ci_broker::ergo::ErgoError),

    #[error(transparent)]
    RefError(#[from] radicle_ci_broker::refs::RefError),
}