Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: rewrite database abstraction
Lars Wirzenius committed 1 year ago
commit 1386c94189c60ddb3037e5946c59e2c5f058f3e2
parent 969f4fb2fb4786364a0f6a273a1eafe95a9f6e10
3 files changed +625 -83
modified Cargo.lock
@@ -1648,6 +1648,7 @@ dependencies = [
 "serde_json",
 "serde_yaml 0.9.34+deprecated",
 "sqlite",
+
 "sqlite3-sys",
 "subplot-build",
 "subplotlib",
 "tempfile",
modified Cargo.toml
@@ -27,6 +27,7 @@ thiserror = "1.0.50"
time = { version = "0.3.34", features = ["formatting", "macros"] }
uuid = { version = "1.7.0", features = ["v4"] }
regex = "1.10.4"
+
sqlite3-sys = "0.15.0"

[dependencies.radicle]
version = "0.11.0"
modified src/db.rs
@@ -1,118 +1,618 @@
-
//! Persistent database for CI run information.
+
//! Database abstraction for the Radicle CI broker.
+
//!
+
//! This module is a wrapper around the [SQLite](https://sqlite.org/)
+
//! database. It is meant to suffice for the Radicle CI broker, and
+
//! does not try to be a more generic wrapper.
+
//!
+
//! The database stores the following kinds of data:
+
//!
+
//! - "counter": This is used for testing the implementation of
+
//!   concurrent access to the database. It is not useful for anything
+
//!   else.

use std::{
    fmt,
    path::{Path, PathBuf},
+
    time::{Duration, Instant},
};

-
use log::info;
-
use sqlite::{Connection, State};
+
use log::debug;
+
use sqlite::{Connection, State, Statement};
+
use time::{macros::format_description, OffsetDateTime};
+
use uuid::Uuid;

-
use crate::run::Run;
+
use crate::{event::BrokerEvent, msg::RunId, run::Run};

-
const CREATE_TABLES: &str =
-
    "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)";
-

-
const INSERT_ROW: &str = "INSERT OR REPLACE INTO ci_runs (run_id, json) VALUES (:id, :json)";
-

-
const ALL_RUNS: &str = "SELECT json FROM ci_runs";
+
const MAX_WAIT: u64 = 1000; // how long to retry when SQL fails for busy database

+
/// The CI broker database. It stores the data that needs to be
+
/// persistent, even if the process terminates.
pub struct Db {
    filename: PathBuf,
    conn: Connection,
}

-
impl fmt::Debug for Db {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
-
        write!(f, "<Db:{}>", self.filename.display())
+
impl Db {
+
    /// Open or create a database. It is created if it doesn't exist.
+
    /// If it is created, tables are created.
+
    pub fn new<P: AsRef<Path>>(filename: P) -> Result<Self, DbError> {
+
        let filename = filename.as_ref();
+
        debug!("open database {}", filename.display());
+
        let db = Db {
+
            filename: filename.into(),
+
            conn: sqlite::open(filename).map_err(|e| DbError::open(filename, e))?,
+
        };
+

+
        {
+
            db.begin()?;
+
            db.create_tables()?;
+
            db.commit()?;
+
        }
+

+
        Ok(db)
    }
-
}

-
impl Db {
-
    pub fn new(filename: &Path) -> Result<Self, DbError> {
-
        info!("open database {}", filename.display());
-
        let conn = sqlite::open(filename).map_err(|e| DbError::open(filename, e))?;
+
    fn create_tables(&self) -> Result<(), DbError> {
+
        const TABLES: &[&str] = &[
+
            "CREATE TABLE IF NOT EXISTS counter_test (counter INT)",
+
            "CREATE TABLE IF NOT EXISTS event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT, status TEXT)",
+
            "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)",
+
        ];

-
        info!("create tables");
-
        conn.execute(CREATE_TABLES)
-
            .map_err(|e| DbError::create_tables(CREATE_TABLES, e))?;
+
        for table in TABLES.iter() {
+
            let mut stmt = self.prepare(table)?;
+
            Self::execute_valueless(&mut stmt)?;
+
        }

-
        info!("database setup OK");
-
        Ok(Db {
-
            conn,
-
            filename: filename.into(),
-
        })
+
        Ok(())
+
    }
+

+
    /// Name of database file.
+
    pub fn filename(&self) -> &Path {
+
        &self.filename
+
    }
+

+
    /// Start a transaction.
+
    pub fn begin(&self) -> Result<(), DbError> {
+
        let mut stmt = self.prepare("BEGIN TRANSACTION")?;
+
        Self::execute_valueless(&mut stmt)
+
    }
+

+
    /// Commit a transaction.
+
    pub fn commit(&self) -> Result<(), DbError> {
+
        let mut stmt = self.prepare("COMMIT")?;
+
        Self::execute_valueless(&mut stmt)
+
    }
+

+
    /// Roll back a transaction.
+
    pub fn rollback(&self) -> Result<(), DbError> {
+
        let mut stmt = self.prepare("ROLLBACK")?;
+
        Self::execute_valueless(&mut stmt)
+
    }
+

+
    // Prepare a statement for execution.
+
    fn prepare<'a>(&'a self, sql: &str) -> Result<Stmt<'a>, DbError> {
+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);
+
        loop {
+
            match self.conn.prepare(sql) {
+
                Ok(stmt) => {
+
                    return Ok(Stmt::new(sql, stmt));
+
                }
+

+
                // An error about the database being busy (locked) is
+
                // probably due to another process modifying the same
+
                // database. We ignore it as the problem should go away in
+
                // a very short while.
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::preapre(sql, &self.filename, e));
+
                }
+

+
                Err(e) => {
+
                    return Err(DbError::preapre(sql, &self.filename, e));
+
                }
+
            }
+
        }
