Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add new CI broker management tool
Lars Wirzenius committed 1 year ago
commit 21ef36aa234ce83fa950e2ccd665568a10ea0fb7
parent 1fbb4ccbd596a98021dfbf797a24f83573acc57d
2 files changed +463 -1
added src/bin/cibtool.rs
@@ -0,0 +1,462 @@
+
use std::{
+
    error::Error,
+
    fs::{read, write},
+
    path::PathBuf,
+
    process::exit,
+
    str::FromStr,
+
};
+

+
use clap::Parser;
+

+
use radicle::{
+
    git::RefString,
+
    prelude::{NodeId, RepoId},
+
    storage::ReadStorage,
+
    Profile, Storage,
+
};
+
use radicle_git_ext::Oid;
+

+
use radicle_ci_broker::{
+
    db::{Db, DbError, QueueId},
+
    error::BrokerError,
+
    event::BrokerEvent,
+
};
+

+
fn main() {
+
    if let Err(e) = fallible_main() {
+
        eprintln!("ERROR: {}", e);
+
        let mut e = e.source();
+
        while let Some(source) = e {
+
            eprintln!("caused by: {}", source);
+
            e = source.source();
+
        }
+
        exit(1);
+
    }
+
}
+

+
fn fallible_main() -> Result<(), CibToolError> {
+
    pretty_env_logger::init();
+

+
    let args = Args::parse();
+
    args.run()?;
+

+
    Ok(())
+
}
+

+
#[derive(Parser)]
+
struct Args {
+
    #[clap(long)]
+
    db: PathBuf,
+

+
    #[clap(subcommand)]
+
    cmd: Cmd,
+
}
+

+
impl Args {
+
    fn run(&self) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            Cmd::Counter(x) => x.run(self)?,
+
            Cmd::Event(x) => x.run(self)?,
+
        }
+
        Ok(())
+
    }
+

+
    fn open_db(&self) -> Result<Db, CibToolError> {
+
        Ok(Db::new(&self.db)?)
+
    }
+
}
+

+
#[derive(Parser)]
+
enum Cmd {
+
    Counter(Counter),
+
    Event(Event),
+
}
+

+
#[derive(Parser)]
+
struct Counter {
+
    #[clap(subcommand)]
+
    cmd: CounterCmd,
+
}
+

