Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle src cob store.rs
//! Generic COB storage.
use std::fmt::Debug;
use std::marker::PhantomData;

use nonempty::NonEmpty;
use radicle_cob::CollaborativeObject;
use serde::{Deserialize, Serialize};

use crate::cob::op::Op;
use crate::cob::{Create, Embed, EntryId, ObjectId, TypeName, Update, Updated, Uri, Version};
use crate::git;
use crate::prelude::*;
use crate::storage::git as storage;
use crate::storage::{HasRepoId, SignRepository};
use crate::{cob, identity};

pub mod access;
use access::WriteAs;

pub trait CobAction {
    /// Parent objects this action depends on. For example, patch revisions
    /// have the commit objects as their parent.
    fn parents(&self) -> Vec<git::Oid> {
        Vec::new()
    }

    /// The outcome of some actions are to be referred later.
    /// For example, one action may create a comment, followed by another
    /// action that may create a reply to the comment, referring to it.
    /// Since actions are stored as part of [`crate::cob::op::Op`],
    /// and operations are the smallest identifiable units,
    /// this may lead to ambiguity.
    /// It would not be possible to to, say, address one particular comment out
    /// of two, if the corresponding actions of creations were part of the
    /// same operation.
    /// To help avoid this, implementations signal whether specific actions
    /// require "their own" identifier.
    /// This allows checking for multiple such actions before creating an
    /// operation.
    fn produces_identifier(&self) -> bool {
        false
    }
}

/// A collaborative object. Can be materialized from an operation history.
pub trait Cob: Sized {
    /// The underlying action composing each operation.
    type Action: CobAction + for<'de> Deserialize<'de> + Serialize;
    /// Error returned by `apply` function.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Initialize a collaborative object from a root operation.
    fn from_root<R: ReadRepository>(op: Op<Self::Action>, repo: &R) -> Result<Self, Self::Error>;

    /// Apply an operation to the state.
    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
        &mut self,
        op: Op<Self::Action>,
        concurrent: I,
        repo: &R,
    ) -> Result<(), <Self as Cob>::Error>;

    #[cfg(test)]
    /// Create an object from a history.
    fn from_history<R: ReadRepository>(
        history: &crate::cob::History,
        repo: &R,
    ) -> Result<Self, test::HistoryError<Self>>
    where
        Self: CobWithType,
    {
        test::from_history::<R, Self>(history, repo)
    }

    #[cfg(test)]
    /// Create an object from individual operations.
    /// Returns an error if any of the operations fails to apply.
    fn from_ops<R: ReadRepository>(
        ops: impl IntoIterator<Item = Op<Self::Action>>,
        repo: &R,
    ) -> Result<Self, Self::Error> {
        let mut ops = ops.into_iter();
        let Some(init) = ops.next() else {
            panic!("FromHistory::from_ops: operations list is empty");
        };
        let mut state = Self::from_root(init, repo)?;
        for op in ops {
            state.op(op, [].into_iter(), repo)?;
        }
        Ok(state)
    }
}

/// Implementations are statically associated with a particular
/// type name of a collaborative object.
///
/// In most cases, this trait should be used in tandem with [`Cob`].
pub trait CobWithType {
    /// The type name of the collaborative object type which backs this implementation.
    fn type_name() -> &'static TypeName;
}

