Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Add `radicle-crdt` crate
Alexis Sellier committed 3 years ago
commit 35f061746ce61e910219df6d5089a731c2ce4c36
parent 21e47c7a340a530e61fa3bebf024169a0f540845
10 files changed +1074 -2
modified Cargo.lock
@@ -20,6 +20,15 @@ dependencies = [
]

[[package]]
+
name = "aho-corasick"
+
version = "0.7.20"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
+
dependencies = [
+
 "memchr",
+
]
+

+
[[package]]
name = "amplify_derive"
version = "2.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -465,6 +474,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3b7eb4404b8195a9abb6356f4ac07d8ba267045c8d6d220ac4dc992e6cc75df"

[[package]]
+
name = "ctor"
+
version = "0.1.26"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
+
dependencies = [
+
 "quote",
+
 "syn",
+
]
+

+
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -589,6 +608,12 @@ dependencies = [
]

[[package]]
+
name = "diff"
+
version = "0.1.13"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
+

+
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -686,6 +711,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"

[[package]]
+
name = "env_logger"
+
version = "0.8.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
+
dependencies = [
+
 "log",
+
 "regex",
+
]
+

+
[[package]]
name = "fastrand"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1466,6 +1501,15 @@ dependencies = [
]

[[package]]
+
name = "output_vt100"
+
version = "0.1.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "628223faebab4e3e40667ee0b2336d34a5b960ff60ea743ddfdbcf7770bcfb66"
+
dependencies = [
+
 "winapi",
+
]
+

+
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1603,6 +1647,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"

[[package]]
+
name = "pretty_assertions"
+
version = "1.3.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a25e9bcb20aa780fd0bb16b72403a9064d6b3f22f026946029acb941a50af755"
+
dependencies = [
+
 "ctor",
+
 "diff",
+
 "output_vt100",
+
 "yansi",
+
]
+

+
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1641,6 +1697,8 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
dependencies = [
+
 "env_logger",
+
 "log",
 "rand 0.8.5",
]

@@ -1747,6 +1805,23 @@ dependencies = [
]

[[package]]
+
name = "radicle-crdt"
+
version = "0.1.0"
+
dependencies = [
+
 "fastrand",
+
 "itertools",
+
 "num-traits",
+
 "olpc-cjson",
+
 "pretty_assertions",
+
 "quickcheck",
+
 "quickcheck_macros",
+
 "radicle",
+
 "radicle-crypto",
+
 "serde",
+
 "serde_json",
+
]
+

