Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor: use sqlite::Connection::set_busy_timeout
Lars Wirzenius committed 1 year ago
commit 84d868c919aa1e28ec9b65333750a28ae2290343
parent 24ed10c6da3536883b8ff1d04662ea3b736d5983
1 file changed +26 -112
modified src/db.rs
@@ -13,10 +13,10 @@
use std::{
    fmt,
    path::{Path, PathBuf},
-
    time::{Duration, Instant},
+
    time::Duration,
};

-
use log::debug;
+
use log::{debug, trace};
use sqlite::{Connection, State, Statement};
use time::{macros::format_description, OffsetDateTime};
use uuid::Uuid;
@@ -24,7 +24,7 @@ use uuid::Uuid;
use crate::{event::BrokerEvent, msg::RunId, run::Run};

// how long to retry when SQL fails for busy database
-
const MAX_WAIT: Duration = Duration::from_millis(1000);
+
const MAX_WAIT: Duration = Duration::from_millis(5_000);

/// The CI broker database. It stores the data that needs to be
/// persistent, even if the process terminates.
@@ -39,16 +39,18 @@ impl Db {
    pub fn new<P: AsRef<Path>>(filename: P) -> Result<Self, DbError> {
        let filename = filename.as_ref();
        debug!("open database {}", filename.display());
-
        let db = Db {
+
        let mut db = Db {
            filename: filename.into(),
            conn: sqlite::open(filename).map_err(|e| DbError::open(filename, e))?,
        };

-
        {
-
            db.begin()?;
-
            db.create_tables()?;
-
            db.commit()?;
-
        }
+
        let ms = MAX_WAIT.as_millis().try_into().unwrap();
+
        trace!("set busy timeout to {ms} milliseconds");
+
        db.conn
+
            .set_busy_timeout(ms)
+
            .map_err(|e| DbError::busy_timer(filename, e))?;
+

+
        db.create_tables()?;

        Ok(db)
    }
@@ -93,63 +95,22 @@ impl Db {

    // Prepare a statement for execution.
    fn prepare<'a>(&'a self, sql: &str) -> Result<Stmt<'a>, DbError> {
-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-
        loop {
-
            match self.conn.prepare(sql) {
-
                Ok(stmt) => {
-
                    return Ok(Stmt::new(sql, stmt));
-
                }
-

-
                // An error about the database being busy (locked) is
-
                // probably due to another process modifying the same
-
                // database. We ignore it as the problem should go away in
-
                // a very short while.
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::preapre(sql, &self.filename, e));
-
                }
-

-
                Err(e) => {
-
                    return Err(DbError::preapre(sql, &self.filename, e));
-
                }
-
            }
+
        trace!("prepare {}", sql);
+
        match self.conn.prepare(sql) {
+
            Ok(stmt) => Ok(Stmt::new(sql, stmt)),
+
            Err(e) => Err(DbError::preapre(sql, &self.filename, e)),
        }
    }

    // Execute a statement that doesn't return any rows with values.
    // This means basically any statement except SELECT.
    fn execute_valueless(stmt: &mut Stmt) -> Result<(), DbError> {
+
        trace!("execute {}", stmt.sql);
        stmt.stmt.reset().map_err(DbError::reset)?;
-

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-
        loop {
-
            match stmt.stmt.next() {
-
                Ok(_) => {
-
                    break;
-
                }
-

-
                // An error about the database being busy (locked) is
-
                // probably due to another process modifying the same
-
                // database. We ignore it as the problem should go away in
-
                // a very short while.
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::execute(&stmt.sql, e));
-
                }
-

-
                Err(e) => {
-
                    return Err(DbError::execute(&stmt.sql, e));
-
                }
-
            }
+
        match stmt.stmt.next() {
+
            Ok(_) => Ok(()),
+
            Err(e) => Err(DbError::execute(&stmt.sql, e)),
        }
-

-
        Ok(())
    }

    /// Create the counter with an initial value. Only use this if