/// Store error.
#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("create error: {0}")]
    Create(#[from] cob::error::Create),
    #[error("update error: {0}")]
    Update(#[from] cob::error::Update),
    #[error("retrieve error: {0}")]
    Retrieve(#[from] cob::error::Retrieve),
    #[error("remove error: {0}")]
    Remove(#[from] cob::error::Remove),
    #[error(transparent)]
    Identity(#[from] identity::doc::DocError),
    #[error(transparent)]
    Serialize(#[from] serde_json::Error),
    #[error("object `{1}` of type `{0}` was not found")]
    NotFound(TypeName, ObjectId),
    #[error("signed refs: {0}")]
    SignRefs(Box<storage::RepositoryError>),
    #[error("invalid or unknown embed URI: {0}")]
    EmbedUri(Uri),
    #[error(transparent)]
    Git(git::raw::Error),
    #[error("failed to find reference '{name}': {err}")]
    RefLookup {
        name: git::fmt::RefString,
        #[source]
        err: git::raw::Error,
    },
    #[error(
        "transaction already contains action {0} which produces an identifier, denying to add action {1} which also produces an identifier"
    )]
    ClashingIdentifiers(String, String),
}

/// The [`Scope`] of a [`Store`] keeps track of what collaborative object is
/// being accessed.
///
/// For example, a `Scope<Patch>` keeps track of `Patch` access.
///
/// The type parameter `T` is the collaborative object type.
pub struct Scope<'a, T> {
    witness: PhantomData<T>,
    type_name: &'a TypeName,
}

impl<'a, T: CobWithType> Scope<'a, T> {
    fn new() -> Self {
        Self {
            witness: PhantomData,
            type_name: T::type_name(),
        }
    }
}

/// Storage for collaborative objects of a specific type `T` in a single repository.
pub struct Store<'a, T, Repo, Access> {
    identity: Option<git::Oid>,
    repo: &'a Repo,
    scope: Scope<'a, T>,
    access: Access,
}

impl<T, Repo, Access> HasRepoId for Store<'_, T, Repo, Access>
where
    Repo: HasRepoId,
{
    fn rid(&self) -> RepoId {
        self.repo.rid()
    }
}

impl<T, Repo, Access> AsRef<Repo> for Store<'_, T, Repo, Access> {
    fn as_ref(&self) -> &Repo {
        self.repo
    }
}

impl<'a, T, Repo, Access> Store<'a, T, Repo, Access>
where
    Repo: ReadRepository + cob::Store<Namespace = NodeId>,
    Access: access::Access,
{
    pub fn open_for(
        type_name: &'a TypeName,
        repo: &'a Repo,
        access: Access,
    ) -> Result<Self, Error> {
        Ok(Self {
            repo,
            identity: None,
            scope: Scope {
                witness: PhantomData,
                type_name,
            },
            access,
        })
    }
}

impl<'a, T, Repo, Access> Store<'a, T, Repo, Access>
where
    Repo: ReadRepository + cob::Store<Namespace = NodeId>,
    Access: access::Access,
{
    /// Return a new store with the attached identity.
    pub fn identity(self, identity: git::Oid) -> Self {
        Self {
            repo: self.repo,
            identity: Some(identity),
            scope: self.scope,
            access: self.access,
        }
    }
}

impl<'a, T, Repo, Access> Store<'a, T, Repo, Access>
where
    T: CobWithType,
    Repo: ReadRepository + cob::Store<Namespace = NodeId>,
    Access: access::Access,
{
    /// Open a new generic store.
    pub fn open(repo: &'a Repo, access: Access) -> Result<Self, Error> {
        Ok(Self {
            repo,
            identity: None,
            scope: Scope::new(),
            access,
        })
    }
}

impl<'a, 'b, T, Repo, Signer> Store<'a, T, Repo, WriteAs<'b, Signer>>
where
    T: Cob + cob::Evaluate<Repo>,
    Repo: ReadRepository + cob::Store<Namespace = NodeId>,
{
    #[deprecated(note = "only exists to accommodate signatures in crate::cob::identity")]
    pub(super) fn signer(&self) -> &Signer {
        self.access.signer
    }

    #[deprecated(note = "only exists to accommodate signatures in crate::cob::identity")]
    pub(super) fn repo(&self) -> &Repo {
        self.repo
    }

    pub fn transaction(
        &self,
        actions: Vec<T::Action>,
        embeds: Vec<Embed<Uri>>,
    ) -> Transaction<T, Repo> {
        Transaction::new(self.scope.type_name.clone(), actions, embeds)
    }
}