+
[[package]]
name = "radicle-crypto"
version = "0.1.0"
dependencies = [
@@ -1975,6 +2050,8 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b"
dependencies = [
+
 "aho-corasick",
+
 "memchr",
 "regex-syntax",
]

modified Cargo.toml
@@ -3,6 +3,7 @@ members = [
  "radicle",
  "radicle-cob",
  "radicle-cli",
+
  "radicle-crdt",
  "radicle-crypto",
  "radicle-httpd",
  "radicle-node",
added radicle-crdt/Cargo.toml
@@ -0,0 +1,20 @@
+
[package]
+
name = "radicle-crdt"
+
version = "0.1.0"
+
edition = "2021"
+

+
[dependencies]
+
num-traits = { version = "0.2.15", default-features = false, features = ["std"] }
+
olpc-cjson = { version = "0.1.1" }
+
radicle = { path = "../radicle" }
+
serde = { version = "1" }
+
serde_json = { version = "1" }
+

+
[dev-dependencies]
+
itertools = { version = "0.10.5" }
+
fastrand = { version = "1.8.0" }
+
pretty_assertions = { version = "1.3.0" }
+
quickcheck = { version = "1" }
+
quickcheck_macros = { version = "1" }
+
radicle = { path = "../radicle", features = ["test"] }
+
radicle-crypto = { path = "../radicle-crypto", features = ["test"] }
added radicle-crdt/src/lib.rs
@@ -0,0 +1,26 @@
+
#![allow(clippy::collapsible_if)]
+
#![allow(clippy::collapsible_else_if)]
+
#![allow(clippy::type_complexity)]
+
pub mod lwwreg;
+
pub mod lwwset;
+
pub mod ord;
+
pub mod thread;
+

+
#[cfg(test)]
+
mod test;
+

+
////////////////////////////////////////////////////////////////////////////////
+

+
/// A join-semilattice.
+
pub trait Semilattice: Default {
+
    /// Join or "merge" two semilattices into one.
+
    fn join(self, other: Self) -> Self;
+
}
+

+
/// Reduce an iterator of semilattice values to its least upper bound.
+
pub fn fold<S>(i: impl IntoIterator<Item = S>) -> S
+
where
+
    S: Semilattice,
+
{
+
    i.into_iter().fold(S::default(), S::join)
+
}
added radicle-crdt/src/lwwreg.rs
@@ -0,0 +1,87 @@
+
use crate::Semilattice;
+

+
/// Last-Write-Wins Register.
+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
+
pub struct LWWReg<T, C> {
+
    value: T,
+
    clock: C,
+
}
+

+
impl<T: PartialOrd, C: PartialOrd> LWWReg<T, C> {
+
    pub fn new(value: T, clock: C) -> Self {
+
        Self { value, clock }
+
    }
+

+
    pub fn set(&mut self, value: T, clock: C) {
+
        if clock > self.clock || (clock == self.clock && value > self.value) {
+
            self.value = value;
+
            self.clock = clock;
+
        }
+
    }
+

+
    pub fn get(&self) -> &T {
+
        &self.value
+
    }
+

+
    pub fn clock(&self) -> &C {
+
        &self.clock
+
    }
+

+
    pub fn into_inner(self) -> T {
+
        self.value
+
    }
+
}
+

+
impl<T, C> Semilattice for LWWReg<T, C>
+
where
+
    T: PartialOrd + Default,
+
    C: PartialOrd + Default,
+
{
+
    fn join(mut self, other: Self) -> Self {
+
        self.set(other.value, other.clock);
+
        self
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+
    use quickcheck_macros::quickcheck;
+

+
    #[quickcheck]
+
    fn prop_semilattice(a: (u8, u16), b: (u8, u16), c: (u8, u16)) {
+
        let a = LWWReg::new(a.0, a.1);
+
        let b = LWWReg::new(b.0, b.1);
+
        let c = LWWReg::new(c.0, c.1);
+

+
        crate::test::assert_laws(&a, &b, &c);
+
    }
+

+
    #[test]
+
    fn test_set_get() {
+
        let mut reg = LWWReg::new(42, 1);
+
        assert_eq!(*reg.get(), 42);
+

+
        reg.set(84, 0);
+
        assert_eq!(*reg.get(), 42);
+

+
        reg.set(84, 2);
+
        assert_eq!(*reg.get(), 84);
+

+
        // Smaller value, same clock: smaller value loses.
+
        reg.set(42, 2);
+
        assert_eq!(*reg.get(), 84);
+

+
        // Bigger value, same clock: bigger value wins.
+
        reg.set(168, 2);
+
        assert_eq!(*reg.get(), 168);
+

+
        // Smaller value, newer clock: smaller value wins.
+
        reg.set(42, 3);
+
        assert_eq!(*reg.get(), 42);
+

+
        // Same value, newer clock: newer clock is set.
+
        reg.set(42, 4);
+
        assert_eq!(*reg.clock(), 4);
+
    }
+
}
added radicle-crdt/src/lwwset.rs
@@ -0,0 +1,179 @@
+
use std::collections::BTreeMap;
+

+
use crate::Semilattice;
+

+
/// Last-Write-Wins Set.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct LWWSet<T, C> {
+
    added: BTreeMap<T, C>,
+
    removed: BTreeMap<T, C>,
+
}
+

+
impl<T: Ord, C: Ord + Copy> LWWSet<T, C> {
+
    pub fn singleton(value: T, clock: C) -> Self {
+
        Self {
+
            added: BTreeMap::from_iter([(value, clock)]),
+
            removed: BTreeMap::default(),
+
        }
+
    }
+

+
    pub fn insert(&mut self, value: T, clock: C) {
+
        self.added
+
            .entry(value)
+
            .and_modify(|t| *t = C::max(*t, clock))
+
            .or_insert(clock);
+
    }
+

+
    pub fn remove(&mut self, value: T, clock: C) {
+
        // TODO: Should we remove from 'added' set if timestamp is newer?
+
        self.removed
+
            .entry(value)
+
            .and_modify(|t| *t = C::max(*t, clock))
+
            .or_insert(clock);
+
    }
+

+
    pub fn contains(&self, value: T) -> bool {
+
        let Some(added) = self.added.get(&value) else {
+
            // If the element was never added, return false.
+
            return false;
+
        };
+

+
        if let Some(removed) = self.removed.get(&value) {
+
            // If the element was added and also removed, whichever came last
+
            // is the winner, or if they came at the same time, we bias towards
+
            // it having been added last.
+
            return added >= removed;
+
        }
+
        // If it was only added and never removed, return true.
+
        true
+
    }
+

+
    pub fn iter(&self) -> impl Iterator<Item = &T> {
+
        self.added.iter().filter_map(|(value, added)| {
+
            if let Some(removed) = self.removed.get(value) {
+
                // Note, in case the element was added and removed at the same time,
+
                // we bias towards it being added, ie. this won't return `None`.
+
                if removed > added {
+
                    return None;
+
                }
+
            }
+
            Some(value)
+
        })
+
    }
+
}
+

+
impl<T, C> Default for LWWSet<T, C> {
+
    fn default() -> Self {
+
        Self {
+
            added: BTreeMap::default(),
+
            removed: BTreeMap::default(),
+
        }
+
    }
+
}
+

+
impl<T: Ord, C: Copy + Ord> FromIterator<(T, C)> for LWWSet<T, C> {
+
    fn from_iter<I: IntoIterator<Item = (T, C)>>(iter: I) -> Self {
+
        let mut set = LWWSet::default();
+
        for (v, c) in iter.into_iter() {
+
            set.insert(v, c);
+
        }
+
        set
+
    }
+
}
+

+
impl<T: Ord, C: Ord + Copy> Extend<(T, C)> for LWWSet<T, C> {
+
    fn extend<I: IntoIterator<Item = (T, C)>>(&mut self, iter: I) {
+
        for (v, c) in iter.into_iter() {
+
            self.insert(v, c);
+
        }
+
    }
+
}
+

+
impl<T, C> Semilattice for LWWSet<T, C>
+
where
+
    T: Ord,
+
    C: Ord + Copy,
+
{
+
    fn join(mut self, other: Self) -> Self {
+
        self.extend(other.added.into_iter());
+
        self
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+
    use quickcheck_macros::quickcheck;
+

+
    #[quickcheck]
+
    fn prop_semilattice(
+
        a: Vec<(u8, u16)>,
+
        b: Vec<(u8, u16)>,
+
        c: Vec<(u8, u16)>,
+
        mix: Vec<(u8, u16)>,
+
    ) {
+
        let mut a = LWWSet::from_iter(a);
+
        let mut b = LWWSet::from_iter(b);
+
        let c = LWWSet::from_iter(c);
+

+
        a.extend(mix.clone());
+
        b.extend(mix);
+

+
        crate::test::assert_laws(&a, &b, &c);
+
    }
+

+
    #[test]
+
    fn test_insert() {
+
        let mut set = LWWSet::default();
+

+
        set.insert('a', 0);
+
        set.insert('b', 0);
+
        set.insert('c', 0);
+

+
        assert!(set.contains('a'));
+
        assert!(set.contains('b'));
+
        assert!(!set.contains('?'));
+

+
        let values = set.iter().cloned().collect::<Vec<_>>();
+
        assert!(values.contains(&'a'));
+
        assert!(values.contains(&'b'));
+
        assert!(values.contains(&'c'));
+
        assert_eq!(values.len(), 3);
+
    }
+

+
    #[test]
+
    fn test_insert_remove() {
+
        let mut set = LWWSet::default();
+

+
        set.insert('a', 1);
+
        assert!(set.contains('a'));
+

+
        set.remove('a', 0);
+
        assert!(set.contains('a'));
+

+
        set.remove('a', 1);
+
        assert!(set.contains('a')); // Add takes precedence over remove.
+
        assert!(set.iter().any(|c| *c == 'a'));
+

+
        set.remove('a', 2);
+
        assert!(!set.contains('a'));
+
        assert!(!set.iter().any(|c| *c == 'a'));
+
    }
+

+
    #[test]
+
    fn test_remove_insert() {
+
        let mut set = LWWSet::default();
+

+
        set.insert('a', 1);
+
        assert!(set.contains('a'));
+

+
        set.remove('a', 2);
+
        assert!(!set.contains('a'));
+

+
        set.insert('a', 1);
+
        assert!(!set.contains('a'));
+

+
        set.insert('a', 2);
+
        assert!(set.contains('a'));
+
    }
+
}
added radicle-crdt/src/ord.rs
@@ -0,0 +1,71 @@
+
use std::{cmp, ops};
+

+
use serde::{Deserialize, Serialize};
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
+
pub struct Max<T>(T);
+

+
impl<T: num_traits::SaturatingAdd + num_traits::One> Max<T> {
+
    pub fn incr(&mut self) {
+
        self.0 = self.0.saturating_add(&T::one());
+
    }
+
}
+

+
impl<T> Default for Max<T>
+
where
+
    T: num_traits::bounds::Bounded,
+
{
+
    fn default() -> Self {
+
        Self(T::min_value())
+
    }
+
}
+

+
impl<T> ops::Deref for Max<T> {
+
    type Target = T;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl<T> From<T> for Max<T> {
+
    fn from(t: T) -> Self {
+
        Self(t)
+
    }
+
}
+

+
#[allow(clippy::derive_ord_xor_partial_ord)]
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, Serialize, Deserialize)]
+
pub struct Min<T>(pub T);
+

+
impl<T> Default for Min<T>
+
where
+
    T: num_traits::bounds::Bounded,
+
{
+
    fn default() -> Self {
+
        Self(T::max_value())
+
    }
+
}
+

+
impl<T> ops::Deref for Min<T> {
+
    type Target = T;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl<T> From<T> for Min<T> {
+
    fn from(t: T) -> Self {
+
        Self(t)
+
    }
+
}
+

+
impl<T> cmp::PartialOrd for Min<T>
+
where
+
    T: PartialOrd,
+
{
+
    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
+
        other.0.partial_cmp(&self.0)
+
    }
+
}
added radicle-crdt/src/test.rs
@@ -0,0 +1,108 @@
+
use std::fmt::Debug;
+
use std::rc::Rc;
+

+
use super::*;
+

+
/// Generate test values following a weight distribution.
+
pub struct WeightedGenerator<'a, T, C> {
+
    cases: Vec<Rc<dyn Fn(&mut C, fastrand::Rng) -> Option<T> + 'a>>,
+
    rng: fastrand::Rng,
+
    ctx: C,
+
}
+

+
impl<'a, T, C> Iterator for WeightedGenerator<'a, T, C> {
+
    type Item = T;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        let cases = self.cases.len();
+

+
        loop {
+
            let r = self.rng.usize(0..cases);
+
            let g = &self.cases[r];
+

+
            if let Some(val) = g(&mut self.ctx, self.rng.clone()) {
+
                return Some(val);
+
            }
+
        }
+
    }
+
}
+

