Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Improve COB cache behavior
Merged did:key:z6MksFqX...wzpT opened 1 year ago

See commits.

10 files changed +190 -109 2b750456 a61affa9
modified radicle-cli/src/commands/inbox.rs
@@ -306,9 +306,13 @@ where
        } = match &n.kind {
            NotificationKind::Branch { name } => NotificationRow::branch(name, head, &n, &repo)?,
            NotificationKind::Cob { typed_id } => {
-
                match NotificationRow::cob(typed_id, &n, &issues, &patches, &repo)? {
-
                    Some(row) => row,
-
                    None => continue,
+
                match NotificationRow::cob(typed_id, &n, &issues, &patches, &repo) {
+
                    Ok(Some(row)) => row,
+
                    Ok(None) => continue,
+
                    Err(e) => {
+
                        log::error!(target: "cli", "Error loading notification for {typed_id}: {e}");
+
                        continue;
+
                    }
                }
            }
            NotificationKind::Unknown { refname } => {
modified radicle-cli/src/commands/issue.rs
@@ -632,9 +632,13 @@ where
    let mut all = Vec::new();
    let issues = cache.list()?;
    for result in issues {
-
        let Ok((id, issue)) = result else {
-
            // Skip issues that failed to load.
-
            continue;
+
        let (id, issue) = match result {
+
            Ok((id, issue)) => (id, issue),
+
            Err(e) => {
+
                // Skip issues that failed to load.
+
                log::error!(target: "cli", "Issue load error: {e}");
+
                continue;
+
            }
        };

        if let Some(a) = assignee {
modified radicle-cli/src/commands/patch/list.rs
@@ -29,9 +29,13 @@ pub fn run(
        None => patches.list()?,
    };
    for patch in iter {
-
        let Ok((id, patch)) = patch else {
-
            // Skip patches that failed to load.
-
            continue;
+
        let (id, patch) = match patch {
+
            Ok((id, patch)) => (id, patch),
+
            Err(e) => {
+
                // Skip patches that failed to load.
+
                log::error!(target: "cli", "Patch load error: {e}");
+
                continue;
+
            }
        };
        if !authors.is_empty() {
            if !authors.contains(patch.author().id()) {
modified radicle-node/src/worker/fetch.rs
@@ -5,6 +5,7 @@ use std::str::FromStr;

use localtime::LocalTime;

+
use radicle::cob::TypedId;
use radicle::crypto::PublicKey;
use radicle::identity::DocAt;
use radicle::prelude::RepoId;
@@ -281,8 +282,9 @@ where
    C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
    C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
{
-
    let issues = cob::issue::Issues::open(storage)?;
-
    let patches = cob::patch::Patches::open(storage)?;
+
    let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
+
    let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
+

    for update in refs {
        match update {
            RefUpdate::Updated { name, .. }
@@ -293,47 +295,12 @@ where
                        continue;
                    };
                    if identifier.is_issue() {
-
                        if let Some(issue) = issues.get(&identifier.id)? {
-
                            cache
-
                                .update(rid, &identifier.id, &issue)
-
                                .map(|_| ())
-
                                .map_err(|e| error::Cache::Update {
-
                                    id: identifier.id,
-
                                    type_name: identifier.type_name,
-
                                    err: e.into(),
-
                                })?;
-
                        } else {
-
                            // N.b. the issue has been removed entirely from the
-
                            // repository so we also remove it from the cache
-
                            cob::cache::Remove::<cob::issue::Issue>::remove(cache, &identifier.id)
-
                                .map(|_| ())
-
                                .map_err(|e| error::Cache::Remove {
-
                                    id: identifier.id,
-
                                    type_name: identifier.type_name,
-
                                    err: Box::new(e),
-
                                })?;
-
                        }
+
                        update_or_remove(&mut issues, cache, rid, identifier)?;
                    } else if identifier.is_patch() {
-
                        if let Some(patch) = patches.get(&identifier.id)? {
-
                            cache
-
                                .update(rid, &identifier.id, &patch)
-
                                .map(|_| ())
-
                                .map_err(|e| error::Cache::Update {
-
                                    id: identifier.id,
-
                                    type_name: identifier.type_name,
-
                                    err: e.into(),
-
                                })?;
-
                        } else {
-
                            // N.b. the patch has been removed entirely from the
-
                            // repository so we also remove it from the cache
-
                            cob::cache::Remove::<cob::patch::Patch>::remove(cache, &identifier.id)
-
                                .map(|_| ())
-
                                .map_err(|e| error::Cache::Remove {
-
                                    id: identifier.id,
-
                                    type_name: identifier.type_name,
-
                                    err: Box::new(e),
-
                                })?;
-
                        }
+
                        update_or_remove(&mut patches, cache, rid, identifier)?;
+
                    } else {
+
                        // Unknown COB, don't cache.
+
                        continue;
                    }
                }
                None => continue,
@@ -344,3 +311,47 @@ where

    Ok(())
}
+

+
/// Update or remove a cache entry.
+
fn update_or_remove<R, C, T>(
+
    store: &mut cob::store::Store<T, R>,
+
    cache: &mut C,
+
    rid: &RepoId,
+
    tid: TypedId,
+
) -> Result<(), error::Cache>
+
where
+
    R: cob::Store + ReadRepository,
+
    T: cob::Evaluate<R> + cob::store::Cob,
+
    C: cob::cache::Update<T> + cob::cache::Remove<T>,
+
{
+
    match store.get(&tid.id) {
+
        Ok(Some(obj)) => {
+
            // Object loaded correctly, update cache.
+
            return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
+
                error::Cache::Update {
+
                    id: tid.id,
+
                    type_name: tid.type_name,
+
                    err: e.into(),
+
                }
+
            });
+
        }
+
        Ok(None) => {
+
            // Object was not found. Fall-through.
+
        }
+
        Err(e) => {
+
            // Object was found, but failed to load. Fall-through.
+
            log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
+
        }
+
    }
+
    // The object has either been removed entirely from the repository,
+
    // or it failed to load. So we also remove it from the cache.
+
    cob::cache::Remove::<T>::remove(cache, &tid.id)
+
        .map(|_| ())
+
        .map_err(|e| error::Cache::Remove {
+
            id: tid.id,
+
            type_name: tid.type_name,
+
            err: Box::new(e),
+
        })?;
+

+
    Ok(())
+
}
modified radicle/src/cob.rs
@@ -29,6 +29,12 @@ pub struct TypedId {
    pub type_name: TypeName,
}

+
impl std::fmt::Display for TypedId {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        write!(f, "{}/{}", self.type_name, self.id)
+
    }
+
}
+

/// Errors that occur when parsing a Git refname into a [`TypedId`].
#[derive(Debug, thiserror::Error)]
pub enum ParseIdentifierError {
modified radicle/src/cob/cache.rs
@@ -203,6 +203,8 @@ pub trait Remove<T> {
    ///
    /// This assumes that the `id` is unique across repositories.
    fn remove(&mut self, id: &ObjectId) -> Result<Self::Out, Self::RemoveError>;
+
    /// Delete all entries from a repo.
+
    fn remove_all(&mut self, rid: &RepoId) -> Result<Self::Out, Self::RemoveError>;
}

/// An in-memory cache for storing COB objects.
@@ -270,6 +272,10 @@ impl<T> Remove<T> for NoCache {
    fn remove(&mut self, _id: &ObjectId) -> Result<Self::Out, Self::RemoveError> {
        Ok(())
    }
+

+
    fn remove_all(&mut self, _rid: &RepoId) -> Result<Self::Out, Self::RemoveError> {
+
        Ok(())
+
    }
}

/// Track the progress of cache writes when transferring the
modified radicle/src/cob/issue.rs
@@ -72,6 +72,11 @@ pub enum Error {
        #[source]
        err: Box<dyn std::error::Error + Send + Sync + 'static>,
    },
+
    #[error("failed to remove issues from cache: {err}")]
+
    CacheRemoveAll {
+
        #[source]
+
        err: Box<dyn std::error::Error + Send + Sync + 'static>,
+
    },
}

/// Reason why an issue was closed.
modified radicle/src/cob/issue/cache.rs
@@ -11,7 +11,6 @@ use crate::cob::store;
use crate::cob::{Embed, Label, ObjectId, TypeName};
use crate::crypto::Signer;
use crate::prelude::{Did, RepoId};
-
use crate::sql::transaction;
use crate::storage::{HasRepoId, ReadRepository, RepositoryError, SignRepository, WriteRepository};

use super::{Issue, IssueCounts, IssueId, IssueMut, State};
@@ -151,8 +150,13 @@ impl<'a, R, C> Cache<super::Issues<'a, R>, C> {
    ) -> Result<(), super::Error>
    where
        R: ReadRepository + cob::Store,
-
        C: Update<Issue>,
+
        C: Update<Issue> + Remove<Issue>,
    {
+
        // Start by clearing the cache. This will get rid of issues that are cached but
+
        // no longer exist in storage.
+
        self.remove_all(&self.rid())
+
            .map_err(|e| super::Error::CacheRemoveAll { err: e.into() })?;
+

        let issues = self.store.all()?;
        let mut progress = cache::WriteAllProgress::new(issues.len());
        for issue in self.store.all()? {
@@ -266,6 +270,10 @@ where
    fn remove(&mut self, id: &ObjectId) -> Result<Self::Out, Self::RemoveError> {
        self.cache.remove(id)
    }
+

+
    fn remove_all(&mut self, rid: &RepoId) -> Result<Self::Out, Self::RemoveError> {
+
        self.cache.remove_all(rid)
+
    }
}

#[derive(Debug, Error)]
@@ -286,21 +294,19 @@ impl Update<Issue> for StoreWriter {
        id: &ObjectId,
        object: &Issue,
    ) -> Result<Self::Out, Self::UpdateError> {
-
        transaction::<_, UpdateError>(&self.db, move |db| {
-
            let mut stmt = db.prepare(
-
                "INSERT INTO issues (id, repo, issue)
-
                  VALUES (?1, ?2, ?3)
-
                  ON CONFLICT DO UPDATE
-
                  SET issue =  (?3)",
-
            )?;
-

-
            stmt.bind((1, sql::Value::String(id.to_string())))?;
-
            stmt.bind((2, rid))?;
-
            stmt.bind((3, sql::Value::String(serde_json::to_string(&object)?)))?;
-
            stmt.next()?;
-

-
            Ok(db.change_count() > 0)
-
        })
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO issues (id, repo, issue)
+
             VALUES (?1, ?2, ?3)
+
             ON CONFLICT DO UPDATE
+
             SET issue = (?3)",
+
        )?;
+

+
        stmt.bind((1, sql::Value::String(id.to_string())))?;
+
        stmt.bind((2, rid))?;
+
        stmt.bind((3, sql::Value::String(serde_json::to_string(&object)?)))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
    }
}

@@ -309,17 +315,27 @@ impl Remove<Issue> for StoreWriter {
    type RemoveError = sql::Error;

    fn remove(&mut self, id: &ObjectId) -> Result<Self::Out, Self::RemoveError> {
-
        transaction::<_, sql::Error>(&self.db, move |db| {
-
            let mut stmt = db.prepare(
-
                "DELETE FROM issues
-
                  WHERE id = ?1",
-
            )?;
+
        let mut stmt = self.db.prepare(
+
            "DELETE FROM issues
+
             WHERE id = ?1",
+
        )?;

-
            stmt.bind((1, sql::Value::String(id.to_string())))?;
-
            stmt.next()?;
+
        stmt.bind((1, sql::Value::String(id.to_string())))?;
+
        stmt.next()?;

-
            Ok(db.change_count() > 0)
-
        })
+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn remove_all(&mut self, rid: &RepoId) -> Result<Self::Out, Self::RemoveError> {
+
        let mut stmt = self.db.prepare(
+
            "DELETE FROM issues
+
             WHERE repo = ?1",
+
        )?;
+

+
        stmt.bind((1, rid))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
    }
}

modified radicle/src/cob/patch.rs
@@ -145,6 +145,11 @@ pub enum Error {
        #[source]
        err: Box<dyn std::error::Error + Send + Sync + 'static>,
    },
+
    #[error("failed to remove patches from cache: {err}")]
+
    CacheRemoveAll {
+
        #[source]
+
        err: Box<dyn std::error::Error + Send + Sync + 'static>,
+
    },
}

/// Patch operation.
modified radicle/src/cob/patch/cache.rs
@@ -12,7 +12,6 @@ use crate::cob::{Label, ObjectId, TypeName};
use crate::crypto::Signer;
use crate::git;
use crate::prelude::RepoId;
-
use crate::sql::transaction;
use crate::storage::{HasRepoId, ReadRepository, RepositoryError, SignRepository, WriteRepository};

use super::{
@@ -218,8 +217,13 @@ impl<'a, R, C> Cache<super::Patches<'a, R>, C> {
    ) -> Result<(), super::Error>
    where
        R: ReadRepository + cob::Store,
-
        C: Update<Patch>,
+
        C: Update<Patch> + Remove<Patch>,
    {
+
        // Start by clearing the cache. This will get rid of patches that are cached but
+
        // no longer exist in storage.
+
        self.remove_all(&self.rid())
+
            .map_err(|e| super::Error::CacheRemoveAll { err: e.into() })?;
+

        let patches = self.store.all()?;
        let mut progress = cache::WriteAllProgress::new(patches.len());
        for patch in self.store.all()? {
@@ -333,6 +337,10 @@ where
    fn remove(&mut self, id: &ObjectId) -> Result<Self::Out, Self::RemoveError> {
        self.cache.remove(id)
    }
+

+
    fn remove_all(&mut self, rid: &RepoId) -> Result<Self::Out, Self::RemoveError> {
+
        self.cache.remove_all(rid)
+
    }
}

#[derive(Debug, Error)]
@@ -353,21 +361,19 @@ impl Update<Patch> for StoreWriter {
        id: &ObjectId,
        object: &Patch,
    ) -> Result<Self::Out, Self::UpdateError> {
-
        transaction::<_, UpdateError>(&self.db, move |db| {
-
            let mut stmt = db.prepare(
-
                "INSERT INTO patches (id, repo, patch)
-
                  VALUES (?1, ?2, ?3)
-
                  ON CONFLICT DO UPDATE
-
                  SET patch =  (?3)",
-
            )?;
-

-
            stmt.bind((1, sql::Value::String(id.to_string())))?;
-
            stmt.bind((2, rid))?;
-
            stmt.bind((3, sql::Value::String(serde_json::to_string(&object)?)))?;
-
            stmt.next()?;
-

-
            Ok(db.change_count() > 0)
-
        })
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO patches (id, repo, patch)
+
             VALUES (?1, ?2, ?3)
+
             ON CONFLICT DO UPDATE
+
             SET patch =  (?3)",
+
        )?;
+

+
        stmt.bind((1, sql::Value::String(id.to_string())))?;
+
        stmt.bind((2, rid))?;
+
        stmt.bind((3, sql::Value::String(serde_json::to_string(&object)?)))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
    }
}

@@ -376,17 +382,27 @@ impl Remove<Patch> for StoreWriter {
    type RemoveError = sql::Error;

    fn remove(&mut self, id: &ObjectId) -> Result<Self::Out, Self::RemoveError> {
-
        transaction::<_, sql::Error>(&self.db, move |db| {
-
            let mut stmt = db.prepare(
-
                "DELETE FROM patches
-
                  WHERE id = ?1",
-
            )?;
+
        let mut stmt = self.db.prepare(
+
            "DELETE FROM patches
+
             WHERE id = ?1",
+
        )?;

-
            stmt.bind((1, sql::Value::String(id.to_string())))?;
-
            stmt.next()?;
+
        stmt.bind((1, sql::Value::String(id.to_string())))?;
+
        stmt.next()?;

-
            Ok(db.change_count() > 0)
-
        })
+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn remove_all(&mut self, rid: &RepoId) -> Result<Self::Out, Self::RemoveError> {
+
        let mut stmt = self.db.prepare(
+
            "DELETE FROM patches
+
             WHERE repo = ?1",
+
        )?;
+

+
        stmt.bind((1, rid))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
    }
}

@@ -395,7 +411,9 @@ pub enum Error {
    #[error("object `{1}` of type `{0}` was not found")]
    NotFound(TypeName, ObjectId),
    #[error(transparent)]
-
    Object(#[from] cob::object::ParseObjectId),
+
    ObjectId(#[from] cob::object::ParseObjectId),
+
    #[error("object {0} failed to parse: {1}")]
+
    Object(ObjectId, serde_json::Error),
    #[error(transparent)]
    Json(#[from] serde_json::Error),
    #[error(transparent)]
@@ -413,7 +431,8 @@ pub struct PatchesIter<'a> {
impl<'a> PatchesIter<'a> {
    fn parse_row(row: sql::Row) -> Result<(PatchId, Patch), Error> {
        let id = PatchId::from_str(row.read::<&str, _>("id"))?;
-
        let patch = serde_json::from_str::<Patch>(row.read::<&str, _>("patch"))?;
+
        let patch = serde_json::from_str::<Patch>(row.read::<&str, _>("patch"))
+
            .map_err(|e| Error::Object(id, e))?;
        Ok((id, patch))
    }
}
@@ -558,21 +577,21 @@ mod query {
        rid: &RepoId,
        id: &PatchId,
    ) -> Result<Option<Patch>, Error> {
-
        let id = sql::Value::String(id.to_string());
+
        let key = sql::Value::String(id.to_string());
        let mut stmt = db.prepare(
            "SELECT patch
             FROM patches
             WHERE id = ?1 AND repo = ?2",
        )?;

-
        stmt.bind((1, id))?;
+
        stmt.bind((1, key))?;
        stmt.bind((2, rid))?;

        match stmt.into_iter().next().transpose()? {
            None => Ok(None),
            Some(row) => {
                let patch = row.read::<&str, _>("patch");
-
                let patch = serde_json::from_str(patch)?;
+
                let patch = serde_json::from_str(patch).map_err(|e| Error::Object(*id, e))?;
                Ok(Some(patch))
            }
        }
@@ -598,7 +617,8 @@ mod query {
            None => Ok(None),
            Some(row) => {
                let id = PatchId::from_str(row.read::<&str, _>("id"))?;
-
                let patch = serde_json::from_str::<Patch>(row.read::<&str, _>("patch"))?;
+
                let patch = serde_json::from_str::<Patch>(row.read::<&str, _>("patch"))
+
                    .map_err(|e| Error::Object(id, e))?;
                let revision = serde_json::from_str::<Revision>(row.read::<&str, _>("revision"))?;
                Ok(Some(ByRevision {
                    id,