impl<'a, 'b, T, Repo, Signer> Store<'a, T, Repo, WriteAs<'b, Signer>>
where
    T: Cob + cob::Evaluate<Repo>,
    T::Action: Serialize,
    Repo: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
    Signer: crypto::signature::Keypair<VerifyingKey = crypto::PublicKey>,
    Signer: crypto::signature::Signer<crypto::Signature>,
    Signer: crypto::signature::Signer<crypto::ssh::ExtendedSignature>,
    Signer: crypto::signature::Verifier<crypto::Signature>,
{
    /// Update an object.
    pub fn update(
        &mut self,
        type_name: &TypeName,
        object_id: ObjectId,
        message: &str,
        actions: impl Into<NonEmpty<T::Action>>,
        embeds: Vec<Embed<Uri>>,
    ) -> Result<Updated<T>, Error> {
        let actions = actions.into();
        let related = actions.iter().flat_map(T::Action::parents).collect();
        let changes = actions.try_map(encoding::encode)?;
        let embeds = embeds
            .into_iter()
            .map(|e| {
                Ok::<_, Error>(Embed {
                    content: git::Oid::try_from(&e.content).map_err(Error::EmbedUri)?,
                    name: e.name.clone(),
                })
            })
            .collect::<Result<_, _>>()?;
        let namespace = self.access.signer.verifying_key();
        let updated = cob::update(
            self.repo,
            self.access.signer,
            self.identity,
            related,
            &namespace,
            Update {
                object_id,
                type_name: type_name.clone(),
                message: message.to_owned(),
                embeds,
                changes,
            },
        )?;
        self.repo
            .sign_refs(self.access.signer)
            .map_err(|e| Error::SignRefs(Box::new(e)))?;

        Ok(updated)
    }

    /// Create an object.
    pub fn create(
        &mut self,
        message: &str,
        actions: impl Into<NonEmpty<T::Action>>,
        embeds: Vec<Embed<Uri>>,
    ) -> Result<(ObjectId, T), Error> {
        let actions = actions.into();
        let parents = actions.iter().flat_map(T::Action::parents).collect();
        let contents = actions.try_map(encoding::encode)?;
        let embeds = embeds
            .into_iter()
            .map(|e| {
                Ok::<_, Error>(Embed {
                    content: git::Oid::try_from(&e.content).map_err(Error::EmbedUri)?,
                    name: e.name.clone(),
                })
            })
            .collect::<Result<_, _>>()?;
        let namespace = self.access.signer.verifying_key();
        let cob = cob::create::<T, _, _>(
            self.repo,
            self.access.signer,
            self.identity,
            parents,
            &namespace,
            Create {
                type_name: self.scope.type_name.clone(),
                version: Version::default(),
                message: message.to_owned(),
                embeds,
                contents,
            },
        )?;
        // Nb. We can't sign our refs before the identity refs exist, which are created after
        // the identity COB is created. Therefore we manually sign refs when creating identity
        // COBs.
        if self.scope.type_name != &*crate::cob::identity::TYPENAME {
            self.repo
                .sign_refs(self.access.signer)
                .map_err(|e| Error::SignRefs(Box::new(e)))?;
        }
        Ok((*cob.id(), cob.object))
    }

    /// Remove an object.
    pub fn remove(&mut self, id: &ObjectId) -> Result<(), Error> {
        let namespace = self.access.signer.verifying_key();
        let name = git::refs::storage::cob(&namespace, self.scope.type_name, id);
        match self.repo.reference_oid(&namespace, &name.strip_namespace()) {
            Ok(_) => {
                cob::remove(self.repo, &namespace, self.scope.type_name, id)?;
                self.repo
                    .sign_refs(self.access.signer)
                    .map_err(|e| Error::SignRefs(Box::new(e)))?;
                Ok(())
            }
            Err(err) if err.code() == git::raw::ErrorCode::NotFound => Ok(()),
            Err(err) => Err(Error::RefLookup {
                name: name.to_ref_string(),
                err,
            }),
        }
    }
}

