Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
cob: provide stable ordering of concurrent values
Merged fintohaps opened 1 year ago

The underlying Dag for a COB will use the Ord implementation of whatever is provided as the key. This means that the lexicographical ordering of the Oid SHA will be used, which in turn means that if the SHAs change in our tests, then the ordering will be broken for concurrent updates.

To prevent this from happening, a new key is used EntryKey, which is the combination of an Oid and a Timestamp – the commit timestamp. The key is used in place of EntryId as the Dag key.

This introduction meant that in some places it was necessary to have a way of getting the timestamp of an EntryId. The TimestampOf trait is used in these cases, which means that some infallible functions became fallible.

2 files changed +130 -17 de1958fa b4f2614d
modified radicle-cob/src/change_graph.rs
@@ -1,7 +1,7 @@
// Copyright © 2021 The Radicle Link Contributors

-
use std::collections::BTreeSet;
use std::ops::ControlFlow;
+
use std::{cmp::Ordering, collections::BTreeSet};

use git_ext::Oid;
use radicle_dag::Dag;
@@ -113,20 +113,24 @@ impl ChangeGraph {
        let manifest = root.manifest.clone();
        let root = root.id;

-
        self.graph.prune(&children, |_, entry, siblings| {
-
            // Check the entry signatures are valid.
-
            if !entry.valid_signatures() {
-
                return ControlFlow::Break(());
-
            }
-
            // Apply the entry to the state, and if there's an error, prune that branch.
-
            if object
-
                .apply(entry, siblings.map(|(k, n)| (k, &n.value)), store)
-
                .is_err()
-
            {
-
                return ControlFlow::Break(());
-
            }
-
            ControlFlow::Continue(())
-
        });
+
        self.graph.prune_by(
+
            &children,
+
            |_, entry, siblings| {
+
                // Check the entry signatures are valid.
+
                if !entry.valid_signatures() {
+
                    return ControlFlow::Break(());
+
                }
+
                // Apply the entry to the state, and if there's an error, prune that branch.
+
                if object
+
                    .apply(entry, siblings.map(|(k, n)| (k, &n.value)), store)
+
                    .is_err()
+
                {
+
                    return ControlFlow::Break(());
+
                }
+
                ControlFlow::Continue(())
+
            },
+
            Self::chronological,
+
        );

        Ok(CollaborativeObject {
            manifest,
@@ -144,6 +148,10 @@ impl ChangeGraph {
    pub(crate) fn number_of_nodes(&self) -> usize {
        self.graph.len()
    }
+

+
    fn chronological(x: (&Oid, &Entry), y: (&Oid, &Entry)) -> Ordering {
+
        x.1.timestamp.cmp(&y.1.timestamp).then(x.0.cmp(y.0))
+
    }
}

struct GraphBuilder {
modified radicle-dag/src/lib.rs
@@ -201,7 +201,7 @@ impl<K: Ord + Copy, V> Dag<K, V> {
    /// To continue traversing a branch, return [`ControlFlow::Continue`] from the
    /// filter function. To stop traversal of a branch and prune it,
    /// return [`ControlFlow::Break`].
-
    pub fn prune<F>(&mut self, roots: &[K], mut filter: F)
+
    pub fn prune<F>(&mut self, roots: &[K], filter: F)
    where
        F: for<'r> FnMut(
            &'r K,
@@ -209,11 +209,30 @@ impl<K: Ord + Copy, V> Dag<K, V> {
            Box<dyn Iterator<Item = (&'r K, &'r Node<K, V>)> + 'r>,
        ) -> ControlFlow<()>,
    {
+
        self.prune_by(roots, filter, |(k1, _), (k2, _)| k1.cmp(k2))
+
    }
+

+
    /// Fold over the graph in the order provided by `order`, pruning branches
+
    /// along the way.
+
    /// This is a depth-first traversal.
+
    ///
+
    /// To continue traversing a branch, return [`ControlFlow::Continue`] from the
+
    /// filter function. To stop traversal of a branch and prune it,
+
    /// return [`ControlFlow::Break`].
+
    pub fn prune_by<F, P>(&mut self, roots: &[K], mut filter: F, ordering: P)
+
    where
+
        F: for<'r> FnMut(
+
            &'r K,
+
            &'r Node<K, V>,
+
            Box<dyn Iterator<Item = (&'r K, &'r Node<K, V>)> + 'r>,
+
        ) -> ControlFlow<()>,
+
        P: Fn((&K, &V), (&K, &V)) -> Ordering,
+
    {
        let mut visited = BTreeSet::new();
        let mut result = VecDeque::new();

        for root in roots {
-
            self.visit(root, &mut visited, &mut result);
+
            self.visit_by(root, &mut visited, &mut result, &ordering);
        }

        for next in result {
@@ -369,6 +388,32 @@ impl<K: Ord + Copy, V> Dag<K, V> {
            order.push_front(*key);
        }
    }
+

+
    /// Add nodes recursively to the provided ordering, starting from the given node.
+
    fn visit_by(
+
        &self,
+
        key: &K,
+
        visited: &mut BTreeSet<K>,
+
        order: &mut VecDeque<K>,
+
        ordering: &impl Fn((&K, &V), (&K, &V)) -> Ordering,
+
    ) {
+
        if visited.insert(*key) {
+
            // Recursively visit all of the node's dependents.
+
            if let Some(node) = self.graph.get(key) {
+
                let mut dependents: Vec<&Node<K, V>> = node
+
                    .dependents
+
                    .iter()
+
                    .filter_map(|k| self.get(k))
+
                    .collect::<Vec<_>>();
+
                dependents.sort_by(|x, y| ordering((&x.key, &x.value), (&y.key, &y.value)));
+
                for dependent in dependents.iter().rev() {
+
                    self.visit_by(&dependent.key, visited, order, ordering);
+
                }
+
            }
+
            // Add the node to the topological order.
+
            order.push_front(*key);
+
        }
+
    }
}

impl<K: Ord + Copy + fmt::Display, V> Dag<K, V> {
@@ -915,4 +960,64 @@ mod tests {
        });
        assert_eq!(order, dag.sorted());
    }
+

+
    #[test]
+
    fn test_prune_by_sorting() {
+
        let mut dag = Dag::new();
+

+
        dag.node("R", 0);
+
        dag.node("A1", 1);
+
        dag.node("A2", 2);
+
        dag.node("A3", 3);
+
        dag.node("B1", 1);
+
        dag.node("B2", 2);
+
        dag.node("B3", 3);
+
        dag.node("C1", 1);
+

+
        dag.dependency("A1", "R");
+
        dag.dependency("A2", "R");
+
        dag.dependency("A3", "R");
+

+
        dag.dependency("B1", "A1");
+
        dag.dependency("B2", "A1");
+
        dag.dependency("B3", "A2");
+
        dag.dependency("B3", "A3");
+

+
        dag.dependency("C1", "B1");
+
        dag.dependency("C1", "B2");
+
        dag.dependency("C1", "B3");
+

+
        let mut order = Vec::new();
+
        dag.prune_by(
+
            &["R"],
+
            |key, _, _| {
+
                order.push(*key);
+
                ControlFlow::Continue(())
+
            },
+
            |(a, _), (b, _)| a.cmp(b),
+
        );
+
        assert_eq!(order, vec!["R", "A1", "B1", "B2", "A2", "A3", "B3", "C1"]);
+

+
        let mut order = Vec::new();
+
        dag.prune_by(
+
            &["R"],
+
            |key, _, _| {
+
                order.push(*key);
+
                ControlFlow::Continue(())
+
            },
+
            |(a, _), (b, _)| a.cmp(b).reverse(),
+
        );
+
        assert_eq!(order, vec!["R", "A3", "A2", "B3", "A1", "B2", "B1", "C1"]);
+

+
        let mut order = Vec::new();
+
        dag.prune_by(
+
            &["R"],
+
            |key, _, _| {
+
                order.push(*key);
+
                ControlFlow::Continue(())
+
            },
+
            |(_, a), (_, b)| a.cmp(b).reverse(),
+
        );
+
        assert_eq!(order, vec!["R", "A3", "A2", "B3", "A1", "B2", "B1", "C1"]);
+
    }
}