Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor(src/bin/ci-broker.rs): use new modules in main program
Lars Wirzenius committed 2 years ago
commit 9886cfd48b7e53b95120dc00dce4225b05df99e4
parent f298cc3ebd8bcb72ce530558b0d112c6da856e93
1 file changed +50 -64
modified src/bin/ci-broker.rs
@@ -1,18 +1,16 @@
-
use std::{
-
    error::Error,
-
    io::BufReader,
-
    path::PathBuf,
-
    process::{Command, Stdio},
-
};
+
use std::{error::Error, path::PathBuf};

use log::{debug, info};

use radicle::prelude::Profile;
use radicle_ci_broker::{
-
    config::{Adapter, Config},
+
    adapter::Adapter,
+
    broker::Broker,
+
    config::Config,
    error::BrokerError,
    event::{BrokerEvent, NodeEventSource},
-
    msg::{Request, Response, RunResult},
+
    msg::Request,
+
    run::Run,
};

fn main() {
@@ -40,82 +38,70 @@ fn fallible_main() -> Result<(), BrokerError> {
    let config = Config::load(&filename)?;
    debug!("loaded configuration: {:#?}", config);

+
    let mut broker = Broker::default();
+
    debug!("created broker");
+

+
    // FIXME: this is broken. the config file only lists how to invoke
+
    // each adapter, not what adapter to use for each repo.
+
    // for (rid, spec) in config.adapters.iter() {
+
    //     debug!("setting adapter for {rid:?} to {spec:#?}");
+
    //     let rid = RepoId::from_urn(rid).map_err(|e| BrokerError::BadRepoId(rid.into(), e))?;
+
    //     let adapter = Adapter::new(&spec.command).with_environment(spec.envs());
+
    //     broker.set_repository_adapter(&rid, &adapter);
+
    // }
+
    // debug!("set per-repository adapters");
+

+
    let spec =
+
        config
+
            .adapter(&config.default_adapter)
+
            .ok_or(BrokerError::UnknownDefaultAdapter(
+
                config.default_adapter.clone(),
+
            ))?;
+
    let adapter = Adapter::new(&spec.command).with_environment(spec.envs());
+
    broker.set_default_adapter(&adapter);
+
    debug!("set default adapter");
+

    let profile = Profile::load()?;
+
    debug!("loaded profile");
+

    let mut source = NodeEventSource::new(&profile)?;
+
    debug!("created node event source");
+

    for filter in config.filters.iter() {
        source.allow(filter.clone());
    }
-

-
    let default = &config.default_adapter;
-
    let adapter = config
-
        .adapter(default)
-
        .ok_or(BrokerError::UnknownDefaultAdapter(default.clone()))?;
+
    debug!("added filters to node event source");

    // This loop ends when there's an error, e.g., failure to read an
    // event from the node.
    let mut counter = 0;
    loop {
+
        debug!("waiting for event from node");
        for e in source.event()? {
            counter += 1;
-
            let req = Request::trigger(&profile, &e)?;
-
            let ret = match run(adapter, req)? {
-
                None => "unknown".into(),
-
                Some(RunResult::Error(e)) => format!("ERROR: {}", e),
-
                Some(RunResult::Failure) => "FAILED".into(),
-
                Some(RunResult::Success) => "OK".into(),
-
                Some(x) => format!("UNKNOWN: {:#?}", x),
-
            };
+
            debug!("broker event {e:#?}");
            let BrokerEvent::RefChanged {
                rid,
                oid,
                name: _,
                old: _,
            } = e;
-
            println!("Run CI run #{}: {}, {} -> {}", counter, rid, oid, ret,);
-
        }
-
    }
-
}

-
fn run(adapter: &Adapter, req: Request) -> Result<Option<RunResult>, BrokerError> {
-
    info!("Trigger on {}", req);
-

-
    debug!("Spawning child process");
-
    let mut child = Command::new(&adapter.command)
-
        .stdin(Stdio::piped())
-
        .stdout(Stdio::piped())
-
        .envs(adapter.envs())
-
        .spawn()
-
        .map_err(|e| BrokerError::SpawnAdapter(adapter.command.clone(), e))?;
-

-
    {
-
        let stdin = child.stdin.take().expect("get stdin");
-
        req.to_writer(stdin)?;
-
    }
-

-
    let mut id = None;
-
    let stdout = child.stdout.take().expect("get stdout");
-
    let mut stdout = BufReader::new(stdout);
-
    let mut ret = None;
-
    while let Some(resp) = Response::from_reader(&mut stdout)? {
-
        debug!("received from adapter: {:#?}", resp);
-
        match resp {
-
            Response::Triggered { run_id } => {
-
                info!("CI run triggered, id is {}", run_id);
-
                id = Some(run_id);
-
            }
-
            Response::Finished { result } => {
-
                if let Some(id) = &id {
-
                    info!("CI run {} finished: {}", id, result);
-
                    ret = Some(result);
-
                } else {
-
                    info!("Unknown CI run finished: {}", result);
-
                    ret = Some(result);
-
                }
+
            let req = Request::trigger(&profile, &e)?;
+
            let mut run = Run::default();
+
            if let Some(adapter) = broker.adapter(&rid) {
+
                adapter.run(&req, &mut run)?;
+
            } else {
+
                return Err(BrokerError::NoAdapter(rid));
            }
+

+
            println!(
+
                "Run CI run #{}: {}, {} -> {}",
+
                counter,
+
                rid,
+
                oid,
+
                run.result().unwrap()
+
            );
        }
    }
-

-
    child.wait().expect("wait for child");
-

-
    Ok(ret)
}