Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
radicle: introduce COB stream
Merged fintohaps opened 1 year ago

Introduce a new stream module to radicle::cob. The purpose of this module is to provide an API for iterating over a COB’s actions, given a range of commits.

The CobStream trait provides the generic API for implementing and using, while Stream provides a concrete implementation using the git2 crate.

20 files changed +1066 -59 8953ec4c 2a0f6fd3
modified CHANGELOG.md
@@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## New Features

+
- `rad cob log` now supports the arguments `--from` and `--to` which can be used
+
  to range over particular operations on a COB.
+

## Fixed Bugs

## 1.3.0 - 2025-08-12
modified crates/radicle-cli/examples/rad-cob-log.md
@@ -68,13 +68,13 @@ author z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
date     Thu, 15 Dec 2022 17:28:04 +0000

    {
-
      "body": "Flux capacitor power requirements exceed current supply",
-
      "type": "comment"
+
      "type": "comment",
+
      "body": "Flux capacitor power requirements exceed current supply"
    }

    {
-
      "title": "flux capacitor underpowered",
-
      "type": "edit"
+
      "type": "edit",
+
      "title": "flux capacitor underpowered"
    }

```
@@ -91,16 +91,16 @@ author z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
date     Thu, 15 Dec 2022 17:28:04 +0000

    {
-
      "base": "f2de534b5e81d7c6e2dcaf58c3dd91573c0a0354",
+
      "type": "revision",
      "description": "See details.",
-
      "oid": "3e674d1a1df90807e934f9ae5da2591dd6848a33",
-
      "type": "revision"
+
      "base": "f2de534b5e81d7c6e2dcaf58c3dd91573c0a0354",
+
      "oid": "3e674d1a1df90807e934f9ae5da2591dd6848a33"
    }

    {
-
      "target": "delegates",
+
      "type": "edit",
      "title": "Define power requirements",
-
      "type": "edit"
+
      "target": "delegates"
    }

```
@@ -117,10 +117,10 @@ author z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
date     Thu, 15 Dec 2022 17:28:04 +0000

    {
+
      "type": "label",
      "labels": [
        "bug"
-
      ],
-
      "type": "label"
+
      ]
    }

commit   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
@@ -129,13 +129,13 @@ author z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
date     Thu, 15 Dec 2022 17:28:04 +0000

    {
-
      "body": "Flux capacitor power requirements exceed current supply",
-
      "type": "comment"
+
      "type": "comment",
+
      "body": "Flux capacitor power requirements exceed current supply"
    }

    {
-
      "title": "flux capacitor underpowered",
-
      "type": "edit"
+
      "type": "edit",
+
      "title": "flux capacitor underpowered"
    }

```
added crates/radicle-cli/examples/rad-cob-operations.md
@@ -0,0 +1,241 @@
+
The `rad cob` command provides a subcommand, `log`, for inspecting the
+
operations of a COB.
+

+
To demonstrate, we will first create an issue and interact with it:
+

+
```
+
$ rad issue open --title "flux capacitor underpowered" --description "Flux capacitor power requirements exceed current supply" --no-announce
+
╭─────────────────────────────────────────────────────────╮
+
│ Title   flux capacitor underpowered                     │
+
│ Issue   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe        │
+
│ Author  alice (you)                                     │
+
│ Status  open                                            │
+
│                                                         │
+
│ Flux capacitor power requirements exceed current supply │
+
╰─────────────────────────────────────────────────────────╯
+
$ rad issue react d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --to d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --emoji ✨ --no-announce
+
$ rad issue comment d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --message "Max power!" --no-announce
+
╭─────────────────────────╮
+
│ alice (you) now 3c849c9 │
+
│ Max power!              │
+
╰─────────────────────────╯
+
$ rad issue assign d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --add did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk --no-announce
+
```
+

+
Now, let's see the list of operations using `rad cob log`:
+

+
```
+
$ rad cob log --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --type xyz.radicle.issue --object d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
commit   376ba71113603004eae3c1b125c58cdc41d36b73
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "assign",
+
      "assignees": [
+
        "did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk"
+
      ]
+
    }
+

+
commit   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   256908937f3cda8df522d5a3ba442eb935c3f11b
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Max power!",
+
      "replyTo": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe"
+
    }
+

+
commit   256908937f3cda8df522d5a3ba442eb935c3f11b
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment.react",
+
      "id": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe",
+
      "reaction": "✨",
+
      "active": true
+
    }
+

+
commit   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Flux capacitor power requirements exceed current supply"
+
    }
+

+
    {
+
      "type": "edit",
+
      "title": "flux capacitor underpowered"
+
    }
+

+
```
+

+
We can also limit the range of operations, using the `--from` and `--until`
+
options. We will need some commit revisions to use for those options, so let's
+
look at what those revision are by using `rad cob log`:
+

+
```
+
$ rad cob log --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --type xyz.radicle.issue --object d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
commit   376ba71113603004eae3c1b125c58cdc41d36b73
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "assign",
+
      "assignees": [
+
        "did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk"
+
      ]
+
    }
+

+
commit   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   256908937f3cda8df522d5a3ba442eb935c3f11b
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Max power!",
+
      "replyTo": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe"
+
    }
+

+
commit   256908937f3cda8df522d5a3ba442eb935c3f11b
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment.react",
+
      "id": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe",
+
      "reaction": "✨",
+
      "active": true
+
    }
+

+
commit   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Flux capacitor power requirements exceed current supply"
+
    }
+

+
    {
+
      "type": "edit",
+
      "title": "flux capacitor underpowered"
+
    }
+

+
```
+

+
If we provide only the `--from` option, the operations we get back start from that
+
revision and go until the end:
+

+
```
+
$ rad cob log --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --type xyz.radicle.issue --object d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --from 3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
commit   376ba71113603004eae3c1b125c58cdc41d36b73
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "assign",
+
      "assignees": [
+
        "did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk"
+
      ]
+
    }
+

+
commit   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   256908937f3cda8df522d5a3ba442eb935c3f11b
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Max power!",
+
      "replyTo": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe"
