Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cob: Simplify graph evaluation
Alexis Sellier committed 2 years ago
commit 0e14dacb995191be3eef42054d393d78dc9c4a9c
parent 0361ca38187bd43dd74396c8ae6df437d7444f37
11 files changed +391 -416
modified radicle-cob/src/change_graph.rs
@@ -1,17 +1,16 @@
// Copyright © 2021 The Radicle Link Contributors

+
use std::ops::ControlFlow;
use std::{collections::BTreeSet, convert::TryInto};

use git_ext::Oid;
-
use radicle_dag::{Dag, Node};
+
use radicle_dag::Dag;

use crate::{
-
    change, object, signatures::ExtendedSignature, Change, CollaborativeObject, ObjectId, TypeName,
+
    change, history::EntryId, object, signatures::ExtendedSignature, Change, CollaborativeObject,
+
    Entry, History, ObjectId, TypeName,
};

-
mod evaluation;
-
use evaluation::evaluate;
-

/// The graph of changes for a particular collaborative object
pub(super) struct ChangeGraph {
    object_id: ObjectId,
@@ -82,19 +81,46 @@ impl ChangeGraph {

    /// Given a graph evaluate it to produce a collaborative object. This will
    /// filter out branches of the graph which do not have valid signatures.
-
    pub(crate) fn evaluate(&self) -> CollaborativeObject {
-
        let mut roots: Vec<(&Oid, &Node<_, _>)> = self.graph.roots().collect();
-
        roots.sort_by_key(|(k, _)| *k);
-
        // This is okay because we check that the graph has a root node in
-
        // GraphBuilder::build
-
        let (root, root_node) = roots.first().unwrap();
+
    pub(crate) fn evaluate(self) -> CollaborativeObject {
+
        let root = *self.object_id;
+
        let root_node = self
+
            .graph
+
            .get(&root)
+
            .expect("ChangeGraph::evaluate: root must be part of change graph");
+

        let manifest = root_node.manifest.clone();
-
        let rng = fastrand::Rng::new();
-
        let history = evaluate(*self.graph[*root].id(), &self.graph, rng);
+
        let graph = self
+
            .graph
+
            .fold(&root, Dag::new(), |mut graph, _, change, depth| {
+
                // Check the change signatures are valid.
+
                if !change.valid_signatures() {
+
                    return ControlFlow::Break(graph);
+
                }
+
                let clock = depth as u64 + 1;
+
                let entry = Entry::new(
+
                    *change.id(),
+
                    change.signature.key,
+
                    change.resource,
+
                    change.contents().clone(),
+
                    change.timestamp,
+
                    clock,
+
                );
+
                let id = *entry.id();
+

+
                graph.node(id, entry);
+

+
                for k in &change.dependents {
+
                    graph.dependency(EntryId::from(*k), id);
+
                }
+
                for k in &change.dependencies {
+
                    graph.dependency(id, EntryId::from(*k));
+
                }
+
                ControlFlow::Continue(graph)
+
            });

        CollaborativeObject {
            manifest,
-
            history,
+
            history: History::new((*root).into(), graph),
            id: self.object_id,
        }
    }
deleted radicle-cob/src/change_graph/evaluation.rs
@@ -1,96 +0,0 @@
-
// Copyright © 2021 The Radicle Link Contributors
-

-
use std::{collections::HashMap, ops::ControlFlow};
-

-
use git_ext::Oid;
-
use radicle_dag::Dag;
-

-
use crate::history::entry::{EntryId, EntryWithClock};
-
use crate::{change::Change, history, pruning_fold};
-

-
/// # Panics
-
///
-
/// If the change corresponding to the root OID is not in `items`
-
pub fn evaluate(root: Oid, graph: &Dag<Oid, Change>, rng: fastrand::Rng) -> history::History {
-
    let entries = pruning_fold::pruning_fold(
-
        HashMap::<EntryId, EntryWithClock>::new(),
-
        graph.sorted(rng).into_iter().map(|oid| {
-
            let node = &graph[&oid];
-
            let child_commits = node.dependents.iter().copied().collect();
-

-
            ChangeWithChildren {
-
                oid,
-
                change: &node.value,
-
                child_commits,
-
            }
-
        }),
-
        |mut entries, c| match evaluate_change(c.change, &c.child_commits) {
-
            Err(RejectionReason::InvalidSignatures) => {
-
                log::warn!(
-
                    "rejecting change '{}' because its signatures were invalid",
-
                    c.change.id(),
-
                );
-
                ControlFlow::Break(entries)
-
            }
-
            Ok(entry) => {
-
                // Get parent commits and calculate this node's clock based on theirs.
-
                let clock = graph[&c.oid]
-
                    .dependencies
-
                    .iter()
-
                    .map(|e| entries[&EntryId::from(*e)].clock())
-
                    .max()
-
                    .unwrap_or_default() // When there are no operations, the clock is zero.
-
                    + 1;
-
                log::trace!("change '{}' accepted", c.change.id());
-

-
                entries.insert(*entry.id(), EntryWithClock { entry, clock });
-

-
                ControlFlow::Continue(entries)
-
            }
-
        },
-
    );
-
    // SAFETY: The caller must guarantee that `root` is in `items`
-
    history::History::new(root, entries).unwrap()
-
}
-

-
fn evaluate_change(
-
    change: &Change,
-
    child_commits: &[Oid],
-
) -> Result<history::Entry, RejectionReason> {
-
    // Check the change signatures are valid
-
    if !change.valid_signatures() {
-
        return Err(RejectionReason::InvalidSignatures);
-
    };
-

-
    Ok(history::Entry::new(
-
        *change.id(),
-
        change.signature.key,
-
        change.resource,
-
        child_commits.iter().cloned(),
-
        change.contents().clone(),
-
        change.timestamp,
-
    ))
-
}
-

-
struct ChangeWithChildren<'a> {
-
    oid: Oid,
-
    change: &'a Change,
-
    child_commits: Vec<Oid>,
-
}
-

-
impl<'a> pruning_fold::GraphNode for ChangeWithChildren<'a> {
-
    type Id = Oid;
-

-
    fn id(&self) -> &Self::Id {
-
        self.change.id()
-
    }
-

-
    fn child_ids(&self) -> &[Self::Id] {
-
        &self.child_commits
-
    }
-
}
-

-
#[derive(Debug)]
-
enum RejectionReason {
-
    InvalidSignatures,
-
}
modified radicle-cob/src/history.rs
@@ -1,23 +1,19 @@
// Copyright © 2021 The Radicle Link Contributors

-
use std::{
-
    collections::{BTreeSet, HashMap},
-
    ops::ControlFlow,
-
};
+
use std::{cmp::Ordering, collections::BTreeSet, ops::ControlFlow};

use git_ext::Oid;
use radicle_crypto::PublicKey;
use radicle_dag::Dag;

-
use crate::pruning_fold;
-

pub mod entry;
-
pub use entry::{Clock, Contents, Entry, EntryId, EntryWithClock, Timestamp};
+
pub use entry::{Clock, Contents, Entry, EntryId, Timestamp};

/// The DAG of changes making up the history of a collaborative object.
#[derive(Clone, Debug)]
pub struct History {
-
    graph: Dag<EntryId, EntryWithClock>,
+
    graph: Dag<EntryId, Entry>,
+
    root: EntryId,
}

impl PartialEq for History {
@@ -28,13 +24,16 @@ impl PartialEq for History {

impl Eq for History {}

-
#[derive(Debug, thiserror::Error)]
-
pub enum CreateError {
-
    #[error("no entry for the root ID in the entries")]
-
    MissingRoot,
-
}
-

impl History {
+
    /// Create a new history from a DAG. Panics if the root is not part of the graph.
+
    pub fn new(root: EntryId, graph: Dag<EntryId, Entry>) -> Self {
+
        assert!(
+
            graph.contains(&root),
+
            "History::new: root must be present in graph"
+
        );
+
        Self { root, graph }
+
    }
+

    pub fn new_from_root<Id>(
        id: Id,
        actor: PublicKey,
@@ -46,29 +45,18 @@ impl History {
        Id: Into<EntryId>,
    {
        let id = id.into();
-
        let root_entry = Entry {
+
        let root = Entry {
            id,
            actor,
            resource,
-
            children: vec![],
            contents,
            timestamp,
+
            clock: 1,
        };
-
        let mut entries = HashMap::new();
-
        entries.insert(id, EntryWithClock::root(root_entry));
-

-
        create_dag(&id, &entries)
-
    }

-
    pub fn new<Id>(root: Id, entries: HashMap<EntryId, EntryWithClock>) -> Result<Self, CreateError>
-
    where
-
        Id: Into<EntryId>,
-
    {
-
        let root = root.into();
-
        if !entries.contains_key(&root) {
-
            Err(CreateError::MissingRoot)
-
        } else {
-
            Ok(create_dag(&root, &entries))
+
        Self {
+
            root: id,
+
            graph: Dag::root(id, root),
        }
    }

@@ -92,30 +80,37 @@ impl History {
            .unwrap_or_default()
    }

+
    /// Get all the tips of the graph.
+
    pub fn tips(&self) -> BTreeSet<Oid> {
+
        self.graph
+
            .tips()
+
            .map(|(_, entry)| (*entry.id()).into())
+
            .collect()
+
    }
+

    /// A topological (parents before children) traversal of the dependency
    /// graph of this history. This is analagous to
    /// [`std::iter::Iterator::fold`] in that it folds every change into an
    /// accumulator value of type `A`. However, unlike `fold` the function `f`
    /// may prune branches from the dependency graph by returning
    /// `ControlFlow::Break`.
-
    pub fn traverse<F, A>(&self, init: A, f: F) -> A
+
    pub fn traverse<F, A>(&self, init: A, mut f: F) -> A
    where
-
        F: for<'r> FnMut(A, &'r EntryWithClock) -> ControlFlow<A, A>,
+
        F: for<'r> FnMut(A, &'r EntryId, &'r Entry) -> ControlFlow<A, A>,
    {
-
        let items = self
-
            .graph
-
            .sorted(fastrand::Rng::new())
-
            .into_iter()
-
            .map(|idx| &self.graph[&idx]);
-

-
        pruning_fold::pruning_fold(init, items, f)
+
        self.graph
+
            .fold(&self.root, init, |acc, k, v, _| f(acc, k, v))
    }

-
    pub fn tips(&self) -> BTreeSet<Oid> {
+
    pub fn sorted<F>(&self, compare: F) -> impl Iterator<Item = &Entry>
+
    where
+
        F: FnMut(&EntryId, &EntryId) -> Ordering,
+
    {
        self.graph
-
            .tips()
-
            .map(|(_, entry)| (*entry.id()).into())
-
            .collect()
+
            .sorted(compare)
+
            .into_iter()
+
            .filter_map(|k| self.graph.get(&k))
+
            .map(|node| &node.value)
    }

    pub fn extend<Id>(
@@ -134,17 +129,12 @@ impl History {
            new_id,
            new_actor,
            new_resource,
-
            std::iter::empty::<git2::Oid>(),
            new_contents,
            new_timestamp,
+
            self.clock() + 1,
        );
-
        self.graph.node(
-
            new_id,
-
            EntryWithClock {
-
                entry: new_entry,
-
                clock: self.clock() + 1,
-
            },
-
        );
+
        self.graph.node(new_id, new_entry);
+

        for tip in tips {
            self.graph.dependency(new_id, (*tip).into());
        }
@@ -154,19 +144,3 @@ impl History {
        self.graph.merge(other.graph);
    }
}
-

-
fn create_dag<'a>(root: &'a EntryId, entries: &'a HashMap<EntryId, EntryWithClock>) -> History {
-
    let root_entry = entries.get(root).unwrap().clone();
-
    let mut graph: Dag<EntryId, EntryWithClock> = Dag::root(*root, root_entry.clone());
-
    let mut to_process = vec![root_entry];
-

-
    while let Some(entry) = to_process.pop() {
-
        for child_id in entry.children() {
-
            let child = entries[child_id].clone();
-
            graph.node(*child_id, child.clone());
-
            graph.dependency(*child_id, entry.id);
-
            to_process.push(child.clone());
-
        }
-
    }
-
    History { graph }
-
}
modified radicle-cob/src/history/entry.rs
@@ -9,7 +9,6 @@ use nonempty::NonEmpty;
use radicle_crypto::PublicKey;
use serde::{Deserialize, Serialize};

-
use crate::pruning_fold;
use crate::{object, ObjectId};

/// Entry contents.
@@ -90,43 +89,36 @@ pub struct Entry {
    /// The content-address for the resource this entry lives under.
    /// If the resource was updated, this should point to its latest version.
    pub(super) resource: Oid,
-
    /// The child entries for this entry.
-
    pub(super) children: Vec<EntryId>,
    /// The contents of this entry.
    pub(super) contents: Contents,
    /// The entry timestamp, as seconds since epoch.
    pub(super) timestamp: Timestamp,
+
    /// Logical clock.
+
    pub(super) clock: Clock,
}

impl Entry {
-
    pub fn new<Id1, Id2, ChildIds>(
-
        id: Id1,
+
    pub fn new<Id>(
+
        id: Id,
        actor: PublicKey,
        resource: Oid,
-
        children: ChildIds,
        contents: Contents,
        timestamp: Timestamp,
+
        clock: Clock,
    ) -> Self
    where
-
        Id1: Into<EntryId>,
-
        Id2: Into<EntryId>,
-
        ChildIds: IntoIterator<Item = Id2>,
+
        Id: Into<EntryId>,
    {
        Self {
            id: id.into(),
            actor,
            resource,
-
            children: children.into_iter().map(|id| id.into()).collect(),
            contents,
            timestamp,
+
            clock,
        }
    }

-
    /// The ids of the changes this change depends on
-
    pub fn children(&self) -> impl Iterator<Item = &EntryId> {
-
        self.children.iter()
-
    }
-

    /// The current `Oid` of the resource this change lives under.
    pub fn resource(&self) -> Oid {
        self.resource
@@ -147,67 +139,13 @@ impl Entry {
        &self.contents
    }

+
    /// Entry ID.
    pub fn id(&self) -> &EntryId {
        &self.id
    }
-
}
-

-
impl pruning_fold::GraphNode for Entry {
-
    type Id = EntryId;
-

-
    fn id(&self) -> &Self::Id {
-
        &self.id
-
    }
-

-
    fn child_ids(&self) -> &[Self::Id] {
-
        &self.children
-
    }
-
}
-

-
/// Wraps an [`Entry`], adding a logical clock to it.
-
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
-
pub struct EntryWithClock {
-
    pub entry: Entry,
-
    pub clock: Clock,
-
}

-
impl EntryWithClock {
-
    pub fn root(entry: Entry) -> Self {
-
        Self {
-
            entry,
-
            clock: 1 as Clock, // The root entry has a clock value of `1`.
-
        }
-
    }
-
}
-

-
impl EntryWithClock {
-
    /// Get the clock value.
+
    /// Logical clock.
    pub fn clock(&self) -> Clock {
        self.clock
    }
-

-
    /// Iterator over the changes, including the clock.
-
    pub fn changes(&self) -> impl Iterator<Item = &[u8]> {
-
        self.contents.iter().map(|blob| blob.as_slice())
-
    }
-
}
-

-
impl pruning_fold::GraphNode for EntryWithClock {
-
    type Id = EntryId;
-

-
    fn id(&self) -> &Self::Id {
-
        &self.entry.id
-
    }
-

-
    fn child_ids(&self) -> &[Self::Id] {
-
        &self.entry.children
-
    }
-
}
-

-
impl std::ops::Deref for EntryWithClock {
-
    type Target = Entry;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.entry
-
    }
}
modified radicle-cob/src/lib.rs
@@ -85,8 +85,6 @@ pub use change::Change;
pub mod history;
pub use history::{Contents, Entry, History};

-
mod pruning_fold;
-

pub mod signatures;
use signatures::ExtendedSignature;

deleted radicle-cob/src/pruning_fold.rs
@@ -1,97 +0,0 @@
-
// Copyright © 2021 The Radicle Link Contributors
-

-
use std::{
-
    borrow::Borrow,
-
    collections::{BTreeSet, HashMap},
-
    ops::ControlFlow,
-
};
-

-
pub(crate) trait GraphNode {
-
    type Id: Clone + Eq + Ord + std::hash::Hash + std::fmt::Debug;
-

-
    fn id(&self) -> &Self::Id;
-
    fn child_ids(&self) -> &[Self::Id];
-
}
-

-
/// Fold a topological sort of a directed acyclic graph, pruning some branches.
-
///
-
/// `items` must be an iterator over the nodes of the graph in topological
-
/// order. Assuming this is the case `fold` will only be called with nodes whose
-
/// ancestors have already been evaluated. Returning `ControlFlow::Break(..)`
-
/// from `fold` will omit evaluation of the current node and consequently omit
-
/// processing of any nodes who have the current node as an ancestor.
-
pub(crate) fn pruning_fold<'a, BN, Node, It, F, O>(init: O, items: It, mut f: F) -> O
-
where
-
    BN: Borrow<Node> + 'a,
-
    Node: 'a + GraphNode,
-
    It: Iterator<Item = BN>,
-
    F: for<'r> FnMut(O, &'r Node) -> std::ops::ControlFlow<O, O>,
-
{
-
    let mut rejected = RejectedNodes::new();
-
    let mut state = init;
-
    for node in items {
-
        // There can be multiple paths to a change so in a topological traversal we
-
        // might encounter a change which we have already rejected
-
        // previously
-
        if rejected.is_rejected(node.borrow().id()) {
-
            continue;
-
        }
-
        if let Some(rejected_ancestor) = rejected.rejected_ancestor(node.borrow().id()) {
-
            let ancestor = rejected_ancestor.clone();
-
            log::warn!(
-
                "rejecting node because an ancestor change was rejected id='{:?}', ancestor='{:?}'",
-
                node.borrow().id(),
-
                rejected_ancestor
-
            );
-
            for child in node.borrow().child_ids() {
-
                rejected.transitively_reject(child, &ancestor);
-
            }
-
            continue;
-
        }
-
        state = match f(state, node.borrow()) {
-
            ControlFlow::Continue(state) => state,
-
            ControlFlow::Break(state) => {
-
                rejected.directly_reject(node.borrow().id(), node.borrow().child_ids());
-
                state
-
            }
-
        };
-
    }
-
    state
-
}
-

-
struct RejectedNodes<NodeId> {
-
    /// Changes which are directly rejected by the fold function
-
    direct: BTreeSet<NodeId>,
-
    /// A map from node IDs to the IDs of ancestor nodes which are
-
    /// direct rejections
-
    transitive: HashMap<NodeId, NodeId>,
-
}
-

-
impl<NodeId: Clone + Eq + Ord + std::hash::Hash> RejectedNodes<NodeId> {
-
    fn new() -> RejectedNodes<NodeId> {
-
        RejectedNodes {
-
            direct: BTreeSet::new(),
-
            transitive: HashMap::new(),
-
        }
-
    }
-

-
    fn rejected_ancestor(&self, node: &NodeId) -> Option<&NodeId> {
-
        self.transitive.get(node)
-
    }
-

-
    fn is_rejected(&self, node: &NodeId) -> bool {
-
        self.direct.contains(node)
-
    }
-

-
    fn directly_reject(&mut self, node: &NodeId, children: &[NodeId]) {
-
        self.direct.insert(node.clone());
-
        for child in children {
-
            self.transitive.insert(child.clone(), node.clone());
-
        }
-
    }
-

-
    fn transitively_reject(&mut self, child: &NodeId, rejected_ancestor: &NodeId) {
-
        self.transitive
-
            .insert(child.clone(), rejected_ancestor.clone());
-
    }
-
}
modified radicle-cob/src/tests.rs
@@ -207,7 +207,7 @@ fn traverse_cobs() {
    .unwrap();

    // traverse over the history and filter by changes that were only authorized by terry
-
    let contents = object.history().traverse(Vec::new(), |mut acc, entry| {
+
    let contents = object.history().traverse(Vec::new(), |mut acc, _, entry| {
        if entry.actor() == terry_signer.public_key() {
            acc.push(entry.contents().head.clone());
        }
@@ -217,7 +217,7 @@ fn traverse_cobs() {
    assert_eq!(contents, vec![b"issue 1".to_vec()]);

    // traverse over the history and filter by changes that were only authorized by neil
-
    let contents = object.history().traverse(Vec::new(), |mut acc, entry| {
+
    let contents = object.history().traverse(Vec::new(), |mut acc, _, entry| {
        acc.push(entry.contents().head.clone());
        ControlFlow::Continue(acc)
    });
modified radicle-dag/src/lib.rs
@@ -1,28 +1,31 @@
+
//! Directed-acyclic graph implementation.
+
#![warn(missing_docs)]
use std::{
    borrow::Borrow,
-
    collections::{HashMap, HashSet},
+
    cmp::Ordering,
+
    collections::{BTreeMap, BTreeSet, VecDeque},
    fmt,
    hash::Hash,
-
    ops::{Deref, Index},
+
    ops::{ControlFlow, Deref, Index},
};

/// A node in the graph.
#[derive(Clone, Debug, PartialEq, Eq)]
-
pub struct Node<K: Eq + Hash, V> {
+
pub struct Node<K: Eq, V> {
    /// The node value, stored by the user.
    pub value: V,
    /// Nodes depended on.
-
    pub dependencies: HashSet<K>,
+
    pub dependencies: BTreeSet<K>,
    /// Nodes depending on this node.
-
    pub dependents: HashSet<K>,
+
    pub dependents: BTreeSet<K>,
}

impl<K: Eq + Hash, V> Node<K, V> {
    fn new(value: V) -> Self {
        Self {
            value,
-
            dependencies: HashSet::new(),
-
            dependents: HashSet::new(),
+
            dependencies: BTreeSet::new(),
+
            dependents: BTreeSet::new(),
        }
    }
}
@@ -44,26 +47,27 @@ impl<K: Eq + Hash, V> Deref for Node<K, V> {
/// A directed acyclic graph.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Dag<K: Eq + Hash, V> {
-
    graph: HashMap<K, Node<K, V>>,
-
    tips: HashSet<K>,
-
    roots: HashSet<K>,
+
    graph: BTreeMap<K, Node<K, V>>,
+
    tips: BTreeSet<K>,
+
    roots: BTreeSet<K>,
}

-
impl<K: Eq + Copy + Hash, V> Dag<K, V> {
+
impl<K: PartialOrd + Ord + Eq + Copy + Hash, V> Dag<K, V> {
    /// Create a new empty DAG.
    pub fn new() -> Self {
        Self {
-
            graph: HashMap::new(),
-
            tips: HashSet::new(),
-
            roots: HashSet::new(),
+
            graph: BTreeMap::new(),
+
            tips: BTreeSet::new(),
+
            roots: BTreeSet::new(),
        }
    }

+
    /// Create a DAG with a root node.
    pub fn root(key: K, value: V) -> Self {
        Self {
-
            graph: HashMap::from_iter([(key, Node::new(value))]),
-
            tips: HashSet::from_iter([key]),
-
            roots: HashSet::from_iter([key]),
+
            graph: BTreeMap::from_iter([(key, Node::new(value))]),
+
            tips: BTreeSet::from_iter([key]),
+
            roots: BTreeSet::from_iter([key]),
        }
    }

@@ -85,8 +89,8 @@ impl<K: Eq + Copy + Hash, V> Dag<K, V> {
            key,
            Node {
                value,
-
                dependencies: HashSet::new(),
-
                dependents: HashSet::new(),
+
                dependencies: BTreeSet::new(),
+
                dependents: BTreeSet::new(),
            },
        )
    }
@@ -136,30 +140,43 @@ impl<K: Eq + Copy + Hash, V> Dag<K, V> {
    }

    /// Merge a DAG into this one.
-
    ///
-
    /// If a key exists in both graphs, its value is set to that of the other graph.
-
    pub fn merge(&mut self, other: Self) {
-
        for k in other.tips.into_iter() {
-
            self.tips.insert(k);
-
        }
-
        for k in other.roots.into_iter() {
-
            self.roots.insert(k);
-
        }
-
        for (k, v) in other.graph.into_iter() {
-
            self.graph.insert(k, v);
+
    pub fn merge(&mut self, mut other: Self) {
+
        let Some((root, _)) = other.roots().next() else {
+
            return;
+
        };
+
        let mut visited = BTreeSet::new();
+
        let mut queue = VecDeque::<K>::from([*root]);
+

+
        while let Some(next) = queue.pop_front() {
+
            if !visited.insert(next) {
+
                continue;
+
            }
+
            if let Some(node) = other.graph.remove(&next) {
+
                if !self.contains(&next) {
+
                    self.node(next, node.value);
+
                }
+
                for k in &node.dependents {
+
                    self.dependency(*k, next);
+
                }
+
                for k in &node.dependencies {
+
                    self.dependency(next, *k);
+
                }
+
                queue.extend(node.dependents.iter());
+
            }
        }
    }

-
    /// Return a topological ordering of the graph's nodes, using the given RNG.
-
    /// Graphs with more than one partial order will return an arbitrary topological ordering.
-
    ///
-
    /// Calling this function over and over will eventually yield all possible orderings.
-
    pub fn sorted(&self, rng: fastrand::Rng) -> Vec<K> {
+
    /// Return a topological ordering of the graph's nodes.
+
    /// Uses a comparison function to sort partially ordered nodes.
+
    pub fn sorted<F>(&self, mut compare: F) -> Vec<K>
+
    where
+
        F: FnMut(&K, &K) -> Ordering,
+
    {
        let mut order = Vec::new(); // Stores the topological order.
-
        let mut visited = HashSet::new(); // Nodes that have been visited.
+
        let mut visited = BTreeSet::new(); // Nodes that have been visited.
        let mut keys = self.graph.keys().collect::<Vec<_>>();

-
        rng.shuffle(&mut keys);
+
        keys.sort_by(|a, b| compare(a, b));

        for node in keys {
            self.visit(node, &mut visited, &mut order);
@@ -167,8 +184,62 @@ impl<K: Eq + Copy + Hash, V> Dag<K, V> {
        order
    }

+
    /// Fold over the graph in topological order, pruning branches along the way.
+
    ///
+
    /// To continue traversing a branch, return [`ControlFlow::Continue`] from the
+
    /// filter function. To stop traversal of a branch, return [`ControlFlow::Break`].
+
    pub fn fold<A, F>(&self, root: &K, mut acc: A, mut filter: F) -> A
+
    where
+
        F: for<'r> FnMut(A, &'r K, &'r Node<K, V>, usize) -> ControlFlow<A, A>,
+
    {
+
        let mut visited = BTreeSet::new();
+
        let mut queue = VecDeque::<(K, usize)>::from([(*root, 0)]);
+

+
        while let Some((next, depth)) = queue.pop_front() {
+
            if !visited.insert(next) {
+
                continue;
+
            }
+
            if let Some(node) = self.graph.get(&next) {
+
                match filter(acc, &next, node, depth) {
+
                    ControlFlow::Continue(a) => {
+
                        queue.extend(node.dependents.iter().map(|k| (*k, depth + 1)));
+
                        acc = a;
+
                    }
+
                    ControlFlow::Break(a) => {
+
                        // When filtering out a node, we filter out all transitive dependents on
+
                        // that node by adding them to the already visited list.
+
                        visited.extend(self.descendants_of(node));
+
                        acc = a;
+
                    }
+
                }
+
            }
+
        }
+
        acc
+
    }
+

+
    fn descendants_of(&self, from: &Node<K, V>) -> Vec<K> {
+
        let mut visited = BTreeSet::new();
+
        let mut stack = VecDeque::new();
+
        let mut nodes = Vec::new();
+

+
        stack.extend(from.dependents.iter());
+

+
        while let Some(key) = stack.pop_front() {
+
            if let Some(node) = self.graph.get(&key) {
+
                if visited.insert(key) {
+
                    nodes.push(key);
+

+
                    for &neighbour in &node.dependents {
+
                        stack.push_back(neighbour);
+
                    }
+
                }
+
            }
+
        }
+
        nodes
+
    }
+

    /// Add nodes recursively to the topological order, starting from the given node.
-
    fn visit(&self, key: &K, visited: &mut HashSet<K>, order: &mut Vec<K>) {
+
    fn visit(&self, key: &K, visited: &mut BTreeSet<K>, order: &mut Vec<K>) {
        if visited.contains(key) {
            return;
        }
@@ -185,7 +256,7 @@ impl<K: Eq + Copy + Hash, V> Dag<K, V> {
    }
}

-
impl<K: Eq + Copy + Hash + fmt::Debug, V> Index<&K> for Dag<K, V> {
+
impl<K: PartialOrd + Ord + Eq + Copy + Hash + fmt::Debug, V> Index<&K> for Dag<K, V> {
    type Output = Node<K, V>;

    fn index(&self, key: &K) -> &Self::Output {
@@ -252,14 +323,14 @@ mod tests {
        dag.dependency(0, 1);
        dag.dependency(1, 0);

-
        let sorted = dag.sorted(fastrand::Rng::new());
+
        let sorted = dag.sorted(|a, b| a.cmp(b));
        let expected: &[&[i32]] = &[&[0, 1], &[1, 0]];

        assert!(expected.contains(&sorted.as_slice()));
    }

    #[test]
-
    fn test_merge() {
+
    fn test_merge_1() {
        let mut a = Dag::new();
        let mut b = Dag::new();
        let mut c = Dag::new();
@@ -283,6 +354,46 @@ mod tests {
    }

    #[test]
+
    fn test_merge_2() {
+
        let mut a = Dag::new();
+
        let mut b = Dag::new();
+

+
        a.node(0, ());
+
        a.node(1, ());
+
        a.node(2, ());
+
        a.dependency(1, 0);
+
        a.dependency(2, 0);
+

+
        b.node(0, ());
+
        b.node(1, ());
+
        b.node(2, ());
+
        b.node(3, ());
+
        b.node(4, ());
+
        b.dependency(1, 0);
+
        b.dependency(2, 0);
+
        b.dependency(3, 0);
+
        b.dependency(4, 2);
+

+
        assert!(a.tips.contains(&2));
+

+
        a.merge(b);
+

+
        assert!(a.get(&0).is_some());
+
        assert!(a.get(&1).is_some());
+
        assert!(a.get(&2).is_some());
+
        assert!(a.get(&3).is_some());
+
        assert!(a.get(&4).is_some());
+
        assert!(a.has_dependency(&4, &2));
+
        assert!(a.get(&2).unwrap().dependents.contains(&4));
+
        assert!(a.get(&0).unwrap().dependents.contains(&3));
+
        assert!(a.tips.contains(&1));
+
        assert!(!a.tips.contains(&2));
+
        assert!(a.tips.contains(&3));
+
        assert!(a.tips.contains(&4));
+
        assert!(a.roots.contains(&0));
+
    }
+

+
    #[test]
    fn test_diamond() {
        let mut dag = Dag::new();

@@ -301,7 +412,7 @@ mod tests {

        // All of the possible sort orders for the above graph.
        let expected: &[&[i32]] = &[&[0, 1, 2, 3], &[0, 2, 1, 3]];
-
        let actual = dag.sorted(fastrand::Rng::new());
+
        let actual = dag.sorted(|a, b| a.cmp(b));

        assert!(expected.contains(&actual.as_slice()), "{actual:?}");
    }
@@ -325,12 +436,12 @@ mod tests {
        dag.dependency(1, 4);

        assert_eq!(
-
            dag.tips().map(|(k, _)| *k).collect::<HashSet<_>>(),
-
            HashSet::from_iter([1, 0])
+
            dag.tips().map(|(k, _)| *k).collect::<BTreeSet<_>>(),
+
            BTreeSet::from_iter([1, 0])
        );
        assert_eq!(
-
            dag.roots().map(|(k, _)| *k).collect::<HashSet<_>>(),
-
            HashSet::from_iter([4, 5])
+
            dag.roots().map(|(k, _)| *k).collect::<BTreeSet<_>>(),
+
            BTreeSet::from_iter([4, 5])
        );

        // All of the possible sort orders for the above graph.
@@ -349,15 +460,126 @@ mod tests {
            [5, 4, 2, 3, 0, 1],
            [5, 4, 2, 3, 1, 0],
        ];
+
        let mut sorts = BTreeSet::new();
        let rng = fastrand::Rng::new();
-
        let mut sorts = HashSet::new();

        while sorts.len() < expected.len() {
-
            sorts.insert(dag.sorted(rng.clone()));
+
            sorts.insert(dag.sorted(|a, b| if rng.bool() { a.cmp(b) } else { b.cmp(a) }));
        }
        for e in expected {
            assert!(sorts.remove(e.to_vec().as_slice()));
        }
        assert!(sorts.is_empty());
    }
+

+
    #[test]
+
    fn test_fold_sorting() {
+
        let mut dag = Dag::new();
+

+
        dag.node("R", ());
+
        dag.node("A1", ());
+
        dag.node("A2", ());
+
        dag.node("A3", ());
+
        dag.node("B1", ());
+
        dag.node("B2", ());
+
        dag.node("B3", ());
+
        dag.node("C1", ());
+

+
        dag.dependency("A1", "R");
+
        dag.dependency("A2", "R");
+
        dag.dependency("A3", "R");
+

+
        dag.dependency("B1", "A1");
+
        dag.dependency("B2", "A1");
+
        dag.dependency("B3", "A2");
+
        dag.dependency("B3", "A3");
+

+
        dag.dependency("C1", "B1");
+
        dag.dependency("C1", "B2");
+
        dag.dependency("C1", "B3");
+

+
        let acc = dag.fold(&"R", Vec::new(), |mut acc, key, _, _| {
+
            acc.push(*key);
+
            ControlFlow::Continue(acc)
+
        });
+
        assert_eq!(acc, vec!["R", "A1", "A2", "A3", "B1", "B2", "B3", "C1"]);
+
    }
+

+
    #[test]
+
    fn test_fold_depth() {
+
        let mut dag = Dag::new();
+

+
        dag.node("R", ());
+
        dag.node("A1", ());
+
        dag.node("A2", ());
+
        dag.node("A3", ());
+
        dag.node("B1", ());
+
        dag.node("B2", ());
+
        dag.node("B3", ());
+
        dag.node("C1", ());
+

+
        dag.dependency("A1", "R");
+
        dag.dependency("A2", "R");
+
        dag.dependency("A3", "R");
+

+
        dag.dependency("B1", "A1");
+
        dag.dependency("B2", "A1");
+
        dag.dependency("B3", "A2");
+
        dag.dependency("B3", "A3");
+

+
        dag.dependency("C1", "B1");
+
        dag.dependency("C1", "B2");
+
        dag.dependency("C1", "B3");
+

+
        let acc = dag.fold(&"R", Vec::new(), |mut acc, key, _, depth| {
+
            acc.push((*key, depth));
+
            ControlFlow::Continue(acc)
+
        });
+

+
        assert_eq!(
+
            acc,
+
            vec![
+
                ("R", 0),
+
                ("A1", 1),
+
                ("A2", 1),
+
                ("A3", 1),
+
                ("B1", 2),
+
                ("B2", 2),
+
                ("B3", 2),
+
                ("C1", 3)
+
            ]
+
        );
+
    }
+

+
    #[test]
+
    fn test_fold_reject() {
+
        let mut dag = Dag::new();
+

+
        dag.node("R", true);
+
        dag.node("A1", false); // Reject.
+
        dag.node("A2", true);
+
        dag.node("B1", true);
+
        dag.node("C1", true);
+
        dag.node("D1", true);
+

+
        dag.dependency("A1", "R");
+
        dag.dependency("A2", "R");
+
        dag.dependency("B1", "A1");
+
        dag.dependency("C1", "B1");
+
        dag.dependency("D1", "C1");
+
        dag.dependency("D1", "A2");
+

+
        let a1 = dag.get(&"A1").unwrap();
+
        assert_eq!(dag.descendants_of(a1), vec!["B1", "C1", "D1"]);
+

+
        let acc = dag.fold(&"R", Vec::new(), |mut acc, key, accept, _| {
+
            if !accept.value {
+
                ControlFlow::Break(acc)
+
            } else {
+
                acc.push(*key);
+
                ControlFlow::Continue(acc)
+
            }
+
        });
+
        assert_eq!(acc, vec!["R", "A2"]);
+
    }
}
modified radicle/src/cob/op.rs
@@ -1,7 +1,7 @@
use nonempty::NonEmpty;
use thiserror::Error;

-
use radicle_cob::history::{EntryId, EntryWithClock};
+
use radicle_cob::history::{Entry, EntryId};
use radicle_crdt::clock;
use radicle_crdt::clock::Lamport;
use radicle_crypto::PublicKey;
@@ -78,19 +78,20 @@ impl<A> Op<A> {

pub struct Ops<A>(pub NonEmpty<Op<A>>);

-
impl<'a, A> TryFrom<&'a EntryWithClock> for Ops<A>
+
impl<'a, A> TryFrom<&'a Entry> for Ops<A>
where
    for<'de> A: serde::Deserialize<'de>,
{
    type Error = OpEncodingError;

-
    fn try_from(entry: &'a EntryWithClock) -> Result<Self, Self::Error> {
+
    fn try_from(entry: &'a Entry) -> Result<Self, Self::Error> {
        let id = *entry.id();
        let identity = entry.resource();
        let ops = entry
-
            .changes()
+
            .contents()
+
            .iter()
            .map(|blob| {
-
                let action = serde_json::from_slice(blob)?;
+
                let action = serde_json::from_slice(blob.as_slice())?;
                let op = Op {
                    id,
                    action,
@@ -101,10 +102,19 @@ where
                };
                Ok::<_, Self::Error>(op)
            })
-
            .collect::<Result<Vec<_>, _>>()?;
+
            .collect::<Result<_, _>>()?;

        // SAFETY: Entry is guaranteed to have at least one operation.
        #[allow(clippy::unwrap_used)]
        Ok(Self(NonEmpty::from_vec(ops).unwrap()))
    }
}
+

+
impl<A: 'static> IntoIterator for Ops<A> {
+
    type Item = Op<A>;
+
    type IntoIter = <NonEmpty<Op<A>> as IntoIterator>::IntoIter;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.0.into_iter()
+
    }
+
}
modified radicle/src/cob/store.rs
@@ -53,7 +53,6 @@ pub trait FromHistory: Sized + Default + PartialEq {
        history: &History,
        repo: &R,
    ) -> Result<(Self, Lamport), Self::Error> {
-
        let obj = history.traverse(Self::default(), |mut acc, entry| {
            match Ops::try_from(entry) {
                Ok(Ops(ops)) => {
                    if let Err(err) = acc.apply(ops, repo) {
modified radicle/src/cob/test.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::marker::PhantomData;
-
use std::ops::{ControlFlow, Deref};
+
use std::ops::Deref;

use nonempty::NonEmpty;
use serde::Serialize;
@@ -30,7 +30,7 @@ pub struct HistoryBuilder<T> {

impl<T: FromHistory> HistoryBuilder<T>
where
-
    T::Action: Serialize + Eq,
+
    T::Action: Serialize + Eq + 'static,
{
    pub fn new(op: &Op<T::Action>) -> HistoryBuilder<T> {
        let entry = arbitrary::oid();
@@ -68,22 +68,23 @@ where
    }

    /// Return a sorted list of operations by traversing the history in topological order.
-
    pub fn sorted(&self) -> Vec<Op<T::Action>> {
-
        self.history.traverse(Vec::new(), |mut acc, entry| {
-
            let Ops(ops) =
-
                Ops::try_from(entry).expect("HistoryBuilder::sorted: operations must be valid");
-
            acc.extend(ops);
-

-
            ControlFlow::Continue(acc)
-
        })
+
    pub fn sorted(&self, rng: &mut fastrand::Rng) -> Vec<Op<T::Action>> {
+
        self.history
+
            .sorted(|a, b| if rng.bool() { a.cmp(b) } else { b.cmp(a) })
+
            .flat_map(|entry| {
+
                Ops::try_from(entry).expect("HistoryBuilder::sorted: operations must be valid")
+
            })
+
            .collect()
    }

    /// Return `n` permutations of the topological ordering of operations.
    /// *This function will never return if less than `n` permutations exist.*
    pub fn permutations(&self, n: usize) -> impl IntoIterator<Item = Vec<Op<T::Action>>> {
        let mut permutations = BTreeSet::new();
+
        let mut rng = fastrand::Rng::new();
+

        while permutations.len() < n {
-
            permutations.insert(self.sorted());
+
            permutations.insert(self.sorted(&mut rng));
        }
        permutations.into_iter()
    }
@@ -100,7 +101,7 @@ impl<A> Deref for HistoryBuilder<A> {
/// Create a new test history.
pub fn history<T: FromHistory>(op: &Op<T::Action>) -> HistoryBuilder<T>
where
-
    T::Action: Serialize + Eq,
+
    T::Action: Serialize + Eq + 'static,
{
    HistoryBuilder::new(op)
}