Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat! make list of CI runs in broker be persistent
Lars Wirzenius committed 2 years ago
commit 23be5b2133a904b4cde3fe8c1f3a147cbbce6b6c
parent 173231d46c14625013660db8420d7cdf224bb521
10 files changed +264 -13
modified Cargo.lock
@@ -1169,6 +1169,7 @@ dependencies = [
 "serde",
 "serde_json",
 "serde_yaml",
+
 "sqlite",
 "tempfile",
 "thiserror",
 "time",
modified Cargo.toml
@@ -16,6 +16,7 @@ radicle-surf = { version = "0.18.0", default-features = false, features = ["serd
uuid = { version = "1.7.0", features = ["v4"] }
time = { version = "0.3.34", features = ["formatting", "macros"] }
html-page = "0.1.0"
+
sqlite = "0.32.0"

[dependencies.radicle]
git = "https://seed.radicle.xyz/z3gqcJUoA1n9HaHKufZs5FCSGazv5.git"
modified src/bin/ci-broker.rs
@@ -43,8 +43,11 @@ fn fallible_main() -> Result<(), BrokerError> {
    let config = Config::load(&filename)?;
    debug!("loaded configuration: {:#?}", config);

-
    let mut broker = Broker::default();
-
    debug!("created broker");
+
    let mut broker = Broker::new(config.db())?;
+
    debug!(
+
        "created broker, db has {} CI runs",
+
        broker.all_runs()?.len()
+
    );

    // FIXME: this is broken. the config file only lists how to invoke
    // each adapter, not what adapter to use for each repo.
@@ -80,6 +83,7 @@ fn fallible_main() -> Result<(), BrokerError> {
    // Spawn a thread that updates the status pages.
    let mut page = PageBuilder::default()
        .node_alias(&profile.config.node.alias)
+
        .runs(broker.all_runs()?)
        .build()?;
    let page2 = page.clone();
    let report_dir = if let Some(dir) = &config.report_dir {
added src/bin/list_runs.rs
@@ -0,0 +1,30 @@
+
use std::{error::Error, path::PathBuf};
+

+
use radicle_ci_broker::{db::Db, error::BrokerError};
+

+
fn main() {
+
    if let Err(e) = fallible_main() {
+
        eprintln!("ERROR: {}", e);
+
        let mut e = e.source();
+
        while let Some(source) = e {
+
            eprintln!("caused by: {}", source);
+
            e = source.source();
+
        }
+
    }
+
}
+

+
fn fallible_main() -> Result<(), BrokerError> {
+
    let mut args = std::env::args().skip(1);
+
    let filename: PathBuf = if let Some(filename) = args.next() {
+
        PathBuf::from(filename)
+
    } else {
+
        return Err(BrokerError::Usage);
+
    };
+

+
    let mut db = Db::new(&filename)?;
+
    for run in db.all_runs()? {
+
        println!("{run:#?}");
+
    }
+

+
    Ok(())
+
}
modified src/broker.rs
@@ -3,14 +3,16 @@
//! This is type and module of its own to facilitate automated
//! testing.

-
use std::collections::HashMap;
+
use std::{collections::HashMap, path::Path};

+
use log::debug;
use time::{macros::format_description, OffsetDateTime};

use radicle::prelude::RepoId;

use crate::{
    adapter::Adapter,
+
    db::Db,
    error::BrokerError,
    msg::{PatchEvent, PushEvent, Request},
    pages::StatusPage,
@@ -22,13 +24,29 @@ use crate::{
/// The broker gets repository change events from the local Radicle
/// node, and executes the appropriate adapter to run CI on the
/// repository.
-
#[derive(Debug, Default)]
+
#[derive(Debug)]
pub struct Broker {
    default_adapter: Option<Adapter>,
    adapters: HashMap<RepoId, Adapter>,
+
    db: Db,
}

impl Broker {
+
    #[allow(clippy::result_large_err)]
+
    pub fn new(db_filename: &Path) -> Result<Self, BrokerError> {
+
        debug!("broker database in {}", db_filename.display());
+
        Ok(Self {
+
            default_adapter: None,
+
            adapters: HashMap::new(),
+
            db: Db::new(db_filename)?,
+
        })
+
    }
+

+
    #[allow(clippy::result_large_err)]
+
    pub fn all_runs(&mut self) -> Result<Vec<Run>, BrokerError> {
+
        self.db.all_runs().map_err(BrokerError::Db)
+
    }
+

    pub fn set_default_adapter(&mut self, adapter: &Adapter) {
        self.default_adapter = Some(adapter.clone());
    }
@@ -47,7 +65,7 @@ impl Broker {

    #[allow(clippy::result_large_err)]
    pub fn execute_ci(
-
        &self,
+
        &mut self,
        trigger: &Request,
        status: &mut StatusPage,
    ) -> Result<Run, BrokerError> {
@@ -82,6 +100,7 @@ impl Broker {
                }
            }
        };
+
        self.db.push_run(&run)?;

        Ok(run)
    }
@@ -94,6 +113,7 @@ fn now() -> String {

#[cfg(test)]
mod test {
+
    use std::path::Path;
    use tempfile::tempdir;

    use super::{Adapter, Broker, RepoId};
@@ -104,6 +124,10 @@ mod test {
        test::{mock_adapter, trigger_request, TestResult},
    };

+
    fn broker(filename: &Path) -> Broker {
+
        Broker::new(filename).unwrap()
+
    }
+

    fn rid() -> RepoId {
        const RID: &str = "rad:zwTxygwuz5LDGBq255RA2CbNGrz8";
        RepoId::from_urn(RID).unwrap()
@@ -123,7 +147,9 @@ mod test {

    #[test]
    fn has_no_adapters_initially() -> TestResult<()> {
-
        let broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let broker = broker(&db);
        let rid = rid();
        assert_eq!(broker.adapter(&rid), None);
        Ok(())
@@ -131,7 +157,10 @@ mod test {

    #[test]
    fn adds_adapter() -> TestResult<()> {
-
        let mut broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let mut broker = broker(&db);
+

        let adapter = Adapter::default();
        let rid = rid();
        broker.set_repository_adapter(&rid, &adapter);
@@ -141,7 +170,10 @@ mod test {

    #[test]
    fn does_not_find_unknown_repo() -> TestResult<()> {
-
        let mut broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let mut broker = broker(&db);
+

        let adapter = Adapter::default();
        let rid = rid();
        let rid2 = rid2();
@@ -152,14 +184,20 @@ mod test {

    #[test]
    fn does_not_have_a_default_adapter_initially() -> TestResult<()> {
-
        let broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let broker = broker(&db);
+

        assert_eq!(broker.default_adapter(), None);
        Ok(())
    }

    #[test]
    fn sets_a_default_adapter_initially() -> TestResult<()> {
-
        let mut broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let mut broker = broker(&db);
+

        let adapter = Adapter::default();
        broker.set_default_adapter(&adapter);
        assert_eq!(broker.default_adapter(), Some(&adapter));
@@ -168,7 +206,10 @@ mod test {

    #[test]
    fn finds_default_adapter_for_unknown_repo() -> TestResult<()> {
-
        let mut broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let mut broker = broker(&db);
+

        let adapter = Adapter::default();
        broker.set_default_adapter(&adapter);

@@ -188,7 +229,9 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");
        let adapter = mock_adapter(&bin, ADAPTER)?;

-
        let mut broker = Broker::default();
+
        let tmp = tempdir().unwrap();
+
        let db = tmp.path().join("db.db");
+
        let mut broker = broker(&db);
        broker.set_default_adapter(&adapter);

        let trigger = trigger_request()?;
modified src/config.rs
@@ -18,6 +18,7 @@ pub struct Config {
    pub filters: Vec<EventFilter>,
    pub report_dir: Option<PathBuf>,
    pub status_update_interval_seconds: Option<u64>,
+
    pub db: PathBuf,
}

impl Config {
@@ -35,6 +36,10 @@ impl Config {
        self.status_update_interval_seconds
            .unwrap_or(DEFAULT_STATUS_PAGE_UPDATE_INTERVAL)
    }
+

+
    pub fn db(&self) -> &Path {
+
        &self.db
+
    }
}

#[derive(Debug, Serialize, Deserialize)]
added src/db.rs
@@ -0,0 +1,148 @@
+
//! Persistent database for CI run information.
+

+
use std::{
+
    fmt,
+
    path::{Path, PathBuf},
+
};
+

+
use sqlite::{Connection, State};
+

+
use crate::run::Run;
+

+
const CREATE_TABLES: &str =
+
    "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)";
+

+
const INSERT_ROW: &str = "INSERT OR REPLACE INTO ci_runs (run_id, json) VALUES (:id, :json)";
+

+
const ALL_RUNS: &str = "SELECT json FROM ci_runs";
+

+
pub struct Db {
+
    filename: PathBuf,
+
    conn: Connection,
+
}
+

+
impl fmt::Debug for Db {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
+
        write!(f, "<Db:{}>", self.filename.display())
+
    }
+
}
+

+
impl Db {
+
    pub fn new(filename: &Path) -> Result<Self, DbError> {
+
        eprintln!("open {}", filename.display());
+
        let conn = sqlite::open(filename).map_err(|e| DbError::open(filename, e))?;
+

+
        eprintln!("create tables");
+
        conn.execute(CREATE_TABLES)
+
            .map_err(|e| DbError::create_tables(CREATE_TABLES, e))?;
+

+
        Ok(Db {
+
            conn,
+
            filename: filename.into(),
+
        })
+
    }
+

+
    pub fn push_run(&mut self, run: &Run) -> Result<(), DbError> {
+
        let json = serde_json::to_string(&run).map_err(DbError::to_json)?;
+

+
        let mut stmt = self
+
            .conn
+
            .prepare(INSERT_ROW)
+
            .map_err(|e| DbError::prepare(INSERT_ROW, e))?;
+

+
        let run_id = format!("{}", run.adapter_run_id().unwrap());
+
        stmt.bind((":id", run_id.as_str()))
+
            .map_err(|e| DbError::bind(":id", e))?;
+
        stmt.bind((":json", json.as_str()))
+
            .map_err(|e| DbError::bind(":json", e))?;
+
        stmt.next().map_err(DbError::insert_run)?;
+

+
        Ok(())
+
    }
+

+
    pub fn all_runs(&mut self) -> Result<Vec<Run>, DbError> {
+
        let mut stmt = self
+
            .conn
+
            .prepare(ALL_RUNS)
+
            .map_err(|e| DbError::prepare(ALL_RUNS, e))?;
+

+
        let mut runs = vec![];
+
        while let Ok(State::Row) = stmt.next() {
+
            let json: String = stmt.read("json").map_err(DbError::get_run)?;
+
            let run: Run = serde_json::from_str(&json).map_err(DbError::from_json)?;
+
            runs.push(run);
+
        }
+

+
        Ok(runs)
+
    }
+
}
+

+
/// All errors from this module.
+
#[derive(Debug, thiserror::Error)]
+
pub enum DbError {
+
    /// Error opening a database file.
+
    #[error("failed to open SQLite database {0}")]
+
    Open(PathBuf, #[source] sqlite::Error),
+

+
    /// Error creating tables.
+
    #[error("failed to create tables: {0}")]
+
    CreateTables(&'static str, #[source] sqlite::Error),
+

+
    /// Error preparing an SQL statement.
+
    #[error("failed to prepare SQL statement {0}")]
+
    Prepare(&'static str, #[source] sqlite::Error),
+

+
    /// Error binding a value to an SQL statement placeholder.
+
    #[error("failed to bind a value to SQL statement placeholder {0}")]
+
    Bind(&'static str, #[source] sqlite::Error),
+

+
    /// Error inserting or updating a run in SQL database.
+
    #[error("failed to insert or update a run in SQL database")]
+
    InsertRun(#[source] sqlite::Error),
+

+
    /// Error getting a run from SQL query.
+
    #[error("failed to get CI run from SQL query result")]
+
    GetRun(#[source] sqlite::Error),
+

+
    /// Error serializing a [`Run`]` into a string.
+
    #[error("failed to serialize a CI run into JSON")]
+
    ToJson(#[source] serde_json::Error),
+

+
    /// Error deserializing a [`Run`]` from a string.
+
    #[error("failed to parse JSON as a CI run")]
+
    FromJson(#[source] serde_json::Error),
+
}
+

+
impl DbError {
+
    fn open(filename: &Path, e: sqlite::Error) -> Self {
+
        Self::Open(filename.into(), e)
+
    }
+

+
    fn create_tables(query: &'static str, e: sqlite::Error) -> Self {
+
        Self::CreateTables(query, e)
+
    }
+

+
    fn prepare(stmt: &'static str, e: sqlite::Error) -> Self {
+
        Self::Prepare(stmt, e)
+
    }
+

+
    fn bind(placeholder: &'static str, e: sqlite::Error) -> Self {
+
        Self::Bind(placeholder, e)
+
    }
+

+
    fn insert_run(e: sqlite::Error) -> Self {
+
        Self::InsertRun(e)
+
    }
+

+
    fn to_json(e: serde_json::Error) -> Self {
+
        Self::ToJson(e)
+
    }
+

+
    fn from_json(e: serde_json::Error) -> Self {
+
        Self::FromJson(e)
+
    }
+

+
    fn get_run(e: sqlite::Error) -> Self {
+
        Self::GetRun(e)
+
    }
+
}
modified src/error.rs
@@ -11,6 +11,7 @@ use radicle::prelude::RepoId;
use crate::{
    adapter::AdapterError,
    config::ConfigError,
+
    db::DbError,
    msg::{MessageError, Request},
    pages::PageError,
};
@@ -66,4 +67,8 @@ pub enum BrokerError {
    /// Status page error.
    #[error(transparent)]
    StatusPage(#[from] PageError),
+

+
    /// Database error.
+
    #[error(transparent)]
+
    Db(#[from] DbError),
}
modified src/lib.rs
@@ -8,6 +8,7 @@
pub mod adapter;
pub mod broker;
pub mod config;
+
pub mod db;
pub mod error;
pub mod event;
pub mod msg;
modified src/pages.rs
@@ -15,6 +15,7 @@ use std::{
};

use html_page::{Document, Element, Tag};
+
use log::info;
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

@@ -46,6 +47,7 @@ pub enum PageError {
#[derive(Default)]
pub struct PageBuilder {
    node_alias: Option<String>,
+
    runs: Vec<Run>,
}

impl PageBuilder {
@@ -54,13 +56,24 @@ impl PageBuilder {
        self
    }

+
    pub fn runs(mut self, runs: Vec<Run>) -> Self {
+
        self.runs = runs;
+
        self
+
    }
+

    pub fn build(self) -> Result<StatusPage, PageError> {
+
        let mut runs = HashMap::new();
+
        for run in self.runs.iter() {
+
            runs.insert(run.adapter_run_id().unwrap().clone(), run.clone());
+
        }
+
        info!("broker database had {} CI runs", runs.len());
+

        Ok(StatusPage::new(PageData {
            timestamp: now(),
            ci_broker_version: env!("CARGO_PKG_VERSION"),
            ci_broker_git_commit: env!("GIT_HEAD"),
            node_alias: self.node_alias.ok_or(PageError::NoAlias)?,
-
            runs: HashMap::new(),
+
            runs,
            broker_event_counter: 0,
            latest_broker_event: None,
            latest_ci_run: None,