+
    }
+

+
```
+

+
Conversely, if we provide only the `--until` option, the operations we get back
+
start from the beginning and stop at that revision:
+

+
```
+
$ rad cob log --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --type xyz.radicle.issue --object d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --until 256908937f3cda8df522d5a3ba442eb935c3f11b
+
commit   256908937f3cda8df522d5a3ba442eb935c3f11b
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment.react",
+
      "id": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe",
+
      "reaction": "✨",
+
      "active": true
+
    }
+

+
commit   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Flux capacitor power requirements exceed current supply"
+
    }
+

+
    {
+
      "type": "edit",
+
      "title": "flux capacitor underpowered"
+
    }
+

+
```
+

+
Finally, if we provide both, we get back that exact range:
+

+
```
+
$ rad cob log --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --type xyz.radicle.issue --object d87dcfe8c2b3200e78b128d9b959cfdf7063fefe --from 256908937f3cda8df522d5a3ba442eb935c3f11b --until 3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
commit   3c849c9b555b18be9a1f6c71fb254ba000de8cfe
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   256908937f3cda8df522d5a3ba442eb935c3f11b
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment",
+
      "body": "Max power!",
+
      "replyTo": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe"
+
    }
+

+
commit   256908937f3cda8df522d5a3ba442eb935c3f11b
+
resource 0656c217f917c3e06234771e9ecae53aba5e173e
+
parent   d87dcfe8c2b3200e78b128d9b959cfdf7063fefe
+
author   z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
+
date     Thu, 15 Dec 2022 17:28:04 +0000
+

+
    {
+
      "type": "comment.react",
+
      "id": "d87dcfe8c2b3200e78b128d9b959cfdf7063fefe",
+
      "reaction": "✨",
+
      "active": true
+
    }
+

+
```
modified crates/radicle-cli/examples/rad-id.md
@@ -100,12 +100,12 @@ author z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
date     Thu, 15 Dec 2022 17:28:04 +0000

    {
-
      "blob": "053541ba7b90534b35dd8718e0ceaa408979b02b",
+
      "type": "revision",
+
      "title": "Add Bob",
      "description": "Add Bob as a delegate",
+
      "blob": "053541ba7b90534b35dd8718e0ceaa408979b02b",
      "parent": "0656c217f917c3e06234771e9ecae53aba5e173e",
-
      "signature": "z3AyzixN2eWLtRfQWowtBXwWyRH3iJ8oJ25W6KFYFw5ANLntbzfavge15muNU6AVAUkxSxQvgg9yh2gupbUecavQY",
-
      "title": "Add Bob",
-
      "type": "revision"
+
      "signature": "z3AyzixN2eWLtRfQWowtBXwWyRH3iJ8oJ25W6KFYFw5ANLntbzfavge15muNU6AVAUkxSxQvgg9yh2gupbUecavQY"
    }

commit   0656c217f917c3e06234771e9ecae53aba5e173e
@@ -113,11 +113,11 @@ author z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
date     Thu, 15 Dec 2022 17:28:04 +0000

    {
+
      "type": "revision",
+
      "title": "Initial revision",
      "blob": "d96f425412c9f8ad5d9a9a05c9831d0728e2338d",
      "parent": null,
-
      "signature": "z5nGqUvrmfiSyLjNCHWTWYvVMcPUZcvo9TxPKzEKXYBdSgUzbrqf1cYsmpGgbQvYunnsrLSsubEmxZaRdKM4quqQR",
-
      "title": "Initial revision",
-
      "type": "revision"
+
      "signature": "z5nGqUvrmfiSyLjNCHWTWYvVMcPUZcvo9TxPKzEKXYBdSgUzbrqf1cYsmpGgbQvYunnsrLSsubEmxZaRdKM4quqQR"
    }

```
modified crates/radicle-cli/src/commands/cob.rs
@@ -11,10 +11,10 @@ use nonempty::NonEmpty;

use radicle::cob;
use radicle::cob::store::CobAction;
+
use radicle::cob::stream::CobStream as _;
+
use radicle::git;
use radicle::prelude::*;
-
use radicle::storage::git;
-

-
use serde_json::json;
+
use radicle::storage;

use crate::git::Rev;
use crate::terminal as term;
@@ -54,6 +54,10 @@ Create, Update options
Log options

    --format (pretty | json)    Desired output format (default: pretty)
+
    --from <oid>                Git object ID of the commit of the operation to
+
                                start iterating at.
+
    --until <oid>               Git object ID of the commit of the operation to
+
                                stop iterating at.

Show options

