Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cob: Update issues and threads
Alexis Sellier committed 3 years ago
commit b82e1864b5e788b0a06c39d1b170bdcda2f1eed3
parent 252b6cd7bc4d3667f336c2c478c319485bd8715e
3 files changed +164 -111
modified radicle/src/cob/issue.rs
@@ -98,6 +98,8 @@ impl Default for Issue {
}

impl store::FromHistory for Issue {
+
    type Action = Action;
+

    fn type_name() -> &'static TypeName {
        &*TYPENAME
    }
@@ -105,10 +107,13 @@ impl store::FromHistory for Issue {
    fn from_history(
        history: &radicle_cob::History,
    ) -> Result<(Self, clock::Lamport), store::Error> {
-
        let mut clock = clock::Lamport::default();
        let obj = history.traverse(Self::default(), |mut acc, entry| {
-
            if let Ok(change) = Change::decode(entry.contents()) {
-
                if let Err(err) = acc.apply(change, &mut clock) {
+
            if let Ok(action) = Action::decode(entry.contents()) {
+
                if let Err(err) = acc.apply(Change {
+
                    action,
+
                    author: *entry.actor(),
+
                    clock: entry.clock().into(),
+
                }) {
                    log::warn!("Error applying change to issue state: {err}");
                    return ControlFlow::Break(acc);
                }
@@ -118,7 +123,7 @@ impl store::FromHistory for Issue {
            ControlFlow::Continue(acc)
        });

-
        Ok((obj, clock))
+
        Ok((obj, history.clock().into()))
    }
}

@@ -150,29 +155,20 @@ impl Issue {
        self.thread.comments().map(|(id, comment)| (id, comment))
    }

-
    pub fn timeline(&self) -> Vec<Action> {
-
        todo!();
-
    }
-

-
    pub fn apply(&mut self, change: Change, clock: &mut clock::Lamport) -> Result<(), Error> {
+
    pub fn apply(&mut self, change: Change) -> Result<(), Error> {
        match change.action {
            Action::Title { title } => {
                self.title.set(title, change.clock);
-
                clock.merge(change.clock);
            }
            Action::Lifecycle { status } => {
                self.status.set(status, change.clock);
-
                clock.merge(change.clock);
            }
            Action::Thread { action } => {
-
                self.thread.apply(
-
                    [crdt::Change {
-
                        action,
-
                        author: change.author,
-
                        clock: change.clock,
-
                    }],
-
                    clock,
-
                );
+
                self.thread.apply([crdt::Change {
+
                    action,
+
                    author: change.author,
+
                    clock: change.clock,
+
                }]);
            }
        }
        Ok(())
@@ -202,13 +198,8 @@ impl<'a, 'g> IssueMut<'a, 'g> {

    /// Lifecycle an issue.
    pub fn lifecycle<G: Signer>(&mut self, status: Status, signer: &G) -> Result<ChangeId, Error> {
-
        let clock = self.clock.tick();
-
        let change = Change {
-
            action: Action::Lifecycle { status },
-
            author: *signer.public_key(),
-
            clock,
-
        };
-
        self.apply("Lifecycle", change, signer)
+
        let action = Action::Lifecycle { status };
+
        self.apply("Lifecycle", action, signer)
    }

    /// Comment on an issue.
@@ -217,18 +208,12 @@ impl<'a, 'g> IssueMut<'a, 'g> {
        body: S,
        signer: &G,
    ) -> Result<CommentId, Error> {
-
        let author = *signer.public_key();
-
        let clock = self.clock.tick();
        let body = body.into();
-
        let change = Change {
-
            action: Action::from(thread::Action::Comment {
-
                body,
-
                reply_to: None,
-
            }),
-
            author,
-
            clock,
-
        };
-
        self.apply("Comment", change, signer)
+
        let action = Action::from(thread::Action::Comment {
+
            body,
+
            reply_to: None,
+
        });
+
        self.apply("Comment", action, signer)
    }

    /// Tag an issue.
@@ -237,17 +222,11 @@ impl<'a, 'g> IssueMut<'a, 'g> {
        tags: impl IntoIterator<Item = Tag>,
        signer: &G,
    ) -> Result<ChangeId, Error> {
-
        let author = *signer.public_key();
-
        let clock = self.clock.tick();
        let tags = tags.into_iter().collect::<Vec<_>>();
-
        let change = Change {
-
            author,
-
            action: Action::Thread {
-
                action: thread::Action::Tag { tags },
-
            },
-
            clock,
+
        let action = Action::Thread {
+
            action: thread::Action::Tag { tags },
        };
-
        self.apply("Tag", change, signer)
+
        self.apply("Tag", action, signer)
    }

    /// Reply to on an issue comment.
@@ -257,21 +236,15 @@ impl<'a, 'g> IssueMut<'a, 'g> {
        body: S,
        signer: &G,
    ) -> Result<ChangeId, Error> {
-
        let author = *signer.public_key();
-
        let clock = self.clock.tick();
        let body = body.into();

        assert!(self.thread.comment(&parent).is_some());

-
        let change = Change {
-
            action: Action::from(thread::Action::Comment {
-
                body,
-
                reply_to: Some(parent),
-
            }),
-
            author,
-
            clock,
-
        };
-
        self.apply("Reply", change, signer)
+
        let action = Action::from(thread::Action::Comment {
+
            body,
+
            reply_to: Some(parent),
+
        });
+
        self.apply("Reply", action, signer)
    }

    /// React to an issue comment.
@@ -281,37 +254,37 @@ impl<'a, 'g> IssueMut<'a, 'g> {
        reaction: Reaction,
        signer: &G,
    ) -> Result<ChangeId, Error> {
-
        let author = *signer.public_key();
-
        let clock = self.clock.tick();
-
        let change = Change {
-
            action: Action::Thread {
-
                action: thread::Action::React {
-
                    to,
-
                    reaction,
-
                    active: true,
-
                },
+
        let action = Action::Thread {
+
            action: thread::Action::React {
+
                to,
+
                reaction,
+
                active: true,
            },
-
            author,
-
            clock,
        };
-
        self.apply("React", change, signer)
+
        self.apply("React", action, signer)
    }

    /// Apply a change to the issue.
    pub fn apply<G: Signer>(
        &mut self,
        msg: &'static str,
-
        change: Change,
+
        action: Action,
        signer: &G,
    ) -> Result<ChangeId, Error> {
-
        let id = change.id();
+
        let cob = self
+
            .store
+
            .update(self.id, msg, action.clone(), signer)
+
            .map_err(Error::Store)?;
+
        let clock = cob.history().clock();

-
        self.issue.apply(change.clone(), &mut self.clock)?;
-
        self.store
-
            .update(self.id, msg, change, signer)
-
            .map_err(|e| Error::Store(store::Error::from(e)))?;
+
        let change = Change {
+
            author: *signer.public_key(),
+
            action,
+
            clock: clock.into(),
+
        };
+
        self.issue.apply(change)?;

-
        Ok(id)
+
        Ok((clock.into(), *signer.public_key()))
    }
}

@@ -376,12 +349,8 @@ impl<'a> Issues<'a> {
    ) -> Result<IssueMut<'a, 'g>, Error> {
        let title = title.into();
        let description = description.into();
-
        let change = Change {
-
            author: self.author().id,
-
            action: Action::Title { title },
-
            clock: clock::Lamport::default(),
-
        };
-
        let (id, issue, clock) = self.raw.create("Create issue", change, signer)?;
+
        let action = Action::Title { title };
+
        let (id, issue, clock) = self.raw.create("Create issue", action, signer)?;
        let mut issue = IssueMut {
            id,
            clock,
@@ -409,6 +378,12 @@ pub enum Action {
    Thread { action: thread::Action },
}

+
impl Action {
+
    pub fn decode(bytes: &[u8]) -> Result<Self, serde_json::Error> {
+
        serde_json::from_slice(bytes)
+
    }
+
}
+

impl From<thread::Action> for Action {
    fn from(action: thread::Action) -> Self {
        Self::Thread { action }
@@ -417,6 +392,8 @@ impl From<thread::Action> for Action {

#[cfg(test)]
mod test {
+
    use pretty_assertions::assert_eq;
+

    use super::*;
    use crate::cob::Reaction;
    use crate::test;
@@ -443,7 +420,7 @@ mod test {
        let (id, created) = (created.id, created.issue);
        let issue = issues.get(&id).unwrap().unwrap();

-
        assert_eq!(issue, created);
+
        assert_eq!(created, issue);
        assert_eq!(issue.title(), "My first issue");
        assert_eq!(issue.author(), Some(issues.author()));
        assert_eq!(issue.description(), Some("Blah blah blah."));
modified radicle/src/cob/store.rs
@@ -2,7 +2,7 @@
#![allow(clippy::large_enum_variant)]
use std::marker::PhantomData;

-
use radicle_crdt::{Change, Lamport};
+
use radicle_crdt::Lamport;
use serde::Serialize;

use crate::cob;
@@ -18,6 +18,8 @@ use crate::storage::git as storage;
/// A type that can be materialized from an event history.
/// All collaborative objects implement this trait.
pub trait FromHistory: Sized {
+
    type Action;
+

    /// The object type name.
    fn type_name() -> &'static TypeName;
    /// Create an object from a history.
@@ -35,6 +37,8 @@ pub enum Error {
    Retrieve(#[from] cob::error::Retrieve),
    #[error(transparent)]
    Identity(#[from] project::IdentityError),
+
    #[error(transparent)]
+
    Serialize(#[from] serde_json::Error),
    #[error("object `{1}` of type `{0}` was not found")]
    NotFound(TypeName, ObjectId),
}
@@ -77,15 +81,20 @@ impl<'a, T> Store<'a, T> {
    }
}

-
impl<'a, T: FromHistory> Store<'a, T> {
+
impl<'a, T: FromHistory> Store<'a, T>
+
where
+
    T::Action: Serialize,
+
{
    /// Update an object.
-
    pub fn update<A: Serialize, G: Signer>(
+
    pub fn update<G: Signer>(
        &self,
        object_id: ObjectId,
        message: &'static str,
-
        change: Change<A>,
+
        action: T::Action,
        signer: &G,
-
    ) -> Result<CollaborativeObject, cob::error::Update> {
+
    ) -> Result<CollaborativeObject, Error> {
+
        let changes = encoding::encode(&action)?;
+

        cob::update(
            self.raw,
            signer,
@@ -96,18 +105,20 @@ impl<'a, T: FromHistory> Store<'a, T> {
                history_type: HistoryType::default(),
                typename: T::type_name().clone(),
                message: message.to_owned(),
-
                changes: change.encode(),
+
                changes,
            },
        )
+
        .map_err(Error::from)
    }

    /// Create an object.
-
    pub fn create<A: Serialize, G: Signer>(
+
    pub fn create<G: Signer>(
        &self,
        message: &'static str,
-
        change: Change<A>,
+
        action: T::Action,
        signer: &G,
    ) -> Result<(ObjectId, T, Lamport), Error> {
+
        let contents = encoding::encode(&action)?;
        let cob = cob::create(
            self.raw,
            signer,
@@ -117,7 +128,7 @@ impl<'a, T: FromHistory> Store<'a, T> {
                history_type: HistoryType::default(),
                typename: T::type_name().clone(),
                message: message.to_owned(),
-
                contents: change.encode(),
+
                contents,
            },
        )?;
        let (object, clock) = T::from_history(cob.history())?;
@@ -160,3 +171,18 @@ impl<'a, T: FromHistory> Store<'a, T> {
        todo!();
    }
}
+

+
mod encoding {
+
    use serde::Serialize;
+

+
    /// Serialize the change into a byte string.
+
    pub fn encode<T: Serialize>(obj: &T) -> Result<Vec<u8>, serde_json::Error> {
+
        let mut buf = Vec::new();
+
        let mut serializer =
+
            serde_json::Serializer::with_formatter(&mut buf, olpc_cjson::CanonicalFormatter::new());
+

+
        obj.serialize(&mut serializer)?;
+

+
        Ok(buf)
+
    }
+
}
modified radicle/src/cob/thread.rs
@@ -82,6 +82,13 @@ pub enum Action {
    },
}

+
impl Action {
+
    /// Deserialize an action from a byte string.
+
    pub fn decode(bytes: &[u8]) -> Result<Self, serde_json::Error> {
+
        serde_json::from_slice(bytes)
+
    }
+
}
+

/// A discussion thread.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Thread {
@@ -94,22 +101,26 @@ pub struct Thread {
}

impl store::FromHistory for Thread {
+
    type Action = Action;
+

    fn type_name() -> &'static radicle_cob::TypeName {
        &*TYPENAME
    }

    fn from_history(history: &History) -> Result<(Self, Lamport), store::Error> {
-
        let mut clock = Lamport::default();
        let obj = history.traverse(Thread::default(), |mut acc, entry| {
-
            if let Ok(change) = Change::decode(entry.contents()) {
-
                acc.apply([change], &mut clock);
+
            if let Ok(action) = Action::decode(entry.contents()) {
+
                acc.apply([Change {
+
                    action,
+
                    author: *entry.actor(),
+
                    clock: entry.clock().into(),
+
                }]);
                ControlFlow::Continue(acc)
            } else {
                ControlFlow::Break(acc)
            }
        });
-

-
        Ok((obj, clock))
+
        Ok((obj, history.clock().into()))
    }
}

@@ -142,6 +153,14 @@ impl Thread {
        }
    }

+
    pub fn first(&self) -> Option<&str> {
+
        self.comments
+
            .values()
+
            .filter_map(|r| r.get())
+
            .map(|c| c.body.as_str())
+
            .next()
+
    }
+

    pub fn replies<'a>(
        &'a self,
        to: &'a CommentId,
@@ -167,11 +186,7 @@ impl Thread {
            .map(|(a, r)| (a, r))
    }

-
    pub fn apply(
-
        &mut self,
-
        changes: impl IntoIterator<Item = Change<Action>>,
-
        clock: &mut Lamport,
-
    ) {
+
    pub fn apply(&mut self, changes: impl IntoIterator<Item = Change<Action>>) {
        // FIXME(cloudhead): Use commit timestamp.
        let timestamp = Timestamp::default();

@@ -238,7 +253,6 @@ impl Thread {
                        });
                }
            }
-
            clock.merge(change.clock);
        }
    }

@@ -441,12 +455,47 @@ mod tests {
    }

    #[test]
+
    fn test_redact_comment() {
+
        let tmp = tempfile::tempdir().unwrap();
+
        let (_, signer, repository) = radicle::test::setup::context(&tmp);
+
        let store =
+
            radicle::cob::store::Store::<Thread>::open(*signer.public_key(), &repository).unwrap();
+
        let mut alice = Actor::new(signer);
+

+
        let a1 = alice.comment("First comment", None);
+
        let a2 = alice.comment("Second comment", None);
+
        let a3 = alice.comment("Third comment", None);
+

+
        let (id, _, _) = store
+
            .create("Thread created", a1.action, &alice.signer)
+
            .unwrap();
+
        let second = store
+
            .update(id, "Thread updated", a2.action, &alice.signer)
+
            .unwrap();
+
        store
+
            .update(id, "Thread updated", a3.action, &alice.signer)
+
            .unwrap();
+

+
        let a4 = alice.redact((second.history().clock().into(), *alice.signer.public_key()));
+
        store
+
            .update(id, "Comment redacted", a4.action, &alice.signer)
+
            .unwrap();
+

+
        let (thread, _) = store.get(&id).unwrap().unwrap();
+
        let (_, comment0) = thread.comments().nth(0).unwrap();
+
        let (_, comment1) = thread.comments().nth(1).unwrap();
+

+
        assert_eq!(thread.comments().count(), 2);
+
        assert_eq!(comment0.body, "First comment");
+
        assert_eq!(comment1.body, "Third comment"); // Second comment was redacted.
+
    }
+

+
    #[test]
    fn test_storage() {
        let tmp = tempfile::tempdir().unwrap();
        let (_, signer, repository) = radicle::test::setup::context(&tmp);
        let store =
            radicle::cob::store::Store::<Thread>::open(*signer.public_key(), &repository).unwrap();
-
        let mut clock = Lamport::default();

        let mut alice = Actor::new(signer);

@@ -454,11 +503,13 @@ mod tests {
        let a2 = alice.comment("Second comment", None);

        let mut expected = Thread::default();
-
        expected.apply([a1.clone(), a2.clone()], &mut clock);
+
        expected.apply([a1.clone(), a2.clone()]);

-
        let (id, _, _) = store.create("Thread created", a1, &alice.signer).unwrap();
+
        let (id, _, _) = store
+
            .create("Thread created", a1.action, &alice.signer)
+
            .unwrap();
        store
-
            .update(id, "Thread updated", a2, &alice.signer)
+
            .update(id, "Thread updated", a2.action, &alice.signer)
            .unwrap();

        let (actual, _) = store.get(&id).unwrap().unwrap();
@@ -540,16 +591,15 @@ mod tests {
    fn prop_invariants(log: Changes<3>) {
        let t = Thread::default();
        let [p1, p2, p3] = log.permutations;
-
        let mut clock = Lamport::default();

        let mut t1 = t.clone();
-
        t1.apply(p1, &mut clock);
+
        t1.apply(p1);

        let mut t2 = t.clone();
-
        t2.apply(p2, &mut clock);
+
        t2.apply(p2);

        let mut t3 = t;
-
        t3.apply(p3, &mut clock);
+
        t3.apply(p3);

        assert_eq!(t1, t2);
        assert_eq!(t2, t3);