impl<'a, T, Repo, Access> Store<'a, T, Repo, Access>
where
    T: Cob + cob::Evaluate<Repo> + 'a,
    T::Action: Serialize,
    Repo: ReadRepository + cob::Store<Namespace = NodeId>,
    Access: access::Access,
{
    /// Get an object.
    pub fn get(&self, id: &ObjectId) -> Result<Option<T>, Error> {
        cob::get::<T, _>(self.repo, self.scope.type_name, id)
            .map(|r| r.map(|cob| cob.object))
            .map_err(Error::from)
    }

    /// Return all objects.
    pub fn all(
        &self,
    ) -> Result<
        impl ExactSizeIterator<Item = Result<(ObjectId, T), Error>> + use<'a, T, Repo, Access>,
        Error,
    > {
        let raw = cob::list::<T, _>(self.repo, self.scope.type_name)?;

        Ok(raw.into_iter().map(|o| Ok((*o.id(), o.object))))
    }

    /// Return true if the list of issues is empty.
    pub fn is_empty(&self) -> Result<bool, Error> {
        Ok(self.count()? == 0)
    }

    /// Return objects count.
    pub fn count(&self) -> Result<usize, Error> {
        let raw = cob::list::<T, _>(self.repo, self.scope.type_name)?;

        Ok(raw.len())
    }
}

/// Allows operations to be batched atomically.
#[derive(Debug)]
pub struct Transaction<T: Cob + cob::Evaluate<R>, R> {
    actions: Vec<T::Action>,
    embeds: Vec<Embed<Uri>>,

    // Internal state kept for validation of the transaction.
    // If an action that produces an identifier is added to
    // the transaction, then this will track its index,
    // so that adding a second action that produces an identifier
    // can fail with a useful error.
    produces_identifier: Option<usize>,

    repo: PhantomData<R>,
    type_name: TypeName,
}

impl<T: Cob + CobWithType + cob::Evaluate<R>, R> Default for Transaction<T, R> {
    fn default() -> Self {
        Self {
            actions: Vec::new(),
            embeds: Vec::new(),
            produces_identifier: None,
            repo: PhantomData,
            type_name: T::type_name().clone(),
        }
    }
}

impl<T, R> Transaction<T, R>
where
    T: Cob + cob::Evaluate<R>,
{
    pub fn new(type_name: TypeName, actions: Vec<T::Action>, embeds: Vec<Embed<Uri>>) -> Self {
        Self {
            actions,
            embeds,
            produces_identifier: None,
            repo: PhantomData,
            type_name,
        }
    }

    #[deprecated(note = "only exists to accommodate signatures in crate::cob::identity")]
    pub(super) fn into_inner(self) -> (Vec<T::Action>, Vec<Embed<Uri>>) {
        (self.actions, self.embeds)
    }
}

impl<T, Repo> Transaction<T, Repo>
where
    T: Cob + CobWithType + cob::Evaluate<Repo>,
{
    /// Create a new transaction to be used as the initial set of operations for a COB.
    pub fn initial<'a, 'b, Signer, Tx, F>(
        message: &str,
        store: &mut Store<'a, T, Repo, WriteAs<'b, Signer>>,
        operations: F,
    ) -> Result<(ObjectId, T), Error>
    where
        T::Action: Serialize + Clone,
        Repo: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
        Signer: crypto::signature::Keypair<VerifyingKey = crypto::PublicKey>,
        Signer: crypto::signature::Signer<crypto::Signature>,
        Signer: crypto::signature::Signer<crypto::ssh::ExtendedSignature>,
        Signer: crypto::signature::Verifier<crypto::Signature>,
        Tx: From<Self>,
        Self: From<Tx>,
        F: FnOnce(&mut Tx, &Repo) -> Result<(), Error>,
    {
        let mut tx = Tx::from(Transaction::default());
        operations(&mut tx, store.as_ref())?;
        let tx = Self::from(tx);

        let actions = NonEmpty::from_vec(tx.actions)
            .expect("Transaction::initial: transaction must contain at least one action");

        store.create(message, actions, tx.embeds)
    }
}