+
impl<'a, T, C: Default> WeightedGenerator<'a, T, C> {
+
    /// Create a new distribution.
+
    pub fn new(rng: fastrand::Rng) -> Self {
+
        Self {
+
            cases: Vec::new(),
+
            rng,
+
            ctx: C::default(),
+
        }
+
    }
+

+
    /// Add a new variant with a given weight and generator function.
+
    pub fn variant(
+
        mut self,
+
        weight: usize,
+
        generator: impl Fn(&mut C, fastrand::Rng) -> Option<T> + 'a,
+
    ) -> Self {
+
        let gen = Rc::new(generator);
+
        for _ in 0..weight {
+
            self.cases.push(gen.clone());
+
        }
+
        self
+
    }
+
}
+

+
/// Assert semilattice ACI laws.
+
pub fn assert_laws<S: Debug + Semilattice + PartialEq + Clone>(a: &S, b: &S, c: &S) {
+
    assert_associative(a, b, c);
+
    assert_commutative(a, b);
+
    assert_idempotent(a);
+
}
+

+
pub fn assert_associative<S: Debug + Semilattice + PartialEq + Clone>(a: &S, b: &S, c: &S) {
+
    // (a ^ b) ^ c
+
    let s1 = a.clone().join(b.clone()).join(c.clone());
+
    // a ^ (b ^ c)
+
    let s2 = a.clone().join(b.clone().join(c.clone()));
+
    // (a ^ b) ^ c = a ^ (b ^ c)
+
    assert_eq!(s1, s2, "associativity");
+
}
+

