Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Fix sync regressions with refs db
Merged did:key:z6MksFqX...wzpT opened 2 years ago

See commits.

14 files changed +326 -86 bc247dff bd8e0ebc
modified radicle-cli/src/commands/ls.rs
@@ -1,6 +1,6 @@
use std::ffi::OsString;

-
use radicle::storage::git::RepositoryInfo;
+
use radicle::storage::{ReadStorage, RepositoryInfo};

use crate::terminal as term;
use crate::terminal::args::{Args, Error, Help};
modified radicle-cli/src/commands/node.rs
@@ -13,6 +13,8 @@ use crate::terminal as term;
use crate::terminal::args::{Args, Error, Help};
use crate::terminal::Element as _;

+
#[path = "node/commands.rs"]
+
mod commands;
#[path = "node/control.rs"]
pub mod control;
#[path = "node/events.rs"]
@@ -35,6 +37,7 @@ Usage
    rad node routing [--rid <rid>] [--nid <nid>] [--json] [<option>...]
    rad node events [--timeout <secs>] [-n <count>] [<option>...]
    rad node config [--addresses]
+
    rad node db <command> [<option>..]

    For `<node-option>` see `radicle-node --help`.

@@ -73,6 +76,9 @@ pub enum Operation {
    Config {
        addresses: bool,
    },
+
    Db {
+
        args: Vec<OsString>,
+
    },
    Events {
        timeout: time::Duration,
        count: usize,
@@ -100,6 +106,7 @@ pub enum Operation {
pub enum OperationName {
    Connect,
    Config,
+
    Db,
    Events,
    Routing,
    Logs,
@@ -136,6 +143,7 @@ impl Args for Options {
                }
                Value(val) if op.is_none() => match val.to_string_lossy().as_ref() {
                    "connect" => op = Some(OperationName::Connect),
+
                    "db" => op = Some(OperationName::Db),
                    "events" => op = Some(OperationName::Events),
                    "logs" => op = Some(OperationName::Logs),
                    "config" => op = Some(OperationName::Config),
@@ -188,6 +196,9 @@ impl Args for Options {
                Value(val) if matches!(op, Some(OperationName::Start)) => {
                    options.push(val);
                }
+
                Value(val) if matches!(op, Some(OperationName::Db)) => {
+
                    options.push(val);
+
                }
                _ => return Err(anyhow!(arg.unexpected())),
            }
        }
@@ -200,6 +211,7 @@ impl Args for Options {
                timeout,
            },
            OperationName::Config => Operation::Config { addresses },
+
            OperationName::Db => Operation::Db { args: options },
            OperationName::Events => Operation::Events { timeout, count },
            OperationName::Routing => Operation::Routing { rid, nid, json },
            OperationName::Logs => Operation::Logs { lines },
@@ -235,6 +247,9 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
                control::config(&node)?;
            }
        }
+
        Operation::Db { args } => {
+
            commands::db(&profile, args)?;
+
        }
        Operation::Sessions => {
            let sessions = control::sessions(&node)?;
            if let Some(table) = sessions {
added radicle-cli/src/commands/node/commands.rs
@@ -0,0 +1,49 @@
+
use std::ffi::OsString;
+

+
use anyhow::anyhow;
+
use radicle::Profile;
+
use radicle_term as term;
+

+
#[derive(PartialEq, Eq)]
+
pub enum Operation {
+
    Exec { query: String },
+
}
+

+
pub fn db(profile: &Profile, args: Vec<OsString>) -> anyhow::Result<()> {
+
    use lexopt::prelude::*;
+

+
    let mut parser = lexopt::Parser::from_args(args);
+
    let mut op: Option<Operation> = None;
+

+
    while let Some(arg) = parser.next()? {
+
        match arg {
+
            Value(cmd) if op.is_none() => match cmd.to_string_lossy().as_ref() {
+
                "exec" => {
+
                    let val = parser
+
                        .value()
+
                        .map_err(|_| anyhow!("a query to execute must be provided for `exec`"))?;
+
                    op = Some(Operation::Exec {
+
                        query: val.to_string_lossy().to_string(),
+
                    });
+
                }
+
                unknown => anyhow::bail!("unknown operation '{unknown}'"),
+
            },
+
            _ => return Err(anyhow!(arg.unexpected())),
+
        }
+
    }
+

+
    match op.ok_or_else(|| anyhow!("a command must be provided, eg. `rad node db exec`"))? {
+
        Operation::Exec { query } => {
+
            let db = profile.database_mut()?;
+
            db.execute(query)?;
+

+
            let changed = db.change_count();
+
            if changed > 0 {
+
                term::success!("{changed} row(s) affected.");
+
            } else {
+
                term::print(term::format::italic("No rows affected."));
+
            }
+
        }
+
    }
+
    Ok(())
+
}
modified radicle-fetch/src/state.rs
@@ -670,6 +670,10 @@ impl<'a, S> RemoteRepository for Cached<'a, S> {
            .map(|id| self.remote(id).map(|remote| (*id, remote)))
            .collect::<Result<_, _>>()
    }
+

+
    fn remote_refs_at(&self) -> Result<Vec<RefsAt>, storage::refs::Error> {
+
        self.handle.repo.remote_refs_at()
+
    }
}

impl<'a, S> ValidateRepository for Cached<'a, S> {
modified radicle-httpd/src/api/v1/stats.rs
@@ -4,6 +4,8 @@ use axum::routing::get;
use axum::{Json, Router};
use serde_json::json;

+
use radicle::storage::ReadStorage;
+

use crate::api::error::Error;
use crate::api::Context;

modified radicle-node/src/service.rs
@@ -555,6 +555,19 @@ where
        let nid = self.node_id();
        self.started_at = Some(time);

+
        // Populate refs database. This is only useful as part of the upgrade process for nodes
+
        // that have been online since before the refs database was created.
+
        match self.db.refs().count() {
+
            Ok(0) => {
+
                info!(target: "service", "Empty refs database, populating from storage..");
+
                if let Err(e) = self.db.refs_mut().populate(&self.storage) {
+
                    error!(target: "service", "Failed to populate refs database: {e}");
+
                }
+
            }
+
            Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
+
            Err(e) => error!(target: "service", "Error checking refs database: {e}"),
+
        }
+

        // Ensure that our local node is in our address database.
        self.db
            .addresses_mut()
@@ -1038,7 +1051,7 @@ where
                }

                // It's possible for a fetch to succeed but nothing was updated.
-
                if updated.is_empty() {
+
                if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
                    debug!(target: "service", "Nothing to announce, no refs were updated..");
                } else {
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
@@ -1058,7 +1071,6 @@ where
                }
            }
        }
-

        // We can now try to dequeue another fetch.
        self.dequeue_fetch();
    }
