Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cli: Add new `rad inbox` command
cloudhead committed 2 years ago
commit fe55de181d4320a0cd7a6ebd2820764280ae9adc
parent 98c94de5e3db8acb55433ffd95f137396cde6fe4
21 files changed +1466 -44
added radicle-cli/examples/rad-inbox.md
@@ -0,0 +1,108 @@
+
``` ~alice
+
$ cd heartwood
+
$ rad inbox
+
Your inbox is empty.
+
```
+

+
``` ~bob
+
$ cd heartwood
+
$ rad issue open --title "No license file" --description "..." -q
+
✓ Synced with 1 node(s)
+
$ git commit -m "Change copyright" --allow-empty -q
+
$ git push rad HEAD:bob/copy
+
$ cd ..
+
$ cd radicle-git
+
$ git commit -m "Change copyright" --allow-empty -q
+
$ git push rad -o patch.message="Copyright fixes" HEAD:refs/patches
+
```
+

+
``` ~alice
+
$ rad inbox --sort-by id
+
╭──────────────────────────────────────────────────────────────╮
+
│ heartwood                                                    │
+
├──────────────────────────────────────────────────────────────┤
+
│ 1   ●   issue    No license file    [  ..  ]   opened    now │
+
│ 2   ●   branch   Change copyright   bob/copy   created   now │
+
╰──────────────────────────────────────────────────────────────╯
+
```
+

+
``` ~alice
+
$ rad inbox --all --sort-by id
+
╭──────────────────────────────────────────────────────────────╮
+
│ heartwood                                                    │
+
├──────────────────────────────────────────────────────────────┤
+
│ 1   ●   issue    No license file    [  ..  ]   opened    now │
+
│ 2   ●   branch   Change copyright   bob/copy   created   now │
+
╰──────────────────────────────────────────────────────────────╯
+
╭──────────────────────────────────────────────────────────╮
+
│ radicle-git                                              │
+
├──────────────────────────────────────────────────────────┤
+
│ 3   ●   patch   Copyright fixes   [ ... ]   opened   now │
+
╰──────────────────────────────────────────────────────────╯
+
```
+

+
``` ~alice
+
$ rad inbox show 2
+
commit 141c9073066e3910f1dfe356904a0120542e1cc9
+
Author: radicle <radicle@localhost>
+
Date:   Thu Dec 15 17:28:04 2022 +0000
+

+
    Change copyright
+

+
commit f2de534b5e81d7c6e2dcaf58c3dd91573c0a0354
+
Author: anonymous <anonymous@radicle.xyz>
+
Date:   Mon Jan 1 14:39:16 2018 +0000
+

+
    Second commit
+

+
commit 08c788dd1be6315de09e3fe09b5b1b7a2b8711d9
+
Author: anonymous <anonymous@radicle.xyz>
+
Date:   Mon Jan 1 14:39:16 2018 +0000
+

+
    Initial commit
+
```
+

+
``` ~alice
+
$ rad inbox list --sort-by id
+
╭──────────────────────────────────────────────────────────────╮
+
│ heartwood                                                    │
+
├──────────────────────────────────────────────────────────────┤
+
│ 1   ●   issue    No license file    [ ... ]    opened    now │
+
│ 2       branch   Change copyright   bob/copy   created   now │
+
╰──────────────────────────────────────────────────────────────╯
+
```
+

+
``` ~alice
+
$ rad inbox show 1
+
╭──────────────────────────────────────────────────╮
+
│ Title   No license file                          │
+
│ Issue   [ ...                                  ] │
+
│ Author  bob z6Mkt67…v4N1tRk                      │
+
│ Status  open                                     │
+
│                                                  │
+
│ ...                                              │
+
╰──────────────────────────────────────────────────╯
+
```
+

+
``` ~alice
+
$ rad inbox clear
+
✓ Cleared 2 item(s) from your inbox
+
$ rad inbox
+
Your inbox is empty.
+
$ rad inbox --all
+
╭──────────────────────────────────────────────────────────╮
+
│ radicle-git                                              │
+
├──────────────────────────────────────────────────────────┤
+
│ 3   ●   patch   Copyright fixes   [ ... ]   opened   now │
+
╰──────────────────────────────────────────────────────────╯
+
```
+

+
``` ~alice
+
$ rad inbox clear --all
+
✓ Cleared 1 item(s) from your inbox
+
```
+

+
``` ~alice
+
$ rad inbox clear --all
+
Your inbox is empty.
+
```
modified radicle-cli/examples/workflow/5-patching-maintainer.md
@@ -16,6 +16,13 @@ $ rad remote add z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk --name bob --s
The contributor's changes are now visible to us.