+
pub fn assert_commutative<S: Debug + Semilattice + PartialEq + Clone>(a: &S, b: &S) {
+
    // a ^ b
+
    let s1 = a.clone().join(b.clone());
+
    // b ^ a
+
    let s2 = b.clone().join(a.clone());
+
    // a ^ b = b ^ a
+
    assert_eq!(s1, s2, "commutativity");
+
}
+

+
pub fn assert_idempotent<S: Debug + Semilattice + PartialEq + Clone>(a: &S) {
+
    // a ^ a
+
    let s1 = a.clone().join(a.clone());
+
    // a
+
    let s2 = a.clone();
+
    // a ^ a = a
+
    assert_eq!(s1, s2, "idempotence");
+
}
+

+
#[test]
+
fn test_generator() {
+
    let rng = fastrand::Rng::with_seed(0);
+
    let dist = WeightedGenerator::<char, ()>::new(rng)
+
        .variant(1, |_, _| Some('a'))
+
        .variant(2, |_, _| Some('b'))
+
        .variant(4, |_, _| Some('c'))
+
        .variant(8, |_, _| Some('d'));
+

+
    let values = dist.take(1000).collect::<Vec<_>>();
+

+
    let a = values.iter().filter(|c| **c == 'a').count();
+
    let b = values.iter().filter(|c| **c == 'b').count();
+
    let c = values.iter().filter(|c| **c == 'c').count();
+
    let d = values.iter().filter(|c| **c == 'd').count();
+

+
    assert_eq!(a, 79);
+
    assert_eq!(b, 122);
+
    assert_eq!(c, 285);
+
    assert_eq!(d, 514);
+
}
added radicle-crdt/src/thread.rs
@@ -0,0 +1,493 @@
+
use std::collections::BTreeMap;
+
use std::ops::Deref;
+

