Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Make `Transaction` generic, move to `store`
Alexis Sellier committed 3 years ago
commit 90f20f447a43647a94888eab410aa550fc808642
parent 036442af878b220ce2bcad4b986198759a14fbd8
3 files changed +134 -90
modified radicle-crdt/src/clock.rs
@@ -20,6 +20,11 @@ impl Lamport {
        *self.counter.get()
    }

+
    /// The initial value of the clock.
+
    pub fn initial() -> Self {
+
        Self::default()
+
    }
+

    /// Increment clock and return new value.
    /// Must be called before sending a message.
    pub fn tick(&mut self) -> Self {
modified radicle/src/cob/patch.rs
@@ -5,7 +5,6 @@ use std::ops::Deref;
use std::ops::Range;
use std::str::FromStr;

-
use nonempty::NonEmpty;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -16,6 +15,7 @@ use radicle_crdt::{GMap, LWWReg, LWWSet, Max, Redactable, Semilattice};
use crate::cob;
use crate::cob::common::{Author, Tag, Timestamp};
use crate::cob::op::Ops;
+
use crate::cob::store::Transaction;
use crate::cob::thread;
use crate::cob::thread::CommentId;
use crate::cob::thread::Thread;
@@ -526,47 +526,7 @@ impl Review {
    }
}

-
/// Allows operations to be batched atomically.
-
#[derive(Debug)]
-
pub struct Transaction {
-
    actor: ActorId,
-
    clock: clock::Lamport,
-
    actions: Vec<Action>,
-
}
-

-
impl Transaction {
-
    /// Create a new transaction.
-
    pub fn new(actor: ActorId, clock: clock::Lamport) -> Self {
-
        Self {
-
            actions: Vec::new(),
-
            clock,
-
            actor,
-
        }
-
    }
-

-
    /// Create a new transaction to be used as the initial set of operations for a COB.
-
    pub fn initial(actor: ActorId) -> Self {
-
        Self {
-
            actions: Vec::new(),
-
            clock: clock::Lamport::default(),
-
            actor,
-
        }
-
    }
-

-
    /// Consume this transaction, returning the underlying actions.
-
    pub fn actions(self) -> Vec<Action> {
-
        self.actions
-
    }
-

-
    /// Add an operation to this transaction.
-
    pub fn push(&mut self, action: Action) -> OpId {
-
        self.actions.push(action);
-
        self.clock.tick();
-

-
        (self.clock, self.actor)
-
    }
-

-
    /// Edit patch metadata.
+
impl store::Transaction<Patch> {
    pub fn edit(
        &mut self,
        title: impl ToString,
@@ -677,12 +637,14 @@ impl<'a, 'g> PatchMut<'a, 'g> {
    ) -> Result<T, Error>
    where
        G: Signer,
-
        F: FnOnce(&mut Transaction) -> T,
+
        F: FnOnce(&mut Transaction<Patch>) -> T,
    {
        let mut tx = Transaction::new(*signer.public_key(), self.clock);
        let output = operations(&mut tx);
+
        let (ops, clock) = tx.commit(message, self.id, &mut self.store.raw, signer)?;

-
        self.commit(message, tx, signer)?;
+
        self.patch.apply(ops)?;
+
        self.clock = clock;

        Ok(output)
    }
@@ -762,40 +724,6 @@ impl<'a, 'g> PatchMut<'a, 'g> {
    ) -> Result<OpId, Error> {
        self.transaction("Tag", signer, |tx| tx.tag(add, remove))
    }
-

-
    /// Commit transaction.
-
    pub fn commit<G: Signer>(
-
        &mut self,
-
        msg: &str,
-
        tx: Transaction,
-
        signer: &G,
-
    ) -> Result<(), Error> {
-
        let actions = NonEmpty::from_vec(tx.actions)
-
            .expect("PatchMut::commit: transaction must not be empty");
-
        let cob = self
-
            .store
-
            .update(self.id, msg, actions.clone(), signer)
-
            .map_err(Error::Store)?;
-
        let author = tx.actor;
-
        let timestamp = cob.history().timestamp().into();
-

-
        // The history clock should be in sync with the tx clock.
-
        assert_eq!(cob.history().clock(), tx.clock.get());
-

-
        for action in actions {
-
            let clock = self.clock.tick();
-
            self.patch.apply_one(Op {
-
                action,
-
                author,
-
                clock,
-
                timestamp,
-
            })?;
-
        }
-
        // After applying all ops, our clock should also be in sync with the tx clock.
-
        assert_eq!(self.clock, tx.clock);
-

-
        Ok(())
-
    }
}

impl<'a, 'g> Deref for PatchMut<'a, 'g> {
@@ -840,15 +768,14 @@ impl<'a> Patches<'a> {
        tags: &[Tag],
        signer: &G,
    ) -> Result<PatchMut<'a, 'g>, Error> {
-
        let mut tx = Transaction::initial(*self.public_key());
-

-
        tx.revision(base, oid);
-
        tx.edit(title, description, target);
-
        tx.tag(tags.to_owned(), []);
-

-
        #[allow(clippy::unwrap_used)]
-
        let actions: NonEmpty<_> = tx.actions().try_into().unwrap(); // SAFETY: The transaction is not empty.
-
        let (id, patch, clock) = self.raw.create("Create patch", actions, signer)?;
+
        let (id, patch, clock) =
+
            Transaction::initial("Create patch", &mut self.raw, signer, |tx| {
+
                tx.revision(base, oid);
+
                tx.edit(title, description, target);
+
                tx.tag(tags.to_owned(), []);
+
            })?;
+
        // Just a sanity check that our clock is advancing as expected.
+
        assert_eq!(clock.get(), 2);

        Ok(PatchMut::new(id, patch, clock, self))
    }
@@ -1307,12 +1234,15 @@ mod test {
            )
            .unwrap();

+
        assert_eq!(patch.clock.get(), 2);
        assert_eq!(patch.description(), Some("Blah blah blah."));
        assert_eq!(patch.version(), 0);

-
        let _rev1_id = patch
+
        let ((c1, _), (c2, _)) = patch
            .update("I've made changes.", base, rev1_oid, &signer)
            .unwrap();
+
        assert_eq!(c1.get(), 3);
+
        assert_eq!(c2.get(), 4);