@@ -180,9 +141,6 @@ impl Db {
        let mut select = self.prepare("SELECT counter FROM counter_test")?;
        let mut counter = None;

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-

        loop {
            match select.stmt.next() {
                Ok(State::Row) => {
@@ -191,12 +149,6 @@ impl Db {
                Ok(State::Done) => {
                    break;
                }
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::get_counter(&select.sql, e));
-
                }
                Err(e) => {
                    return Err(DbError::get_counter(&select.sql, e));
                }
@@ -210,9 +162,6 @@ impl Db {
    pub fn queued_events(&self) -> Result<Vec<QueueId>, DbError> {
        let mut select = self.prepare("SELECT id FROM event_queue")?;

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-

        let mut ids = vec![];

        loop {
@@ -227,12 +176,6 @@ impl Db {
                Ok(State::Done) => {
                    break;
                }
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::list_events(&select.sql, e));
-
                }
                Err(e) => {
                    return Err(DbError::list_events(&select.sql, e));
                }
@@ -250,9 +193,6 @@ impl Db {
            .bind((":id", id.as_str()))
            .map_err(|e| DbError::bind(&select.sql, e))?;

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-

        let mut timestamp = None;
        let mut event = None;

@@ -277,12 +217,6 @@ impl Db {
                Ok(State::Done) => {
                    break;
                }
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::get_event(&select.sql, e));
-
                }
                Err(e) => {
                    return Err(DbError::get_event(&select.sql, e));
                }
@@ -353,9 +287,6 @@ impl Db {
    pub fn list_runs(&self) -> Result<Vec<RunId>, DbError> {
        let mut select = self.prepare("SELECT run_id FROM ci_runs")?;

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-

        let mut run_ids = vec![];

        loop {
@@ -372,12 +303,6 @@ impl Db {
                Ok(State::Done) => {
                    break;
                }
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::list_runs(&select.sql, e));
-
                }
                Err(e) => {
                    return Err(DbError::list_runs(&select.sql, e));
                }
@@ -391,9 +316,6 @@ impl Db {
    pub fn get_all_runs(&self) -> Result<Vec<Run>, DbError> {
        let mut select = self.prepare("SELECT json FROM ci_runs")?;

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-

        let mut runs = vec![];

        loop {
@@ -411,12 +333,6 @@ impl Db {
                Ok(State::Done) => {
                    break;
                }
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::get_all_runs(&select.sql, e));
-
                }
                Err(e) => {
                    return Err(DbError::get_all_runs(&select.sql, e));
                }
@@ -434,9 +350,6 @@ impl Db {
            .bind((":id", id.to_string().as_str()))
            .map_err(|e| DbError::bind(&select.sql, e))?;

-
        let started = Instant::now();
-
        let max_wait = MAX_WAIT;
-

        let mut run = None;

        select.stmt.reset().unwrap();
@@ -455,12 +368,6 @@ impl Db {
                Ok(State::Done) => {
                    break;
                }
-
                Err(e) if e.code.unwrap() == sqlite3_sys::SQLITE_BUSY as isize => {
-
                    if started.elapsed() < max_wait {
-
                        continue;
-
                    }
-
                    return Err(DbError::get_run(&select.sql, e));
-
                }
                Err(e) => {
                    return Err(DbError::get_run(&select.sql, e));
                }
@@ -600,6 +507,9 @@ impl QueuedEvent {
/// All errors from this module.
#[derive(Debug, thiserror::Error)]
pub enum DbError {
+
    #[error("failed to set a busy timer one SQLite database {0}")]
+
    BusyTimer(PathBuf, #[source] sqlite::Error),
+

    #[error("failed to open SQLite database {0}")]
    Open(PathBuf, #[source] sqlite::Error),

@@ -662,6 +572,10 @@ pub enum DbError {
}

impl DbError {
+
    fn busy_timer(filename: &Path, e: sqlite::Error) -> Self {
+
        Self::BusyTimer(filename.into(), e)
+
    }
+

    fn open(filename: &Path, e: sqlite::Error) -> Self {
        Self::Open(filename.into(), e)
    }