+
use serde::{Deserialize, Serialize};
+

+
use radicle::cob::shared::Reaction;
+
use radicle::cob::Timestamp;
+
use radicle::crypto::{PublicKey, Signature, Signer};
+
use radicle::hash;
+

+
use crate::lwwreg::LWWReg;
+
use crate::lwwset::LWWSet;
+

+
/// Identifies a change.
+
pub type ChangeId = radicle::hash::Digest;
+
/// Identifies a tag.
+
pub type TagId = String;
+
/// The author of a change.
+
pub type Author = PublicKey;
+
/// Alias for `Author`.
+
pub type ActorId = PublicKey;
+

+
/// The `Change` is the unit of replication.
+
/// Everything that can be done in the system is represented by a `Change` object.
+
/// Changes are applied to an accumulator to yield a final state.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
pub struct Change {
+
    /// The action carried out by this change.
+
    action: Action,
+
    /// The author of the change.
+
    author: Author,
+
    /// The time at which this change was authored.
+
    timestamp: Timestamp,
+
}
+

+
impl Change {
+
    /// Get the change id.
+
    pub fn id(&self) -> ChangeId {
+
        hash::Digest::new(self.encode())
+
    }
+

+
    /// Serialize the change into a byte string.
+
    pub fn encode(&self) -> Vec<u8> {
+
        let mut buf = Vec::new();
+
        let mut serializer =
+
            serde_json::Serializer::with_formatter(&mut buf, olpc_cjson::CanonicalFormatter::new());
+

+
        self.serialize(&mut serializer).unwrap();
+

+
        buf
+
    }
+
}
+

+
/// Change envelope. Carries signed changes.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
pub struct Envelope {
+
    /// Changes included in this envelope, serialized as JSON.
+
    pub changes: Vec<u8>,
+
    /// Signature over the change, by the change author.
+
    pub signature: Signature,
+
}
+

+
/// An object that can be either present or removed.
+
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum Redactable<T> {
+
    /// When the object is present.
+
    Present(T),
+
    /// When the object has been removed.
+
    #[default]
+
    Redacted,
+
}
+

+
/// A comment on a discussion thread.
+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
pub struct Comment {
+
    /// The comment body.
+
    body: String,
+
    /// Thread or comment this is a reply to.
+
    reply_to: ChangeId,
+
}
+

+
impl Comment {
+
    /// Create a new comment.
+
    pub fn new(body: String, reply_to: ChangeId) -> Self {
+
        Self { body, reply_to }
+
    }
+
}
+

+
/// An action that can be carried out in a change.
+
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
+
pub enum Action {
+
    /// Initialize a new thread.
+
    Thread { title: String },
+
    /// Comment on a thread.
+
    Comment { comment: Comment },
+
    /// Redact a change. Not all changes can be redacted.
+
    Redact { id: ChangeId },
+
    /// Add a tag to the thread.
+
    Tag { tag: TagId },
+
    /// Remove a tag from the thread.
+
    Untag { tag: TagId },
+
    /// React to a change.
+
    React {
+
        to: ChangeId,
+
        reaction: Reaction,
+
        active: bool,
+
    },
+
}
+

+
/// A discussion thread.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Thread {
+
    /// The id of the thread.
+
    id: ChangeId,