+
    }
+

+
    // Execute a statement that doesn't return any rows with values.
+
    // This means basically any statement except S
+
    fn execute_valueless(stmt: &mut Stmt) -> Result<(), DbError> {
+
        stmt.stmt.reset().map_err(DbError::reset)?;
+

+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);
+
        loop {
+
            match stmt.stmt.next() {
+
                Ok(_) => {
+
                    break;
+
                }
+

+
                // An error about the database being busy (locked) is
+
                // probably due to another process modifying the same
+
                // database. We ignore it as the problem should go away in
+
                // a very short while.
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::execute(&stmt.sql, e));
+
                }
+

+
                Err(e) => {
+
                    return Err(DbError::execute(&stmt.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(())
+
    }
+

+
    /// Create the counter with an initial value. Only use this if
+
    /// there isn't a counter row already.
+
    pub fn create_counter(&self, counter: i64) -> Result<(), DbError> {
+
        let mut insert = self.prepare("INSERT INTO counter_test (counter) VALUES (:1)")?;
+
        insert.stmt.bind((1, counter)).unwrap();
+
        match insert.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::insert_counter(&insert.sql, e)),
+
        }
+
        Ok(())
+
    }
+

+
    /// Update the counter to have a new value.
+
    pub fn update_counter(&self, counter: i64) -> Result<(), DbError> {
+
        let mut update = self.prepare("UPDATE counter_test SET counter = :1")?;
+
        update.stmt.bind((1, counter)).unwrap();
+
        match update.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::update_counter(&update.sql, e)),
+
        }
+
        Ok(())
    }

