Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cobs: allow for batched operations
Fintan Halpenny committed 3 years ago
commit 5f80ee96711ac922cc108a3e851d0fb6712d4d22
parent 7c9ee0f911324306d11c0f25d84ef79dfb730aae
12 files changed +107 -67
modified Cargo.lock
@@ -1659,9 +1659,9 @@ checksum = "9ff7ac1e5ea23db6d61ad103e91864675049644bf47c35912336352fa4e9c109"

[[package]]
name = "nonempty"
-
version = "0.8.0"
+
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "09f1f8e5676e1a1f2ee8b21f38238e1243c827531c9435624c7bfb305102cee4"
+
checksum = "aeaf4ad7403de93e699c191202f017118df734d3850b01e13a3a8b2e6953d3c9"
dependencies = [
 "serde",
]
@@ -2079,7 +2079,7 @@ dependencies = [
 "git2",
 "log",
 "multibase",
-
 "nonempty 0.8.0",
+
 "nonempty 0.8.1",
 "num-traits",
 "olpc-cjson",
 "once_cell",
@@ -2138,6 +2138,7 @@ dependencies = [
 "git-trailers",
 "git2",
 "log",
+
 "nonempty 0.8.1",
 "petgraph",
 "qcheck",
 "qcheck-macros",
@@ -2240,7 +2241,7 @@ dependencies = [
 "log",
 "nakamoto-net",
 "nakamoto-net-poll",
-
 "nonempty 0.8.0",
+
 "nonempty 0.8.1",
 "qcheck",
 "qcheck-macros",
 "radicle",
modified radicle-cob/Cargo.toml
@@ -17,6 +17,7 @@ git-commit = { version = "0.2" }
git-ref-format = { version = "0.1" }
git-trailers = { version = "0.1" }
log = { version = "0.4.17" }
+
nonempty = { version = "0.8.1", features = ["serialize"] }
petgraph = { version = "0.5" }
radicle-git-ext = { version = "0.2" }
serde_json = { version = "1.0" }
modified radicle-cob/src/backend/git/change.rs
@@ -3,11 +3,13 @@
// This file is part of radicle-link, distributed under the GPLv3 with Radicle
// Linking Exception. For full terms see the included LICENSE file.

+
use std::collections::BTreeMap;
use std::convert::TryFrom;

use git_commit::{self as commit, Commit};
use git_ext::Oid;
use git_trailers::OwnedTrailer;
+
use nonempty::NonEmpty;

use crate::history::entry::Timestamp;
use crate::{
@@ -18,7 +20,6 @@ use crate::{
};

const MANIFEST_BLOB_NAME: &str = "manifest";
-
const CHANGE_BLOB_NAME: &str = "change";

pub mod error {
    use std::str::Utf8Error;
@@ -195,14 +196,23 @@ fn load_contents(
    repo: &git2::Repository,
    tree: &git2::Tree,
) -> Result<entry::Contents, error::Load> {
-
    let contents_tree_entry = tree
-
        .get_name(CHANGE_BLOB_NAME)
-
        .ok_or_else(|| error::Load::NoChange(tree.id().into()))?;
-
    let contents_object = contents_tree_entry.to_object(repo)?;
-
    let contents_blob = contents_object
-
        .as_blob()
-
        .ok_or_else(|| error::Load::ChangeNotBlob(tree.id().into()))?;
-
    Ok(contents_blob.content().to_owned())
+
    let ops = tree
+
        .iter()
+
        .filter_map(|entry| {
+
            entry.kind().and_then(|kind| match kind {
+
                git2::ObjectType::Blob => {
+
                    let name = entry.name()?.parse::<i8>().ok()?;
+
                    let content = entry.to_object(repo).and_then(|object| {
+
                        object.peel_to_blob().map(|blob| blob.content().to_owned())
+
                    });
+
                    Some(content.map(|c| (name, c)))
+
                }
+
                _ => None,
+
            })
+
        })
+
        .collect::<Result<BTreeMap<_, _>, _>>()?;
+

+
    NonEmpty::collect(ops.into_values()).ok_or_else(|| error::Load::NoChange(tree.id().into()))
}

fn write_commit<O>(
@@ -263,8 +273,10 @@ fn write_manifest(
        git2::FileMode::Blob.into(),
    )?;

-
    let change_blob = repo.blob(contents.as_ref())?;
-
    tb.insert(CHANGE_BLOB_NAME, change_blob, git2::FileMode::Blob.into())?;
+
    for (ix, op) in contents.iter().enumerate() {
+
        let change_blob = repo.blob(op.as_ref())?;
+
        tb.insert(&ix.to_string(), change_blob, git2::FileMode::Blob.into())?;
+
    }

    tb.write()
}
modified radicle-cob/src/history/entry.rs
@@ -4,14 +4,14 @@
// Linking Exception. For full terms see the included LICENSE file.

use git_ext::Oid;
-

+
use nonempty::NonEmpty;
use radicle_crypto::PublicKey;

use crate::pruning_fold;

/// Entry contents.
/// This is the change payload.
-
pub type Contents = Vec<u8>;
+
pub type Contents = NonEmpty<Vec<u8>>;

/// Logical clock used to track causality in change graph.
pub type Clock = u64;
modified radicle-cob/src/tests.rs
@@ -2,6 +2,7 @@ use std::ops::ControlFlow;

use crypto::test::signer::MockSigner;
use git_ref_format::{refname, Component, RefString};
+
use nonempty::nonempty;
use qcheck::Arbitrary;
use radicle_crypto::Signer;

@@ -29,7 +30,7 @@ fn roundtrip() {
        &proj.identifier(),
        Create {
            history_type: "test".to_string(),
-
            contents: Vec::new(),
+
            contents: nonempty!(Vec::new()),
            typename: typename.clone(),
            message: "creating xyz.rad.issue".to_string(),
        },
@@ -61,7 +62,7 @@ fn list_cobs() {
        &proj.identifier(),
        Create {
            history_type: "test".to_string(),
-
            contents: b"issue 1".to_vec(),
+
            contents: nonempty!(b"issue 1".to_vec()),
            typename: typename.clone(),
            message: "creating xyz.rad.issue".to_string(),
        },
@@ -75,7 +76,7 @@ fn list_cobs() {
        &proj.identifier(),
        Create {
            history_type: "test".to_string(),
-
            contents: b"issue 2".to_vec(),
+
            contents: nonempty!(b"issue 2".to_vec()),
            typename: typename.clone(),
            message: "commenting xyz.rad.issue".to_string(),
        },
@@ -109,7 +110,7 @@ fn update_cob() {
        &proj.identifier(),
        Create {
            history_type: "test".to_string(),
-
            contents: Vec::new(),
+
            contents: nonempty!(Vec::new()),
            typename: typename.clone(),
            message: "creating xyz.rad.issue".to_string(),
        },
@@ -126,7 +127,7 @@ fn update_cob() {
        &proj,
        &proj.identifier(),
        Update {
-
            changes: b"issue 1".to_vec(),
+
            changes: nonempty!(b"issue 1".to_vec()),
            history_type: "test".to_string(),
            object_id: *cob.id(),
            typename: typename.clone(),
@@ -166,7 +167,7 @@ fn traverse_cobs() {
        &terry_proj,
        &terry_proj.identifier(),
        Create {
-
            contents: b"issue 1".to_vec(),
+
            contents: nonempty!(b"issue 1".to_vec()),
            history_type: "test".to_string(),
            typename: typename.clone(),
            message: "creating xyz.rad.issue".to_string(),
@@ -188,7 +189,7 @@ fn traverse_cobs() {
        &neil_proj,
        &neil_proj.identifier(),
        Update {
-
            changes: b"issue 2".to_vec(),
+
            changes: nonempty!(b"issue 2".to_vec()),
            history_type: "test".to_string(),
            object_id: *cob.id(),
            typename,
@@ -200,7 +201,7 @@ fn traverse_cobs() {
    // traverse over the history and filter by changes that were only authorized by terry
    let contents = updated.history().traverse(Vec::new(), |mut acc, entry| {
        if entry.actor() == terry_signer.public_key() {
-
            acc.push(entry.contents().to_vec());
+
            acc.push(entry.contents().head.to_vec());
        }
        ControlFlow::Continue(acc)
    });
@@ -209,7 +210,7 @@ fn traverse_cobs() {

    // traverse over the history and filter by changes that were only authorized by neil
    let contents = updated.history().traverse(Vec::new(), |mut acc, entry| {
-
        acc.push(entry.contents().to_vec());
+
        acc.push(entry.contents().head.to_vec());
        ControlFlow::Continue(acc)
    });

modified radicle-node/Cargo.toml
@@ -21,7 +21,7 @@ lexopt = { version = "0.2.1" }
log = { version = "0.4.17", features = ["std"] }
nakamoto-net = { version = "0.3.0" }
nakamoto-net-poll = { version = "0.3.0" }
-
nonempty = { version = "0.8.0", features = ["serialize"] }
+
nonempty = { version = "0.8.1", features = ["serialize"] }
qcheck = { version = "1", default-features = false, optional = true }
sqlite = { version = "0.30.3" }
sqlite3-src = { version = "0.4.0", features = ["bundled"] } # Ensures static linking
modified radicle/Cargo.toml
@@ -21,6 +21,7 @@ git-ref-format = { version = "0", features = ["serde", "macro"] }
multibase = { version = "0.9.1" }
num-traits = { version = "0.2.15", default-features = false, features = ["std"] }
log = { version = "0.4.17", features = ["std"] }
+
nonempty = { version = "0.8.1", features = ["serialize"] }
once_cell = { version = "1.13" }
olpc-cjson = { version = "0.1.1" }
serde = { version = "1", features = ["derive"] }
@@ -28,7 +29,6 @@ serde_json = { version = "1", features = ["preserve_order"] }
siphasher = { version = "0.3.10" }
radicle-git-ext = { version = "0", features = ["serde"] }
sqlite = { version = "0.30.3", optional = true }
-
nonempty = { version = "0.8.0", features = ["serialize"] }
tempfile = { version = "3.3.0" }
thiserror = { version = "1" }
zeroize = { version = "1.5.7" }
modified radicle/src/cob/issue.rs
@@ -15,6 +15,8 @@ use crate::cob::{store, ObjectId, OpId, TypeName};
use crate::crypto::{PublicKey, Signer};
use crate::storage::git as storage;

+
use super::op::Ops;
+

/// Issue operation.
pub type Op = crate::cob::Op<Action>;

@@ -110,8 +112,8 @@ impl store::FromHistory for Issue {
        history: &radicle_cob::History,
    ) -> Result<(Self, clock::Lamport), store::Error> {
        let obj = history.traverse(Self::default(), |mut acc, entry| {
-
            if let Ok(op) = Op::try_from(entry) {
-
                if let Err(err) = acc.apply(op) {
+
            if let Ok(Ops(changes)) = Ops::try_from(entry) {
+
                if let Err(err) = acc.apply(changes) {
                    log::warn!("Error applying op to issue state: {err}");
                    return ControlFlow::Break(acc);
                }
@@ -153,29 +155,31 @@ impl Issue {
        self.thread.comments().map(|(id, comment)| (id, comment))
    }

-
    pub fn apply(&mut self, op: Op) -> Result<(), Error> {
-
        match op.action {
-
            Action::Title { title } => {
-
                self.title.set(title, op.clock);
-
            }
-
            Action::Lifecycle { status } => {
-
                self.status.set(status, op.clock);
-
            }
-
            Action::Tag { add, remove } => {
-
                for tag in add {
-
                    self.tags.insert(tag, op.clock);
+
    pub fn apply(&mut self, changes: impl IntoIterator<Item = Op>) -> Result<(), Error> {
+
        for change in changes {
+
            match change.action {
+
                Action::Title { title } => {
+
                    self.title.set(title, change.clock);
                }
-
                for tag in remove {
-
                    self.tags.remove(tag, op.clock);
+
                Action::Lifecycle { status } => {
+
                    self.status.set(status, change.clock);
+
                }
+
                Action::Tag { add, remove } => {
+
                    for tag in add {
+
                        self.tags.insert(tag, change.clock);
+
                    }
+
                    for tag in remove {
+
                        self.tags.remove(tag, change.clock);
+
                    }
+
                }
+
                Action::Thread { action } => {
+
                    self.thread.apply([cob::Op {
+
                        action,
+
                        author: change.author,
+
                        clock: change.clock,
+
                        timestamp: change.timestamp,
+
                    }]);
                }
-
            }
-
            Action::Thread { action } => {
-
                self.thread.apply([cob::Op {
-
                    action,
-
                    author: op.author,
-
                    clock: op.clock,
-
                    timestamp: op.timestamp,
-
                }]);
            }
        }
        Ok(())
@@ -291,7 +295,7 @@ impl<'a, 'g> IssueMut<'a, 'g> {
            clock,
            timestamp,
        };
-
        self.issue.apply(op)?;
+
        self.issue.apply([op])?;

        Ok((clock, *signer.public_key()))
    }
modified radicle/src/cob/op.rs
@@ -1,5 +1,6 @@
use std::collections::BTreeMap;

+
use nonempty::NonEmpty;
use thiserror::Error;

use radicle_cob::history::EntryWithClock;
@@ -35,18 +36,32 @@ pub struct Op<A> {
    pub timestamp: clock::Physical,
}

-
impl<'a: 'de, 'de, A: serde::Deserialize<'de>> TryFrom<&'a EntryWithClock> for Op<A> {
+
pub struct Ops<A>(pub NonEmpty<Op<A>>);
+

+
impl<'a, A> TryFrom<&'a EntryWithClock> for Ops<A>
+
where
+
    for<'de> A: serde::Deserialize<'de>,
+
{
    type Error = OpDecodeError;

    fn try_from(entry: &'a EntryWithClock) -> Result<Self, Self::Error> {
-
        let action = serde_json::from_slice(entry.contents())?;
-

-
        Ok(Op {
-
            action,
-
            author: *entry.actor(),
-
            clock: entry.clock().into(),
-
            timestamp: entry.timestamp().into(),
-
        })
+
        let mut clock = entry.clock().into();
+
        entry
+
            .contents()
+
            .clone()
+
            .try_map(|op| {
+
                let action = serde_json::from_slice(&op)?;
+
                let op = Op {
+
                    action,
+
                    author: *entry.actor(),
+
                    clock,
+
                    timestamp: entry.timestamp().into(),
+
                };
+
                clock.tick();
+

+
                Ok(op)
+
            })
+
            .map(Self)
    }
}

modified radicle/src/cob/patch.rs
@@ -25,6 +25,8 @@ use crate::storage::git as storage;
/// The logical clock we use to order operations to patches.
pub use clock::Lamport as Clock;

+
use super::op::Ops;
+

/// Type name of a patch.
pub static TYPENAME: Lazy<TypeName> =
    Lazy::new(|| FromStr::from_str("xyz.radicle.patch").expect("type name is valid"));
@@ -327,8 +329,8 @@ impl store::FromHistory for Patch {
        history: &radicle_cob::History,
    ) -> Result<(Self, clock::Lamport), store::Error> {
        let obj = history.traverse(Self::default(), |mut acc, entry| {
-
            if let Ok(op) = Op::try_from(entry) {
-
                if let Err(err) = acc.apply([op]) {
+
            if let Ok(Ops(changes)) = Ops::try_from(entry) {
+
                if let Err(err) = acc.apply(changes) {
                    log::warn!("Error applying op to patch state: {err}");
                    return ControlFlow::Break(acc);
                }
modified radicle/src/cob/store.rs
@@ -2,6 +2,7 @@
#![allow(clippy::large_enum_variant)]
use std::marker::PhantomData;

+
use nonempty::NonEmpty;
use radicle_crdt::Lamport;
use serde::Serialize;

@@ -21,6 +22,7 @@ pub const HISTORY_TYPE: &str = "radicle";
/// A type that can be materialized from an event history.
/// All collaborative objects implement this trait.
pub trait FromHistory: Sized {
+
    // TODO(finto): Action not being used smells fishy to me
    type Action;

    /// The object type name.
@@ -100,7 +102,7 @@ where
        action: T::Action,
        signer: &G,
    ) -> Result<CollaborativeObject, Error> {
-
        let changes = encoding::encode(&action)?;
+
        let changes = NonEmpty::new(encoding::encode(&action)?);

        cob::update(
            self.raw,
@@ -125,7 +127,7 @@ where
        action: T::Action,
        signer: &G,
    ) -> Result<(ObjectId, T, Lamport), Error> {
-
        let contents = encoding::encode(&action)?;
+
        let contents = NonEmpty::new(encoding::encode(&action)?);
        let cob = cob::create(
            self.raw,
            signer,
modified radicle/src/cob/thread.rs
@@ -19,6 +19,8 @@ use crdt::lwwset::LWWSet;
use crdt::redactable::Redactable;
use crdt::Semilattice;

+
use super::op::Ops;
+

/// Type name of a thread.
pub static TYPENAME: Lazy<TypeName> =
    Lazy::new(|| FromStr::from_str("xyz.radicle.thread").expect("type name is valid"));
@@ -97,8 +99,8 @@ impl store::FromHistory for Thread {

    fn from_history(history: &History) -> Result<(Self, Lamport), store::Error> {
        let obj = history.traverse(Thread::default(), |mut acc, entry| {
-
            if let Ok(change) = Op::try_from(entry) {
-
                acc.apply([change]);
+
            if let Ok(Ops(changes)) = Ops::try_from(entry) {
+
                acc.apply(changes);
                ControlFlow::Continue(acc)
            } else {
                ControlFlow::Break(acc)