Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Bundle patch CRDT operations
Alexis Sellier committed 3 years ago
commit e5e1ba583f41d9eb529132b8e00421a533bb7be1
parent 335fcbca89e4ffe0bd2c63690814429ab6ad9915
4 files changed +202 -71
modified radicle/src/cob/issue.rs
@@ -1,12 +1,14 @@
use std::ops::{ControlFlow, Deref};
use std::str::FromStr;

+
use nonempty::NonEmpty;
use once_cell::sync::Lazy;
-
use radicle_crdt::clock;
-
use radicle_crdt::{LWWReg, LWWSet, Max, Semilattice};
use serde::{Deserialize, Serialize};
use thiserror::Error;

+
use radicle_crdt::clock;
+
use radicle_crdt::{LWWReg, LWWSet, Max, Semilattice};
+

use crate::cob;
use crate::cob::common::{Author, Reaction, Tag};
use crate::cob::thread;
@@ -285,7 +287,7 @@ impl<'a, 'g> IssueMut<'a, 'g> {
    ) -> Result<OpId, Error> {
        let cob = self
            .store
-
            .update(self.id, msg, action.clone(), signer)
+
            .update(self.id, msg, NonEmpty::new(action.clone()), signer)
            .map_err(Error::Store)?;
        let clock = cob.history().clock().into();
        let timestamp = cob.history().timestamp().into();
@@ -363,7 +365,9 @@ impl<'a> Issues<'a> {
        let title = title.into();
        let description = description.into();
        let action = Action::Title { title };
-
        let (id, issue, clock) = self.raw.create("Create issue", action, signer)?;
+
        let (id, issue, clock) = self
+
            .raw
+
            .create("Create issue", NonEmpty::new(action), signer)?;
        let mut issue = IssueMut {
            id,
            clock,
modified radicle/src/cob/patch.rs
@@ -5,14 +5,17 @@ use std::ops::Deref;
use std::ops::Range;
use std::str::FromStr;

+
use nonempty::NonEmpty;
use once_cell::sync::Lazy;
-
use radicle_crdt::clock;
-
use radicle_crdt::{GMap, LWWReg, LWWSet, Max, Redactable, Semilattice};
use serde::{Deserialize, Serialize};
use thiserror::Error;

+
use radicle_crdt::clock;
+
use radicle_crdt::{GMap, LWWReg, LWWSet, Max, Redactable, Semilattice};
+

use crate::cob;
use crate::cob::common::{Author, Tag, Timestamp};
+
use crate::cob::op::Ops;
use crate::cob::thread;
use crate::cob::thread::CommentId;
use crate::cob::thread::Thread;
@@ -25,8 +28,6 @@ use crate::storage::git as storage;
/// The logical clock we use to order operations to patches.
pub use clock::Lamport as Clock;

-
use super::op::Ops;
-

/// Type name of a patch.
pub static TYPENAME: Lazy<TypeName> =
    Lazy::new(|| FromStr::from_str("xyz.radicle.patch").expect("type name is valid"));
@@ -525,6 +526,107 @@ impl Review {
    }
}

+
/// Allows operations to be batched atomically.
+
#[derive(Debug)]
+
pub struct Transaction {
+
    actor: ActorId,
+
    clock: clock::Lamport,
+
    actions: Vec<Action>,
+
}
+

+
impl Transaction {
+
    /// Create a new transaction.
+
    pub fn new(actor: ActorId, clock: clock::Lamport) -> Self {
+
        Self {
+
            actions: Vec::new(),
+
            clock,
+
            actor,
+
        }
+
    }
+

+
    /// Add an operation to this transaction.
+
    pub fn push(&mut self, action: Action) -> OpId {
+
        self.actions.push(action);
+
        self.clock.tick();
+

+
        (self.clock, self.actor)
+
    }
+

+
    /// Edit patch metadata.
+
    pub fn edit(&mut self, title: String, description: String, target: MergeTarget) -> OpId {
+
        self.push(Action::Edit {
+
            title,
+
            description,
+
            target,
+
        })
+
    }
+

+
    /// Comment on a patch revision.
+
    pub fn comment<S: ToString>(&mut self, revision: RevisionId, body: S) -> OpId {
+
        self.push(Action::Thread {
+
            revision,
+
            action: thread::Action::Comment {
+
                body: body.to_string(),
+
                reply_to: None,
+
            },
+
        })
+
    }
+

+
    /// Review a patch revision.
+
    pub fn review(
+
        &mut self,
+
        revision: RevisionId,
+
        verdict: Option<Verdict>,
+
        comment: Option<String>,
+
        inline: Vec<CodeComment>,
+
    ) -> OpId {
+
        self.push(Action::Review {
+
            revision,
+
            comment,
+
            verdict,
+
            inline,
+
        })
+
    }
+

+
    /// Merge a patch revision.
+
    pub fn merge(&mut self, revision: RevisionId, commit: git::Oid) -> OpId {
+
        self.push(Action::Merge { revision, commit })
+
    }
+

+
    /// Add a patch revision.
+
    pub fn revision(&mut self, base: impl Into<git::Oid>, oid: impl Into<git::Oid>) -> OpId {
+
        self.push(Action::Revision {
+
            base: base.into(),
+
            oid: oid.into(),
+
        })
+
    }
+

+
    /// Update a patch with a new revision.
+
    pub fn update(
+
        &mut self,
+
        description: impl ToString,
+
        base: impl Into<git::Oid>,
+
        oid: impl Into<git::Oid>,
+
    ) -> (OpId, OpId) {
+
        let revision = self.revision(base, oid);
+
        let comment = self.comment(revision, description);
+

+
        (revision, comment)
+
    }
+

+
    /// Tag a patch.
+
    pub fn tag(
+
        &mut self,
+
        add: impl IntoIterator<Item = Tag>,
+
        remove: impl IntoIterator<Item = Tag>,
+
    ) -> OpId {
+
        let add = add.into_iter().collect::<Vec<_>>();
+
        let remove = remove.into_iter().collect::<Vec<_>>();
+

+
        self.push(Action::Tag { add, remove })
+
    }
+
}
+

pub struct PatchMut<'a, 'g> {
    pub id: ObjectId,

@@ -548,6 +650,24 @@ impl<'a, 'g> PatchMut<'a, 'g> {
        }
    }

+
    pub fn transaction<G, F, T>(
+
        &mut self,
+
        message: &str,
+
        signer: &G,
+
        operations: F,
+
    ) -> Result<T, Error>
+
    where
+
        G: Signer,
+
        F: FnOnce(&mut Transaction) -> T,
+
    {
+
        let mut tx = Transaction::new(*signer.public_key(), self.clock);
+
        let output = operations(&mut tx);
+

+
        self.commit(message, tx, signer)?;
+

+
        Ok(output)
+
    }
+

    /// Get the internal logical clock.
    pub fn clock(&self) -> &clock::Lamport {
        &self.clock
@@ -561,30 +681,17 @@ impl<'a, 'g> PatchMut<'a, 'g> {
        target: MergeTarget,
        signer: &G,
    ) -> Result<OpId, Error> {
-
        let action = Action::Edit {
-
            title,
-
            description,
-
            target,
-
        };
-
        self.apply("Edit", action, signer)
+
        self.transaction("Edit", signer, |tx| tx.edit(title, description, target))
    }

    /// Comment on a patch revision.
-
    pub fn comment<G: Signer, S: Into<String>>(
+
    pub fn comment<G: Signer, S: ToString>(
        &mut self,
        revision: RevisionId,
        body: S,
        signer: &G,
    ) -> Result<CommentId, Error> {
-
        let body = body.into();
-
        let action = Action::Thread {
-
            revision,
-
            action: thread::Action::Comment {
-
                body,
-
                reply_to: None,
-
            },
-
        };
-
        self.apply("Comment", action, signer)
+
        self.transaction("Comment", signer, |tx| tx.comment(revision, body))
    }

    /// Review a patch revision.
@@ -596,13 +703,9 @@ impl<'a, 'g> PatchMut<'a, 'g> {
        inline: Vec<CodeComment>,
        signer: &G,
    ) -> Result<OpId, Error> {
-
        let action = Action::Review {
-
            revision,
-
            comment,
-
            verdict,
-
            inline,
-
        };
-
        self.apply("Review patch", action, signer)
+
        self.transaction("Review", signer, |tx| {
+
            tx.review(revision, verdict, comment, inline)
+
        })
    }

    /// Merge a patch revision.
@@ -612,29 +715,23 @@ impl<'a, 'g> PatchMut<'a, 'g> {
        commit: git::Oid,
        signer: &G,
    ) -> Result<OpId, Error> {
-
        let action = Action::Merge { revision, commit };
-
        self.apply("Merge revision", action, signer)
+
        self.transaction("Merge revision", signer, |tx| tx.merge(revision, commit))
    }

    /// Update a patch with a new revision.
    pub fn update<G: Signer>(
        &mut self,
-
        description: impl Into<String>,
+
        description: impl ToString,
        base: impl Into<git::Oid>,
        oid: impl Into<git::Oid>,
        signer: &G,
-
    ) -> Result<OpId, Error> {
-
        let description = description.into();
-
        let base = base.into();
-
        let oid = oid.into();
-
        let revision = self.apply(
-
            "Update patch with new revision",
-
            Action::Revision { base, oid },
-
            signer,
-
        )?;
-
        self.comment(revision, description, signer)?;
+
    ) -> Result<(OpId, OpId), Error> {
+
        self.transaction("Add revision", signer, |tx| {
+
            let r = tx.revision(base, oid);
+
            let c = tx.comment(r, description);

-
        Ok(revision)
+
            (r, c)
+
        })
    }

    /// Tag a patch.
@@ -644,35 +741,41 @@ impl<'a, 'g> PatchMut<'a, 'g> {
        remove: impl IntoIterator<Item = Tag>,
        signer: &G,
    ) -> Result<OpId, Error> {
-
        let add = add.into_iter().collect::<Vec<_>>();
-
        let remove = remove.into_iter().collect::<Vec<_>>();
-
        let action = Action::Tag { add, remove };
-

-
        self.apply("Tag", action, signer)
+
        self.transaction("Tag", signer, |tx| tx.tag(add, remove))
    }

-
    /// Apply an operation to the patch.
-
    pub fn apply<G: Signer>(
+
    /// Commit transaction.
+
    pub fn commit<G: Signer>(
        &mut self,
-
        msg: &'static str,
-
        action: Action,
+
        msg: &str,
+
        tx: Transaction,
        signer: &G,
-
    ) -> Result<OpId, Error> {
+
    ) -> Result<(), Error> {
+
        let actions = NonEmpty::from_vec(tx.actions)
+
            .expect("PatchMut::commit: transaction must not be empty");
        let cob = self
            .store
-
            .update(self.id, msg, action.clone(), signer)
+
            .update(self.id, msg, actions.clone(), signer)
            .map_err(Error::Store)?;
-
        let clock = cob.history().clock().into();
+
        let author = tx.actor;
        let timestamp = cob.history().timestamp().into();
-
        let op = Op {
-
            action,
-
            author: *signer.public_key(),
-
            clock,
-
            timestamp,
-
        };
-
        self.patch.apply_one(op)?;

-
        Ok((clock, *signer.public_key()))
+
        // The history clock should be in sync with the tx clock.
+
        assert_eq!(cob.history().clock(), tx.clock.get());
+

+
        for action in actions {
+
            let clock = self.clock.tick();
+
            self.patch.apply_one(Op {
+
                action,
+
                author,
+
                clock,
+
                timestamp,
+
            })?;
+
        }
+
        // After applying all ops, our clock should also be in sync with the tx clock.
+
        assert_eq!(self.clock, tx.clock);
+

+
        Ok(())
    }
}

@@ -724,11 +827,17 @@ impl<'a> Patches<'a> {
            base: base.into(),
            oid: oid.into(),
        };
-
        let (id, patch, clock) = self.raw.create("Create patch", action, signer)?;
+
        let (id, patch, clock) = self
+
            .raw
+
            .create("Create patch", NonEmpty::new(action), signer)?;
        let mut patch = PatchMut::new(id, patch, clock, self);
+
        assert_eq!(patch.clock, clock::Lamport::from(0));

        patch.edit(title, description, target, signer)?;
+
        assert_eq!(patch.clock, clock::Lamport::from(1));
+

        patch.tag(tags.to_owned(), [], signer)?;
+
        assert_eq!(patch.clock, clock::Lamport::from(2));

        Ok(patch)
    }
modified radicle/src/cob/store.rs
@@ -99,11 +99,17 @@ where
    pub fn update<G: Signer>(
        &self,
        object_id: ObjectId,
-
        message: &'static str,
-
        action: T::Action,
+
        message: &str,
+
        actions: impl Into<NonEmpty<T::Action>>,
        signer: &G,
    ) -> Result<CollaborativeObject, Error> {
-
        let changes = NonEmpty::new(encoding::encode(&action)?);
+
        let changes = actions
+
            .into()
+
            .iter()
+
            .map(encoding::encode)
+
            .collect::<Result<Vec<_>, _>>()?
+
            .try_into()
+
            .expect("the collection is always non-empty");

        cob::update(
            self.raw,
@@ -125,10 +131,16 @@ where
    pub fn create<G: Signer>(
        &self,
        message: &'static str,
-
        action: T::Action,
+
        actions: impl Into<NonEmpty<T::Action>>,
        signer: &G,
    ) -> Result<(ObjectId, T, Lamport), Error> {
-
        let contents = NonEmpty::new(encoding::encode(&action)?);
+
        let contents = actions
+
            .into()
+
            .iter()
+
            .map(encoding::encode)
+
            .collect::<Result<Vec<_>, _>>()?
+
            .try_into()
+
            .expect("the collection is always non-empty");
        let cob = cob::create(
            self.raw,
            signer,
modified radicle/src/cob/thread.rs
@@ -81,6 +81,12 @@ pub enum Action {
    },
}

+
impl From<Action> for nonempty::NonEmpty<Action> {
+
    fn from(action: Action) -> Self {
+
        Self::new(action)
+
    }
+
}
+

/// A discussion thread.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Thread {