-
    pub fn push_run(&mut self, run: &Run) -> Result<(), DbError> {
-
        let json = serde_json::to_string(&run).map_err(DbError::to_json)?;
+
    /// Return the current value of the counter, if any.
+
    pub fn get_counter(&self) -> Result<Option<i64>, DbError> {
+
        let mut select = self.prepare("SELECT counter FROM counter_test")?;
+
        let mut counter = None;
+

+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    counter = Some(select.stmt.read("counter").unwrap());
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::get_counter(&select.sql, e));
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_counter(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(counter)
+
    }
+

+
    /// Return list of broker events currently in the queue.
+
    pub fn queued_events(&self) -> Result<Vec<QueueId>, DbError> {
+
        let mut select = self.prepare("SELECT id FROM event_queue")?;
+

+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);
+

+
        let mut ids = vec![];
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let id: String = select
+
                        .stmt
+
                        .read("id")
+
                        .map_err(|e| DbError::list_events(&select.sql, e))?;
+
                    ids.push(QueueId::from(&id));
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::list_events(&select.sql, e));
+
                }
+
                Err(e) => {
+
                    return Err(DbError::list_events(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(ids)
+
    }

-
        let mut stmt = self
-
            .conn
-
            .prepare(INSERT_ROW)
-
            .map_err(|e| DbError::prepare(INSERT_ROW, e))?;
+
    /// Return a specific event, given is id, if one exists.
+
    pub fn get_queued_event(&self, id: &QueueId) -> Result<Option<QueuedEvent>, DbError> {
+
        let mut select = self.prepare("SELECT timestamp, event FROM event_queue WHERE id = :id")?;
+
        select
+
            .stmt
+
            .bind((":id", id.as_str()))
+
            .map_err(|e| DbError::bind(&select.sql, e))?;
+

+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);
+

+
        let mut timestamp = None;
+
        let mut event = None;
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    timestamp = Some(
+
                        select
+
                            .stmt
+
                            .read("timestamp")
+
                            .map_err(|e| DbError::get_event(&select.sql, e))?,
+
                    );
+
                    let json: String = select
+
                        .stmt
+
                        .read("event")
+
                        .map_err(|e| DbError::get_event(&select.sql, e))?;
+
                    event = Some(
+
                        serde_json::from_str(&json)
+
                            .map_err(|e| DbError::event_from_json(&json, e))?,
+
                    );
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::get_event(&select.sql, e));
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_event(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        if timestamp.is_some() && event.is_some() {
+
            let qe = QueuedEvent::new(id.clone(), timestamp.unwrap(), event.unwrap());
+
            Ok(Some(qe))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    /// Add a new event to the event queue, returning its id.
+
    pub fn push_queued_event(&self, event: BrokerEvent) -> Result<QueueId, DbError> {
+
        let json = serde_json::to_string(&event).expect("serialize BrokerEvent to JSON");
+

+
        let id = QueueId::default();
+
        let ts = now();
+
        let status = "FIXME";
+

+
        let mut insert = self.prepare(
+
            "INSERT INTO event_queue (id, timestamp, event, status) VALUES (:id, :ts, :e, :s)",
+
        )?;
+
        insert
+
            .stmt
+
            .bind((":id", id.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":ts", ts.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":e", json.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":s", status))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        match insert.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::push_event(&insert.sql, e)),
+
        }
+

+
        Ok(id)
+
    }

-
        let run_id = format!("{}", run.adapter_run_id().unwrap());
-
        stmt.bind((":id", run_id.as_str()))
-
            .map_err(|e| DbError::bind(":id", e))?;
-
        stmt.bind((":json", json.as_str()))
-
            .map_err(|e| DbError::bind(":json", e))?;
-
        stmt.next().map_err(DbError::insert_run)?;
+
    /// Remove event from queue, given its id. It's OK if the event is
+
    /// not in the queue, that is just silently ignored.
+
    pub fn remove_queued_event(&self, id: &QueueId) -> Result<(), DbError> {
+
        let mut remove = self.prepare("DELETE FROM event_queue WHERE id = :id")?;
+
        remove
+
            .stmt
+
            .bind((":id", id.as_str()))
+
            .map_err(|e| DbError::bind(&remove.sql, e))?;
+

+
        match remove.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::remove_event(&remove.sql, e)),
+
        }

        Ok(())
    }