impl<T, Repo> Transaction<T, Repo>
where
    T: Cob + cob::Evaluate<Repo>,
{
    /// Add an action to this transaction.
    pub fn push(&mut self, action: T::Action) -> Result<(), Error> {
        if action.produces_identifier() {
            if let Some(index) = self.produces_identifier {
                return Err(Error::ClashingIdentifiers(
                    serde_json::to_string(&self.actions[index])?,
                    serde_json::to_string(&action)?,
                ));
            } else {
                self.produces_identifier = Some(self.actions.len())
            }
        }

        self.actions.push(action);

        Ok(())
    }

    /// Add actions to this transaction.
    /// Note that we cannot implement [`std::iter::Extend`] because [`Self::push`]
    /// validates the action being pushed, and therefore is falliable.
    pub fn extend<I: IntoIterator<Item = T::Action>>(&mut self, actions: I) -> Result<(), Error> {
        for action in actions {
            self.push(action)?;
        }
        Ok(())
    }

    /// Embed media into the transaction.
    pub fn embed(&mut self, embeds: impl IntoIterator<Item = Embed<Uri>>) -> Result<(), Error> {
        self.embeds.extend(embeds);

        Ok(())
    }

    /// Commit transaction.
    ///
    /// Returns an operation that can be applied onto an in-memory state.
    pub fn commit<'a, Signer>(
        self,
        msg: &str,
        id: ObjectId,
        store: &mut Store<T, Repo, WriteAs<'a, Signer>>,
    ) -> Result<(T, EntryId), Error>
    where
        T::Action: Serialize + Clone,
        Repo: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
        Signer: crypto::signature::Keypair<VerifyingKey = crypto::PublicKey>,
        Signer: crypto::signature::Signer<crypto::Signature>,
        Signer: crypto::signature::Signer<crypto::ssh::ExtendedSignature>,
        Signer: crypto::signature::Verifier<crypto::Signature>,
    {
        let actions = NonEmpty::from_vec(self.actions)
            .expect("Transaction::commit: transaction must not be empty");

        let Updated {
            head,
            object: CollaborativeObject { object, .. },
            ..
        } = store.update(&self.type_name, id, msg, actions, self.embeds)?;

        Ok((object, head))
    }
}

/// Get an object's operations without decoding them.
pub fn ops<R: cob::Store>(
    id: &ObjectId,
    type_name: &TypeName,
    repo: &R,
) -> Result<NonEmpty<Op<Vec<u8>>>, Error> {
    let cob = cob::get::<NonEmpty<cob::Entry>, _>(repo, type_name, id)?;

    if let Some(cob) = cob {
        Ok(cob.object.map(Op::from))
    } else {
        Err(Error::NotFound(type_name.clone(), *id))
    }
}

pub mod encoding {
    use serde::Serialize;

    use crate::canonical::formatter::CanonicalFormatter;

    /// Serialize the change into a byte string.
    pub fn encode<A: Serialize>(action: A) -> Result<Vec<u8>, serde_json::Error> {
        let mut buf = Vec::new();
        let mut serializer =
            serde_json::Serializer::with_formatter(&mut buf, CanonicalFormatter::new());

        action.serialize(&mut serializer)?;

        Ok(buf)
    }
}

#[cfg(test)]
pub mod test {
    use super::*;

    #[derive(Debug, thiserror::Error)]
    pub enum HistoryError<T: Cob> {
        #[error("apply: {0}")]
        Apply(T::Error),
        #[error("operation decoding failed: {0}")]
        Op(#[from] cob::op::OpEncodingError),
    }

    /// Turn a history into a concrete type, by traversing the history and applying each operation
    /// to the state, skipping branches that return errors.
    pub fn from_history<R: ReadRepository, T: Cob + CobWithType>(
        history: &crate::cob::History,
        repo: &R,
    ) -> Result<T, HistoryError<T>> {
        use std::ops::ControlFlow;

        let root = history.root();
        let children = history.children_of(root.id());
        let op = Op::try_from(root)?;
        let initial = T::from_root(op, repo).map_err(HistoryError::Apply)?;
        let obj = history.traverse(initial, &children, |mut acc, _, entry| {
            match Op::try_from(entry) {
                Ok(op) => {
                    if let Err(err) = acc.op(op, [], repo) {
                        log::warn!("Error applying op to `{}` state: {err}", T::type_name());
                        return ControlFlow::Break(acc);
                    }
                }
                Err(err) => {
                    log::warn!("Error decoding ops for `{}` state: {err}", T::type_name());
                    return ControlFlow::Break(acc);
                }
            }
            ControlFlow::Continue(acc)
        });

        Ok(obj)
    }
}