+
    /// The thread title.
+
    title: String,
+
    /// The thread author.
+
    author: Author,
+
    /// The thread timestamp.
+
    timestamp: Timestamp,
+
    /// The comments under the thread.
+
    comments: BTreeMap<ChangeId, Redactable<Comment>>,
+
    /// Associated tags.
+
    tags: BTreeMap<TagId, LWWReg<bool, Timestamp>>,
+
    /// Reactions to changes.
+
    reactions: BTreeMap<ChangeId, LWWSet<(ActorId, Reaction), Timestamp>>,
+
}
+

+
impl Deref for Thread {
+
    type Target = BTreeMap<ChangeId, Redactable<Comment>>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.comments
+
    }
+
}
+

+
impl Thread {
+
    pub fn new(root: Change) -> Self {
+
        let id = root.id();
+

+
        let Action::Thread { title } = root.action else {
+
            panic!("Threads need to be initialized with a `Thread` message");
+
        };
+

+
        Self {
+
            id,
+
            title,
+
            author: root.author,
+
            timestamp: root.timestamp,
+
            comments: BTreeMap::default(),
+
            tags: BTreeMap::default(),
+
            reactions: BTreeMap::default(),
+
        }
+
    }
+

+
    pub fn clear(&mut self) {
+
        self.comments.clear();
+
    }
+

+
    pub fn apply(&mut self, changes: impl IntoIterator<Item = Change>) {
+
        for change in changes.into_iter() {
+
            let id = change.id();
+

+
            match change.action {
+
                Action::Comment { comment } => {
+
                    match self.comments.get(&id) {
+
                        Some(Redactable::Present(_)) => {
+
                            // Do nothing, the action was already processed,
+
                            // since a change with the same content-id as this
+
                            // one exists already.
+
                        }
+
                        Some(Redactable::Redacted) => {
+
                            // Do nothing, the action was redacted.
+
                        }
+
                        None => {
+
                            self.comments.insert(id, Redactable::Present(comment));
+
                        }
+
                    }
+
                }
+
                Action::Redact { id } => {
+
                    self.comments.insert(id, Redactable::Redacted);
+
                }
+
                Action::Tag { tag } => {
+
                    self.tags
+
                        .entry(tag)
+
                        .and_modify(|r| r.set(true, change.timestamp))
+
                        .or_insert_with(|| LWWReg::new(true, change.timestamp));
+
                }
+
                Action::Untag { tag } => {
+
                    self.tags
+
                        .entry(tag)
+
                        .and_modify(|r| r.set(false, change.timestamp))
+
                        .or_insert_with(|| LWWReg::new(false, change.timestamp));
+
                }
+
                Action::React {
+
                    to,
+
                    reaction,
+
                    active,
+
                } => {
+
                    self.reactions
+
                        .entry(to)
+
                        .and_modify(|reactions| {
+
                            if active {
+
                                reactions.insert((change.author, reaction), change.timestamp);
+
                            } else {
+
                                reactions.remove((change.author, reaction), change.timestamp);
+
                            }
+
                        })
+
                        .or_insert_with(|| {
+
                            if active {
+
                                LWWSet::singleton((change.author, reaction), change.timestamp)
+
                            } else {
+
                                let mut set = LWWSet::default();
+
                                set.remove((change.author, reaction), change.timestamp);
+
                                set
+
                            }
+
                        });
+
                }
+
                Action::Thread { .. } => {
+
                    // Ignored
+
                }
+
            }
+
        }
+
    }
+

+
    pub fn comments(&self) -> impl Iterator<Item = (&ChangeId, &Comment)> + '_ {
+
        self.comments.iter().filter_map(|(id, comment)| {
+
            if let Redactable::Present(c) = comment {
+
                Some((id, c))
+
            } else {
+
                None
+
            }
+
        })
+
    }
+

+
    pub fn tags(&self) -> impl Iterator<Item = &TagId> + '_ {
+
        self.tags
+
            .iter()
+
            .filter_map(|(tag, r)| if *r.get() { Some(tag) } else { None })
+
    }
+
}
+

+
/// An object that can be used to create and sign changes.
+
#[derive(Default)]
+
pub struct Actor<G> {
+
    signer: G,
+
}
+