-
    pub fn all_runs(&mut self) -> Result<Vec<Run>, DbError> {
-
        let mut stmt = self
-
            .conn
-
            .prepare(ALL_RUNS)
-
            .map_err(|e| DbError::prepare(ALL_RUNS, e))?;
+
    /// Return list of CI runs currently in the database.
+
    pub fn list_runs(&self) -> Result<Vec<RunId>, DbError> {
+
        let mut select = self.prepare("SELECT json FROM ci_runs")?;
+

+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);

        let mut runs = vec![];
-
        while let Ok(State::Row) = stmt.next() {
-
            let json: String = stmt.read("json").map_err(DbError::get_run)?;
-
            let run: Run = serde_json::from_str(&json).map_err(DbError::from_json)?;
-
            runs.push(run);
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let json: String = select
+
                        .stmt
+
                        .read("event")
+
                        .map_err(|e| DbError::get_event(&select.sql, e))?;
+
                    let run = serde_json::from_str(&json)
+
                        .map_err(|e| DbError::run_from_json(&json, e))?;
+
                    runs.push(run);
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::list_runs(&select.sql, e));
+
                }
+
                Err(e) => {
+
                    return Err(DbError::list_runs(&select.sql, e));
+
                }
+
            }
        }

        Ok(runs)
    }
+

+
    /// Return a specific CI run, given is id, if one exists.
+
    pub fn get_run(&self, id: &RunId) -> Result<Option<Run>, DbError> {
+
        let mut select = self.prepare("SELECT json FROM ci_runs WHERE run_id = :id")?;
+
        select
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&select.sql, e))?;
+

+
        let started = Instant::now();
+
        let max_wait = Duration::from_millis(MAX_WAIT);
+

+
        let mut run = None;
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let json: String = select
+
                        .stmt
+
                        .read("event")
+
                        .map_err(|e| DbError::get_event(&select.sql, e))?;
+
                    run = Some(
+
                        serde_json::from_str(&json)
+
                            .map_err(|e| DbError::run_from_json(&json, e))?,
+
                    );
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
+
                    if started.elapsed() < max_wait {
+
                        continue;
+
                    }
+
                    return Err(DbError::get_run(&select.sql, e));
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_run(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(run)
+
    }
+

+
    /// Add a new CI run to the database, returning its id.
+
    pub fn push_run(&self, run: Run) -> Result<RunId, DbError> {
+
        let json = serde_json::to_string(&run).expect("serialize BrokerEvent to JSON");
+
        let id = RunId::default();
+

+
        let mut insert = self.prepare("INSERT INTO ci_runs (run_id, json) VALUES (:id, :json)")?;
+
        insert
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":json", json.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+

+
        match insert.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::push_run(&insert.sql, e)),
+
        }
+

+
        Ok(id)
+
    }
+

+
    /// Remove a CI run from database, given its id. It's OK if the run is
+
    /// not in the database, that is just silently ignored.
+
    pub fn remove_run(&self, id: &RunId) -> Result<(), DbError> {
+
        let mut remove = self.prepare("DELETE FROM ci_runs WHERE id = :id")?;
+
        remove
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&remove.sql, e))?;
+

+
        match remove.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::remove_run(&remove.sql, e)),
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
fn now() -> String {
+
    let fmt =
+
        format_description!("[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:6]Z");
+
    OffsetDateTime::now_utc().format(fmt).expect("format time")
+
}
+

+
// A wrapper around a statement that remembers its text form.
+
struct Stmt<'a> {
+
    sql: String,
+
    stmt: Statement<'a>,
+
}
+

+
impl<'a> Stmt<'a> {
+
    fn new(sql: &str, stmt: Statement<'a>) -> Self {
+
        Self {
+
            sql: sql.into(),
+
            stmt,
+
        }
+
    }
+
}
+

+
/// An identifier for an event in the event queue in the database.
+
#[derive(Clone, Debug)]
+
pub struct QueueId {
+
    id: String,
+
}
+

+
impl fmt::Display for QueueId {
+
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
+
        write!(f, "{}", self.id)
+
    }
+
}
+