@@ -92,6 +96,8 @@ enum Operation {
        type_name: FilteredTypeName,
        oid: Rev,
        format: Format,
+
        from: Option<Rev>,
+
        until: Option<Rev>,
    },
    Migrate,
    Show {
@@ -173,7 +179,10 @@ impl std::fmt::Display for FilteredTypeName {
}

impl Embed {
-
    fn try_into_bytes(self, repo: &git::Repository) -> anyhow::Result<cob::Embed<cob::Uri>> {
+
    fn try_into_bytes(
+
        self,
+
        repo: &storage::git::Repository,
+
    ) -> anyhow::Result<cob::Embed<cob::Uri>> {
        Ok(match self.content {
            EmbedContent::Hash(hash) => cob::Embed {
                name: self.name,
@@ -217,6 +226,8 @@ impl Args for Options {
        let mut message: Option<String> = None;
        let mut embeds: Vec<Embed> = vec![];
        let mut actions: Option<PathBuf> = None;
+
        let mut from: Option<Rev> = None;
+
        let mut until: Option<Rev> = None;

        while let Some(arg) = parser.next()? {
            match (&op, &arg) {
@@ -279,6 +290,14 @@ impl Args for Options {
                (Update | Create, Value(val)) => {
                    actions = Some(PathBuf::from(term::args::string(val)));
                }
+
                (Log, Long("from")) => {
+
                    let v = parser.value()?;
+
                    from = Some(term::args::rev(&v)?);
+
                }
+
                (Log, Long("until")) => {
+
                    let v = parser.value()?;
+
                    until = Some(term::args::rev(&v)?);
+
                }
                _ => return Err(anyhow!(arg.unexpected())),
            }
        }
@@ -317,6 +336,8 @@ impl Args for Options {
                        type_name,
                        oid: oids.pop().ok_or_else(missing_oid)?,
                        format,
+
                        from,
+
                        until,
                    },
                    Migrate => Operation::Migrate,
                    Show => {
@@ -422,15 +443,44 @@ pub fn run(Options { op }: Options, ctx: impl term::Context) -> anyhow::Result<(
            type_name,
            oid,
            format,
+
            from,
+
            until,
        } => {
            let repo = storage.repository(rid)?;
            let oid = oid.resolve(&repo.backend)?;
-
            let ops = cob::store::ops(&oid, type_name.as_ref(), &repo)?;

-
            for op in ops.into_iter().rev() {
-
                match format {
-
                    Format::Json => print_op_json(op)?,
-
                    Format::Pretty => print_op_pretty(op)?,
+
            let from = from.map(|from| from.resolve(&repo.backend)).transpose()?;
+
            let until = until
+
                .map(|until| until.resolve(&repo.backend))
+
                .transpose()?;
+

+
            match type_name {
+
                Issue => operations::<cob::issue::Action>(
+
                    &cob::issue::TYPENAME,
+
                    oid,
+
                    from,
+
                    until,
+
                    &repo,
+
                    format,
+
                )?,
+
                Patch => operations::<cob::patch::Action>(
+
                    &cob::patch::TYPENAME,
+
                    oid,
+
                    from,
+
                    until,
+
                    &repo,
+
                    format,
+
                )?,
+
                Identity => operations::<cob::identity::Action>(
+
                    &cob::identity::TYPENAME,
+
                    oid,
+
                    from,
+
                    until,
+
                    &repo,
+
                    format,
+
                )?,
+
                Other(type_name) => {
+
                    operations::<serde_json::Value>(&type_name, oid, from, until, &repo, format)?
                }
            }
        }
@@ -509,7 +559,7 @@ pub fn run(Options { op }: Options, ctx: impl term::Context) -> anyhow::Result<(

fn show(
    oids: Vec<Rev>,
-
    repo: &git::Repository,
+
    repo: &storage::git::Repository,
    type_name: FilteredTypeName,
    profile: &Profile,
) -> Result<(), anyhow::Error> {
@@ -578,7 +628,10 @@ fn show(
    Ok(())
}

-
fn print_op_pretty(op: cob::Op<Vec<u8>>) -> anyhow::Result<()> {
+
fn print_op_pretty<A>(op: cob::Op<A>) -> anyhow::Result<()>
+
where
+
    A: serde::Serialize,
+
{
    let time = DateTime::<Utc>::from(
        std::time::UNIX_EPOCH + std::time::Duration::from_secs(op.timestamp.as_secs()),
    )
@@ -597,8 +650,7 @@ fn print_op_pretty(op: cob::Op<Vec<u8>>) -> anyhow::Result<()> {
    term::print(format!("date     {time}"));
    term::blank();
    for action in op.actions {
-
        let obj: serde_json::Value = serde_json::from_slice(&action)?;
-
        let val = serde_json::to_string_pretty(&obj)?;
+
        let val = serde_json::to_string_pretty(&action)?;
        for line in val.lines() {
            term::indented(term::format::dim(line));
        }
@@ -607,21 +659,11 @@ fn print_op_pretty(op: cob::Op<Vec<u8>>) -> anyhow::Result<()> {
    Ok(())
}

-
fn print_op_json(op: cob::Op<Vec<u8>>) -> anyhow::Result<()> {
-
    let mut ser = json!(op);
-
    ser.as_object_mut()
-
        .expect("ops must serialize to objects")
-
        .insert(
-
            "actions".to_string(),
-
            json!(op
-
                .actions
-
                .iter()
-
                .map(|action: &Vec<u8>| -> Result<serde_json::Value, _> {
-
                    serde_json::from_slice(action)
-
                })
-
                .collect::<Result<Vec<serde_json::Value>, _>>()?),
-
        );
-
    term::print(ser);
+
fn print_op_json<A>(op: cob::Op<A>) -> anyhow::Result<()>
+
where
+
    A: serde::Serialize,
+
{
+
    term::print(serde_json::to_value(&op)?);
    Ok(())
}

@@ -650,3 +692,38 @@ where
    NonEmpty::from_vec(read_jsonl(reader)?)
        .ok_or_else(|| anyhow!("at least one action is required"))
}
+

+
fn operations<A>(
+
    typename: &cob::TypeName,
+
    oid: cob::ObjectId,
+
    from: Option<git::Oid>,
+
    until: Option<git::Oid>,
+
    repo: &storage::git::Repository,
+
    format: Format,
+
) -> anyhow::Result<()>
+
where
+
    A: serde::Serialize,
+
    A: for<'de> serde::Deserialize<'de>,
+
{
+
    let history = cob::stream::CobRange::new(typename, &oid);
+
    let stream = cob::stream::Stream::<A>::new(&repo.backend, history, typename.clone());
+
    let iter = match (from, until) {
+
        (None, None) => stream.all()?,
+
        (None, Some(until)) => stream.until(until)?,
+
        (Some(from), None) => stream.since(from)?,
+
        (Some(from), Some(until)) => stream.range(from, until)?,
+
    };
+

+
    // Reverse
+
    let iter = iter.collect::<Vec<_>>().into_iter().rev();
+

+
    for op in iter {
+
        let op = op?;
+
        match format {
+
            Format::Json => print_op_json(op)?,
+
            Format::Pretty => print_op_pretty(op)?,
+
        }
+
    }
+

+
    Ok(())
+
}
modified crates/radicle-cli/tests/commands.rs
@@ -165,6 +165,11 @@ fn rad_cob_migrate() {
}

#[test]
+
fn rad_cob_operations() {
+
    Environment::alice(["rad-init", "rad-cob-operations"]);
+
}
+

+
#[test]
#[ignore = "part of many other tests"]
fn rad_init() {
    Environment::alice(["rad-init"]);
modified crates/radicle-cob/src/backend/git/change.rs
@@ -159,6 +159,12 @@ impl change::Storage for git2::Repository {
            .collect::<Vec<_>>())
    }

+
    fn manifest_of(&self, id: &Oid) -> Result<crate::Manifest, Self::LoadError> {
+
        let commit = self.find_commit(**id)?;
+
        let tree = commit.tree()?;
+
        load_manifest(self, &tree)
+
    }
+

    fn load(&self, id: Self::ObjectId) -> Result<Entry, Self::LoadError> {
        let commit = Commit::read(self, id.into())?;
        let timestamp = git2::Time::from(commit.committer().time).seconds() as u64;
modified crates/radicle-cob/src/change/store.rs
@@ -38,6 +38,9 @@ pub trait Storage {

    /// Returns the parents of the object with the specified ID.
    fn parents_of(&self, id: &Oid) -> Result<Vec<Oid>, Self::LoadError>;
+

+
    /// Load only the manifest of the change entry.
+
    fn manifest_of(&self, id: &Oid) -> Result<Manifest, Self::LoadError>;
}

/// Change template, used to create a new change.
modified crates/radicle-cob/src/test/storage.rs
@@ -101,6 +101,10 @@ impl change::Storage for Storage {
            .map(git_ext::Oid::from)
            .collect::<Vec<_>>())
    }
+

+
    fn manifest_of(&self, id: &git_ext::Oid) -> Result<crate::Manifest, Self::LoadError> {
+
        self.as_raw().manifest_of(id)
+
    }
}

impl object::Storage for Storage {
modified crates/radicle/src/cob.rs
@@ -7,6 +7,7 @@ pub mod issue;
pub mod op;
pub mod patch;
pub mod store;
+
pub mod stream;
pub mod thread;

#[cfg(test)]
modified crates/radicle/src/cob/identity.rs
@@ -12,6 +12,7 @@ use thiserror::Error;
use crate::identity::doc::Doc;
use crate::node::device::Device;
use crate::node::NodeId;
+
use crate::storage;
use crate::{
    cob,
    cob::{
@@ -38,6 +39,15 @@ pub type Op = cob::Op<Action>;
/// Identifier for an identity revision.
pub type RevisionId = EntryId;

+
pub type IdentityStream<'a> = cob::stream::Stream<'a, Action>;
+

+
impl<'a> IdentityStream<'a> {
+
    pub fn init(identity: ObjectId, store: &'a storage::git::Repository) -> Self {
+
        let history = cob::stream::CobRange::new(&TYPENAME, &identity);
+
        Self::new(&store.backend, history, TYPENAME.clone())
+
    }
+
}
+

/// Proposal operation.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
modified crates/radicle/src/cob/issue.rs
@@ -19,6 +19,7 @@ use crate::identity::doc::DocError;
use crate::node::device::Device;
use crate::node::NodeId;
use crate::prelude::{Did, Doc, ReadRepository, RepoId};
+
use crate::storage;
use crate::storage::{HasRepoId, RepositoryError, WriteRepository};

pub use cache::Cache;
@@ -33,6 +34,15 @@ pub static TYPENAME: LazyLock<TypeName> =
/// Identifier for an issue.
pub type IssueId = ObjectId;

+
pub type IssueStream<'a> = cob::stream::Stream<'a, Action>;
+

+
impl<'a> IssueStream<'a> {
+
    pub fn init(issue: IssueId, store: &'a storage::git::Repository) -> Self {
+
        let history = cob::stream::CobRange::new(&TYPENAME, &issue);
+
        Self::new(&store.backend, history, TYPENAME.clone())
+
    }
+
}
+

/// Error updating or creating issues.
#[derive(Error, Debug)]
pub enum Error {
modified crates/radicle/src/cob/op.rs
@@ -3,10 +3,10 @@ use radicle_cob::Manifest;
use serde::Serialize;
use thiserror::Error;

-
use radicle_cob as cob;
use radicle_cob::history::{Entry, EntryId};
use radicle_crypto::PublicKey;

+
use crate::cob;
use crate::cob::Timestamp;
use crate::identity::DocAt;
use crate::storage::ReadRepository;
@@ -24,20 +24,26 @@ pub enum OpEncodingError {
    Git(#[from] git2::Error),
}

+
#[derive(Error, Debug)]
+
#[error("failed to load manifest of '{object}': {err}")]
+
pub struct ManifestError {
+
    object: git::Oid,
+
    #[source]
+
    err: Box<dyn std::error::Error + Send + Sync + 'static>,
+
}
+

/// Error loading an `Op` from storage.
#[derive(Error, Debug)]
pub enum LoadError {
-
    #[error("failed to load Op at '{object}': {err}")]
+
    #[error("failed to load Op at '{object}': {source}")]
    Load {
        object: git::Oid,
-
        #[source]
-
        err: Box<dyn std::error::Error + Send + Sync + 'static>,
+
        source: Box<dyn std::error::Error + Send + Sync + 'static>,
    },
-
    #[error("failed to decode Op at '{object}': {err}")]
+
    #[error("failed to decode Op at '{object}': {source}")]
    Encoding {
        object: git::Oid,
-
        #[source]
-
        err: OpEncodingError,
+
        source: OpEncodingError,
    },
}

@@ -112,6 +118,20 @@ impl<A> Op<A> {
        }
    }

+
    pub fn manifest_of<S>(store: &S, id: &git::Oid) -> Result<Manifest, ManifestError>
+
    where
+
        S: cob::change::Storage<
+
            ObjectId = git::Oid,
+
            Parent = git::Oid,
+
            Signatures = crypto::ssh::ExtendedSignature,
+
        >,
+
    {
+
        store.manifest_of(id).map_err(|err| ManifestError {
+
            object: *id,
+
            err: Box::new(err),
+
        })
+
    }
+

    /// Get the `Op` identified by the `id` in the provided `store`.
    pub fn load<S>(store: &S, id: git::Oid) -> Result<Self, LoadError>
    where
@@ -124,9 +144,12 @@ impl<A> Op<A> {
    {
        let entry = store.load(id).map_err(|err| LoadError::Load {
            object: id,
-
            err: Box::new(err),
+
            source: Box::new(err),
        })?;
-
        Op::try_from(&entry).map_err(|err| LoadError::Encoding { object: id, err })
+
        Op::try_from(&entry).map_err(|err| LoadError::Encoding {
+
            object: id,
+
            source: err,
+
        })
    }
}

modified crates/radicle/src/cob/patch.rs
@@ -46,6 +46,15 @@ pub type Op = cob::Op<Action>;
/// Identifier for a patch.
pub type PatchId = ObjectId;

+
pub type PatchStream<'a> = cob::stream::Stream<'a, Action>;
+

+
impl<'a> PatchStream<'a> {
+
    pub fn init(patch: PatchId, store: &'a storage::git::Repository) -> Self {
+
        let history = cob::stream::CobRange::new(&TYPENAME, &patch);
+
        Self::new(&store.backend, history, TYPENAME.clone())
+
    }
+
}
+

/// Unique identifier for a patch revision.
#[derive(
    Wrapper,
added crates/radicle/src/cob/stream.rs
@@ -0,0 +1,382 @@
+
pub mod error;
+

+
mod iter;
+
pub use iter::OpsIter;
+
use iter::Walk;
+

+
use std::marker::PhantomData;
+

+
use serde::Deserialize;
+

+
use crate::git::Oid;
+

+
use super::{ObjectId, Op, TypeName};
+

+
/// Helper trait for anything can provide its initial commit. Generally, this is
+
/// the root of a COB object.
+
pub trait HasRoot {
+
    /// Return the root `Oid` of the COB.
+
    fn root(&self) -> Oid;
+
}
+

+
/// Provide the stream of operations that are related to a given COB.
+
///
+
/// The whole history of operations can be retrieved via [`CobStream::all`].
+
///
+
/// To constrain the history, use one of [`CobStream::since`],
+
/// [`CobStream::until`], or [`CobStream::range`].
+
pub trait CobStream: HasRoot {
+
    /// Any error that can occur when iterating over the operations.
+
    type IterError: std::error::Error + Send + Sync + 'static;
+

+
    /// The associated action to the COB's [`Op`].
+
    type Action;
+

+
    /// The iterator that walks over the operations.
+
    type Iter: Iterator<Item = Result<Op<Self::Action>, Self::IterError>>;
+

+
    /// Get an iterator of all operations from the inception of the collaborative
+
    /// object.
+
    fn all(&self) -> Result<Self::Iter, error::Stream>;
+

+
    /// Get an iterator of all operations from the given `oid`, in the
+
    /// collaborative object's history.
+
    fn since(&self, oid: Oid) -> Result<Self::Iter, error::Stream>;
+

+
    /// Get an iterator of all operations until the given `oid`, in the
+
    /// collaborative object's history.
+
    fn until(&self, oid: Oid) -> Result<Self::Iter, error::Stream>;
+

+
    /// Get an iterator of all operations `from` the given `Oid`, `until` the
+
    /// other `Oid`, in the collaborative object's history.
+
    fn range(&self, from: Oid, until: Oid) -> Result<Self::Iter, error::Stream>;
+
}
+

+
/// The range for iterating over a COB's action history.
+
///
+
/// Construct via [`CobRange::new`] to use for constructing a [`Stream`].
+
#[derive(Clone, Debug)]
+
pub struct CobRange {
+
    root: Oid,
+
    until: iter::Until,
+
}
+

+
impl CobRange {
+
    /// Construct a `CobRange` for a given COB [`TypeName`] and its
+
    /// [`ObjectId`] identifier.
+
    ///
+
    /// The range will be from the root, given by the [`ObjectId`], to the
+
    /// reference tips of all remote namespaces.
+
    pub fn new(typename: &TypeName, object_id: &ObjectId) -> Self {
+
        let glob = crate::storage::refs::cobs(typename, object_id);
+
        Self {
+
            root: **object_id,
+
            until: iter::Until::Glob(glob),
+
        }
+
    }
+
}
+

+
impl HasRoot for CobRange {
+
    fn root(&self) -> Oid {
+
        self.root
+
    }
+
}
+

+
/// A stream over a COB's operations.
+
///
+
/// The generic parameter `A` is filled by the COB's corresponding `Action`
+
/// type.
+
///
+
/// The `Stream` implements [`CobStream`], so iterators over the operations can be
+
/// constructed via the [`CobStream`] methods.
+
///
+
/// To construct a `Stream`, use [`Stream::new`].
+
pub struct Stream<'a, A> {
+
    repo: &'a git2::Repository,
+
    range: CobRange,
+
    typename: TypeName,
+
    marker: PhantomData<A>,
+
}
+

+
impl<'a, A> Stream<'a, A> {
+
    /// Construct a new stream providing the underlying `repo`, a [`CobRange`],
+
    /// and the [`TypeName`] of the COB that is being streamed.
+
    pub fn new(repo: &'a git2::Repository, range: CobRange, typename: TypeName) -> Self {
+
        Self {
+
            repo,
+
            range,
+
            typename,
+
            marker: PhantomData,
+
        }
+
    }
+
}
+

+
impl<A> HasRoot for Stream<'_, A> {
+
    fn root(&self) -> Oid {
+
        self.range.root()
+
    }
+
}
+

+
impl<'a, A> CobStream for Stream<'a, A>
+
where
+
    A: for<'de> Deserialize<'de>,
+
{
+
    type IterError = error::Ops;
+
    type Action = A;
+
    type Iter = OpsIter<'a, Self::Action>;
+

+
    fn all(&self) -> Result<Self::Iter, error::Stream> {
+
        Ok(OpsIter::new(
+
            Walk::from(self.range.clone())
+
                .iter(self.repo)
+
                .map_err(error::Stream::new)?,
+
            self.typename.clone(),
+
        ))
+
    }
+

+
    fn since(&self, oid: Oid) -> Result<Self::Iter, error::Stream> {
+
        Ok(OpsIter::new(
+
            Walk::from(self.range.clone())
+
                .since(oid)
+
                .iter(self.repo)
+
                .map_err(error::Stream::new)?,
+
            self.typename.clone(),
+
        ))
+
    }
+

+
    fn until(&self, oid: Oid) -> Result<Self::Iter, error::Stream> {
+
        Ok(OpsIter::new(
+
            Walk::from(self.range.clone())
+
                .until(oid)
+
                .iter(self.repo)
+
                .map_err(error::Stream::new)?,
+
            self.typename.clone(),
+
        ))
+
    }
+

+
    fn range(&self, from: Oid, until: Oid) -> Result<Self::Iter, error::Stream> {
+
        Ok(OpsIter::new(
+
            Walk::new(from, until.into())
+
                .iter(self.repo)
+
                .map_err(error::Stream::new)?,
+
            self.typename.clone(),
+
        ))
+
    }
+
}
+

+
#[allow(clippy::unwrap_used)]
+
#[cfg(test)]
+
mod tests {
+
    use std::{collections::BTreeSet, fmt};
+

+
    use json::json;
+
    use nonempty::NonEmpty;
+
    use serde_json as json;
+

+
    use crate::cob;
+
    use crate::cob::change::Storage as _;
+
    use crate::crypto::test::signer::MockSigner;
+
    use crate::test::arbitrary;
+
    use crate::test::arbitrary::gen;
+

+
    use super::*;
+

+
    fn typename() -> TypeName {
+
        "xyz.radicle.test".parse::<TypeName>().unwrap()
+
    }
+

+
    fn gen_ops(repo: &git2::Repository, signer: &MockSigner) -> Vec<cob::Entry> {
+
        // Number of ops
+
        let n = gen::<u8>(1).clamp(1, 10);
+
        let mut entries = Vec::with_capacity(n.into());
+

+
        for _ in 0..n {
+
            // Number of actions in this bop
+
            let m = gen::<u8>(1).clamp(1, 3);
+
            let contents = NonEmpty::collect((0..m).map(|_| {
+
                json::to_vec(&json!({
+
                    "test": arbitrary::alphanumeric(1),
+
                }))
+
                .unwrap()
+
            }))
+
            .unwrap();
+
            let entry = repo
+
                .store(
+
                    None,
+
                    vec![],
+
                    signer,
+
                    cob::change::Template {
+
                        type_name: typename(),
+
                        tips: vec![],
+
                        message: "Test Op Stream".to_string(),
+
                        embeds: vec![],
+
                        contents,
+
                    },
+
                )
+
                .unwrap();
+
            entries.push(entry);
+
        }
+
        entries
+
    }
+

+
    /// all === from(root)
+
    fn prop_all_from<S>(stream: &S)
+
    where
+
        S: CobStream,
+
        S::Action: fmt::Debug + Eq,
+
    {
+
        assert_eq!(
+
            stream
+
                .all()
+
                .expect("failed to get 'all' stream")
+
                .collect::<Result<Vec<_>, _>>()
+
                .unwrap(),
+
            stream
+
                .since(stream.root())
+
                .expect("failed to get 'from' stream")
+
                .collect::<Result<Vec<_>, _>>()
+
                .unwrap()
+
        )
+
    }
+

+
    /// all === until(tip)
+
    fn prop_all_until<S>(stream: &S, tip: Oid)
+
    where
+
        S: CobStream,
+
        S::Action: fmt::Debug + Eq,
+
    {
+
        assert_eq!(
+
            stream
+
                .all()
+
                .expect("failed to get 'all' stream")
+
                .collect::<Result<Vec<_>, _>>()
+
                .unwrap(),
+
            stream
+
                .until(tip)
+
                .expect("failed to get 'until' stream")
+
                .collect::<Result<Vec<_>, _>>()
+
                .unwrap()
+
        )
+
    }
+

+
    /// all === from_until(root, tip)
+
    fn prop_all_from_until<S>(stream: &S, tip: Oid)
+
    where
+
        S: CobStream,
+
        S::Action: fmt::Debug + Eq,
+
    {
+
        let root = stream.root();
+
        assert_eq!(
+
            stream
+
                .all()
+
                .expect("failed to get 'all' stream")
+
                .collect::<Result<Vec<_>, _>>()
+
                .unwrap(),
+
            stream
+
                .range(root, tip)
+
                .expect("failed to get 'from_until' stream")
+
                .collect::<Result<Vec<_>, _>>()
+
                .unwrap(),
+
            "from: {root}, until: {tip}"
+
        )
+
    }
+

+
    /// from_until(a, b) === from(a).intersect(until(b))
+
    fn prop_from_until<S>(stream: &S, from: Oid, until: Oid)
+
    where
+
        S: CobStream,
+
        S::Action: fmt::Debug + Clone,
+
    {
+
        let from_s = stream
+
            .since(from)
+
            .expect("failed to get 'from' stream")
+
            .map(|op| op.expect("Op failed in stream").id)
+
            .collect::<BTreeSet<_>>();
+

+
        let until_s = stream
+
            .until(until)
+
            .expect("failed to get 'until' stream")
+
            .map(|op| op.expect("Op failed in stream").id)
+
            .collect::<BTreeSet<_>>();
+
        let from_until_s = stream
+
            .range(from, until)
+
            .expect("failed to get 'from_until' stream")
+
            .map(|op| op.unwrap().id)
+
            .collect::<BTreeSet<_>>();
+
        assert_eq!(
+
            from_s
+
                .intersection(&until_s)
+
                .cloned()
+
                .collect::<BTreeSet<_>>(),
+
            from_until_s,
+
            "from: {from_s:?}\nuntil: {until_s:?}"
+
        )
+
    }
+

+
    #[test]
+
    fn test_all_from() {
+
        let tmp = tempfile::tempdir().unwrap();
+
        let repo = git2::Repository::init(tmp.path()).unwrap();
+
        let signer = MockSigner::default();
+
        let ops = gen_ops(&repo, &signer);
+
        let history = CobRange {
+
            root: ops.first().unwrap().id,
+
            until: ops.last().unwrap().id.into(),
+
        };
+
        let stream = Stream::<json::Value>::new(&repo, history, typename());
+
        prop_all_from(&stream)
+
    }
+

+
    #[test]
+
    fn test_all_until() {
+
        let tmp = tempfile::tempdir().unwrap();
+
        let repo = git2::Repository::init(tmp.path()).unwrap();
+
        let signer = MockSigner::default();
+
        let ops = gen_ops(&repo, &signer);
+
        let tip = ops.last().unwrap().id;
+
        let history = CobRange {
+
            root: ops.first().unwrap().id,
+
            until: tip.into(),
+
        };
+
        let stream = Stream::<json::Value>::new(&repo, history, typename());
+
        prop_all_until(&stream, tip)
+
    }
+

+
    #[test]
+
    fn test_all_from_until() {
+
        let tmp = tempfile::tempdir().unwrap();
+
        let repo = git2::Repository::init(tmp.path()).unwrap();
+
        let signer = MockSigner::default();
+
        let ops = gen_ops(&repo, &signer);
+
        let tip = ops.last().unwrap().id;
+
        let history = CobRange {
+
            root: ops.first().unwrap().id,
+
            until: tip.into(),
+
        };
+
        let stream = Stream::<json::Value>::new(&repo, history, typename());
+
        prop_all_from_until(&stream, tip)
+
    }
+

+
    #[test]
+
    fn test_from_until() {
+
        let tmp = tempfile::tempdir().unwrap();
+
        let repo = git2::Repository::init(tmp.path()).unwrap();
+
        let signer = MockSigner::default();
+
        let ops = gen_ops(&repo, &signer);
+
        let history = CobRange {
+
            root: ops.first().unwrap().id,
+
            until: ops.last().unwrap().id.into(),
+
        };
+
        let n = ops.len() - 1;
+
        let (x, y) = gen::<(usize, usize)>(1);
+
        let x = x.clamp(0, n);
+
        let y = y.clamp(0, n);
+
        let (from, until) = if x <= y {
+
            (ops[x].id, ops[y].id)
+
        } else {
+
            (ops[y].id, ops[x].id)
+
        };
+
        let stream = Stream::<json::Value>::new(&repo, history, typename());
+
        prop_from_until(&stream, from, until)
+
    }
+
}
added crates/radicle/src/cob/stream/error.rs
@@ -0,0 +1,30 @@
+
use thiserror::Error;
+

+
use crate::cob::op;
+

+
#[derive(Debug, Error)]
+
#[error("failed to construct stream: {source}")]
+
pub struct Stream {
+
    source: Box<dyn std::error::Error + Send + Sync + 'static>,
+
}
+

+
impl Stream {
+
    pub fn new<E>(source: E) -> Self
+
    where
+
        E: std::error::Error + Send + Sync + 'static,
+
    {
+
        Stream {
+
            source: source.into(),
+
        }
+
    }
+
}
+

+
#[derive(Debug, Error)]
+
pub enum Ops {
+
    #[error("failed to get a commit while iterating over stream: {source}")]
+
    Commit { source: git2::Error },
+
    #[error("failed to load COB operation: {source}")]
+
    Load { source: op::LoadError },
+
    #[error("failed to load COB manifest: {source}")]
+
    Manifest { source: op::ManifestError },
+
}
added crates/radicle/src/cob/stream/iter.rs
@@ -0,0 +1,172 @@
+
use std::marker::PhantomData;
+

+
use serde::Deserialize;
+

+
use crate::cob::{Op, TypeName};
+
use crate::git::{self, Oid, PatternString};
+

+
use super::error;
+
use super::CobRange;
+

+
/// A `Walk` specifies a range to construct a [`WalkIter`].
+
#[derive(Clone, Debug)]
+
pub(super) struct Walk {
+
    from: Oid,
+
    until: Until,
+
}
+

+
/// Specify the end of a range by either providing an [`Oid`] tip, or a
+
/// reference glob via a [`PatternString`].
+
#[derive(Clone, Debug)]
+
pub enum Until {
+
    Tip(Oid),
+
    Glob(PatternString),
+
}
+

+
impl From<Oid> for Until {
+
    fn from(tip: Oid) -> Self {
+
        Self::Tip(tip)
+
    }
+
}
+

+
impl From<PatternString> for Until {
+
    fn from(glob: PatternString) -> Self {
+
        Self::Glob(glob)
+
    }
+
}
+

+
/// A revwalk over a set of commits, including the commit that is being walked
+
/// from.
+
pub(super) struct WalkIter<'a> {
+
    /// Git repository for looking up the commit object during the revwalk.
+
    repo: &'a git2::Repository,
+
    /// The root commit that is being walked from.
+
    ///
+
    /// N.b. This is required since ranges are non-inclusive in Git, and if the
+
    /// `^` notation is used with a root commit, then it will result in an
+
    /// error.
+
    from: Option<Oid>,
+
    /// The revwalk that is being iterated over.
+
    inner: git2::Revwalk<'a>,
+
}
+

+
impl From<CobRange> for Walk {
+
    fn from(history: CobRange) -> Self {
+
        Self::new(history.root, history.until)
+
    }
+
}
+

+
impl Walk {
+
    /// Construct a new `Walk`, `from` the given commit, `until` the end of a
+
    /// given range.
+
    pub(super) fn new(from: Oid, until: Until) -> Self {
+
        Self { from, until }
+
    }
+

+
    /// Change the `Oid` that the walk starts from.
+
    pub(super) fn since(mut self, from: Oid) -> Self {
+
        self.from = from;
+
        self
+
    }
+

+
    /// Change the `Until` that the walk finishes on.
+
    pub(super) fn until(mut self, until: impl Into<Until>) -> Self {
+
        self.until = until.into();
+
        self
+
    }
+

+
    /// Get the iterator for the walk.
+
    pub(super) fn iter(self, repo: &git2::Repository) -> Result<WalkIter<'_>, git2::Error> {
+
        let mut walk = repo.revwalk()?;
+
        // N.b. ensure that we start from the `self.from` commit.
+
        walk.set_sorting(git2::Sort::TOPOLOGICAL.union(git2::Sort::REVERSE))?;
+
        match self.until {
+
            Until::Tip(tip) => walk.push_range(&format!("{}..{}", self.from, tip))?,
+
            Until::Glob(glob) => {
+
                walk.push(*self.from)?;
+
                walk.push_glob(glob.as_str())?
+
            }
+
        }
+

+
        Ok(WalkIter {
+
            repo,
+
            from: Some(self.from),
+
            inner: walk,
+
        })
+
    }
+
}
+

+
impl<'a> Iterator for WalkIter<'a> {
+
    type Item = Result<git2::Commit<'a>, git2::Error>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        // N.b. ensure that we start using the `from` commit and use the revwalk
+
        // after that.
+
        if let Some(from) = self.from.take() {
+
            return Some(self.repo.find_commit(*from));
+
        }
+
        let oid = self.inner.next()?;
+
        Some(oid.and_then(|oid| self.repo.find_commit(oid)))
+
    }
+
}
+

+
/// Iterate over all operations for a given range of commits.
+
pub struct OpsIter<'a, A> {
+
    /// The [`WalkIter`] provides each commit that it is being walked over for a
+
    /// given range.
+
    walk: WalkIter<'a>,
+
    /// The walk can iterate over other COBs, e.g. an Identity COB, so this is
+
    /// used to filter for the correct type.
+
    typename: TypeName,
+
    /// Marker for the type of action that is associated with the Op
+
    action: PhantomData<A>,
+
}
+

+
impl<A> Iterator for OpsIter<'_, A>
+
where
+
    A: for<'de> Deserialize<'de>,
+
{
+
    type Item = Result<Op<A>, error::Ops>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        let commit = self.walk.next()?;
+
        match commit {
+
            Ok(commit) => {
+
                let entry = git::Oid::from(commit.id());
+
                // N.b. mark this commit as seen, so that it is not walked again
+
                self.walk.inner.hide(commit.id()).ok();
+
                // Skip any Op that do not match the manifest
+
                self.load(entry).transpose().or_else(|| self.next())
+
            }
+
            // Something was wrong with the commit
+
            Err(err) => Some(Err(error::Ops::Commit { source: err })),
+
        }
+
    }
+
}
+

+
impl<'a, A> OpsIter<'a, A> {
+
    pub(super) fn new(walk: WalkIter<'a>, typename: TypeName) -> Self {
+
        Self {
+
            walk,
+
            typename,
+
            action: PhantomData,
+
        }
+
    }
+

+
    /// Load the `Op` for the given `entry`, ensuring that manifest matches with
+
    /// the expected manifest.
+
    fn load(&self, entry: git::Oid) -> Result<Option<Op<A>>, error::Ops>
+
    where
+
        A: for<'de> Deserialize<'de>,
+
    {
+
        let manifest = Op::<A>::manifest_of(self.walk.repo, &entry)
+
            .map_err(|err| error::Ops::Manifest { source: err })?;
+
        if manifest.type_name == self.typename {
+
            let op =
+
                Op::load(self.walk.repo, entry).map_err(|err| error::Ops::Load { source: err })?;
+
            Ok(Some(op))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+
}
modified crates/radicle/src/storage/git/cob.rs
@@ -82,6 +82,10 @@ impl change::Storage for Repository {
    fn parents_of(&self, id: &Oid) -> Result<Vec<Oid>, Self::LoadError> {
        self.backend.parents_of(id)
    }
+

+
    fn manifest_of(&self, id: &Oid) -> Result<cob::Manifest, Self::LoadError> {
+
        self.backend.manifest_of(id)
+
    }
}

impl cob::object::Storage for Repository {
@@ -223,6 +227,10 @@ impl<R: storage::WriteRepository> change::Storage for DraftStore<'_, R> {
    fn parents_of(&self, id: &Oid) -> Result<Vec<Oid>, Self::LoadError> {
        self.repo.raw().parents_of(id)
    }
+

+
    fn manifest_of(&self, id: &Oid) -> Result<cob::Manifest, Self::LoadError> {
+
        self.repo.raw().manifest_of(id)
+
    }
}

impl<R> SignRepository for DraftStore<'_, R>
modified crates/radicle/src/test/arbitrary.rs
@@ -84,6 +84,25 @@ pub fn nonempty_storage(size: usize) -> MockStorage {
    storage
}

+
/// Generate a `String` of length `size`, only containing alphanumeric
+
/// characters, i.e. [A-Za-z0-9]
+
pub fn alphanumeric(size: usize) -> String {
+
    let mut s = String::with_capacity(size);
+
    for _ in 0..size {
+
        let choice = gen::<u8>(size).clamp(0, 3);
+
        let c = match choice {
+
            // Generate A-Z
+
            0 => gen::<u8>(size).clamp(0x41, 0x5A),
+
            // Generate a-z
+
            1 => gen::<u8>(size).clamp(0x61, 0x7A),
+
            // Generate 0-9
+
            _ => gen::<u8>(size).clamp(0x30, 0x39),
+
        };
+
        s.push(char::from(c));
+
    }
+
    s
+
}
+

pub fn gen<T: Arbitrary>(size: usize) -> T {
    let mut gen = qcheck::Gen::new(size);

modified crates/radicle/src/test/storage.rs
@@ -440,4 +440,8 @@ impl radicle_cob::change::Storage for MockRepository {
    fn parents_of(&self, _id: &Oid) -> Result<Vec<Oid>, Self::LoadError> {
        todo!()
    }
+

+
    fn manifest_of(&self, _id: &Oid) -> Result<radicle_cob::Manifest, Self::LoadError> {
+
        todo!()
+
    }
}