+
impl<G: Signer> Actor<G> {
+
    /// Create a new thread.
+
    pub fn thread(&self, title: &str, timestamp: Timestamp) -> Change {
+
        self.change(
+
            Action::Thread {
+
                title: title.to_owned(),
+
            },
+
            timestamp,
+
        )
+
    }
+

+
    /// Create a new comment.
+
    pub fn comment(&self, body: &str, timestamp: Timestamp, parent: ChangeId) -> Change {
+
        self.change(
+
            Action::Comment {
+
                comment: Comment::new(String::from(body), parent),
+
            },
+
            timestamp,
+
        )
+
    }
+

+
    /// Add a tag.
+
    pub fn tag(&self, tag: TagId, timestamp: Timestamp) -> Change {
+
        self.change(Action::Tag { tag }, timestamp)
+
    }
+

+
    /// Remove a tag.
+
    pub fn untag(&self, tag: TagId, timestamp: Timestamp) -> Change {
+
        self.change(Action::Untag { tag }, timestamp)
+
    }
+

+
    /// Create a new redaction.
+
    pub fn redact(&self, id: ChangeId, timestamp: Timestamp) -> Change {
+
        self.change(Action::Redact { id }, timestamp)
+
    }
+

+
    /// Create a new change.
+
    pub fn change(&self, action: Action, timestamp: Timestamp) -> Change {
+
        let author = *self.signer.public_key();
+

+
        Change {
+
            action,
+
            author,
+
            timestamp,
+
        }
+
    }
+

+
    pub fn sign(&self, changes: impl IntoIterator<Item = Change>) -> Envelope {
+
        let changes = changes.into_iter().collect::<Vec<_>>();
+
        let json = serde_json::to_value(changes).unwrap();
+

+
        let mut buffer = Vec::new();
+
        let mut serializer = serde_json::Serializer::with_formatter(
+
            &mut buffer,
+
            olpc_cjson::CanonicalFormatter::new(),
+
        );
+
        json.serialize(&mut serializer).unwrap();
+

+
        let signature = self.signer.sign(&buffer);
+

+
        Envelope {
+
            changes: buffer,
+
            signature,
+
        }
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use std::{array, iter};
+

+
    use itertools::Itertools;
+
    use pretty_assertions::assert_eq;
+
    use quickcheck::Arbitrary;
+
    use quickcheck_macros::quickcheck;
+
    use radicle::crypto::test::signer::MockSigner;
+

+
    use super::*;
+
    use crate::test::WeightedGenerator;
+

+
    #[derive(Clone)]
+
    struct Changes<const N: usize> {
+
        permutations: [Vec<Change>; N],
+
    }
+

+
    impl<const N: usize> std::fmt::Debug for Changes<N> {
+
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
            for (i, p) in self.permutations.iter().enumerate() {
+
                writeln!(
+
                    f,
+
                    "{i}: {:#?}",
+
                    p.iter().map(|c| &c.action).collect::<Vec<_>>()
+
                )?;
+
            }
+
            Ok(())
+
        }
+
    }
+

+
    impl<const N: usize> Arbitrary for Changes<N> {
+
        fn arbitrary(g: &mut quickcheck::Gen) -> Self {
+
            let rng = fastrand::Rng::with_seed(u64::arbitrary(g));
+
            let gen = WeightedGenerator::<Action, (Vec<TagId>, Vec<Change>)>::new(rng.clone())
+
                .variant(2, |_, rng| {
+
                    Some(Action::Comment {
+
                        comment: Comment {
+
                            body: iter::repeat_with(|| rng.alphabetic()).take(16).collect(),
+
                            reply_to: Default::default(),
+
                        },
+
                    })
+
                })
+
                .variant(2, |(_, changes), rng| {
+
                    if changes.is_empty() {
+
                        return None;
+
                    }
+
                    let to = changes[rng.usize(..changes.len())].id();
+

+
                    Some(Action::React {
+
                        to,
+
                        reaction: Reaction::new('✨').unwrap(),
+
                        active: rng.bool(),
+
                    })
+
                })
+
                .variant(2, |(_, changes), rng| {
+
                    if changes.is_empty() {
+
                        return None;
+
                    }
+
                    let id = changes[rng.usize(..changes.len())].id();
+
                    Some(Action::Redact { id })
+
                })
+
                .variant(2, |(tags, _), rng| {
+
                    let tag = if tags.is_empty() || rng.bool() {
+
                        let tag = iter::repeat_with(|| rng.alphabetic())
+
                            .take(8)
+
                            .collect::<String>();
+
                        tags.push(tag.clone());
+
                        tag
+
                    } else {
+
                        tags[rng.usize(..tags.len())].clone()
+
                    };
+
                    Some(Action::Tag { tag })
+
                })
+
                .variant(2, |(tags, _), rng| {
+
                    if tags.is_empty() {
+
                        return None;
+
                    }
+
                    let tag = tags[rng.usize(..tags.len())].clone();
+
                    Some(Action::Untag { tag })
+
                });
+

+
            let mut changes = Vec::new();
+
            let mut permutations: [Vec<Change>; N] = array::from_fn(|_| Vec::new());
+
            let author = PublicKey::from([0; 32]);
+

+
            for action in gen.take(g.size().min(8)) {
+
                let timestamp = Timestamp::now() + rng.u64(0..3);
+

+
                changes.push(Change {
+
                    action,
+
                    author,
+
                    timestamp,
+
                });
+
            }
+

+
            for p in &mut permutations {
+
                *p = changes.clone();
+
                rng.shuffle(&mut changes);
+
            }
+

+
            Changes { permutations }
+
        }
+
    }
+

+
    #[quickcheck]
+
    fn prop_invariants(log: Changes<3>) {
+
        let bob = Actor::<MockSigner>::default();
+
        let b0 = bob.thread("The Thread", Timestamp::now());
+
        let t = Thread::new(b0);
+
        let [p1, p2, p3] = log.permutations;
+

+
        let mut t1 = t.clone();
+
        t1.apply(p1);
+

+
        let mut t2 = t.clone();
+
        t2.apply(p2);
+

+
        let mut t3 = t;
+
        t3.apply(p3);
+

+
        assert_eq!(t1, t2);
+
        assert_eq!(t2, t3);
+
    }
+

+
    #[test]
+
    fn test_invariants() {
+
        let alice = Actor::<MockSigner>::default();
+
        let bob = Actor::<MockSigner>::default();
+
        let time = Timestamp::now();
+

+
        let b0 = bob.thread("Dinner Ingredients", time);
+
        let a0 = alice.comment("Ham", time, b0.id());
+
        let a1 = alice.comment("Rye", time, a0.id());
+
        let a2 = alice.comment("Dough", time, a1.id());
+
        let a3 = alice.redact(a1.id(), time);
+
        let a4 = alice.comment("Bread", time, b0.id());
+

+
        let t = Thread::new(b0);
+

+
        assert_order_invariance(&t, [&a0, &a1, &a2, &a3, &a4]);
+
        assert_idempotence(&t, [&a0, &a1, &a2, &a3, &a4]);
+
    }
+

+
    fn assert_order_invariance<'a>(t: &Thread, changes: impl IntoIterator<Item = &'a Change>) {
+
        let changes = changes.into_iter().cloned().collect::<Vec<_>>();
+
        let count = changes.len();
+

+
        let mut actual = t.clone();
+
        let mut expected = t.clone();
+
        expected.clear();
+
        expected.apply(changes.clone());
+

+
        for permutation in changes.into_iter().permutations(count) {
+
            actual.clear();
+
            actual.apply(permutation);
+

+
            assert_eq!(actual, expected);
+
        }
+
    }