+
impl Default for QueueId {
+
    fn default() -> Self {
+
        Self {
+
            id: Uuid::new_v4().to_string(),
+
        }
+
    }
+
}
+

+
impl From<&str> for QueueId {
+
    fn from(id: &str) -> Self {
+
        Self { id: id.into() }
+
    }
+
}
+

+
impl From<&String> for QueueId {
+
    fn from(id: &String) -> Self {
+
        Self { id: id.into() }
+
    }
+
}
+

+
impl QueueId {
+
    fn as_str(&self) -> &str {
+
        self.id.as_str()
+
    }
+
}
+

+
#[derive(Clone, Debug)]
+
pub struct QueuedEvent {
+
    id: QueueId,
+
    ts: String,
+
    event: BrokerEvent,
+
}
+

+
impl QueuedEvent {
+
    fn new(id: QueueId, ts: String, event: BrokerEvent) -> Self {
+
        Self { id, ts, event }
+
    }
+

+
    pub fn id(&self) -> &QueueId {
+
        &self.id
+
    }
+

+
    pub fn timestamp(&self) -> &str {
+
        &self.ts
+
    }
+

+
    pub fn event(&self) -> &BrokerEvent {
+
        &self.event
+
    }
}

/// All errors from this module.
#[derive(Debug, thiserror::Error)]
pub enum DbError {
-
    /// Error opening a database file.
    #[error("failed to open SQLite database {0}")]
    Open(PathBuf, #[source] sqlite::Error),

-
    /// Error creating tables.
-
    #[error("failed to create tables: {0}")]
-
    CreateTables(&'static str, #[source] sqlite::Error),
+
    #[error("failed to prepare SQL statement SQLite database {0}: {1}")]
+
    Prepare(String, PathBuf, #[source] sqlite::Error),
+

+
    #[error("failed to reset connection to SQLite")]
+
    Reset(#[source] sqlite::Error),
+

+
    #[error("failed to execute SQL statement in SQLite: {0}")]
+
    Execute(String, #[source] sqlite::Error),
+

+
    #[error("failed to bind a value in SQL statement in SQLite: {0}")]
+
    Bind(String, #[source] sqlite::Error),
+

+
    #[error("failed to insert a counter into database")]
+
    InsertCounter(String, #[source] sqlite::Error),
+

+
    #[error("failed to update a counter in database")]
+
    UpdateCounter(String, #[source] sqlite::Error),
+

+
    #[error("failed to retrieve a counter from database")]
+
    GetCounter(String, #[source] sqlite::Error),
+

+
    #[error("failed to insert an event into database")]
+
    InsertEvent(String, #[source] sqlite::Error),

-
    /// Error preparing an SQL statement.
-
    #[error("failed to prepare SQL statement {0}")]
-
    Prepare(&'static str, #[source] sqlite::Error),
+
    #[error("failed to list queued events in database")]
+
    ListEvents(String, #[source] sqlite::Error),

-
    /// Error binding a value to an SQL statement placeholder.
-
    #[error("failed to bind a value to SQL statement placeholder {0}")]
-
    Bind(&'static str, #[source] sqlite::Error),
+
    #[error("failed to retrieve a queued event in database")]
+
    GetEvent(String, #[source] sqlite::Error),

-
    /// Error inserting or updating a run in SQL database.
-
    #[error("failed to insert or update a run in SQL database")]
-
    InsertRun(#[source] sqlite::Error),
+
    #[error("failed to parse queued event as JSON: {0}")]
+
    EventFromJson(String, #[source] serde_json::Error),

-
    /// Error getting a run from SQL query.
-
    #[error("failed to get CI run from SQL query result")]
-
    GetRun(#[source] sqlite::Error),
+
    #[error("failed to insert an event into queue")]
+
    PushEvent(String, #[source] sqlite::Error),

-
    /// Error serializing a [`Run`]` into a string.
-
    #[error("failed to serialize a CI run into JSON")]
-
    ToJson(#[source] serde_json::Error),
+
    #[error("failed to remove an event from queue")]
+
    RemoveEvent(String, #[source] sqlite::Error),

-
    /// Error deserializing a [`Run`]` from a string.
-
    #[error("failed to parse JSON as a CI run")]
-
    FromJson(#[source] serde_json::Error),
+
    #[error("failed to list CI runs in database")]
+
    ListRuns(String, #[source] sqlite::Error),
+

+
    #[error("failed to parse CI run as JSON: {0}")]
+
    RunFromJson(String, #[source] serde_json::Error),
+

+
    #[error("failed to retrieve a CI run from database")]
+
    GetRun(String, #[source] sqlite::Error),
+

+
    #[error("failed to insert a CI run into database")]
+
    PushRun(String, #[source] sqlite::Error),
+

+
    #[error("failed to remove a CI run from database")]
+
    RemoveRun(String, #[source] sqlite::Error),
}

impl DbError {
@@ -120,31 +620,71 @@ impl DbError {
        Self::Open(filename.into(), e)
    }

-
    fn create_tables(query: &'static str, e: sqlite::Error) -> Self {
-
        Self::CreateTables(query, e)
+
    fn preapre(sql: &str, filename: &Path, e: sqlite::Error) -> Self {
+
        Self::Prepare(sql.into(), filename.into(), e)
+
    }
+

+
    fn reset(e: sqlite::Error) -> Self {
+
        Self::Reset(e)
+
    }
+

+
    fn execute(sql: &str, e: sqlite::Error) -> Self {
+
        Self::Execute(sql.into(), e)
+
    }
+

+
    fn bind(sql: &str, e: sqlite::Error) -> Self {
+
        Self::Bind(sql.into(), e)
+
    }
+

+
    fn insert_counter(sql: &str, e: sqlite::Error) -> Self {
+
        Self::InsertCounter(sql.into(), e)
+
    }
+

+
    fn update_counter(sql: &str, e: sqlite::Error) -> Self {
+
        Self::UpdateCounter(sql.into(), e)
+
    }
+

+
    fn get_counter(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetCounter(sql.into(), e)
+
    }
+

+
    fn list_events(sql: &str, e: sqlite::Error) -> Self {
+
        Self::ListEvents(sql.into(), e)
+
    }
+

+
    fn get_event(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetEvent(sql.into(), e)
+
    }
+

+
    fn event_from_json(json: &str, e: serde_json::Error) -> Self {
+
        Self::EventFromJson(json.into(), e)
+
    }
+

+
    fn push_event(sql: &str, e: sqlite::Error) -> Self {
+
        Self::PushEvent(sql.into(), e)
    }

-
    fn prepare(stmt: &'static str, e: sqlite::Error) -> Self {
-
        Self::Prepare(stmt, e)
+
    fn remove_event(sql: &str, e: sqlite::Error) -> Self {
+
        Self::RemoveEvent(sql.into(), e)
    }

-
    fn bind(placeholder: &'static str, e: sqlite::Error) -> Self {
-
        Self::Bind(placeholder, e)
+
    fn list_runs(sql: &str, e: sqlite::Error) -> Self {
+
        Self::ListRuns(sql.into(), e)
    }

-
    fn insert_run(e: sqlite::Error) -> Self {
-
        Self::InsertRun(e)
+
    fn run_from_json(json: &str, e: serde_json::Error) -> Self {
+
        Self::RunFromJson(json.into(), e)
    }

-
    fn to_json(e: serde_json::Error) -> Self {
-
        Self::ToJson(e)
+
    fn get_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetRun(sql.into(), e)
    }

-
    fn from_json(e: serde_json::Error) -> Self {
-
        Self::FromJson(e)
+
    fn push_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::PushRun(sql.into(), e)
    }

-
    fn get_run(e: sqlite::Error) -> Self {
-
        Self::GetRun(e)
+
    fn remove_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::RemoveRun(sql.into(), e)
    }
}