+
impl Counter {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            CounterCmd::Show(x) => x.run(args)?,
+
            CounterCmd::Count(x) => x.run(args)?,
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
enum CounterCmd {
+
    Show(ShowCounter),
+
    Count(CountCounter),
+
}
+

+
#[derive(Parser)]
+
struct ShowCounter {}
+

+
impl ShowCounter {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        let counter = db.get_counter()?;
+
        println!("{}", counter.unwrap_or(0));
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct CountCounter {
+
    #[clap(long)]
+
    goal: i64,
+
}
+

+
impl CountCounter {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        Self::inc(&db, self.goal)?;
+

+
        Ok(())
+
    }
+

+
    fn inc(db: &Db, goal: i64) -> Result<(), DbError> {
+
        let mut prev: i64 = -1;
+
        loop {
+
            db.begin()?;
+
            println!("BEGIN");
+

+
            let current = db.get_counter()?;
+
            println!("  current as read={current:?}");
+
            let current = current.unwrap_or(0);
+
            println!("  current: {current}; prev={prev}");
+
            if current < prev {
+
                panic!("current < prev");
+
            }
+
            if current >= goal {
+
                println!("GOAL");
+
                db.rollback()?;
+
                println!("ROLLBACK");
+
                break;
+
            }
+

+
            let new = current + 1;
+
            if (new == 1 && db.create_counter(new).is_err()) || db.update_counter(new).is_err() {
+
                db.rollback()?;
+
                println!("ROLLBACK");
+
            } else {
+
                println!("  increment to {new}");
+
                db.commit()?;
+
                println!("COMMIT");
+
                prev = new;
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct Event {
+
    #[clap(subcommand)]
+
    cmd: EventCmd,
+
}
+

+
impl Event {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            EventCmd::Add(x) => x.run(args)?,
+
            EventCmd::Shutdown(x) => x.run(args)?,
+
            EventCmd::List(x) => x.run(args)?,
+
            EventCmd::Count(x) => x.run(args)?,
+
            EventCmd::Show(x) => x.run(args)?,
+
            EventCmd::Remove(x) => x.run(args)?,
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
enum EventCmd {
+
    List(ListEvents),
+
    Count(CountEvents),
+
    Add(AddEvent),
+
    Shutdown(Shutdown),
+
    Show(ShowEvent),
+
    Remove(RemoveEvent),
+
}
+

+
#[derive(Parser)]
+
struct ListEvents {
+
    #[clap(long)]
+
    verbose: bool,
+
}
+

+
impl ListEvents {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        for id in db.queued_events()? {
+
            if self.verbose {
+
                let e = db.get_queued_event(&id)?;
+
                println!("{id}: {:?}", e);
+
            } else {
+
                println!("{id}");
+
            }
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct CountEvents {}
+

+
impl CountEvents {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        println!("{}", db.queued_events()?.len());
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct AddEvent {
+
    #[clap(long)]
+
    repo: String,
+

+
    #[clap(long, alias = "ref")]
+
    name: String,
+

+
    #[clap(long)]
+
    commit: String,
+

+
    #[clap(long)]
+
    base: Option<Oid>,
+

+
    #[clap(long)]
+
    output: Option<PathBuf>,
+

+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl AddEvent {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let rid = if let Ok(rid) = RepoId::from_urn(&self.repo) {
+
            rid
+
        } else {
+
            self.lookup_rid(&self.repo)?
+
        };
+

+
        let oid = if let Ok(rid) = Oid::from_str(&self.commit) {
+
            rid
+
        } else {
+
            self.lookup_commit(rid, &self.commit)?
+
        };
+

+
        let name = format!(
+
            "refs/namespaces/{}/refs/heads/{}",
+
            self.lookup_nid()?,
+
            self.name.as_str()
+
        );
+
        let name = RefString::try_from(name).expect("RefString");
+

+
        let event = BrokerEvent::new(&rid, &name, &oid, self.base);
+

+
        if let Some(output) = &self.output {
+
            let json = serde_json::to_string_pretty(&event)
+
                .map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
+
            std::fs::write(output, json.as_bytes())
+
                .map_err(|e| CibToolError::Write(output.into(), e))?;
+
        } else {
+
            let db = args.open_db()?;
+
            let id = db.push_queued_event(event)?;
+
            println!("{id}");
+

+
            if let Some(filename) = &self.id_file {
+
                write(filename, id.to_string().as_bytes()).expect("write id file");
+
            }
+
        }
+
        Ok(())
+
    }
+

+
    fn lookup_nid(&self) -> Result<NodeId, CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        Ok(*profile.id())
+
    }
+

+
    fn lookup_rid(&self, wanted: &str) -> Result<RepoId, CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        let storage =
+
            Storage::open(profile.storage(), profile.info()).map_err(CibToolError::Storage)?;
+

+
        let mut rid = None;
+
        let repo_infos = storage.repositories().map_err(CibToolError::Repositories)?;
+
        for ri in repo_infos {
+
            let project = ri
+
                .doc
+
                .project()
+
                .map_err(|e| CibToolError::Project(ri.rid, e))?;
+

+
            if project.name() == wanted {
+
                if rid.is_some() {
+
                    return Err(CibToolError::DuplicateRepositories(wanted.into()));
+
                }
+
                rid = Some(ri.rid);
+
            }
+
        }
+

+
        if let Some(rid) = rid {
+
            Ok(rid)
+
        } else {
+
            Err(CibToolError::NotFound(wanted.into()))
+
        }
+
    }
+

+
    fn lookup_commit(&self, rid: RepoId, gitref: &str) -> Result<Oid, CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        let storage =
+
            Storage::open(profile.storage(), profile.info()).map_err(CibToolError::Storage)?;
+
        let repo = storage
+
            .repository(rid)
+
            .map_err(|e| CibToolError::RepoOpen(rid, e))?;
+
        let object = repo
+
            .backend
+
            .revparse_single(gitref)
+
            .map_err(|e| CibToolError::RevParse(gitref.into(), e))?;
+

+
        Ok(object.id().into())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct ShowEvent {
+
    #[clap(long, required_unless_present = "id_file")]
+
    id: Option<QueueId>,
+

+
    #[clap(long)]
+
    json: bool,
+

+
    #[clap(long)]
+
    output: Option<PathBuf>,
+

+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl ShowEvent {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+

+
        let id = if let Some(id) = &self.id {
+
            id.clone()
+
        } else {
+
            assert!(self.id_file.is_some());
+
            let file = self.id_file.as_ref().unwrap();
+
            let id = read(file).expect("read id file");
+
            let id = String::from_utf8_lossy(&id).to_string();
+
            QueueId::from(&id)
+
        };
+

+
        if let Some(event) = db.get_queued_event(&id)? {
+
            if self.json {
+
                let json = serde_json::to_string_pretty(&event.event())
+
                    .map_err(|e| CibToolError::EventToJson(event.event().clone(), e))?;
+
                if let Some(filename) = &self.output {
+
                    std::fs::write(filename, json.as_bytes())
+
                        .map_err(|e| CibToolError::Write(filename.into(), e))?;
+
                } else {
+
                    println!("{json}");
+
                }
+
            } else {
+
                println!("{event:#?}");
+
            }
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct RemoveEvent {
+
    #[clap(long, required_unless_present = "id_file")]
+
    id: Option<QueueId>,
+

+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl RemoveEvent {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+

+
        let id = if let Some(id) = &self.id {
+
            id.clone()
+
        } else {
+
            assert!(self.id_file.is_some());
+
            let file = self.id_file.as_ref().unwrap();
+
            let id = read(file).expect("read id file");
+
            let id = String::from_utf8_lossy(&id).to_string();
+
            QueueId::from(&id)
+
        };
+

+
        db.remove_queued_event(&id)?;
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct Shutdown {
+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl Shutdown {
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        let id = db.push_queued_event(BrokerEvent::Shutdown)?;
+

+
        if let Some(filename) = &self.id_file {
+
            write(filename, id.to_string().as_bytes()).expect("write id file");
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
enum CibToolError {
+
    #[error("failed to look up node profile")]
+
    Profile(#[source] radicle::profile::Error),
+

+
    #[error("failed to look up open node storage")]
+
    Storage(#[source] radicle::storage::Error),
+

+
    #[error("failed to list repositories in node storage")]
+
    Repositories(#[source] radicle::storage::Error),
+

+
    #[error("failed to look up project info for repository {0}")]
+
    Project(RepoId, #[source] radicle::identity::doc::PayloadError),
+

+
    #[error("node has more than one repository called {0}")]
+
    DuplicateRepositories(String),
+

+
    #[error("node has no repository called: {0}")]
+
    NotFound(String),
+

+
    #[error("failed to open git repository in node storage: {0}")]
+
    RepoOpen(RepoId, #[source] radicle::storage::RepositoryError),
+

+
    #[error("failed to parse git ref as a commit id: {0}")]
+
    RevParse(String, #[source] radicle::git::raw::Error),
+

+
    #[error(transparent)]
+
    Broker(#[from] BrokerError),
+

+
    #[error(transparent)]
+
    Db(#[from] DbError),
+

+
    #[error("failed to serialize broker event to JSON: {0:#?}")]
+
    EventToJson(BrokerEvent, #[source] serde_json::Error),
+

+
    #[error("failed to write file: {0}")]
+
    Write(PathBuf, #[source] std::io::Error),
+
}
modified src/event.rs
@@ -322,7 +322,7 @@ pub enum BrokerEvent {
}

impl BrokerEvent {
-
    fn new(rid: &RepoId, name: &RefString, oid: &Oid, old: Option<Oid>) -> Self {
+
    pub fn new(rid: &RepoId, name: &RefString, oid: &Oid, old: Option<Oid>) -> Self {
        Self::RefChanged {
            rid: *rid,
            name: name.clone(),