Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
crdt: Rely on causal ordering of events
Alexis Sellier committed 3 years ago
commit 919e36ab1fe33e563f87d94075cd11ba3b5683d7
parent 3af2c53bb220a312c0869d292fbbbdb2e472ce37
1 file changed +62 -61
modified radicle/src/cob/patch.rs
@@ -1,6 +1,5 @@
#![allow(clippy::too_many_arguments)]
use std::collections::BTreeMap;
-
use std::collections::VecDeque;
use std::fmt;
use std::ops::ControlFlow;
use std::ops::Deref;
@@ -43,11 +42,25 @@ pub type RevisionId = ChangeId;
/// Index of a revision in the revisions list.
pub type RevisionIx = usize;

+
/// Error applying an operation onto a state.
+
#[derive(Error, Debug)]
+
pub enum ApplyError {
+
    /// Causal dependency missing.
+
    ///
+
    /// This error indicates that the operations are not being applied
+
    /// in causal order, which is a requirement for this CRDT.
+
    ///
+
    /// For example, this can occur if a change references another change
+
    /// that hasn't happened yet.
+
    #[error("causal dependency {0:?} missing")]
+
    Missing(ChangeId),
+
}
+

/// Error updating or creating patches.
#[derive(Error, Debug)]
pub enum Error {
-
    #[error("apply failed")]
-
    Apply,
+
    #[error("apply failed: {0}")]
+
    Apply(#[from] ApplyError),
    #[error("store: {0}")]
    Store(#[from] store::Error),
}
@@ -213,31 +226,15 @@ impl Patch {
    }

    /// Apply a list of changes to the state.
-
    pub fn apply(
-
        &mut self,
-
        changes: impl IntoIterator<Item = Change>,
-
        waiting: &mut BTreeMap<ChangeId, Vec<Change>>,
-
    ) -> Result<(), Error> {
-
        let mut queue = changes.into_iter().collect::<VecDeque<_>>();
-

-
        while let Some(change) = queue.pop_front() {
-
            let id = change.id();
-
            self.apply_one(change, waiting)?;
-

-
            // If we have changes waiting for the change we just applied, we can now process them.
-
            if let Some(waiting) = waiting.remove(&id) {
-
                queue.extend(waiting);
-
            }
+
    pub fn apply(&mut self, changes: impl IntoIterator<Item = Change>) -> Result<(), ApplyError> {
+
        for change in changes {
+
            self.apply_one(change)?;
        }
        Ok(())
    }

    /// Apply a single change to the state.
-
    pub fn apply_one(
-
        &mut self,
-
        change: Change,
-
        waiting: &mut BTreeMap<ChangeId, Vec<Change>>,
-
    ) -> Result<(), Error> {
+
    pub fn apply_one(&mut self, change: Change) -> Result<(), ApplyError> {
        let id = change.id();
        let author = Author::new(change.author);
        // FIXME(cloudhead): Use commit timestamp.
@@ -283,7 +280,7 @@ impl Patch {
                        change.clock,
                    );
                } else {
-
                    waiting.entry(revision).or_default().push(change);
+
                    return Err(ApplyError::Missing(revision));
                }
            }
            Action::Merge { revision, commit } => {
@@ -298,7 +295,7 @@ impl Patch {
                        change.clock,
                    );
                } else {
-
                    waiting.entry(revision).or_default().push(change);
+
                    return Err(ApplyError::Missing(revision));
                }
            }
            Action::Thread { revision, action } => {
@@ -310,6 +307,8 @@ impl Patch {
                        author: change.author,
                        clock: change.clock,
                    }]);
+
                } else {
+
                    return Err(ApplyError::Missing(revision));
                }
            }
        }
