Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle src cob stream.rs
pub mod error;

mod iter;
pub use iter::OpsIter;
use iter::Walk;

use std::marker::PhantomData;

use serde::Deserialize;

use crate::git;
use crate::git::Oid;

use super::{ObjectId, Op, TypeName};

/// Helper trait for anything can provide its initial commit. Generally, this is
/// the root of a COB object.
pub trait HasRoot {
    /// Return the root `Oid` of the COB.
    fn root(&self) -> Oid;
}

/// Provide the stream of operations that are related to a given COB.
///
/// The whole history of operations can be retrieved via [`CobStream::all`].
///
/// To constrain the history, use one of [`CobStream::since`],
/// [`CobStream::until`], or [`CobStream::range`].
pub trait CobStream: HasRoot {
    /// Any error that can occur when iterating over the operations.
    type IterError: std::error::Error + Send + Sync + 'static;

    /// The associated action to the COB's [`Op`].
    type Action;

    /// The iterator that walks over the operations.
    type Iter: Iterator<Item = Result<Op<Self::Action>, Self::IterError>>;

    /// Get an iterator of all operations from the inception of the collaborative
    /// object.
    fn all(&self) -> Result<Self::Iter, error::Stream>;

    /// Get an iterator of all operations from the given `oid`, in the
    /// collaborative object's history.
    fn since(&self, oid: Oid) -> Result<Self::Iter, error::Stream>;

    /// Get an iterator of all operations until the given `oid`, in the
    /// collaborative object's history.
    fn until(&self, oid: Oid) -> Result<Self::Iter, error::Stream>;

    /// Get an iterator of all operations `from` the given `Oid`, `until` the
    /// other `Oid`, in the collaborative object's history.
    fn range(&self, from: Oid, until: Oid) -> Result<Self::Iter, error::Stream>;
}

/// The range for iterating over a COB's action history.
///
/// Construct via [`CobRange::new`] to use for constructing a [`Stream`].
#[derive(Clone, Debug)]
pub struct CobRange {
    root: Oid,
    until: iter::Until,
}

impl CobRange {
    /// Construct a `CobRange` for a given COB [`TypeName`] and its
    /// [`ObjectId`] identifier.
    ///
    /// The range will be from the root, given by the [`ObjectId`], to the
    /// reference tips of all remote namespaces.
    pub fn new(typename: &TypeName, object_id: &ObjectId) -> Self {
        let glob = crate::storage::refs::cobs(typename, object_id);
        Self {
            root: **object_id,
            until: iter::Until::Glob(glob),
        }
    }
}

impl HasRoot for CobRange {
    fn root(&self) -> Oid {
        self.root
    }
}

/// A stream over a COB's operations.
///
/// The generic parameter `A` is filled by the COB's corresponding `Action`
/// type.
///
/// The `Stream` implements [`CobStream`], so iterators over the operations can be
/// constructed via the [`CobStream`] methods.
///
/// To construct a `Stream`, use [`Stream::new`].
pub struct Stream<'a, A> {
    repo: &'a git::raw::Repository,
    range: CobRange,
    typename: TypeName,
    marker: PhantomData<A>,
}

impl<'a, A> Stream<'a, A> {
    /// Construct a new stream providing the underlying `repo`, a [`CobRange`],
    /// and the [`TypeName`] of the COB that is being streamed.
    pub fn new(repo: &'a git::raw::Repository, range: CobRange, typename: TypeName) -> Self {
        Self {
            repo,
            range,
            typename,
            marker: PhantomData,
        }
    }
}

impl<A> HasRoot for Stream<'_, A> {
    fn root(&self) -> Oid {
        self.range.root()
    }
}

impl<'a, A> CobStream for Stream<'a, A>
where
    A: for<'de> Deserialize<'de>,
{
    type IterError = error::Ops;
    type Action = A;
    type Iter = OpsIter<'a, Self::Action>;

    fn all(&self) -> Result<Self::Iter, error::Stream> {
        Ok(OpsIter::new(
            Walk::from(self.range.clone())
                .iter(self.repo)
                .map_err(error::Stream::new)?,
            self.typename.clone(),
        ))
    }

    fn since(&self, oid: Oid) -> Result<Self::Iter, error::Stream> {
        Ok(OpsIter::new(
            Walk::from(self.range.clone())
                .since(oid)
                .iter(self.repo)
                .map_err(error::Stream::new)?,
            self.typename.clone(),
        ))
    }

    fn until(&self, oid: Oid) -> Result<Self::Iter, error::Stream> {
        Ok(OpsIter::new(
            Walk::from(self.range.clone())
                .until(oid)
                .iter(self.repo)
                .map_err(error::Stream::new)?,
            self.typename.clone(),
        ))
    }

    fn range(&self, from: Oid, until: Oid) -> Result<Self::Iter, error::Stream> {
        Ok(OpsIter::new(
            Walk::new(from, until.into())
                .iter(self.repo)
                .map_err(error::Stream::new)?,
            self.typename.clone(),
        ))
    }
}