```
+
$ rad inbox --sort-by id
+
╭──────────────────────────────────────────────────────────────────────╮
+
│ heartwood                                                            │
+
├──────────────────────────────────────────────────────────────────────┤
+
│ 1   ●   issue   flux capacitor underpowered   d060989   opened   now │
+
│ 2   ●   patch   Define power requirements     a99d55e   opened   now │
+
╰──────────────────────────────────────────────────────────────────────╯
$ git branch -r
  bob/patches/a99d55e5958a8c52ff7efbc8ff000d9bbdac79c7
  rad/master
modified radicle-cli/src/commands.rs
@@ -20,6 +20,8 @@ pub mod rad_fork;
pub mod rad_help;
#[path = "commands/id.rs"]
pub mod rad_id;
+
#[path = "commands/inbox.rs"]
+
pub mod rad_inbox;
#[path = "commands/init.rs"]
pub mod rad_init;
#[path = "commands/inspect.rs"]
added radicle-cli/src/commands/inbox.rs
@@ -0,0 +1,379 @@
+
use std::ffi::OsString;
+
use std::process;
+

+
use anyhow::anyhow;
+

+
use localtime::LocalTime;
+
use radicle::issue::Issues;
+
use radicle::node::notifications;
+
use radicle::node::notifications::*;
+
use radicle::patch::Patches;
+
use radicle::prelude::{Profile, RepoId};
+
use radicle::storage::RefUpdate;
+
use radicle::storage::{ReadRepository, ReadStorage};
+
use radicle::{cob, Storage};
+

+
use term::Element as _;
+

+
use crate::terminal as term;
+
use crate::terminal::args;
+
use crate::terminal::args::{Args, Error, Help};
+

+
pub const HELP: Help = Help {
+
    name: "inbox",
+
    description: "Manage your Radicle notifications inbox",
+
    version: env!("CARGO_PKG_VERSION"),
+
    usage: r#"
+
Usage
+

+
    rad inbox [<option>...]
+
    rad inbox list [<option>...]
+
    rad inbox clear [<option>...]
+

+
    By default, this command lists all items in your inbox.
+
    If your working directory is a Radicle repository, it only shows item
+
    belonging to this repository, unless `--all` is used.
+

+
Options
+

+
    --all                Operate on all repositories
+
    --repo <rid>         Operate on the given repository (default: rad .)
+
    --sort-by <field>    Sort by `id` or `timestamp` (default: timestamp)
+
    --reverse, -r        Reverse the list
+
    --help               Print help
+
"#,
+
};
+

+
#[derive(Debug, Default, PartialEq, Eq)]
+
enum Operation {
+
    #[default]
+
    List,
+
    Show,
+
    Clear,
+
}
+

+
#[derive(Default, Debug)]
+
enum Mode {
+
    #[default]
+
    Contextual,
+
    All,
+
    ById(Vec<NotificationId>),
+
    ByRepo(RepoId),
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
struct SortBy {
+
    reverse: bool,
+
    field: &'static str,
+
}
+

+
pub struct Options {
+
    op: Operation,
+
    mode: Mode,
+
    sort_by: SortBy,
+
}
+

+
impl Args for Options {
+
    fn from_args(args: Vec<OsString>) -> anyhow::Result<(Self, Vec<OsString>)> {
+
        use lexopt::prelude::*;
+

+
        let mut parser = lexopt::Parser::from_args(args);
+
        let mut op: Option<Operation> = None;
+
        let mut mode = None;
+
        let mut ids = Vec::new();
+
        let mut reverse = None;
+
        let mut field = None;
+

+
        while let Some(arg) = parser.next()? {
+
            match arg {
+
                Long("help") | Short('h') => {
+
                    return Err(Error::Help.into());
+
                }
+
                Long("all") | Short('a') if mode.is_none() => {
+
                    mode = Some(Mode::All);
+
                }
+
                Long("reverse") | Short('r') => {
+
                    reverse = Some(true);
+
                }
+
                Long("sort-by") => {
+
                    let val = parser.value()?;
+

+
                    match term::args::string(&val).as_str() {
+
                        "timestamp" => field = Some("timestamp"),
+
                        "id" => field = Some("rowid"),
+
                        other => {
+
                            return Err(anyhow!(
+
                                "unknown sorting field `{other}`, see `rad inbox --help`"
+
                            ))
+
                        }
+
                    }
+
                }
+
                Long("repo") if mode.is_none() && op.is_some() => {
+
                    let val = parser.value()?;
+
                    let repo = args::rid(&val)?;
+

+
                    mode = Some(Mode::ByRepo(repo));
+
                }
+
                Value(val) if op.is_none() => match val.to_string_lossy().as_ref() {
+
                    "list" => op = Some(Operation::List),
+
                    "show" => op = Some(Operation::Show),
+
                    "clear" => op = Some(Operation::Clear),
+
                    cmd => return Err(anyhow!("unknown command `{cmd}`, see `rad inbox --help`")),
+
                },
+
                Value(val) if op.is_some() && mode.is_none() => {
+
                    let id = term::args::number(&val)? as NotificationId;
+
                    ids.push(id);
+
                }
+
                _ => return Err(anyhow::anyhow!(arg.unexpected())),
+
            }
+
        }
+
        let mode = if ids.is_empty() {
+
            mode.unwrap_or_default()
+
        } else {
+
            Mode::ById(ids)
+
        };
+
        let op = op.unwrap_or_default();
+

+
        let sort_by = if let Some(field) = field {
+
            SortBy {
+
                field,
+
                reverse: reverse.unwrap_or(false),
+
            }
+
        } else {
+
            SortBy {
+
                field: "timestamp",
+
                reverse: true,
+
            }
+
        };
+

+
        Ok((Options { op, mode, sort_by }, vec![]))
+
    }
+
}
+

+
pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
+
    let profile = ctx.profile()?;
+
    let storage = &profile.storage;
+
    let mut notifs = profile.notifications_mut()?;
+
    let Options { op, mode, sort_by } = options;
+

+
    match op {
+
        Operation::List => list(mode, sort_by, &notifs.read_only(), storage),
+
        Operation::Clear => clear(mode, &mut notifs),
+
        Operation::Show => show(mode, &mut notifs, storage, &profile),
+
    }
+
}
+

+
fn list(
+
    mode: Mode,
+
    sort_by: SortBy,
+
    notifs: &notifications::StoreReader,
+
    storage: &Storage,
+
) -> anyhow::Result<()> {
+
    let repos: Vec<term::VStack<'_>> = match mode {
+
        Mode::Contextual => {
+
            if let Ok((_, rid)) = radicle::rad::cwd() {
+
                list_repo(rid, sort_by, notifs, storage)?
+
                    .into_iter()
+
                    .collect()
+
            } else {
+
                list_all(sort_by, notifs, storage)?
+
            }
+
        }
+
        Mode::ByRepo(rid) => list_repo(rid, sort_by, notifs, storage)?
+
            .into_iter()
+
            .collect(),
+
        Mode::All => list_all(sort_by, notifs, storage)?,
+
        Mode::ById(_) => anyhow::bail!("the `list` command does not take IDs"),
+
    };
+

+
    if repos.is_empty() {
+
        term::print(term::format::italic("Your inbox is empty."));
+
    } else {
+
        for repo in repos {
+
            repo.print();
+
        }
+
    }
+
    Ok(())
+
}
+

+
fn list_all<'a>(
+
    sort_by: SortBy,
+
    notifs: &notifications::StoreReader,
+
    storage: &Storage,
+
) -> anyhow::Result<Vec<term::VStack<'a>>> {
+
    let mut repos = Vec::new();
+
    for repo in storage.repositories()? {
+
        let repo = list_repo(repo.rid, sort_by, notifs, storage)?;
+
        repos.extend(repo.into_iter());
+
    }
+
    Ok(repos)
+
}
+

+
fn list_repo<'a, R: ReadStorage>(
+
    rid: RepoId,
+
    sort_by: SortBy,
+
    notifs: &notifications::StoreReader,
+
    storage: &R,
+
) -> anyhow::Result<Option<term::VStack<'a>>>
+
where
+
    <R as ReadStorage>::Repository: cob::Store,
+
{
+
    let mut table = term::Table::new(term::TableOptions {
+
        spacing: 3,
+
        ..term::TableOptions::default()
+
    });
+
    let repo = storage.repository(rid)?;
+
    let doc = repo.identity_doc()?;
+
    let proj = doc.project()?;
+
    let issues = Issues::open(&repo)?;
+
    let patches = Patches::open(&repo)?;
+

+
    let mut notifs = notifs.by_repo(&rid, sort_by.field)?.collect::<Vec<_>>();
+
    if !sort_by.reverse {
+
        // Notifications are returned in descendant order by default.
+
        notifs.reverse();
+
    }
+

+
    for n in notifs {
+
        let n: Notification = n?;
+

+
        let seen = if n.status.is_read() {
+
            term::Label::blank()
+
        } else {
+
            term::format::tertiary(String::from("●")).into()
+
        };
+
        let (category, summary, status, name) = match n.kind {
+
            NotificationKind::Branch { name } => {
+
                let commit = if let Some(head) = n.update.new() {
+
                    repo.commit(head)?.summary().unwrap_or_default().to_owned()
+
                } else {
+
                    String::new()
+
                };
+
                let status = match n.update {
+
                    RefUpdate::Updated { .. } => "updated",
+
                    RefUpdate::Created { .. } => "created",
+
                    RefUpdate::Deleted { .. } => "deleted",
+
                    RefUpdate::Skipped { .. } => "skipped",
+
                };
+
                ("branch".to_string(), commit, status, name.to_string())
+
            }
+
            NotificationKind::Cob { type_name, id } => {
+
                let (category, summary) = if type_name == *cob::issue::TYPENAME {
+
                    let issue = issues.get(&id)?.ok_or(anyhow!("missing"))?;
+
                    (String::from("issue"), issue.title().to_owned())
+
                } else if type_name == *cob::patch::TYPENAME {
+
                    let patch = patches.get(&id)?.ok_or(anyhow!("missing"))?;
+
                    (String::from("patch"), patch.title().to_owned())
+
                } else {
+
                    (type_name.to_string(), "".to_owned())
+
                };
+
                let status = match n.update {
+
                    RefUpdate::Updated { .. } => "updated",
+
                    RefUpdate::Created { .. } => "opened",
+
                    RefUpdate::Deleted { .. } => "deleted",
+
                    RefUpdate::Skipped { .. } => "skipped",
+
                };
+
                (category, summary, status, term::format::cob(&id))
+
            }
+
        };
+
        table.push([
+
            n.id.to_string().into(),
+
            seen,
+
            category.into(),
+
            summary.into(),
+
            name.into(),
+
            status.into(),
+
            term::format::timestamp(n.timestamp).into(),
+
        ]);
+
    }
+

+
    if table.is_empty() {
+
        Ok(None)
+
    } else {
+
        Ok(Some(
+
            term::VStack::default()
+
                .border(Some(term::colors::FAINT))
+
                .child(term::label(proj.name()))
+
                .divider()
+
                .child(table),
+
        ))
+
    }
+
}
+

+
fn clear(mode: Mode, notifs: &mut notifications::StoreWriter) -> anyhow::Result<()> {
+
    let cleared = match mode {
+
        Mode::All => notifs.clear_all()?,
+
        Mode::ById(ids) => notifs.clear(&ids)?,
+
        Mode::ByRepo(rid) => notifs.clear_by_repo(&rid)?,
+
        Mode::Contextual => {
+
            if let Ok((_, rid)) = radicle::rad::cwd() {
+
                notifs.clear_by_repo(&rid)?
+
            } else {
+
                return Err(Error::WithHint {
+
                    err: anyhow!("not a radicle repository"),
+
                    hint: "to clear all repository notifications, use the `--all` flag",
+
                }
+
                .into());
+
            }
+
        }
+
    };
+
    if cleared > 0 {
+
        term::success!("Cleared {cleared} item(s) from your inbox");
+
    } else {
+
        term::print(term::format::italic("Your inbox is empty."));
+
    }
+
    Ok(())
+
}
+

+
fn show(
+
    mode: Mode,
+
    notifs: &mut notifications::StoreWriter,
+
    storage: &Storage,
+
    profile: &Profile,
+
) -> anyhow::Result<()> {
+
    let id = match mode {
+
        Mode::ById(ids) => match ids.as_slice() {
+
            [id] => *id,
+
            [] => anyhow::bail!("a Notification ID must be given"),
+
            _ => anyhow::bail!("too many Notification IDs given"),
+
        },
+
        _ => anyhow::bail!("a Notification ID must be given"),
+
    };
+
    let n = notifs.get(id)?;
+
    let repo = storage.repository(n.repo)?;
+

+
    match n.kind {
+
        NotificationKind::Cob { type_name, id } if type_name == *cob::issue::TYPENAME => {
+
            let issues = Issues::open(&repo)?;
+
            let issue = issues.get(&id)?.unwrap();
+

+
            term::issue::show(&issue, &id, term::issue::Format::default(), profile)?;
+
        }
+
        NotificationKind::Cob { type_name, id } if type_name == *cob::patch::TYPENAME => {
+
            let patches = Patches::open(&repo)?;
+
            let patch = patches.get(&id)?.unwrap();
+

+
            term::patch::show(&patch, &id, false, &repo, None, profile)?;
+
        }
+
        NotificationKind::Branch { .. } => {
+
            let refstr = if let Some(remote) = n.remote {
+
                n.qualified
+
                    .with_namespace(remote.to_component())
+
                    .to_string()
+
            } else {
+
                n.qualified.to_string()
+
            };
+
            process::Command::new("git")
+
                .current_dir(repo.path())
+
                .args(["log", refstr.as_str()])
+
                .spawn()?
+
                .wait()?;
+
        }
+
        _ => {
+
            todo!();
+
        }
+
    }
+
    notifs.set_status(NotificationStatus::ReadAt(LocalTime::now()), &[id])?;
+

+
    Ok(())
+
}
modified radicle-cli/src/main.rs
@@ -169,6 +169,11 @@ fn run_other(exe: &str, args: &[OsString]) -> Result<(), Option<anyhow::Error>>
        "id" => {
            term::run_command_args::<rad_id::Options, _>(rad_id::HELP, rad_id::run, args.to_vec());
        }
+
        "inbox" => term::run_command_args::<rad_inbox::Options, _>(
+
            rad_inbox::HELP,
+
            rad_inbox::run,
+
            args.to_vec(),
+
        ),
        "init" => {
            term::run_command_args::<rad_init::Options, _>(
                rad_init::HELP,
modified radicle-cli/tests/commands.rs
@@ -2134,6 +2134,40 @@ fn rad_watch() {
}

#[test]
+
fn rad_inbox() {
+
    let mut environment = Environment::new();
+
    let mut alice = environment.node(Config::test(Alias::new("alice")));
+
    let bob = environment.node(Config::test(Alias::new("bob")));
+
    let working = environment.tmp().join("working");
+
    let (repo1, _) = fixtures::repository(working.join("alice").join("heartwood"));
+
    let (repo2, _) = fixtures::repository(working.join("alice").join("radicle-git"));
+
    let rid1 = alice.project_from("heartwood", "Radicle Heartwood Protocol & Stack", &repo1);
+
    let rid2 = alice.project_from("radicle-git", "Radicle Git", &repo2);
+

+
    let alice = alice.spawn();
+
    let mut bob = bob.spawn();
+

+
    bob.connect(&alice).converge([&alice]);
+
    bob.clone(rid1, working.join("bob")).unwrap();
+
    bob.clone(rid2, working.join("bob")).unwrap();
+

+
    formula(&environment.tmp(), "examples/rad-inbox.md")
+
        .unwrap()
+
        .home(
+
            "alice",
+
            working.join("alice"),
+
            [("RAD_HOME", alice.home.path().display())],
+
        )
+
        .home(
+
            "bob",
+
            working.join("bob"),
+
            [("RAD_HOME", bob.home.path().display())],
+
        )
+
        .run()
+
        .unwrap();
+
}
+

+
#[test]
fn rad_patch_fetch_2() {
    let mut environment = Environment::new();
    let alice = environment.node(Config::test(Alias::new("alice")));
modified radicle-node/src/runtime.rs
@@ -18,6 +18,7 @@ use radicle::git;
use radicle::node;
use radicle::node::address;
use radicle::node::address::Store as _;
+
use radicle::node::notifications;
use radicle::node::Handle as _;
use radicle::profile::Home;
use radicle::Storage;
@@ -27,8 +28,8 @@ use crate::crypto::Signer;
use crate::node::{routing, NodeId};
use crate::service::message::NodeAnnouncement;
use crate::service::{gossip, policy, Event};
-
use crate::wire::Wire;
-
use crate::wire::{self, Decode};
+
use crate::wire;
+
use crate::wire::{Decode, Wire};
use crate::worker;
use crate::{service, LocalTime};

@@ -47,6 +48,9 @@ pub enum Error {
    /// A policies database error.
    #[error("policies database error: {0}")]
    Policy(#[from] policy::Error),
+
    /// A notifications database error.
+
    #[error("notifications database error: {0}")]
+
    Notifications(#[from] notifications::Error),
    /// A gossip database error.
    #[error("gossip database error: {0}")]
    Gossip(#[from] gossip::Error),
@@ -151,6 +155,7 @@ impl Runtime {
        log::info!(target: "node", "Opening policy database..");
        let policies = home.policies_mut()?;
        let policies = policy::Config::new(policy, scope, policies);
+
        let notifications = home.notifications_mut()?;

        log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
        log::info!(target: "node", "Initializing service ({:?})..", network);
@@ -251,6 +256,7 @@ impl Runtime {
            worker_recv,
            nid,
            handle.clone(),
+
            notifications,
            worker::Config {
                capacity: 8,
                timeout: time::Duration::from_secs(9),
modified radicle-node/src/worker.rs
@@ -11,6 +11,7 @@ use std::{io, time};
use crossbeam_channel as chan;

use radicle::identity::RepoId;
+
use radicle::node::notifications;
use radicle::prelude::NodeId;
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
@@ -177,6 +178,7 @@ struct Worker {
    tasks: chan::Receiver<Task>,
    handle: Handle,
    policies: policy::Config<policy::store::Read>,
+
    notifications: notifications::StoreWriter,
}

impl Worker {
@@ -197,7 +199,7 @@ impl Worker {
        } = task;
        let remote = fetch.remote();
        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
-
        let result = self._process(fetch, stream, channels);
+
        let result = self._process(fetch, stream, channels, self.notifications.clone());

        log::trace!(target: "worker", "Sending response back to service..");

@@ -219,6 +221,7 @@ impl Worker {
        fetch: FetchRequest,
        stream: StreamId,
        mut channels: channels::ChannelsFlush,
+
        notifs: notifications::StoreWriter,
    ) -> FetchResult {
        match fetch {
            FetchRequest::Initiator {
@@ -229,7 +232,7 @@ impl Worker {
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
-
                let result = self.fetch(rid, remote, refs_at, channels);
+
                let result = self.fetch(rid, remote, refs_at, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
@@ -278,6 +281,7 @@ impl Worker {
        remote: NodeId,
        refs_at: Option<Vec<RefsAt>>,
        channels: channels::ChannelsFlush,
+
        notifs: notifications::StoreWriter,
    ) -> Result<fetch::FetchResult, FetchError> {
        let FetchConfig {
            limit,
@@ -289,7 +293,15 @@ impl Worker {
        let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
        let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;

-
        let handle = fetch::Handle::new(rid, *local, &self.storage, allowed, blocked, channels)?;
+
        let handle = fetch::Handle::new(
+
            rid,
+
            *local,
+
            &self.storage,
+
            allowed,
+
            blocked,
+
            channels,
+
            notifs,
+
        )?;
        let result = handle.fetch(rid, &self.storage, *limit, remote, refs_at)?;

        if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
@@ -313,6 +325,7 @@ impl Pool {
        tasks: chan::Receiver<Task>,
        nid: NodeId,
        handle: Handle,
+
        notifications: notifications::StoreWriter,
        config: Config,
    ) -> Result<Self, policy::Error> {
        let mut pool = Vec::with_capacity(config.capacity);
@@ -329,6 +342,7 @@ impl Pool {
                storage: config.storage.clone(),
                fetch_config: config.fetch.clone(),
                policies,
+
                notifications: notifications.clone(),
            };
            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());

modified radicle-node/src/worker/fetch.rs
@@ -1,12 +1,15 @@
pub mod error;

use std::collections::HashSet;
+
use std::str::FromStr;
+

+
use localtime::LocalTime;

use radicle::crypto::PublicKey;
use radicle::prelude::RepoId;
use radicle::storage::refs::RefsAt;
-
use radicle::storage::{ReadStorage as _, RefUpdate, WriteRepository as _};
-
use radicle::Storage;
+
use radicle::storage::{ReadStorage as _, RefUpdate, RemoteRepository, WriteRepository as _};
+
use radicle::{git, node, Storage};
use radicle_fetch::{Allowed, BlockList, FetchLimit};

use super::channels::ChannelsFlush;
@@ -26,6 +29,7 @@ pub enum Handle {
    },
    Pull {
        handle: radicle_fetch::Handle<ChannelsFlush>,
+
        notifications: node::notifications::StoreWriter,
    },
}

@@ -37,12 +41,16 @@ impl Handle {
        follow: Allowed,
        blocked: BlockList,
        channels: ChannelsFlush,
+
        notifications: node::notifications::StoreWriter,
    ) -> Result<Self, error::Handle> {
        let exists = storage.contains(&rid)?;
        if exists {
            let repo = storage.repository(rid)?;
            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
-
            Ok(Handle::Pull { handle })
+
            Ok(Handle::Pull {
+
                handle,
+
                notifications,
+
            })
        } else {
            let (repo, tmp) = storage.lock_repository(rid)?;
            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
@@ -58,16 +66,20 @@ impl Handle {
        remote: PublicKey,
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Fetch> {
-
        let result = match self {
+
        let (result, notifs) = match self {
            Self::Clone { mut handle, tmp } => {
                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
                let result = radicle_fetch::clone(&mut handle, limit, remote)?;
                mv(tmp, storage, &rid)?;
-
                result
+
                (result, None)
            }
-
            Self::Pull { mut handle } => {
+
            Self::Pull {
+
                mut handle,
+
                notifications,
+
            } => {
                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
-
                radicle_fetch::pull(&mut handle, limit, remote, refs_at)?
+
                let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
+
                (result, Some(notifications))
            }
        };

@@ -95,6 +107,16 @@ impl Handle {
                repo.set_identity_head()?;
                repo.set_head()?;

+
                // Notifications are only posted for pulls, not clones.
+
                if let Some(mut store) = notifs {
+
                    // Only create notifications for repos that we have
+
                    // contributed to in some way, otherwise our inbox will
+
                    // be flooded by all the repos we are seeding.
+
                    if repo.remote(&storage.info().key).is_ok() {
+
                        notify(&rid, &applied, &mut store)?;
+
                    }
+
                }
+

                Ok(FetchResult {
                    updated: applied.updated,
                    namespaces: remotes.into_iter().collect(),
@@ -132,3 +154,38 @@ fn mv(tmp: tempfile::TempDir, storage: &Storage, rid: &RepoId) -> Result<(), err

    Ok(())
}
+

+
// Post notifications for the given refs.
+
fn notify(
+
    rid: &RepoId,
+
    refs: &radicle_fetch::git::refs::Applied<'static>,
+
    store: &mut node::notifications::StoreWriter,
+
) -> Result<(), error::Fetch> {
+
    let now = LocalTime::now();
+

+
    for update in refs.updated.iter() {
+
        if let Some(r) = update.name().to_namespaced() {
+
            let r = r.strip_namespace();
+
            if r == *git::refs::storage::SIGREFS_BRANCH {
+
                // Don't notify about signed refs.
+
                continue;
+
            }
+
            if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
+
                if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
+
                    // Don't notify about patch branches, since we already get
+
                    // notifications about patch updates.
+
                    continue;
+
                }
+
            }
+
        }
+
        if let RefUpdate::Skipped { .. } = update {
+
            // Don't notify about skipped refs.
+
        } else if let Err(e) = store.insert(&rid, update, now) {
+
            log::error!(
+
                target: "worker",
+
                "Failed to update notification store for {rid}: {e}"
+
            );
+
        }
+
    }
+
    Ok(())
+
}
modified radicle-remote-helper/src/list.rs
@@ -84,7 +84,7 @@ fn patch_refs<R: ReadRepository + cob::Store + 'static>(stored: &R) -> Result<()
        let head = patch.head();

        if patch.is_open() && stored.commit(*head).is_ok() {
-
            println!("{} {}", patch.head(), git::refs::storage::patch(&id));
+
            println!("{} {}", patch.head(), git::refs::patch(&id));
        }
    }
    Ok(())
modified radicle-remote-helper/src/push.rs
@@ -394,7 +394,7 @@ fn patch_open<G: Signer>(
            //
            //  refs/namespaces/<nid>/refs/heads/patches/<patch-id>
            //
-
            let refname = git::refs::storage::patch(&patch).with_namespace(nid.into());
+
            let refname = git::refs::patch(&patch).with_namespace(nid.into());
            let _ = stored.raw().reference(
                refname.as_str(),
                commit.id(),
modified radicle-term/src/table.rs
@@ -184,6 +184,10 @@ impl<const W: usize, T: Cell> Table<W, T> {
        }
    }

+
    pub fn is_empty(&self) -> bool {
+
        !self.rows.iter().any(|r| matches!(r, Row::Data { .. }))
+
    }
+

    fn inner(&self, c: Constraint) -> Size {
        let mut outer = self.outer(c);

modified radicle/src/cob.rs
@@ -13,8 +13,8 @@ pub mod test;
pub use common::*;
pub use op::{ActorId, Op};
pub use radicle_cob::{
-
    change, history::EntryId, object, object::collaboration::error, CollaborativeObject, Contents,
-
    Create, Embed, Entry, Evaluate, History, Manifest, ObjectId, Store, TypeName, Update, Updated,
-
    Version,
+
    change, history::EntryId, object, object::collaboration::error, type_name::TypeNameParse,
+
    CollaborativeObject, Contents, Create, Embed, Entry, Evaluate, History, Manifest, ObjectId,
+
    Store, TypeName, Update, Updated, Version,
};
pub use radicle_cob::{create, get, git, list, remove, update};
modified radicle/src/cob/patch.rs
@@ -315,7 +315,7 @@ impl<'a, R: WriteRepository> Merged<'a, R> {
        signer: &G,
    ) -> Result<(), storage::Error> {
        let nid = signer.public_key();
-
        let stored_ref = git::refs::storage::patch(&self.patch).with_namespace(nid.into());
+
        let stored_ref = git::refs::patch(&self.patch).with_namespace(nid.into());
        let working_ref = git::refs::workdir::patch_upstream(&self.patch);

        working
modified radicle/src/git.rs
@@ -160,6 +160,18 @@ pub mod refs {
        Qualified::from(lit::refs_heads(branch))
    }

+
    /// A patch reference.
+
    ///
+
    /// `refs/heads/patches/<object_id>`
+
    ///
+
    pub fn patch<'a>(object_id: &cob::ObjectId) -> Qualified<'a> {
+
        Qualified::from_components(
+
            name::component!("heads"),
+
            name::component!("patches"),
+
            Some(object_id.into()),
+
        )
+
    }
+

    pub mod storage {
        use format::{
            lit,
@@ -272,18 +284,6 @@ pub mod refs {
                .join(Component::from(object_id))
        }

-
        /// A patch reference.
-
        ///
-
        /// `refs/heads/patches/<object_id>`
-
        ///
-
        pub fn patch<'a>(object_id: &cob::ObjectId) -> Qualified<'a> {
-
            Qualified::from_components(
-
                component!("heads"),
-
                component!("patches"),
-
                Some(object_id.into()),
-
            )
-
        }
-

        /// Draft references.
        ///
        /// These references are not replicated or signed.
@@ -386,18 +386,6 @@ pub mod refs {

        /// A patch head.
        ///
-
        /// `refs/heads/patches/<patch-id>`
-
        ///
-
        pub fn patch<'a>(patch_id: &cob::ObjectId) -> Qualified<'a> {
-
            Qualified::from_components(
-
                component!("heads"),
-
                component!("patches"),
-
                Some(patch_id.into()),
-
            )
-
        }
-

-
        /// A patch head.
-
        ///
        /// `refs/remotes/rad/patches/<patch-id>`
        ///
        pub fn patch_upstream<'a>(patch_id: &cob::ObjectId) -> Qualified<'a> {
modified radicle/src/node.rs
@@ -6,6 +6,7 @@ pub mod address;
pub mod config;
pub mod db;
pub mod events;
+
pub mod notifications;
pub mod policy;
pub mod routing;
pub mod seed;
@@ -54,6 +55,8 @@ pub const PENALTY_THRESHOLD: u8 = 32;
pub const NODE_DB_FILE: &str = "node.db";
/// Filename of policies database under the node directory.
pub const POLICIES_DB_FILE: &str = "policies.db";
+
/// Filename of notifications database under the node directory.
+
pub const NOTIFICATIONS_DB_FILE: &str = "notifications.db";
/// Filename of last node announcement, when running in debug mode.
#[cfg(debug_assertions)]
pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire.debug";
added radicle/src/node/notifications.rs
@@ -0,0 +1,131 @@
+
pub mod store;
+

+
use localtime::LocalTime;
+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::cob::object::ParseObjectId;
+
use crate::cob::{ObjectId, TypeName, TypeNameParse};
+
use crate::git::{BranchName, Qualified};
+
use crate::prelude::RepoId;
+
use crate::storage::{RefUpdate, RemoteId};
+

+
pub use store::{Error, Store};
+
/// Read and write to the store.
+
pub type StoreWriter = Store<store::Write>;
+
/// Write to the store.
+
pub type StoreReader = Store<store::Read>;
+

+
/// Unique identifier for a notification.
+
pub type NotificationId = u32;
+

+
#[derive(Debug, PartialEq, Eq, Clone)]
+
pub enum NotificationStatus {
+
    ReadAt(LocalTime),
+
    Unread,
+
}
+

+
impl NotificationStatus {
+
    pub fn is_read(&self) -> bool {
+
        matches!(self, Self::ReadAt(_))
+
    }
+
}
+

+
/// A notification for an updated ref.
+
#[derive(Debug, PartialEq, Eq, Clone)]
+
pub struct Notification {
+
    /// Unique notification ID.
+
    pub id: NotificationId,
+
    /// Source repository for this notification.
+
    pub repo: RepoId,
+
    /// Remote, if any.
+
    pub remote: Option<RemoteId>,
+
    /// Qualified ref name that was updated.
+
    pub qualified: Qualified<'static>,
+
    /// The underlying ref update.
+
    pub update: RefUpdate,
+
    /// Notification kind.
+
    pub kind: NotificationKind,
+
    /// Read status.
+
    pub status: NotificationStatus,
+
    /// Timestamp of the update.
+
    pub timestamp: LocalTime,
+
}
+

+
/// Type of notification.
+
#[derive(Debug, PartialEq, Eq, Clone)]
+
pub enum NotificationKind {
+
    /// A COB changed.
+
    Cob { type_name: TypeName, id: ObjectId },
+
    /// A source branch changed.
+
    Branch { name: BranchName },
+
}
+

+
#[derive(Error, Debug)]
+
pub enum NotificationKindError {
+
    /// Invalid COB type name.
+
    #[error("invalid type name: {0}")]
+
    TypeName(#[from] TypeNameParse),
+
    /// Invalid COB object id.
+
    #[error("invalid object id: {0}")]
+
    ObjectId(#[from] ParseObjectId),
+
    /// Invalid Git ref format.
+
    #[error("invalid ref format: {0}")]
+
    RefFormat(#[from] radicle_git_ext::ref_format::Error),
+
    /// Unknown notification kind.
+
    #[error("unknown notification kind {0:?}")]
+
    Unknown(Qualified<'static>),
+
}
+

+
impl<'a> TryFrom<Qualified<'a>> for NotificationKind {
+
    type Error = NotificationKindError;
+

+
    fn try_from(value: Qualified) -> Result<Self, Self::Error> {
+
        let kind = match value.non_empty_iter() {
+
            ("refs", "heads", head, rest) => NotificationKind::Branch {
+
                name: [head]
+
                    .into_iter()
+
                    .chain(rest)
+
                    .collect::<Vec<_>>()
+
                    .join("/")
+
                    .try_into()?,
+
            },
+
            ("refs", "cobs", type_name, id) => NotificationKind::Cob {
+
                type_name: type_name.parse()?,
+
                id: id.collect::<String>().parse()?,
+
            },
+
            _ => {
+
                return Err(NotificationKindError::Unknown(value.to_owned()));
+
            }
+
        };
+
        Ok(kind)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for NotificationStatus {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::Null => Ok(NotificationStatus::Unread),
+
            sql::Value::Integer(i) => Ok(NotificationStatus::ReadAt(LocalTime::from_millis(
+
                *i as u128,
+
            ))),
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for notification status".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for &NotificationStatus {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        match self {
+
            NotificationStatus::Unread => sql::Value::Null.bind(stmt, i),
+
            NotificationStatus::ReadAt(t) => {
+
                sql::Value::Integer(t.as_millis() as i64).bind(stmt, i)
+
            }
+
        }
+
    }
+
}
added radicle/src/node/notifications/schema.sql
@@ -0,0 +1,38 @@
+
-- Repository updates.
+
create table if not exists "repository-notifications" (
+
  -- Repository ID.
+
  "repo"               text      not null,
+
  -- Git reference name related to this update.
+
  "ref"                text      not null,
+
  -- Notification read status. Null if unread, otherwise the time it was read.
+
  "status"             integer   default null,
+
  -- Old head of the branch before update (OID or `null`).
+
  "old"                text,
+
  -- New head of the branch after update (OID or `null`).
+
  "new"                text,
+
  -- Update commit timestamp.
+
  "timestamp"          integer   not null,
+
  -- We only allow one notification per ref in a given repo. Newer
+
  -- notifications should replace older ones.
+
  unique ("repo", "ref")
+
) strict;
+

+
-- What updates are we subscribed to.
+
create table if not exists "repository-notification-interests" (
+
  -- Repository ID.
+
  "repo"               text      not null,
+
  -- Git reference glob to set interest on.
+
  -- To set interest on issues for eg., use "refs/cobs/xyz.radicle.issue/*"
+
  -- To set interest on all refs, use "refs/*"
+
  -- This can also be used to set interest on a specific COB or branch.
+
  "glob"               text      not null,
+
  -- Notification interest.
+
  --
+
  -- "all" - get all updates
+
  -- "none" - get no updates
+
  -- "relevant" - get updates if relevant to you
+
  "interest"           text      not null,
+
  --
+
  unique ("repo", "glob", "interest")
+
  --
+
) strict;
added radicle/src/node/notifications/store.rs
@@ -0,0 +1,633 @@
+
#![allow(clippy::type_complexity)]
+
use std::marker::PhantomData;
+
use std::num::TryFromIntError;
+
use std::path::Path;
+
use std::sync::Arc;
+
use std::{fmt, io, str::FromStr, time};
+

+
use localtime::LocalTime;
+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::git;
+
use crate::git::{Oid, RefError, RefString};
+
use crate::prelude::RepoId;
+
use crate::sql::transaction;
+
use crate::storage::RefUpdate;
+

+
use super::{
+
    Notification, NotificationId, NotificationKind, NotificationKindError, NotificationStatus,
+
};
+

+
/// How long to wait for the database lock to be released before failing a read.
+
const DB_READ_TIMEOUT: time::Duration = time::Duration::from_secs(3);
+
/// How long to wait for the database lock to be released before failing a write.
+
const DB_WRITE_TIMEOUT: time::Duration = time::Duration::from_secs(6);
+

+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// I/O error.
+
    #[error("i/o error: {0}")]
+
    Io(#[from] io::Error),
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
    /// Timestamp error.
+
    #[error("invalid timestamp: {0}")]
+
    Timestamp(#[from] TryFromIntError),
+
    /// Invalid Git ref name.
+
    #[error("invalid ref name: {0}")]
+
    RefName(#[from] RefError),
+
    /// Invalid Git ref format.
+
    #[error("invalid ref format: {0}")]
+
    RefFormat(#[from] git_ext::ref_format::Error),
+
    /// Invalid notification kind.
+
    #[error("invalid notification kind: {0}")]
+
    NotificationKind(#[from] NotificationKindError),
+
    /// Not found.
+
    #[error("notification {0} not found")]
+
    NotificationNotFound(NotificationId),
+
    /// Internal unit overflow.
+
    #[error("the unit overflowed")]
+
    UnitOverflow,
+
}
+

+
/// Read-only type witness.
+
#[derive(Clone)]
+
pub struct Read;
+
/// Read-write type witness.
+
#[derive(Clone)]
+
pub struct Write;
+

+
/// Notifications store.
+
#[derive(Clone)]
+
pub struct Store<T> {
+
    db: Arc<sql::ConnectionThreadSafe>,
+
    marker: PhantomData<T>,
+
}
+

+
impl<T> fmt::Debug for Store<T> {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "Store(..)")
+
    }
+
}
+

+
impl Store<Read> {
+
    const SCHEMA: &'static str = include_str!("schema.sql");
+

+
    /// Same as [`Self::open`], but in read-only mode. This is useful to have multiple
+
    /// open databases, as no locking is required.
+
    pub fn reader<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+
        let mut db = sql::Connection::open_thread_safe_with_flags(
+
            path,
+
            sqlite::OpenFlags::new().with_read_only(),
+
        )?;
+
        db.set_busy_timeout(DB_READ_TIMEOUT.as_millis() as usize)?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self {
+
            db: Arc::new(db),
+
            marker: PhantomData,
+
        })
+
    }
+

+
    /// Create a new in-memory address book.
+
    pub fn memory() -> Result<Self, Error> {
+
        let db = sql::Connection::open_thread_safe_with_flags(
+
            ":memory:",
+
            sqlite::OpenFlags::new().with_read_only(),
+
        )?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self {
+
            db: Arc::new(db),
+
            marker: PhantomData,
+
        })
+
    }
+
}
+

+
impl Store<Write> {
+
    const SCHEMA: &'static str = include_str!("schema.sql");
+

+
    /// Open a policy store at the given path. Creates a new store if it
+
    /// doesn't exist.
+
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+
        let mut db = sql::Connection::open_thread_safe(path)?;
+
        db.set_busy_timeout(DB_WRITE_TIMEOUT.as_millis() as usize)?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self {
+
            db: Arc::new(db),
+
            marker: PhantomData,
+
        })
+
    }
+

+
    /// Create a new in-memory address book.
+
    pub fn memory() -> Result<Self, Error> {
+
        let db = sql::Connection::open_thread_safe(":memory:")?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self {
+
            db: Arc::new(db),
+
            marker: PhantomData,
+
        })
+
    }
+

+
    /// Get a read-only version of this store.
+
    pub fn read_only(self) -> Store<Read> {
+
        Store {
+
            db: self.db,
+
            marker: PhantomData,
+
        }
+
    }
+

+
    /// Set notification read status for the given notifications.
+
    pub fn set_status(
+
        &mut self,
+
        status: NotificationStatus,
+
        ids: &[NotificationId],
+
    ) -> Result<bool, Error> {
+
        transaction(&self.db, |_| {
+
            let mut stmt = self.db.prepare(
+
                "UPDATE `repository-notifications`
+
                 SET status = ?1
+
                 WHERE rowid = ?2",
+
            )?;
+
            for id in ids {
+
                stmt.bind((1, &status))?;
+
                stmt.bind((2, *id as i64))?;
+
                stmt.next()?;
+
                stmt.reset()?;
+
            }
+
            Ok(self.db.change_count() > 0)
+
        })
+
    }
+

+
    /// Insert a notification. Resets the status to *unread* if it already exists.
+
    pub fn insert(
+
        &mut self,
+
        repo: &RepoId,
+
        update: &RefUpdate,
+
        timestamp: LocalTime,
+
    ) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `repository-notifications` (repo, ref, old, new, timestamp)
+
             VALUES (?1, ?2, ?3, ?4, ?5)
+
             ON CONFLICT DO UPDATE
+
             SET old = ?3, new = ?4, timestamp = ?5, status = null",
+
        )?;
+
        let old = update.old().map(|o| o.to_string());
+
        let new = update.new().map(|o| o.to_string());
+

+
        stmt.bind((1, repo))?;
+
        stmt.bind((2, update.name().as_str()))?;
+
        stmt.bind((3, old.as_deref()))?;
+
        stmt.bind((4, new.as_deref()))?;
+
        stmt.bind((5, i64::try_from(timestamp.as_millis())?))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Delete the given notifications.
+
    pub fn clear(&mut self, ids: &[NotificationId]) -> Result<usize, Error> {
+
        transaction(&self.db, |_| {
+
            let mut stmt = self
+
                .db
+
                .prepare("DELETE FROM `repository-notifications` WHERE rowid = ?")?;
+

+
            for id in ids {
+
                stmt.bind((1, *id as i64))?;
+
                stmt.next()?;
+
                stmt.reset()?;
+
            }
+
            Ok(self.db.change_count())
+
        })
+
    }
+

+
    /// Delete all notifications of a repo.
+
    pub fn clear_by_repo(&mut self, repo: &RepoId) -> Result<usize, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM `repository-notifications` WHERE repo = ?")?;
+

+
        stmt.bind((1, repo))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count())
+
    }
+

+
    /// Delete all notifications from all repos.
+
    pub fn clear_all(&mut self) -> Result<usize, Error> {
+
        self.db
+
            .prepare("DELETE FROM `repository-notifications`")?
+
            .next()?;
+

+
        Ok(self.db.change_count())
+
    }
+
}
+

+
/// `Read` methods for `Store`. This implies that a
+
/// `Store<Write>` can access these functions as well.
+
impl<T> Store<T> {
+
    /// Get a specific notification.
+
    pub fn get(&self, id: NotificationId) -> Result<Notification, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT rowid, repo, ref, old, new, status, timestamp
+
             FROM `repository-notifications`
+
             WHERE rowid = ?",
+
        )?;
+
        stmt.bind((1, id as i64))?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            return parse::notification(row);
+
        }
+
        Err(Error::NotificationNotFound(id))
+
    }
+

+
    /// Get all notifications.
+
    pub fn all(&self) -> Result<impl Iterator<Item = Result<Notification, Error>> + '_, Error> {
+
        let stmt = self.db.prepare(
+
            "SELECT rowid, repo, ref, old, new, status, timestamp
+
             FROM `repository-notifications`
+
             ORDER BY timestamp DESC",
+
        )?;
+

+
        Ok(stmt.into_iter().map(move |row| {
+
            let row = row?;
+
            parse::notification(row)
+
        }))
+
    }
+

+
    // Get notifications that were created between the given times: `since <= t < until`.
+
    pub fn by_timestamp(
+
        &self,
+
        since: LocalTime,
+
        until: LocalTime,
+
    ) -> Result<impl Iterator<Item = Result<Notification, Error>> + '_, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT rowid, repo, ref, old, new, status, timestamp
+
             FROM `repository-notifications`
+
             WHERE timestamp >= ?1 AND timestamp < ?2
+
             ORDER BY timestamp",
+
        )?;
+
        let since = i64::try_from(since.as_millis())?;
+
        let until = i64::try_from(until.as_millis())?;
+

+
        stmt.bind((1, since))?;
+
        stmt.bind((2, until))?;
+

+
        Ok(stmt.into_iter().map(move |row| {
+
            let row = row?;
+
            parse::notification(row)
+
        }))
+
    }
+

+
    /// Get notifications by repo.
+
    pub fn by_repo(
+
        &self,
+
        repo: &RepoId,
+
        order_by: &str,
+
    ) -> Result<impl Iterator<Item = Result<Notification, Error>> + '_, Error> {
+
        let mut stmt = self.db.prepare(format!(
+
            "SELECT rowid, repo, ref, old, new, status, timestamp
+
             FROM `repository-notifications`
+
             WHERE repo = ?
+
             ORDER BY {order_by} DESC",
+
        ))?;
+
        stmt.bind((1, repo))?;
+

+
        Ok(stmt.into_iter().map(move |row| {
+
            let row = row?;
+
            parse::notification(row)
+
        }))
+
    }
+

+
    /// Get the total notification count.
+
    pub fn count(&self) -> Result<usize, Error> {
+
        let stmt = self
+
            .db
+
            .prepare("SELECT COUNT(*) FROM `repository-notifications`")?;
+

+
        let count: i64 = stmt
+
            .into_iter()
+
            .next()
+
            .expect("COUNT will always return a single row")?
+
            .read(0);
+
        let count: usize = count.try_into().map_err(|_| Error::UnitOverflow)?;
+

+
        Ok(count)
+
    }
+

+
    /// Get the notification for the given repo.
+
    pub fn count_by_repo(&self, repo: &RepoId) -> Result<usize, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT COUNT(*) FROM `repository-notifications` WHERE repo = ?")?;
+

+
        stmt.bind((1, repo))?;
+

+
        let count: i64 = stmt
+
            .into_iter()
+
            .next()
+
            .expect("COUNT will always return a single row")?
+
            .read(0);
+
        let count: usize = count.try_into().map_err(|_| Error::UnitOverflow)?;
+

+
        Ok(count)
+
    }
+
}
+

+
mod parse {
+
    use super::*;
+

+
    pub fn notification(row: sql::Row) -> Result<Notification, Error> {
+
        let id = row.try_read::<i64, _>("rowid")? as NotificationId;
+
        let repo = row.try_read::<RepoId, _>("repo")?;
+
        let refstr = row.try_read::<&str, _>("ref")?;
+
        let status = row.try_read::<NotificationStatus, _>("status")?;
+
        let old = row
+
            .try_read::<Option<&str>, _>("old")?
+
            .map(|oid| {
+
                Oid::from_str(oid).map_err(|e| {
+
                    Error::Internal(sql::Error {
+
                        code: None,
+
                        message: Some(format!("sql: invalid oid in `old` column: {oid:?}: {e}")),
+
                    })
+
                })
+
            })
+
            .unwrap_or(Ok(git::raw::Oid::zero().into()))?;
+
        let new = row
+
            .try_read::<Option<&str>, _>("new")?
+
            .map(|oid| {
+
                Oid::from_str(oid).map_err(|e| {
+
                    Error::Internal(sql::Error {
+
                        code: None,
+
                        message: Some(format!("sql: invalid oid in `new` column: {oid:?}: {e}")),
+
                    })
+
                })
+
            })
+
            .unwrap_or(Ok(git::raw::Oid::zero().into()))?;
+
        let update = RefUpdate::from(RefString::try_from(refstr)?, old, new);
+
        let (namespace, qualified) = git::parse_ref(refstr)?;
+
        let timestamp = row.try_read::<i64, _>("timestamp")?;
+
        let timestamp = LocalTime::from_millis(timestamp as u128);
+
        let qualified = qualified.to_owned();
+
        let kind = NotificationKind::try_from(qualified.clone())?;
+

+
        Ok(Notification {
+
            id,
+
            repo,
+
            update,
+
            remote: namespace,
+
            qualified,
+
            status,
+
            kind,
+
            timestamp,
+
        })
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use radicle_git_ext::ref_format::{qualified, refname};
+

+
    use super::*;
+
    use crate::{cob, node::NodeId, test::arbitrary};
+

+
    #[test]
+
    fn test_clear() {
+
        let mut db = Store::open(":memory:").unwrap();
+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let old = arbitrary::oid();
+
        let time = LocalTime::from_millis(32188142);
+
        let master = arbitrary::oid();
+

+
        for i in 0..3 {
+
            let update = RefUpdate::Updated {
+
                name: format!("refs/heads/feature/{i}").try_into().unwrap(),
+
                old,
+
                new: master,
+
            };
+
            assert!(db.insert(&repo, &update, time).unwrap());
+
        }
+
        assert_eq!(db.count().unwrap(), 3);
+
        assert_eq!(db.count_by_repo(&repo).unwrap(), 3);
+
        db.clear_by_repo(&repo).unwrap();
+
        assert_eq!(db.count().unwrap(), 0);
+
        assert_eq!(db.count_by_repo(&repo).unwrap(), 0);
+
    }
+

+
    #[test]
+
    fn test_branch_notifications() {
+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let old = arbitrary::oid();
+
        let master = arbitrary::oid();
+
        let other = arbitrary::oid();
+
        let time1 = LocalTime::from_millis(32188142);
+
        let time2 = LocalTime::from_millis(32189874);
+
        let time3 = LocalTime::from_millis(32189879);
+
        let mut db = Store::open(":memory:").unwrap();
+

+
        let update1 = RefUpdate::Updated {
+
            name: refname!("refs/heads/master"),
+
            old,
+
            new: master,
+
        };
+
        let update2 = RefUpdate::Created {
+
            name: refname!("refs/heads/other"),
+
            oid: other,
+
        };
+
        let update3 = RefUpdate::Deleted {
+
            name: refname!("refs/heads/dev"),
+
            oid: other,
+
        };
+
        assert!(db.insert(&repo, &update1, time1).unwrap());
+
        assert!(db.insert(&repo, &update2, time2).unwrap());
+
        assert!(db.insert(&repo, &update3, time3).unwrap());
+

+
        let mut notifs = db.by_repo(&repo, "timestamp").unwrap();
+

+
        assert_eq!(
+
            notifs.next().unwrap().unwrap(),
+
            Notification {
+
                id: 3,
+
                repo,
+
                remote: None,
+
                qualified: qualified!("refs/heads/dev"),
+
                update: update3,
+
                kind: NotificationKind::Branch {
+
                    name: refname!("dev")
+
                },
+
                status: NotificationStatus::Unread,
+
                timestamp: time3,
+
            }
+
        );
+
        assert_eq!(
+
            notifs.next().unwrap().unwrap(),
+
            Notification {
+
                id: 2,
+
                repo,
+
                remote: None,
+
                qualified: qualified!("refs/heads/other"),
+
                update: update2,
+
                kind: NotificationKind::Branch {
+
                    name: refname!("other")
+
                },
+
                status: NotificationStatus::Unread,
+
                timestamp: time2,
+
            }
+
        );
+
        assert_eq!(
+
            notifs.next().unwrap().unwrap(),
+
            Notification {
+
                id: 1,
+
                repo,
+
                remote: None,
+
                qualified: qualified!("refs/heads/master"),
+
                update: update1,
+
                kind: NotificationKind::Branch {
+
                    name: refname!("master")
+
                },
+
                status: NotificationStatus::Unread,
+
                timestamp: time1,
+
            }
+
        );
+
        assert!(notifs.next().is_none());
+
    }
+

+
    #[test]
+
    fn test_notification_status() {
+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let oid = arbitrary::oid();
+
        let time = LocalTime::from_millis(32188142);
+
        let mut db = Store::open(":memory:").unwrap();
+

+
        let update1 = RefUpdate::Created {
+
            name: refname!("refs/heads/feature/1"),
+
            oid,
+
        };
+
        let update2 = RefUpdate::Created {
+
            name: refname!("refs/heads/feature/2"),
+
            oid,
+
        };
+
        let update3 = RefUpdate::Created {
+
            name: refname!("refs/heads/feature/3"),
+
            oid,
+
        };
+
        assert!(db.insert(&repo, &update1, time).unwrap());
+
        assert!(db.insert(&repo, &update2, time).unwrap());
+
        assert!(db.insert(&repo, &update3, time).unwrap());
+
        assert!(db
+
            .set_status(NotificationStatus::ReadAt(time), &[1, 2, 3])
+
            .unwrap());
+

+
        let mut notifs = db.by_repo(&repo, "timestamp").unwrap();
+

+
        assert_eq!(
+
            notifs.next().unwrap().unwrap().status,
+
            NotificationStatus::ReadAt(time),
+
        );
+
        assert_eq!(
+
            notifs.next().unwrap().unwrap().status,
+
            NotificationStatus::ReadAt(time),
+
        );
+
        assert_eq!(
+
            notifs.next().unwrap().unwrap().status,
+
            NotificationStatus::ReadAt(time),
+
        );
+
    }
+

+
    #[test]
+
    fn test_duplicate_notifications() {
+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let old = arbitrary::oid();
+
        let master1 = arbitrary::oid();
+
        let master2 = arbitrary::oid();
+
        let time1 = LocalTime::from_millis(32188142);
+
        let time2 = LocalTime::from_millis(32189874);
+
        let mut db = Store::open(":memory:").unwrap();
+

+
        let update1 = RefUpdate::Updated {
+
            name: refname!("refs/heads/master"),
+
            old,
+
            new: master1,
+
        };
+
        let update2 = RefUpdate::Updated {
+
            name: refname!("refs/heads/master"),
+
            old: master1,
+
            new: master2,
+
        };
+
        assert!(db.insert(&repo, &update1, time1).unwrap());
+
        assert!(db
+
            .set_status(NotificationStatus::ReadAt(time1), &[1])
+
            .unwrap());
+
        assert!(db.insert(&repo, &update2, time2).unwrap());
+

+
        let mut notifs = db.by_repo(&repo, "timestamp").unwrap();
+

+
        assert_eq!(
+
            notifs.next().unwrap().unwrap(),
+
            Notification {
+
                id: 1,
+
                repo,
+
                remote: None,
+
                qualified: qualified!("refs/heads/master"),
+
                update: update2,
+
                kind: NotificationKind::Branch {
+
                    name: refname!("master")
+
                },
+
                // Status is reset to "unread".
+
                status: NotificationStatus::Unread,
+
                timestamp: time2,
+
            }
+
        );
+
        assert!(notifs.next().is_none());
+
    }
+

+
    #[test]
+
    fn test_cob_notifications() {
+
        let repo = arbitrary::gen::<RepoId>(1);
+
        let old = arbitrary::oid();
+
        let new = arbitrary::oid();
+
        let timestamp = LocalTime::from_millis(32189874);
+
        let nid: NodeId = "z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi"
+
            .parse()
+
            .unwrap();
+
        let mut db = Store::open(":memory:").unwrap();
+
        let qualified =
+
            qualified!("refs/cobs/xyz.radicle.issue/d185ee16a00bac874c0bcbc2a8ad80fdce5e1e61");
+
        let namespaced = qualified.with_namespace((&nid).into());
+
        let update = RefUpdate::Updated {
+
            name: namespaced.to_ref_string(),
+
            old,
+
            new,
+
        };
+

+
        assert!(db.insert(&repo, &update, timestamp).unwrap());
+

+
        let mut notifs = db.by_repo(&repo, "timestamp").unwrap();
+

+
        assert_eq!(
+
            notifs.next().unwrap().unwrap(),
+
            Notification {
+
                id: 1,
+
                repo,
+
                remote: Some(
+
                    "z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi"
+
                        .parse()
+
                        .unwrap()
+
                ),
+
                qualified,
+
                update,
+
                kind: NotificationKind::Cob {
+
                    type_name: cob::issue::TYPENAME.clone(),
+
                    id: "d185ee16a00bac874c0bcbc2a8ad80fdce5e1e61".parse().unwrap(),
+
                },
+
                status: NotificationStatus::Unread,
+
                timestamp,
+
            }
+
        );
+
        assert!(notifs.next().is_none());
+
    }
+
}
modified radicle/src/profile.rs
@@ -22,7 +22,7 @@ use crate::crypto::ssh::{keystore, Keystore, Passphrase};
use crate::crypto::{PublicKey, Signer};
use crate::explorer::Explorer;
use crate::node::policy::config::store::Read;
-
use crate::node::{policy, Alias, AliasStore};
+
use crate::node::{notifications, policy, Alias, AliasStore};
use crate::prelude::Did;
use crate::prelude::NodeId;
use crate::storage::git::transport;
@@ -101,6 +101,8 @@ pub enum Error {
    #[error(transparent)]
    PolicyStore(#[from] node::policy::store::Error),
    #[error(transparent)]
+
    NotificationsStore(#[from] node::notifications::store::Error),
+
    #[error(transparent)]
    DatabaseStore(#[from] node::db::Error),
}

@@ -219,6 +221,7 @@ impl Profile {
        // Create DBs.
        home.policies_mut()?;
        home.database_mut()?;
+
        home.notifications_mut()?;

        transport::local::register(storage.clone());

@@ -450,6 +453,16 @@ impl Home {
            .unwrap_or_else(|| self.node().join(node::DEFAULT_SOCKET_NAME))
    }

+
    /// Return a read-write handle to the notifications database.
+
    pub fn notifications_mut(
+
        &self,
+
    ) -> Result<notifications::StoreWriter, notifications::store::Error> {
+
        let path = self.node().join(node::NOTIFICATIONS_DB_FILE);
+
        let db = notifications::Store::open(path)?;
+

+
        Ok(db)
+
    }
+

    /// Return a read-write handle to the policies store of the node.
    pub fn policies_mut(&self) -> Result<policy::store::StoreWriter, policy::store::Error> {
        let path = self.node().join(node::POLICIES_DB_FILE);
modified radicle/src/rad.rs
@@ -374,7 +374,7 @@ pub fn setup_patch_upstream<'a>(
                working,
                &*REMOTE_NAME,
                name.as_str(),
-
                git::refs::workdir::patch(patch),
+
                git::refs::patch(patch),
            )?;
        }
    }