@@ -327,17 +326,13 @@ impl store::FromHistory for Patch {
    fn from_history(
        history: &radicle_cob::History,
    ) -> Result<(Self, clock::Lamport), store::Error> {
-
        let mut waiting = BTreeMap::default();
        let obj = history.traverse(Self::default(), |mut acc, entry| {
            if let Ok(action) = Action::decode(entry.contents()) {
-
                if let Err(err) = acc.apply(
-
                    [Change {
-
                        action,
-
                        author: *entry.actor(),
-
                        clock: entry.clock().into(),
-
                    }],
-
                    &mut waiting,
-
                ) {
+
                if let Err(err) = acc.apply([Change {
+
                    action,
+
                    author: *entry.actor(),
+
                    clock: entry.clock().into(),
+
                }]) {
                    log::warn!("Error applying change to patch state: {err}");
                    return ControlFlow::Break(acc);
                }
@@ -662,18 +657,13 @@ impl<'a, 'g> PatchMut<'a, 'g> {
        action: Action,
        signer: &G,
    ) -> Result<ChangeId, Error> {
-
        let mut waiting = BTreeMap::default();
        let change = Change {
            author: *signer.public_key(),
            action: action.clone(),
            clock: self.clock.tick(),
        };
+
        self.patch.apply([change])?;

-
        self.patch.apply([change], &mut waiting)?;
-

-
        if !waiting.is_empty() {
-
            return Err(Error::Apply);
-
        }
        let cob = self
            .store
            .update(self.id, msg, action, signer)
@@ -793,8 +783,7 @@ mod test {
    use crdt::ActorId;

    use pretty_assertions::assert_eq;
-
    use quickcheck::Arbitrary;
-
    use quickcheck_macros::quickcheck;
+
    use quickcheck::{Arbitrary, TestResult};

    use super::*;
    use crate::test;
@@ -867,7 +856,7 @@ mod test {
            let mut changes = Vec::new();
            let mut permutations: [Vec<Change>; N] = array::from_fn(|_| Vec::new());

-
            for (clock, action) in gen.take(g.size().min(8)) {
+
            for (clock, action) in gen.take(g.size()) {
                changes.push(Change {
                    action,
                    author,
@@ -886,29 +875,41 @@ mod test {

    // TODO: Test merging of redacted revision

-
    #[quickcheck]
-
    fn prop_invariants(log: Changes<3>) {
-
        let t = Patch::default();
-
        let [p1, p2, p3] = log.permutations;
-
        let mut waiting = BTreeMap::default();
-

-
        let mut t1 = t.clone();
-
        t1.apply(p1, &mut waiting).unwrap();
+
    #[test]
+
    fn prop_invariants() {
+
        fn property(log: Changes<3>) -> TestResult {
+
            let t = Patch::default();
+
            let [p1, p2, p3] = log.permutations;
+

+
            let mut t1 = t.clone();
+
            match t1.apply(p1) {
+
                Ok(()) => {}
+
                Err(ApplyError::Missing(_)) => return TestResult::discard(),
+
            }

-
        waiting.clear();
+
            let mut t2 = t.clone();
+
            match t2.apply(p2) {
+
                Ok(()) => {}
+
                Err(ApplyError::Missing(_)) => return TestResult::discard(),
+
            }

-
        let mut t2 = t.clone();
-
        t2.apply(p2, &mut waiting).unwrap();
+
            let mut t3 = t;
+
            match t3.apply(p3) {
+
                Ok(()) => {}
+
                Err(ApplyError::Missing(_)) => return TestResult::discard(),
+
            }

-
        waiting.clear();
+
            assert_eq!(t1, t2);
+
            assert_eq!(t2, t3);
+
            assert_laws(&t1, &t2, &t3);

-
        let mut t3 = t;
-
        t3.apply(p3, &mut waiting).unwrap();
+
            TestResult::passed()
+
        }

-
        assert_eq!(t1, t2);
-
        assert_eq!(t2, t3);
-
        assert_laws(&t1, &t2, &t3);
-
        assert!(waiting.is_empty());
+
        quickcheck::QuickCheck::new()
+
            .min_tests_passed(100)
+
            .gen(quickcheck::Gen::new(8))
+
            .quickcheck(property as fn(Changes<3>) -> TestResult);
    }

    #[test]