Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
radicle: Simple database migration system
cloudhead committed 2 years ago
commit d9a84de5eeea8b2e49ddb7178f0e82ff877ad558
parent f4dc224b3a63a19d3ab6da84ff7f54a403d4b87c
6 files changed +115 -28
modified radicle/src/node/address/store.rs
@@ -17,6 +17,9 @@ pub enum Error {
    Internal(#[from] sql::Error),
    #[error("alias error: {0}")]
    InvalidAlias(#[from] AliasError),
+
    /// No rows returned in query result.
+
    #[error("no rows returned")]
+
    NoRows,
}

/// Address store.
@@ -121,8 +124,7 @@ impl Store for Database {
            .prepare("SELECT COUNT(*) FROM addresses")?
            .into_iter()
            .next()
-
            .unwrap()
-
            .unwrap();
+
            .ok_or(Error::NoRows)??;
        let count = row.read::<i64, _>(0) as usize;

        Ok(count)
@@ -168,9 +170,8 @@ impl Store for Database {
                stmt.bind((5, timestamp as i64))?;
                stmt.next()?;
            }
-
            Ok(db.change_count() > 0)
+
            Ok::<_, Error>(db.change_count() > 0)
        })
-
        .map_err(Error::from)
    }

    fn remove(&mut self, node: &NodeId) -> Result<bool, Error> {
modified radicle/src/node/db.rs
@@ -1,24 +1,45 @@
+
//! # Note on database migrations
+
//!
+
//! The `user_version` field in the database SQLite header is used to keep track of the database
+
//! version. It starts with `0`, which means no tables exist yet, and is incremented everytime a
+
//! migration is applied. In turn, migrations are named after their version numbers, so the first
+
//! migration is `1.sql`, the second one is `2.sql` and so on.
+
//!
+
//! The database schema is contained within the first migration. See [`version`], [`bump`] and
+
//! [`migrate`] for how this works.
use std::path::Path;
use std::{fmt, time};

use sqlite as sql;
use thiserror::Error;

+
use crate::sql::transaction;
+

/// How long to wait for the database lock to be released before failing a read.
const DB_READ_TIMEOUT: time::Duration = time::Duration::from_secs(3);
/// How long to wait for the database lock to be released before failing a write.
const DB_WRITE_TIMEOUT: time::Duration = time::Duration::from_secs(6);

+
/// Database migrations.
+
/// The first migration is the creation of the initial tables.
+
const MIGRATIONS: &[&str] = &[
+
    include_str!("db/migrations/1.sql"),
+
    include_str!("db/migrations/2.sql"),
+
];
+

#[derive(Error, Debug)]
pub enum Error {
    /// An Internal error.
    #[error("internal error: {0}")]
    Internal(#[from] sql::Error),
+
    /// No rows returned in query result.
+
    #[error("no rows returned")]
+
    NoRows,
}

/// A file-backed database storing information about the network.
pub struct Database {
-
    pub db: sql::Connection,
+
    pub db: sql::ConnectionThreadSafe,
}

impl fmt::Debug for Database {
@@ -27,29 +48,22 @@ impl fmt::Debug for Database {
    }
}

-
impl From<sql::Connection> for Database {
-
    fn from(db: sql::Connection) -> Self {
+
impl From<sql::ConnectionThreadSafe> for Database {
+
    fn from(db: sql::ConnectionThreadSafe) -> Self {
        Self { db }
    }
}

impl Database {
-
    const SCHEMA: &'static str = include_str!("db/schema.sql");
    const PRAGMA: &'static str = "PRAGMA foreign_keys = ON";

-
    /// Open an address book at the given path. Creates a new address book if it
+
    /// Open a database at the given path. Creates a new database if it
    /// doesn't exist.
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
-
        let mut db = sql::Connection::open_with_flags(
-
            path,
-
            sqlite::OpenFlags::new()
-
                .with_create()
-
                .with_read_write()
-
                .with_full_mutex(),
-
        )?;
+
        let mut db = sql::Connection::open_thread_safe(path)?;
        db.set_busy_timeout(DB_WRITE_TIMEOUT.as_millis() as usize)?;
        db.execute(Self::PRAGMA)?;
-
        db.execute(Self::SCHEMA)?;
+
        migrate(&db)?;

        Ok(Self { db })
    }
@@ -57,21 +71,90 @@ impl Database {
    /// Same as [`Self::open`], but in read-only mode. This is useful to have multiple
    /// open databases, as no locking is required.
    pub fn reader<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
-
        let mut db =
-
            sql::Connection::open_with_flags(path, sqlite::OpenFlags::new().with_read_only())?;
+
        let mut db = sql::Connection::open_thread_safe_with_flags(
+
            path,
+
            sqlite::OpenFlags::new().with_read_only(),
+
        )?;
        db.set_busy_timeout(DB_READ_TIMEOUT.as_millis() as usize)?;
        db.execute(Self::PRAGMA)?;
-
        db.execute(Self::SCHEMA)?;

        Ok(Self { db })
    }

-
    /// Create a new in-memory address book.
+
    /// Create a new in-memory database.
    pub fn memory() -> Result<Self, Error> {
-
        let db = sql::Connection::open(":memory:")?;
+
        let db = sql::Connection::open_thread_safe(":memory:")?;
        db.execute(Self::PRAGMA)?;
-
        db.execute(Self::SCHEMA)?;
+
        migrate(&db)?;

        Ok(Self { db })
    }
+

+
    /// Get the database version. This is updated on schema changes.
+
    pub fn version(&self) -> Result<usize, Error> {
+
        version(&self.db)
+
    }
+

+
    /// Bump the database version.
+
    pub fn bump(&self) -> Result<usize, Error> {
+
        transaction(&self.db, bump)
+
    }
+
}
+

+
/// Get the `user_version` value from the database header.
+
pub fn version(db: &sql::Connection) -> Result<usize, Error> {
+
    let version = db
+
        .prepare("PRAGMA user_version")?
+
        .into_iter()
+
        .next()
+
        .ok_or(Error::NoRows)??
+
        .read::<i64, _>(0);
+

+
    Ok(version as usize)
+
}
+

+
/// Bump the `user_version` value.
+
pub fn bump(db: &sql::Connection) -> Result<usize, Error> {
+
    let old = version(db)?;
+
    let new = old + 1;
+

+
    db.execute(format!("PRAGMA user_version = {new}"))?;
+

+
    Ok(new as usize)
+
}
+

+
/// Migrate the database to the latest schema.
+
pub fn migrate(db: &sql::Connection) -> Result<usize, Error> {
+
    let mut version = version(db)?;
+
    for (i, migration) in MIGRATIONS.iter().enumerate() {
+
        if i >= version {
+
            transaction(db, |db| {
+
                db.execute(migration)?;
+
                version = bump(db)?;
+

+
                Ok::<_, Error>(())
+
            })?;
+
        }
+
    }
+
    Ok(version)
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+

+
    #[test]
+
    fn test_version() {
+
        let n = MIGRATIONS.len();
+
        let db = Database::memory().unwrap();
+
        assert_eq!(db.version().unwrap(), n);
+

+
        let v = db.bump().unwrap();
+
        assert_eq!(v, n + 1);
+
        assert_eq!(db.version().unwrap(), n + 1);
+

+
        let v = db.bump().unwrap();
+
        assert_eq!(v, n + 2);
+
        assert_eq!(db.version().unwrap(), n + 2);
+
    }
}
added radicle/src/node/db/migrations/1.sql
@@ -0,0 +1 @@
+
../schema.sql

\ No newline at end of file
added radicle/src/node/db/migrations/2.sql
@@ -0,0 +1,3 @@
+
-- Add the "penalty" column.
+
-- Higher numbers reduce the chances that we connect to this node.
+
alter table "nodes" add column "penalty" integer not null default 0;
modified radicle/src/node/routing.rs
@@ -140,9 +140,8 @@ impl Store for Database {
                };
                results.push((*id, result));
            }
-
            Ok(results)
+
            Ok::<_, Error>(results)
        })
-
        .map_err(Error::from)
    }

    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error> {
modified radicle/src/sql.rs
@@ -10,10 +10,10 @@ use crate::node::Address;

/// Run an SQL query inside a transaction.
/// Commits the transaction on success, and rolls back on error.
-
pub fn transaction<T>(
+
pub fn transaction<T, E: From<sql::Error>>(
    db: &sql::Connection,
-
    query: impl FnOnce(&sql::Connection) -> Result<T, sql::Error>,
-
) -> Result<T, sql::Error> {
+
    query: impl FnOnce(&sql::Connection) -> Result<T, E>,
+
) -> Result<T, E> {
    db.execute("BEGIN")?;

    match query(db) {