        let id = patch.id;
        let patch = patches.get(&id).unwrap().unwrap();
modified radicle/src/cob/store.rs
@@ -1,5 +1,6 @@
//! Generic COB storage.
#![allow(clippy::large_enum_variant)]
+
#![allow(clippy::type_complexity)]
use std::marker::PhantomData;

use nonempty::NonEmpty;
@@ -9,7 +10,7 @@ use serde::Serialize;
use crate::cob;
use crate::cob::common::Author;
use crate::cob::CollaborativeObject;
-
use crate::cob::{Create, History, ObjectId, TypeName, Update};
+
use crate::cob::{ActorId, Create, History, ObjectId, TypeName, Update};
use crate::crypto::PublicKey;
use crate::git;
use crate::identity;
@@ -130,7 +131,7 @@ where
    /// Create an object.
    pub fn create<G: Signer>(
        &self,
-
        message: &'static str,
+
        message: &str,
        actions: impl Into<NonEmpty<T::Action>>,
        signer: &G,
    ) -> Result<(ObjectId, T, Lamport), Error> {
@@ -199,6 +200,114 @@ where
    }
}

+
/// Allows operations to be batched atomically.
+
#[derive(Debug)]
+
pub struct Transaction<T: FromHistory> {
+
    actor: ActorId,
+
    start: Lamport,
+
    clock: Option<Lamport>,
+
    actions: Vec<T::Action>,
+
}
+

+
impl<T: FromHistory> Transaction<T> {
+
    /// Create a new transaction.
+
    pub fn new(actor: ActorId, clock: Lamport) -> Self {
+
        Self {
+
            actor,
+
            start: clock,
+
            clock: Some(clock),
+
            actions: Vec::new(),
+
        }
+
    }
+

+
    /// Create a new transaction to be used as the initial set of operations for a COB.
+
    pub fn initial<G, F>(
+
        message: &str,
+
        store: &mut Store<T>,
+
        signer: &G,
+
        operations: F,
+
    ) -> Result<(ObjectId, T, Lamport), Error>
+
    where
+
        G: Signer,
+
        F: FnOnce(&mut Self),
+
        T::Action: Serialize + Clone,
+
    {
+
        let actor = *signer.public_key();
+
        let mut tx = Transaction {
+
            actor,
+
            start: Lamport::initial(),
+
            clock: None,
+
            actions: Vec::new(),
+
        };
+
        operations(&mut tx);
+

+
        let actions = NonEmpty::from_vec(tx.actions)
+
            .expect("Transaction::initial: transaction must contain at least one operation");
+
        let (id, cob, clock) = store.create(message, actions, signer)?;
+

+
        // The history clock should be in sync with the tx clock.
+
        assert_eq!(Some(clock), tx.clock);
+

+
        Ok((id, cob, clock))
+
    }
+

+
    /// Add an operation to this transaction.
+
    pub fn push(&mut self, action: T::Action) -> cob::OpId {
+
        self.actions.push(action);
+

+
        // If our clock already had a value, it means this isn't the first operation
+
        // of this COB. In that case we 'tick' the clock and return the new clock
+
        // value.
+
        //
+
        // Otherwise, it means it was the first operation of our COB. In that case
+
        // we set our clock to the initial clock value (0), and return that.
+
        if let Some(ref mut clock) = self.clock {
+
            (clock.tick(), self.actor)
+
        } else {
+
            self.clock = Some(Lamport::initial());
+

+
            (Lamport::initial(), self.actor)
+
        }
+
    }
+

+
    /// Commit transaction.
+
    ///
+
    /// Returns a list of operations that can be applied onto an in-memory CRDT.
+
    pub fn commit<G: Signer>(
+
        self,
+
        msg: &str,
+
        id: ObjectId,
+
        store: &mut Store<T>,
+
        signer: &G,
+
    ) -> Result<(Vec<cob::Op<T::Action>>, Lamport), Error>
+
    where
+
        T::Action: Serialize + Clone,
+
    {
+
        let actions = NonEmpty::from_vec(self.actions)
+
            .expect("Transaction::commit: transaction must not be empty");
+
        let cob = store.update(id, msg, actions.clone(), signer)?;
+
        let author = self.actor;
+
        let timestamp = cob.history().timestamp().into();
+

+
        // The history clock should be in sync with the tx clock.
+
        assert_eq!(Some(cob.history().clock()), self.clock.map(|c| c.get()));
+

+
        // Start the clock from where the transcation clock started.
+
        let mut clock = self.start;
+
        let ops = actions
+
            .into_iter()
+
            .map(|action| cob::Op {
+
                action,
+
                author,
+
                clock: clock.tick(),
+
                timestamp,
+
            })
+
            .collect();
+

+
        Ok((ops, clock))
+
    }
+
}
+

mod encoding {
    use serde::Serialize;