modified radicle/src/node/db.rs
@@ -7,6 +7,7 @@
//!
//! The database schema is contained within the first migration. See [`version`], [`bump`] and
//! [`migrate`] for how this works.
+
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use std::{fmt, time};
@@ -45,6 +46,14 @@ pub struct Database {
    pub db: Arc<sql::ConnectionThreadSafe>,
}

+
impl Deref for Database {
+
    type Target = sql::ConnectionThreadSafe;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.db
+
    }
+
}
+

impl fmt::Debug for Database {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Database").finish()
modified radicle/src/node/refs/store.rs
@@ -10,6 +10,8 @@ use crate::git::{Oid, Qualified};
use crate::node::Database;
use crate::node::NodeId;
use crate::prelude::RepoId;
+
use crate::storage;
+
use crate::storage::{ReadRepository, ReadStorage, RemoteRepository, RepositoryError};

#[derive(Error, Debug)]
pub enum Error {
@@ -19,12 +21,25 @@ pub enum Error {
    /// Timestamp error.
    #[error("invalid timestamp: {0}")]
    Timestamp(#[from] TryFromIntError),
+
    /// Repository error.
+
    #[error("repository error: {0}")]
+
    Repository(#[from] RepositoryError),
+
    /// Storage error.
+
    #[error("storage error: {0}")]
+
    Storage(#[from] storage::Error),
+
    /// Storage refs error.
+
    #[error("storage refs error: {0}")]
+
    Refs(#[from] storage::refs::Error),
+
    /// No rows returned in query result.
+
    #[error("no rows returned")]
+
    NoRows,
}

/// Refs store.
///
/// Used to cache git references.
pub trait Store {
+
    /// Set a reference under a remote namespace to the given [`Oid`].
    fn set(
        &mut self,
        repo: &RepoId,
@@ -33,16 +48,28 @@ pub trait Store {
        oid: Oid,
        timestamp: LocalTime,
    ) -> Result<bool, Error>;
-

+
    /// Get a reference's [`Oid`] and timestamp.
    fn get(
        &self,
        repo: &RepoId,
        namespace: &NodeId,
        refname: &Qualified,
    ) -> Result<Option<(Oid, LocalTime)>, Error>;
-

-
    fn delete(&self, repo: &RepoId, namespace: &NodeId, refname: &Qualified)
-
        -> Result<bool, Error>;
+
    /// Delete a reference.
+
    fn delete(
+
        &mut self,
+
        repo: &RepoId,
+
        namespace: &NodeId,
+
        refname: &Qualified,
+
    ) -> Result<bool, Error>;
+
    /// Populate the database from storage.
+
    fn populate<S: ReadStorage>(&mut self, storage: &S) -> Result<(), Error>;
+
    /// Return the number of references.
+
    fn count(&self) -> Result<usize, Error>;
+
    /// Check if there are any references.
+
    fn is_empty(&self) -> Result<bool, Error> {
+
        self.count().map(|l| l == 0)
+
    }
}

impl Store for Database {
@@ -103,7 +130,7 @@ impl Store for Database {
    }

    fn delete(
-
        &self,
+
        &mut self,
        repo: &RepoId,
        namespace: &NodeId,
        refname: &Qualified,
@@ -119,6 +146,30 @@ impl Store for Database {

        Ok(self.db.change_count() > 0)
    }
+

+
    fn count(&self) -> Result<usize, Error> {
+
        let row = self
+
            .db
+
            .prepare("SELECT COUNT(*) FROM refs")?
+
            .into_iter()
+
            .next()
+
            .ok_or(Error::NoRows)??;
+
        let count = row.read::<i64, _>(0) as usize;
+

+
        Ok(count)
+
    }
+

+
    fn populate<S: ReadStorage>(&mut self, storage: &S) -> Result<(), Error> {
+
        let now = LocalTime::now();
+

+
        for info in storage.repositories()? {
+
            let repo = storage.repository(info.rid)?;
+
            for refs_at in repo.remote_refs_at()? {
+
                self.set(&repo.id(), &refs_at.remote, refs_at.path(), refs_at.at, now)?;
+
            }
+
        }
+
        Ok(())
+
    }
}

#[cfg(test)]
@@ -129,6 +180,32 @@ mod test {
    use localtime::{LocalDuration, LocalTime};

    #[test]
+
    fn test_count() {
+
        let mut db = Database::memory().unwrap();
+
        let oid = arbitrary::oid();
+

+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let namespace = arbitrary::gen::<NodeId>(1);
+
        let refname1 = qualified!("refs/heads/master");
+
        let refname2 = qualified!("refs/heads/main");
+
        let timestamp = LocalTime::now();
+

+
        assert!(db.is_empty().unwrap());
+
        assert_eq!(db.count().unwrap(), 0);
+

+
        assert!(db
+
            .set(&repo, &namespace, &refname1, oid, timestamp)
+
            .unwrap());
+
        assert!(!db.is_empty().unwrap());
+
        assert_eq!(db.count().unwrap(), 1);
+

+
        assert!(db
+
            .set(&repo, &namespace, &refname2, oid, timestamp)
+
            .unwrap());
+
        assert_eq!(db.count().unwrap(), 2);
+
    }
+

+
    #[test]
    fn test_set_and_delete() {
        let mut db = Database::memory().unwrap();
        let oid = arbitrary::oid();
modified radicle/src/storage.rs
@@ -25,11 +25,25 @@ use crate::storage::git::NAMESPACES_GLOB;
use crate::storage::refs::Refs;

use self::git::UserInfo;
-
use self::refs::SignedRefs;
+
use self::refs::{RefsAt, SignedRefs};

pub type BranchName = git::RefString;
pub type Inventory = BTreeSet<RepoId>;

+
/// Basic repository information.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct RepositoryInfo<V> {
+
    /// Repository identifier.
+
    pub rid: RepoId,
+
    /// Head of default branch.
+
    pub head: Oid,
+
    /// Identity document.
+
    pub doc: Doc<V>,
+
    /// Local signed refs, if any.
+
    /// Repositories with this set to `None` are ones that are seeded but not forked.
+
    pub refs: Option<refs::SignedRefsAt>,
+
}
+

/// Describes one or more namespaces.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub enum Namespaces {
@@ -400,6 +414,8 @@ pub trait ReadStorage {
    /// Get the inventory of repositories hosted under this storage.
    /// This function should typically only return public repositories.
    fn inventory(&self) -> Result<Inventory, Error>;
+
    /// Return all repositories (public and private).
+
    fn repositories(&self) -> Result<Vec<RepositoryInfo<Verified>>, Error>;
    /// Insert this repository into the inventory.
    fn insert(&self, rid: RepoId);
    /// Open or create a read-only repository.
@@ -585,6 +601,9 @@ pub trait RemoteRepository {

    /// Get all remotes.
    fn remotes(&self) -> Result<Remotes<Verified>, refs::Error>;
+

+
    /// Get [`RefsAt`] of all remotes.
+
    fn remote_refs_at(&self) -> Result<Vec<RefsAt>, refs::Error>;
}

pub trait ValidateRepository
@@ -671,6 +690,10 @@ where
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, RepositoryError> {
        self.deref().repository(rid)
    }
+

+
    fn repositories(&self) -> Result<Vec<RepositoryInfo<Verified>>, Error> {
+
        self.deref().repositories()
+
    }
}

impl<T, S> WriteStorage for T
modified radicle/src/storage/git.rs
@@ -20,8 +20,8 @@ use crate::identity::{Identity, Project};
use crate::storage::refs;
use crate::storage::refs::{Refs, SignedRefs, SignedRefsAt};
use crate::storage::{
-
    Inventory, ReadRepository, ReadStorage, Remote, Remotes, RepositoryError, SetHead,
-
    SignRepository, WriteRepository, WriteStorage,
+
    Inventory, ReadRepository, ReadStorage, Remote, Remotes, RepositoryError, RepositoryInfo,
+
    SetHead, SignRepository, WriteRepository, WriteStorage,
};

pub use crate::git::{
@@ -29,6 +29,7 @@ pub use crate::git::{
};
pub use crate::storage::Error;

+
use super::refs::RefsAt;
use super::{RemoteId, RemoteRepository, ValidateRepository};

pub static NAMESPACES_GLOB: Lazy<git::refspec::PatternString> =
@@ -43,20 +44,6 @@ pub static CANONICAL_IDENTITY: Lazy<git::Qualified> = Lazy::new(|| {
    )
});

-
/// Basic repository information.
-
#[derive(Debug, Clone, PartialEq, Eq)]
-
pub struct RepositoryInfo<V> {
-
    /// Repository identifier.
-
    pub rid: RepoId,
-
    /// Head of default branch.
-
    pub head: Oid,
-
    /// Identity document.
-
    pub doc: Doc<V>,
-
    /// Local signed refs, if any.
-
    /// Repositories with this set to `None` are ones that are seeded but not forked.
-
    pub refs: Option<refs::SignedRefsAt>,
-
}
-

/// A parsed Git reference.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Ref {
@@ -148,6 +135,65 @@ impl ReadStorage for Storage {
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, RepositoryError> {
        Repository::open(paths::repository(self, &rid), rid)
    }
+

+
    fn repositories(&self) -> Result<Vec<RepositoryInfo<Verified>>, Error> {
+
        let mut repos = Vec::new();
+

+
        for result in fs::read_dir(&self.path)? {
+
            let path = result?;
+

+
            // Skip non-directories.
+
            if !path.file_type()?.is_dir() {
+
                continue;
+
            }
+
            // Skip hidden files.
+
            if path.file_name().to_string_lossy().starts_with('.') {
+
                continue;
+
            }
+
            // Skip lock files.
+
            if let Some(ext) = path.path().extension() {
+
                if ext == "lock" {
+
                    continue;
+
                }
+
            }
+
            let rid = RepoId::try_from(path.file_name())
+
                .map_err(|_| Error::InvalidId(path.file_name()))?;
+

+
            let repo = match self.repository(rid) {
+
                Ok(repo) => repo,
+
                Err(e) => {
+
                    log::warn!(target: "storage", "Repository {rid} is invalid: {e}");
+
                    continue;
+
                }
+
            };
+
            let doc = match repo.identity_doc() {
+
                Ok(doc) => doc.into(),
+
                Err(e) => {
+
                    log::warn!(target: "storage", "Repository {rid} is invalid: looking up doc: {e}");
+
                    continue;
+
                }
+
            };
+

+
            // For performance reasons, we don't do a full repository check here.
+
            let head = match repo.head() {
+
                Ok((_, head)) => head,
+
                Err(e) => {
+
                    log::warn!(target: "storage", "Repository {rid} is invalid: looking up head: {e}");
+
                    continue;
+
                }
+
            };
+
            // Nb. This will be `None` if they were not found.
+
            let refs = refs::SignedRefsAt::load(self.info.key, &repo)?;
+

+
            repos.push(RepositoryInfo {
+
                rid,
+
                head,
+
                doc,
+
                refs,
+
            });
+
        }
+
        Ok(repos)
+
    }
}

impl WriteStorage for Storage {
@@ -218,65 +264,6 @@ impl Storage {
        self.path.as_path()
    }

-
    pub fn repositories(&self) -> Result<Vec<RepositoryInfo<Verified>>, Error> {
-
        let mut repos = Vec::new();
-

-
        for result in fs::read_dir(&self.path)? {
-
            let path = result?;
-

-
            // Skip non-directories.
-
            if !path.file_type()?.is_dir() {
-
                continue;
-
            }
-
            // Skip hidden files.
-
            if path.file_name().to_string_lossy().starts_with('.') {
-
                continue;
-
            }
-
            // Skip lock files.
-
            if let Some(ext) = path.path().extension() {
-
                if ext == "lock" {
-
                    continue;
-
                }
-
            }
-
            let rid = RepoId::try_from(path.file_name())
-
                .map_err(|_| Error::InvalidId(path.file_name()))?;
-

-
            let repo = match self.repository(rid) {
-
                Ok(repo) => repo,
-
                Err(e) => {
-
                    log::warn!(target: "storage", "Repository {rid} is invalid: {e}");
-
                    continue;
-
                }
-
            };
-
            let doc = match repo.identity_doc() {
-
                Ok(doc) => doc.into(),
-
                Err(e) => {
-
                    log::warn!(target: "storage", "Repository {rid} is invalid: looking up doc: {e}");
-
                    continue;
-
                }
-
            };
-

-
            // For performance reasons, we don't do a full repository check here.
-
            let head = match repo.head() {
-
                Ok((_, head)) => head,
-
                Err(e) => {
-
                    log::warn!(target: "storage", "Repository {rid} is invalid: looking up head: {e}");
-
                    continue;
-
                }
-
            };
-
            // Nb. This will be `None` if they were not found.
-
            let refs = refs::SignedRefsAt::load(self.info.key, &repo)?;
-

-
            repos.push(RepositoryInfo {
-
                rid,
-
                head,
-
                doc,
-
                refs,
-
            });
-
        }
-
        Ok(repos)
-
    }
-

    pub fn repositories_by_id<'a>(
        &self,
        mut rids: impl Iterator<Item = &'a RepoId>,
@@ -578,6 +565,18 @@ impl RemoteRepository for Repository {
        let refs = SignedRefs::load(*remote, self)?;
        Ok(Remote::<Verified>::new(refs))
    }
+

+
    fn remote_refs_at(&self) -> Result<Vec<RefsAt>, refs::Error> {
+
        let mut all = Vec::new();
+

+
        for remote in self.remote_ids()? {
+
            let remote = remote?;
+
            let refs_at = RefsAt::new(self, remote)?;
+

+
            all.push(refs_at);
+
        }
+
        Ok(all)
+
    }
}

impl ValidateRepository for Repository {
modified radicle/src/storage/git/cob.rs
@@ -241,6 +241,10 @@ impl<'a, R: storage::RemoteRepository> RemoteRepository for DraftStore<'a, R> {
    fn remotes(&self) -> Result<Remotes<Verified>, storage::refs::Error> {
        RemoteRepository::remotes(self.repo)
    }
+

+
    fn remote_refs_at(&self) -> Result<Vec<storage::refs::RefsAt>, storage::refs::Error> {
+
        RemoteRepository::remote_refs_at(self.repo)
+
    }
}

impl<'a, R: storage::ValidateRepository> ValidateRepository for DraftStore<'a, R> {
modified radicle/src/storage/refs.rs
@@ -387,6 +387,10 @@ impl RefsAt {
    pub fn load<S: ReadRepository>(&self, repo: &S) -> Result<SignedRefsAt, Error> {
        SignedRefsAt::load_at(self.at, self.remote, repo)
    }
+

+
    pub fn path(&self) -> &git::Qualified {
+
        &SIGREFS_BRANCH
+
    }
}

/// Verified [`SignedRefs`] that keeps track of their content address
modified radicle/src/test/storage.rs
@@ -12,7 +12,7 @@ use crate::node::NodeId;

pub use crate::storage::*;

-
use super::fixtures;
+
use super::{arbitrary, fixtures};

#[derive(Clone, Debug)]
pub struct MockStorage {
@@ -89,6 +89,19 @@ impl ReadStorage for MockStorage {
            })
            .cloned()
    }
+

+
    fn repositories(&self) -> Result<Vec<RepositoryInfo<Verified>>, Error> {
+
        Ok(self
+
            .repos
+
            .iter()
+
            .map(|(rid, r)| RepositoryInfo {
+
                rid: *rid,
+
                head: r.head().unwrap().1,
+
                doc: r.doc.clone().into(),
+
                refs: None,
+
            })
+
            .collect())
+
    }
}

impl WriteStorage for MockStorage {
@@ -159,6 +172,17 @@ impl RemoteRepository for MockRepository {
            })
            .collect())
    }
+

+
    fn remote_refs_at(&self) -> Result<Vec<refs::RefsAt>, refs::Error> {
+
        Ok(self
+
            .remotes
+
            .values()
+
            .map(|s| refs::RefsAt {
+
                remote: s.id,
+
                at: s.at,
+
            })
+
            .collect())
+
    }
}

impl ValidateRepository for MockRepository {
@@ -177,7 +201,7 @@ impl ReadRepository for MockRepository {
    }

    fn head(&self) -> Result<(fmt::Qualified, Oid), RepositoryError> {
-
        todo!()
+
        Ok((fmt::qualified!("refs/heads/master"), arbitrary::oid()))
    }

    fn canonical_head(&self) -> Result<(fmt::Qualified, Oid), RepositoryError> {
added scripts/clear-refs-db.sh
@@ -0,0 +1,18 @@
+
#!/bin/sh
+
set -e
+

+
DB="$(rad path)/node/node.db"
+

+
if command -v sqlite3 >/dev/null 2>&1; then
+
  if [ -f "$DB" ]; then
+
    echo -n "Clearing 'refs' table from $DB.. "
+
    sqlite3 $DB "DELETE FROM refs;"
+
    echo "done."
+
  else
+
    echo "fatal: database file does not exist"
+
    exit 1
+
  fi
+
else
+
  echo "fatal: sqlite3 is not installed"
+
  exit 1
+
fi