#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod tests {
    use std::{collections::BTreeSet, fmt};

    use json::json;
    use nonempty::NonEmpty;
    use serde_json as json;

    use crate::cob::change::Storage as _;
    use crate::crypto::test::signer::MockSigner;
    use crate::test::arbitrary;
    use crate::test::arbitrary::r#gen;
    use crate::{cob, test};

    use super::*;

    fn typename() -> TypeName {
        "xyz.radicle.test".parse::<TypeName>().unwrap()
    }

    fn gen_ops(repo: &git::raw::Repository, signer: &MockSigner) -> Vec<cob::Entry> {
        // Number of ops
        let n = r#gen::<u8>(1).clamp(1, 10);
        let mut entries = Vec::with_capacity(n.into());

        let mut parent = None;
        for _ in 0..n {
            // Number of actions in this op
            let m = r#gen::<u8>(1).clamp(1, 3);
            let contents = create_contents((0..m).map(|_| arbitrary::alphanumeric(1)));
            let entry = create_entry(repo, signer, contents, parent);
            parent = Some(entry.id);
            entries.push(entry);
        }
        entries
    }

    fn create_contents(values: impl Iterator<Item = String>) -> NonEmpty<Vec<u8>> {
        NonEmpty::collect(values.map(|value| {
            json::to_vec(&json!({
                "test": value,
            }))
            .unwrap()
        }))
        .unwrap()
    }

    fn create_entry(
        repo: &git::raw::Repository,
        signer: &MockSigner,
        contents: NonEmpty<Vec<u8>>,
        parent: Option<Oid>,
    ) -> cob::Entry {
        repo.store(
            None,
            parent.into_iter().collect(),
            signer,
            cob::change::Template {
                type_name: typename(),
                tips: vec![],
                message: "Test Op Stream".to_string(),
                embeds: vec![],
                contents,
            },
        )
        .unwrap()
    }

    /// all === from(root)
    fn prop_all_from<S>(stream: &S)
    where
        S: CobStream,
        S::Action: fmt::Debug + Eq,
    {
        assert_eq!(
            stream
                .all()
                .expect("failed to get 'all' stream")
                .collect::<Result<Vec<_>, _>>()
                .unwrap(),
            stream
                .since(stream.root())
                .expect("failed to get 'from' stream")
                .collect::<Result<Vec<_>, _>>()
                .unwrap()
        )
    }

    /// all === until(tip)
    fn prop_all_until<S>(stream: &S, tip: Oid)
    where
        S: CobStream,
        S::Action: fmt::Debug + Eq,
    {
        assert_eq!(
            stream
                .all()
                .expect("failed to get 'all' stream")
                .collect::<Result<Vec<_>, _>>()
                .unwrap(),
            stream
                .until(tip)
                .expect("failed to get 'until' stream")
                .collect::<Result<Vec<_>, _>>()
                .unwrap()
        )
    }

    /// all === from_until(root, tip)
    fn prop_all_from_until<S>(stream: &S, tip: Oid)
    where
        S: CobStream,
        S::Action: fmt::Debug + Eq,
    {
        let root = stream.root();
        assert_eq!(
            stream
                .all()
                .expect("failed to get 'all' stream")
                .collect::<Result<Vec<_>, _>>()
                .unwrap(),
            stream
                .range(root, tip)
                .expect("failed to get 'from_until' stream")
                .collect::<Result<Vec<_>, _>>()
                .unwrap(),
            "from: {root}, until: {tip}"
        )
    }

    /// from_until(a, b) === from(a).intersect(until(b))
    fn prop_from_until<S>(stream: &S, from: Oid, until: Oid)
    where
        S: CobStream,
        S::Action: fmt::Debug + Clone,
    {
        let from_s = stream
            .since(from)
            .expect("failed to get 'from' stream")
            .map(|op| op.expect("Op failed in stream").id)
            .collect::<BTreeSet<_>>();

        let until_s = stream
            .until(until)
            .expect("failed to get 'until' stream")
            .map(|op| op.expect("Op failed in stream").id)
            .collect::<BTreeSet<_>>();
        let from_until_s = stream
            .range(from, until)
            .expect("failed to get 'from_until' stream")
            .map(|op| op.unwrap().id)
            .collect::<BTreeSet<_>>();
        assert_eq!(
            from_s
                .intersection(&until_s)
                .cloned()
                .collect::<BTreeSet<_>>(),
            from_until_s,
            "\nfrom_until: {from_until_s:?}\nfrom: {from_s:?}\nuntil: {until_s:?}"
        )
    }

    #[test]
    fn test_all_from() {
        let tmp = tempfile::tempdir().unwrap();
        let (repo, _) = test::fixtures::repository(tmp.path());
        let signer = MockSigner::default();
        let ops = gen_ops(&repo, &signer);
        let history = CobRange {
            root: ops.first().unwrap().id,
            until: ops.last().unwrap().id.into(),
        };
        let stream = Stream::<json::Value>::new(&repo, history, typename());
        prop_all_from(&stream)
    }

    #[test]
    fn test_all_until() {
        let tmp = tempfile::tempdir().unwrap();
        let (repo, _) = test::fixtures::repository(tmp.path());
        let signer = MockSigner::default();
        let ops = gen_ops(&repo, &signer);
        let tip = ops.last().unwrap().id;
        let history = CobRange {
            root: ops.first().unwrap().id,
            until: tip.into(),
        };
        let stream = Stream::<json::Value>::new(&repo, history, typename());
        prop_all_until(&stream, tip)
    }

    #[test]
    fn test_all_from_until() {
        let tmp = tempfile::tempdir().unwrap();
        let (repo, _) = test::fixtures::repository(tmp.path());
        let signer = MockSigner::default();
        let ops = gen_ops(&repo, &signer);
        let tip = ops.last().unwrap().id;
        let history = CobRange {
            root: ops.first().unwrap().id,
            until: tip.into(),
        };
        let stream = Stream::<json::Value>::new(&repo, history, typename());
        prop_all_from_until(&stream, tip)
    }

    #[test]
    fn test_from_until() {
        let tmp = tempfile::tempdir().unwrap();
        let (repo, _) = test::fixtures::repository(tmp.path());
        let signer = MockSigner::default();
        let ops = gen_ops(&repo, &signer);
        let history = CobRange {
            root: ops.first().unwrap().id,
            until: ops.last().unwrap().id.into(),
        };
        let n = ops.len() - 1;
        let (x, y) = r#gen::<(usize, usize)>(1);
        let x = x.clamp(0, n);
        let y = y.clamp(0, n);
        let (from, until) = if x <= y {
            (ops[x].id, ops[y].id)
        } else {
            (ops[y].id, ops[x].id)
        };
        let stream = Stream::<json::Value>::new(&repo, history, typename());
        prop_from_until(&stream, from, until)
    }

    #[test]
    fn test_regression_from_until() {
        let tmp = tempfile::tempdir().unwrap();
        let (repo, _) = test::fixtures::repository(tmp.path());
        let signer = MockSigner::default();
        // Set up 3 entries that make up the COB history
        let op1 = create_entry(
            &repo,
            &signer,
            create_contents(std::iter::once("hello".to_string())),
            None,
        );
        let op2 = create_entry(
            &repo,
            &signer,
            create_contents(std::iter::once("radicle".to_string())),
            Some(op1.id),
        );
        let op3 = create_entry(
            &repo,
            &signer,
            create_contents(std::iter::once("world".to_string())),
            Some(op2.id),
        );

        // The history spans from the 1st op to the last
        let history = CobRange {
            root: op1.id,
            until: op3.id.into(),
        };
        let stream = Stream::<json::Value>::new(&repo, history, typename());
        eprintln!("Op 1: {}", op1.id);
        eprintln!("Op 2: {}", op2.id);
        eprintln!("Op 3: {}", op3.id);

        // "since" the root operation should include all operations
        assert_eq!(
            stream
                .since(op1.id)
                .unwrap()
                .map(|op| op.unwrap().id)
                .collect::<BTreeSet<_>>(),
            [op1.id, op2.id, op3.id]
                .into_iter()
                .collect::<BTreeSet<_>>()
        );
        // "until" the second operation should only include up to the second operation
        assert_eq!(
            stream
                .until(op2.id)
                .unwrap()
                .map(|op| op.unwrap().id)
                .collect::<BTreeSet<_>>(),
            [op1.id, op2.id].into_iter().collect::<BTreeSet<_>>()
        );
        prop_from_until(&stream, op1.id, op2.id);
    }
}