+

+
    fn assert_idempotence<'a>(t: &Thread, changes: impl IntoIterator<Item = &'a Change>) {
+
        let changes = changes.into_iter().cloned().collect::<Vec<_>>();
+

+
        let mut actual = t.clone();
+
        let mut expected = t.clone();
+

+
        expected.clear();
+
        expected.apply(changes.clone());
+

+
        actual.clear();
+
        actual.apply(changes.clone());
+
        actual.apply(changes.clone());
+
        actual.apply(changes);
+

+
        assert_eq!(actual, expected);
+
    }
+
}
modified radicle/src/cob/shared.rs
@@ -23,7 +23,7 @@ pub enum ReactionError {
    InvalidReaction,
}

-
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Reaction {
    pub emoji: char,
@@ -278,7 +278,7 @@ impl Comment<Replies> {
    }
}

-
#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Serialize, Deserialize)]
+
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Timestamp {
    seconds: u64,
@@ -303,6 +303,16 @@ impl Timestamp {
    }
}

+
impl std::ops::Add<u64> for Timestamp {
+
    type Output = Self;
+

+
    fn add(self, rhs: u64) -> Self::Output {
+
        Self {
+
            seconds: self.seconds + rhs,
+
        }
+
    }
+
}
+

impl From<Timestamp> for ScalarValue {
    fn from(ts: Timestamp) -> Self {
        ScalarValue::Timestamp(ts.seconds as i64)