Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cob: Expose operation concurrency
cloudhead committed 2 years ago
commit eee304e0b70ad98403d8779fa4b7103c0f06e6a7
parent a46bcb224708402e9a3042a126a8154f965dc71e
9 files changed +221 -41
modified radicle-cob/src/change_graph.rs
@@ -113,13 +113,16 @@ impl ChangeGraph {
        let manifest = root.manifest.clone();
        let root = root.id;

-
        self.graph.prune(&children, |_, entry| {
+
        self.graph.prune(&children, |_, entry, siblings| {
            // Check the entry signatures are valid.
            if !entry.valid_signatures() {
                return ControlFlow::Break(());
            }
            // Apply the entry to the state, and if there's an error, prune that branch.
-
            if object.apply(entry, store).is_err() {
+
            if object
+
                .apply(entry, siblings.map(|(k, n)| (k, &n.value)), store)
+
                .is_err()
+
            {
                return ControlFlow::Break(());
            }
            ControlFlow::Continue(())
modified radicle-cob/src/object/collaboration.rs
@@ -70,7 +70,12 @@ pub trait Evaluate<R>: Sized + Debug + 'static {
    fn init(entry: &Entry, store: &R) -> Result<Self, Self::Error>;

    /// Apply a history entry to the evaluated state.
-
    fn apply(&mut self, entry: &Entry, store: &R) -> Result<(), Self::Error>;
+
    fn apply<'a, I: Iterator<Item = (&'a Oid, &'a Entry)>>(
+
        &mut self,
+
        entry: &Entry,
+
        concurrent: I,
+
        store: &R,
+
    ) -> Result<(), Self::Error>;
}

impl<R> Evaluate<R> for NonEmpty<Entry> {
@@ -80,7 +85,12 @@ impl<R> Evaluate<R> for NonEmpty<Entry> {
        Ok(Self::new(entry.clone()))
    }

-
    fn apply(&mut self, entry: &Entry, _store: &R) -> Result<(), Self::Error> {
+
    fn apply<'a, I: Iterator<Item = (&'a Oid, &'a Entry)>>(
+
        &mut self,
+
        entry: &Entry,
+
        _concurrent: I,
+
        _store: &R,
+
    ) -> Result<(), Self::Error> {
        self.push(entry.clone());

        Ok(())
modified radicle-cob/src/object/collaboration/update.rs
@@ -1,4 +1,5 @@
// Copyright © 2022 The Radicle Link Contributors
+
use std::iter;

use git_ext::Oid;
use nonempty::NonEmpty;
@@ -107,7 +108,7 @@ where
    // garbage-collected by Git.
    object
        .object
-
        .apply(&entry, storage)
+
        .apply(&entry, iter::empty(), storage)
        .map_err(error::Update::evaluate)?;
    object.history.extend(entry);

modified radicle-dag/src/lib.rs
@@ -12,6 +12,8 @@ use std::{
/// A node in the graph.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Node<K, V> {
+
    /// The node key.
+
    pub key: K,
    /// The node value, stored by the user.
    pub value: V,
    /// Nodes depended on.
@@ -21,8 +23,9 @@ pub struct Node<K, V> {
}

impl<K, V> Node<K, V> {
-
    fn new(value: V) -> Self {
+
    fn new(key: K, value: V) -> Self {
        Self {
+
            key,
            value,
            dependencies: BTreeSet::new(),
            dependents: BTreeSet::new(),
@@ -65,7 +68,7 @@ impl<K: Ord + Copy, V> Dag<K, V> {
    /// Create a DAG with a root node.
    pub fn root(key: K, value: V) -> Self {
        Self {
-
            graph: BTreeMap::from_iter([(key, Node::new(value))]),
+
            graph: BTreeMap::from_iter([(key, Node::new(key, value))]),
            tips: BTreeSet::from_iter([key]),
            roots: BTreeSet::from_iter([key]),
        }
@@ -88,6 +91,7 @@ impl<K: Ord + Copy, V> Dag<K, V> {
        self.graph.insert(
            key,
            Node {
+
                key,
                value,
                dependencies: BTreeSet::new(),
                dependents: BTreeSet::new(),
@@ -199,7 +203,11 @@ impl<K: Ord + Copy, V> Dag<K, V> {
    /// return [`ControlFlow::Break`].
    pub fn prune<F>(&mut self, roots: &[K], mut filter: F)
    where
-
        F: for<'r> FnMut(&'r K, &'r Node<K, V>) -> ControlFlow<()>,
+
        F: for<'r> FnMut(
+
            &'r K,
+
            &'r Node<K, V>,
+
            Box<dyn Iterator<Item = (&'r K, &'r Node<K, V>)> + 'r>,
+
        ) -> ControlFlow<()>,
    {
        let mut visited = BTreeSet::new();
        let mut result = VecDeque::new();
@@ -210,7 +218,12 @@ impl<K: Ord + Copy, V> Dag<K, V> {

        for next in result {
            if let Some(node) = self.graph.get(&next) {
-
                match filter(&next, node) {
+
                let siblings = self
+
                    .siblings_of(node)
+
                    .filter_map(|k| self.graph.get(k))
+
                    .map(|node| (&node.key, node));
+

+
                match filter(&next, node, Box::new(siblings)) {
                    ControlFlow::Continue(()) => {}
                    ControlFlow::Break(()) => {
                        // When pruning a node, we remove all transitive dependents on
@@ -311,6 +324,38 @@ impl<K: Ord + Copy, V> Dag<K, V> {
        nodes
    }

+
    fn ancestors_of(&self, from: &Node<K, V>) -> Vec<K> {
+
        let mut visited = BTreeSet::new();
+
        let mut stack = VecDeque::new();
+
        let mut nodes = Vec::new();
+

+
        stack.extend(from.dependencies.iter());
+

+
        while let Some(key) = stack.pop_front() {
+
            if let Some(node) = self.graph.get(&key) {
+
                if visited.insert(key) {
+
                    nodes.push(key);
+

+
                    for &neighbour in &node.dependencies {
+
                        stack.push_back(neighbour);
+
                    }
+
                }
+
            }
+
        }
+
        nodes
+
    }
+

+
    /// Get the nodes that are neither an ancestor nor a descendant of the given node.
+
    fn siblings_of(&self, node: &Node<K, V>) -> impl Iterator<Item = &K> {
+
        let ancestors = self.ancestors_of(node);
+
        let descendants = self.descendants_of(node);
+
        let key = node.key;
+

+
        self.graph
+
            .keys()
+
            .filter(move |k| !ancestors.contains(k) && !descendants.contains(k) && **k != key)
+
    }
+

    /// Add nodes recursively to the topological order, starting from the given node.
    fn visit(&self, key: &K, visited: &mut BTreeSet<K>, order: &mut VecDeque<K>) {
        if visited.insert(*key) {
@@ -779,7 +824,7 @@ mod tests {
        let a1 = dag.get(&"A1").unwrap();
        assert_eq!(dag.descendants_of(a1), vec!["B1", "C1", "D1"]);

-
        dag.prune(&["R"], |key, _| {
+
        dag.prune(&["R"], |key, _, _| {
            if key == &"B1" {
                ControlFlow::Break(())
            } else {
@@ -790,6 +835,53 @@ mod tests {
    }

    #[test]
+
    fn test_siblings() {
+
        let mut dag = Dag::new();
+

+
        dag.node("R", ());
+
        dag.node("A1", ());
+
        dag.node("A2", ());
+
        dag.node("A3", ());
+
        dag.node("A4", ());
+
        dag.node("B1", ());
+
        dag.node("C1", ());
+
        dag.node("C2", ());
+
        dag.node("C3", ());
+

+
        dag.dependency("A1", "R");
+
        dag.dependency("A2", "A1");
+
        dag.dependency("A3", "A2");
+

+
        dag.dependency("B1", "A2");
+

+
        dag.dependency("C1", "R");
+
        dag.dependency("C2", "C1");
+
        dag.dependency("C3", "C2");
+

+
        dag.dependency("A4", "B1");
+
        dag.dependency("A4", "C3");
+
        dag.dependency("A4", "A3");
+

+
        let siblings: Vec<_> = dag.siblings_of(dag.get(&"A3").unwrap()).copied().collect();
+
        assert_eq!(siblings, vec!["B1", "C1", "C2", "C3"]);
+

+
        let siblings: Vec<_> = dag.siblings_of(dag.get(&"A4").unwrap()).copied().collect();
+
        assert_eq!(siblings, Vec::<&str>::new());
+

+
        let siblings: Vec<_> = dag.siblings_of(dag.get(&"C1").unwrap()).copied().collect();
+
        assert_eq!(siblings, vec!["A1", "A2", "A3", "B1"]);
+

+
        let siblings: Vec<_> = dag.siblings_of(dag.get(&"C2").unwrap()).copied().collect();
+
        assert_eq!(siblings, vec!["A1", "A2", "A3", "B1"]);
+

+
        let siblings: Vec<_> = dag.siblings_of(dag.get(&"B1").unwrap()).copied().collect();
+
        assert_eq!(siblings, vec!["A3", "C1", "C2", "C3"]);
+

+
        let siblings: Vec<_> = dag.siblings_of(dag.get(&"R").unwrap()).copied().collect();
+
        assert_eq!(siblings, Vec::<&str>::new());
+
    }
+

+
    #[test]
    fn test_prune_2() {
        let mut dag = Dag::new();

@@ -817,7 +909,7 @@ mod tests {

        let mut order = VecDeque::new();

-
        dag.prune(&["R"], |key, _| {
+
        dag.prune(&["R"], |key, _, _| {
            order.push_back(*key);
            ControlFlow::Continue(())
        });
modified radicle/src/cob/identity.rs
@@ -355,11 +355,17 @@ impl store::Cob for Identity {
        Ok(Identity::new(revision))
    }

-
    fn op<R: ReadRepository>(&mut self, op: Op, repo: &R) -> Result<(), ApplyError> {
+
    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
+
        &mut self,
+
        op: Op,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), ApplyError> {
        let id = op.id;
+
        let concurrent = concurrent.into_iter().collect::<Vec<_>>();

        for action in op.actions {
-
            match self.action(action, id, op.author, op.timestamp, repo) {
+
            match self.action(action, id, op.author, op.timestamp, &concurrent, repo) {
                Ok(()) => {}
                // This particular error is returned when there is a mismatch between the expected
                // and the actual state of a revision, which can happen concurrently. Therefore
@@ -393,6 +399,7 @@ impl Identity {
        entry: EntryId,
        author: ActorId,
        timestamp: Timestamp,
+
        _concurrent: &[&cob::Entry],
        repo: &R,
    ) -> Result<(), ApplyError> {
        let current = self.current().clone();
@@ -588,10 +595,16 @@ impl<R: ReadRepository> cob::Evaluate<R> for Identity {
        Ok(object)
    }

-
    fn apply(&mut self, entry: &cob::Entry, repo: &R) -> Result<(), Self::Error> {
+
    fn apply<'a, I: Iterator<Item = (&'a EntryId, &'a cob::Entry)>>(
+
        &mut self,
+
        entry: &cob::Entry,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Self::Error> {
        let op = Op::try_from(entry)?;

-
        self.op(op, repo).map_err(Error::Apply)
+
        self.op(op, concurrent.map(|(_, e)| e), repo)
+
            .map_err(Error::Apply)
    }
}

modified radicle/src/cob/issue.rs
@@ -144,17 +144,31 @@ impl store::Cob for Issue {
        let mut issue = Issue::new(thread);

        for action in actions {
-
            issue.action(action, op.id, op.author, op.timestamp, &doc, repo)?;
+
            issue.action(action, op.id, op.author, op.timestamp, &[], &doc, repo)?;
        }
        Ok(issue)
    }

-
    fn op<R: ReadRepository>(&mut self, op: Op, repo: &R) -> Result<(), Error> {
+
    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
+
        &mut self,
+
        op: Op,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Error> {
        let doc = op.identity_doc(repo)?.ok_or(Error::MissingIdentity)?;
+
        let concurrent = concurrent.into_iter().collect::<Vec<_>>();
        for action in op.actions {
            match self.authorization(&action, &op.author, &doc)? {
                Authorization::Allow => {
-
                    self.action(action, op.id, op.author, op.timestamp, &doc, repo)?;
+
                    self.action(
+
                        action,
+
                        op.id,
+
                        op.author,
+
                        op.timestamp,
+
                        &concurrent,
+
                        &doc,
+
                        repo,
+
                    )?;
                }
                Authorization::Deny => {
                    return Err(Error::NotAuthorized(op.author, action));
@@ -178,10 +192,15 @@ impl<R: ReadRepository> cob::Evaluate<R> for Issue {
        Ok(object)
    }

-
    fn apply(&mut self, entry: &cob::Entry, repo: &R) -> Result<(), Self::Error> {
+
    fn apply<'a, I: Iterator<Item = (&'a EntryId, &'a cob::Entry)>>(
+
        &mut self,
+
        entry: &cob::Entry,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Self::Error> {
        let op = Op::try_from(entry)?;

-
        self.op(op, repo)
+
        self.op(op, concurrent.map(|(_, e)| e), repo)
    }
}

@@ -314,12 +333,13 @@ impl Issue {

impl Issue {
    /// Apply a single action to the issue.
-
    fn action<R: ReadRepository>(
+
    fn action<'a, R: ReadRepository>(
        &mut self,
        action: Action,
        entry: EntryId,
        author: ActorId,
        timestamp: Timestamp,
+
        _concurrent: &[&'a cob::Entry],
        _doc: &Doc<Verified>,
        _repo: &R,
    ) -> Result<(), Error> {
modified radicle/src/cob/patch.rs
@@ -696,12 +696,13 @@ impl Patch {

impl Patch {
    /// Apply a single action to the patch.
-
    fn action<R: ReadRepository>(
+
    fn action<'a, R: ReadRepository>(
        &mut self,
        action: Action,
        entry: EntryId,
        author: ActorId,
        timestamp: Timestamp,
+
        _concurrent: &[&'a cob::Entry],
        identity: &Doc<Verified>,
        repo: &R,
    ) -> Result<(), Error> {
@@ -1116,21 +1117,35 @@ impl store::Cob for Patch {
        let mut patch = Patch::new(title, target, (RevisionId(op.id), revision));

        for action in actions {
-
            patch.action(action, op.id, op.author, op.timestamp, &doc, repo)?;
+
            patch.action(action, op.id, op.author, op.timestamp, &[], &doc, repo)?;
        }
        Ok(patch)
    }

-
    fn op<R: ReadRepository>(&mut self, op: Op, repo: &R) -> Result<(), Error> {
+
    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
+
        &mut self,
+
        op: Op,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Error> {
        debug_assert!(!self.timeline.contains(&op.id));
        self.timeline.push(op.id);

        let doc = op.identity_doc(repo)?.ok_or(Error::MissingIdentity)?;
+
        let concurrent = concurrent.into_iter().collect::<Vec<_>>();

        for action in op.actions {
            match self.authorization(&action, &op.author, &doc)? {
                Authorization::Allow => {
-
                    self.action(action, op.id, op.author, op.timestamp, &doc, repo)?;
+
                    self.action(
+
                        action,
+
                        op.id,
+
                        op.author,
+
                        op.timestamp,
+
                        &concurrent,
+
                        &doc,
+
                        repo,
+
                    )?;
                }
                Authorization::Deny => {
                    return Err(Error::NotAuthorized(op.author, action));
@@ -1158,10 +1173,15 @@ impl<R: ReadRepository> cob::Evaluate<R> for Patch {
        Ok(object)
    }

-
    fn apply(&mut self, entry: &cob::Entry, repo: &R) -> Result<(), Self::Error> {
+
    fn apply<'a, I: Iterator<Item = (&'a EntryId, &'a cob::Entry)>>(
+
        &mut self,
+
        entry: &cob::Entry,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Self::Error> {
        let op = Op::try_from(entry)?;

-
        self.op(op, repo)
+
        self.op(op, concurrent.map(|(_, e)| e), repo)
    }
}

@@ -2603,11 +2623,11 @@ mod test {
        let mut patch = Patch::from_ops([a1, a2], &repo).unwrap();
        assert_eq!(patch.revisions().count(), 2);

-
        patch.op(a3, &repo).unwrap();
+
        patch.op(a3, [], &repo).unwrap();
        assert_eq!(patch.revisions().count(), 1);

-
        patch.op(a4, &repo).unwrap();
-
        patch.op(a5, &repo).unwrap();
+
        patch.op(a4, [], &repo).unwrap();
+
        patch.op(a5, [], &repo).unwrap();
    }

    #[test]
modified radicle/src/cob/store.rs
@@ -37,9 +37,10 @@ pub trait Cob: Sized + PartialEq + Debug {
    fn from_root<R: ReadRepository>(op: Op<Self::Action>, repo: &R) -> Result<Self, Self::Error>;

    /// Apply an operation to the state.
-
    fn op<R: ReadRepository>(
+
    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
        &mut self,
        op: Op<Self::Action>,
+
        concurrent: I,
        repo: &R,
    ) -> Result<(), <Self as Cob>::Error>;

@@ -65,7 +66,7 @@ pub trait Cob: Sized + PartialEq + Debug {
        };
        let mut state = Self::from_root(init, repo)?;
        for op in ops {
-
            state.op(op, repo)?;
+
            state.op(op, [].into_iter(), repo)?;
        }
        Ok(state)
    }
@@ -388,7 +389,7 @@ pub mod test {
        let obj = history.traverse(initial, &children, |mut acc, _, entry| {
            match Op::try_from(entry) {
                Ok(op) => {
-
                    if let Err(err) = acc.op(op, repo) {
+
                    if let Err(err) = acc.op(op, [].into_iter(), repo) {
                        log::warn!("Error applying op to `{}` state: {err}", T::type_name());
                        return ControlFlow::Break(acc);
                    }
modified radicle/src/cob/thread.rs
@@ -347,6 +347,7 @@ impl Thread {
        entry: EntryId,
        author: ActorId,
        timestamp: Timestamp,
+
        _concurrent: &[&cob::Entry],
        _identity: git::Oid,
        _repo: &R,
    ) -> Result<(), Error> {
@@ -423,15 +424,29 @@ impl cob::store::Cob for Thread {
        )?;

        for action in actions {
-
            thread.action(action, entry, author, timestamp, identity, repo)?;
+
            thread.action(action, entry, author, timestamp, &[], identity, repo)?;
        }
        Ok(thread)
    }

-
    fn op<R: ReadRepository>(&mut self, op: Op<Action>, repo: &R) -> Result<(), Error> {
+
    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
+
        &mut self,
+
        op: Op<Action>,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Error> {
        let identity = op.identity.ok_or(Error::MissingIdentity)?;
+
        let concurrent = concurrent.into_iter().collect::<Vec<_>>();
        for action in op.actions {
-
            self.action(action, op.id, op.author, op.timestamp, identity, repo)?;
+
            self.action(
+
                action,
+
                op.id,
+
                op.author,
+
                op.timestamp,
+
                &concurrent,
+
                identity,
+
                repo,
+
            )?;
        }
        Ok(())
    }
@@ -447,10 +462,15 @@ impl<R: ReadRepository> cob::Evaluate<R> for Thread {
        Ok(object)
    }

-
    fn apply(&mut self, entry: &cob::Entry, repo: &R) -> Result<(), Self::Error> {
+
    fn apply<'a, I: Iterator<Item = (&'a EntryId, &'a cob::Entry)>>(
+
        &mut self,
+
        entry: &cob::Entry,
+
        concurrent: I,
+
        repo: &R,
+
    ) -> Result<(), Self::Error> {
        let op = Op::try_from(entry)?;

-
        self.op(op, repo)
+
        self.op(op, concurrent.map(|(_, e)| e), repo)
    }
}

@@ -677,7 +697,7 @@ mod tests {

        // Redact the second comment.
        let a3 = alice.redact(a1.id());
-
        thread.op(a3, &repo).unwrap();
+
        thread.op(a3, [], &repo).unwrap();

        let (_, comment0) = thread.comments().nth(0).unwrap();
        let (_, comment1) = thread.comments().nth(1).unwrap();
@@ -885,7 +905,7 @@ mod tests {
        let mut t = Thread::default();
        let id = arbitrary::entry_id();

-
        t.op(alice.redact(id), &repo).unwrap_err();
+
        t.op(alice.redact(id), [], &repo).unwrap_err();
    }

    #[test]
@@ -895,7 +915,7 @@ mod tests {
        let mut t = Thread::default();
        let id = arbitrary::entry_id();

-
        t.op(alice.edit(id, "Edited"), &repo).unwrap_err();
+
        t.op(alice.edit(id, "Edited"), [], &repo).unwrap_err();
    }

    #[test]