Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
httpd: Add notification inbox to `/node/inbox` route
Open did:key:z6MkkfM3...sVz5 opened 2 years ago

Adds all notifications, sorted by timestamp. Provides endpoint to clear the entire inbox, or change the read status on a single notification.

9 files changed +349 -7 9767b485 0fd3ecb8
modified Cargo.lock
@@ -2477,6 +2477,7 @@ dependencies = [
 "flate2",
 "hyper",
 "lexopt",
+
 "localtime",
 "lru",
 "nonempty 0.9.0",
 "pretty_assertions",
modified radicle-httpd/Cargo.toml
@@ -25,6 +25,7 @@ fastrand = { version = "2.0.0" }
flate2 = { version = "1" }
hyper = { version = "1.0.1", default-features = false }
lexopt = { version = "0.3.0" }
+
localtime = { version = "1.2.0" }
lru = { version = "0.12.0" }
nonempty = { version = "0.9.0", features = ["serialize"] }
radicle-surf = { version = "0.18.0", default-features = false, features = ["serde"] }
modified radicle-httpd/src/api/error.rs
@@ -46,6 +46,10 @@ pub enum Error {
    #[error(transparent)]
    Repository(#[from] radicle::storage::RepositoryError),

+
    /// Notification error.
+
    #[error(transparent)]
+
    Notification(#[from] radicle::node::notifications::Error),
+

    /// Routing error.
    #[error(transparent)]
    Routing(#[from] radicle::node::routing::Error),
modified radicle-httpd/src/api/json.rs
@@ -5,16 +5,17 @@ use std::path::Path;
use std::str;

use base64::prelude::{Engine, BASE64_STANDARD};
-
use radicle::cob::{CodeLocation, Reaction};
-
use radicle::patch::ReviewId;
use serde_json::{json, Value};

use radicle::cob::issue::{Issue, IssueId};
use radicle::cob::patch::{Merge, Patch, PatchId, Review};
use radicle::cob::thread::{Comment, CommentId, Edit};
use radicle::cob::{ActorId, Author};
+
use radicle::cob::{CodeLocation, Label, Reaction};
use radicle::git::RefString;
+
use radicle::node::notifications::{Notification, NotificationStatus};
use radicle::node::{Alias, AliasStore};
+
use radicle::patch::ReviewId;
use radicle::prelude::NodeId;
use radicle::storage::{git, refs, RemoteRepository};
use radicle_surf::blob::Blob;
@@ -167,6 +168,64 @@ fn author(author: &Author, alias: Option<Alias>) -> Value {
    }
}

+
/// Returns JSON for a cob notification.
+
pub fn notification_cob(
+
    n: &Notification,
+
    category: String,
+
    cob_id: String,
+
    title: String,
+
    labels: Vec<&Label>,
+
    state: String,
+
    aliases: &impl AliasStore,
+
) -> Value {
+
    json!({
+
        "id": n.id,
+
        "repo": n.repo,
+
        "remote": n.remote.map(|a| author(&Author::from(a), aliases.alias(&a))),
+
        "category": category,
+
        "cob_id": cob_id,
+
        "title": title,
+
        "labels": labels,
+
        "state": state,
+
        "status": notification_status(&n.status),
+
        "timestamp": n.timestamp.to_string(),
+
    })
+
}
+

+
/// Returns JSON for a branch notification.
+
pub fn notification_branch(
+
    n: &Notification,
+
    category: String,
+
    name: String,
+
    title: String,
+
    state: String,
+
    aliases: &impl AliasStore,
+
) -> Value {
+
    json!({
+
        "id": n.id,
+
        "repo": n.repo,
+
        "remote": n.remote.map(|a| author(&Author::from(a), aliases.alias(&a))),
+
        "category": category,
+
        "name": name,
+
        "title": title,
+
        "state": state,
+
        "status": notification_status(&n.status),
+
        "timestamp": n.timestamp.to_string(),
+
    })
+
}
+

+
/// Returns JSON for a `notification`.
+
fn notification_status(status: &NotificationStatus) -> Value {
+
    match status {
+
        NotificationStatus::ReadAt(time) => {
+
            json!({ "type": "readAt", "timestamp": time.to_string() })
+
        }
+
        NotificationStatus::Unread => {
+
            json!({ "type": "unread" })
+
        }
+
    }
+
}
+

/// Returns JSON for a patch `Merge` and fills in `alias` when present.
fn merge(nid: &NodeId, merge: &Merge, aliases: &impl AliasStore) -> Value {
    json!({
modified radicle-httpd/src/api/v1/node.rs
@@ -1,12 +1,16 @@
-
use axum::extract::State;
+
use std::net::SocketAddr;
+

+
use axum::extract::{ConnectInfo, State};
use axum::response::IntoResponse;
-
use axum::routing::{get, put};
+
use axum::routing::{delete, get, patch, put};
use axum::{Json, Router};
use axum_auth::AuthBearer;
use hyper::StatusCode;
+
use localtime::LocalTime;
use serde_json::json;

use radicle::identity::RepoId;
+
use radicle::node::notifications::{NotificationId, NotificationStatus};
use radicle::node::routing::Store;
use radicle::node::{policy, AliasStore, Handle, NodeId, DEFAULT_TIMEOUT};
use radicle::Node;
@@ -18,6 +22,11 @@ use crate::axum_extra::{Path, Query};
pub fn router(ctx: Context) -> Router {
    Router::new()
        .route("/node", get(node_handler))
+
        .route("/node/inbox", delete(node_inbox_clear_handler))
+
        .route(
+
            "/node/inbox/:id",
+
            patch(node_inbox_item_update_handler).delete(node_inbox_item_clear_handler),
+
        )
        .route("/node/policies/repos", get(node_policies_repos_handler))
        .route(
            "/node/policies/repos/:rid",
@@ -55,6 +64,77 @@ async fn node_handler(State(ctx): State<Context>) -> impl IntoResponse {
    Ok::<_, Error>(Json(response))
}

+
/// Toggle a local node inbox item read status.
+
/// `PATCH /node/inbox/:id`
+
async fn node_inbox_item_update_handler(
+
    State(ctx): State<Context>,
+
    AuthBearer(token): AuthBearer,
+
    Path(id): Path<NotificationId>,
+
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
+
) -> impl IntoResponse {
+
    if !addr.ip().is_loopback() {
+
        return Err(Error::Auth(
+
            "Node inbox data updates are only able for localhost",
+
        ));
+
    }
+
    api::auth::validate(&ctx, &token).await?;
+
    let mut notifs = ctx.profile.notifications_mut()?;
+
    let notification = notifs.get(id)?;
+
    let state = match notification.status {
+
        NotificationStatus::Unread => {
+
            notifs.set_status(NotificationStatus::ReadAt(LocalTime::now()), &[id])?
+
        }
+
        NotificationStatus::ReadAt(..) => notifs.set_status(NotificationStatus::Unread, &[id])?,
+
    };
+

+
    Ok::<_, Error>((StatusCode::OK, Json(json!({ "success": state }))))
+
}
+

+
/// Clear a local node inbox item.
+
/// `DELETE /node/inbox/:id`
+
async fn node_inbox_item_clear_handler(
+
    State(ctx): State<Context>,
+
    AuthBearer(token): AuthBearer,
+
    Path(id): Path<NotificationId>,
+
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
+
) -> impl IntoResponse {
+
    if !addr.ip().is_loopback() {
+
        return Err(Error::Auth(
+
            "Node inbox data updates are only able for localhost",
+
        ));
+
    }
+
    api::auth::validate(&ctx, &token).await?;
+
    let mut notifs = ctx.profile.notifications_mut()?;
+
    let cleared = notifs.clear(&[id])?;
+

+
    Ok::<_, Error>((
+
        StatusCode::OK,
+
        Json(json!({ "success": true, "count": cleared })),
+
    ))
+
}
+

+
/// Clear a local node inbox.
+
/// `DELETE /node/inbox`
+
async fn node_inbox_clear_handler(
+
    State(ctx): State<Context>,
+
    AuthBearer(token): AuthBearer,
+
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
+
) -> impl IntoResponse {
+
    if !addr.ip().is_loopback() {
+
        return Err(Error::Auth(
+
            "Node inbox data updates are only able for localhost",
+
        ));
+
    }
+
    api::auth::validate(&ctx, &token).await?;
+
    let mut notifs = ctx.profile.notifications_mut()?;
+
    let cleared = notifs.clear_all()?;
+

+
    Ok::<_, Error>((
+
        StatusCode::OK,
+
        Json(json!({ "success": true, "count": cleared })),
+
    ))
+
}
+

/// Return stored information about other nodes.
/// `GET /nodes/:nid`
async fn nodes_handler(State(ctx): State<Context>, Path(nid): Path<NodeId>) -> impl IntoResponse {
modified radicle-httpd/src/api/v1/projects.rs
@@ -9,17 +9,21 @@ use axum::routing::{get, patch, post};
use axum::{Json, Router};
use axum_auth::AuthBearer;
use hyper::StatusCode;
+
use localtime::LocalTime;
use radicle_surf::blob::BlobRef;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tower_http::set_header::SetResponseHeaderLayer;

-
use radicle::cob::{issue, patch, resolve_embed, Embed, Label, Uri};
+
use radicle::cob::{self, issue, patch, resolve_embed, Embed, Label, Uri};
use radicle::identity::{Did, RepoId, Visibility};
+
use radicle::issue::Issues;
+
use radicle::node::notifications::{NotificationKind, NotificationStatus};
use radicle::node::routing::Store;
use radicle::node::{AliasStore, Node, NodeId};
+
use radicle::patch::Patches;
use radicle::storage::git::paths;
-
use radicle::storage::{ReadRepository, ReadStorage, RemoteRepository, WriteRepository};
+
use radicle::storage::{ReadRepository, ReadStorage, RefUpdate, RemoteRepository, WriteRepository};
use radicle_surf::{diff, Glob, Oid, Repository};

use crate::api::error::Error;
@@ -33,11 +37,18 @@ const MAX_BODY_LIMIT: usize = 4_194_304;
pub fn router(ctx: Context) -> Router {
    Router::new()
        .route("/projects", get(project_root_handler))
+
        .route("/projects/inbox", get(project_inbox_handler))
        .route("/projects/:project", get(project_handler))
        .route("/projects/:project/commits", get(history_handler))
        .route("/projects/:project/commits/:sha", get(commit_handler))
        .route("/projects/:project/diff/:base/:oid", get(diff_handler))
        .route(
+
            "/projects/:project/inbox",
+
            get(inbox_handler)
+
                .patch(inbox_toggle_status_handler)
+
                .delete(inbox_clear_handler),
+
        )
+
        .route(
            "/projects/:project/activity",
            get(
                activity_handler.layer(SetResponseHeaderLayer::if_not_present(
@@ -152,6 +163,25 @@ async fn project_root_handler(
    Ok::<_, Error>(Json(infos))
}

+
/// GET projects that have notifications.
+
/// `GET /projects/inbox`
+
async fn project_inbox_handler(State(ctx): State<Context>) -> impl IntoResponse {
+
    let store = ctx.profile.notifications()?;
+
    let storage = &ctx.profile.storage;
+
    let repos = store
+
        .repos_with_notifications()?
+
        .collect::<Result<Vec<_>, _>>()?;
+

+
    let mut infos = Vec::new();
+
    for rid in repos {
+
        let repo = storage.repository(rid)?;
+
        let project = repo.project()?;
+
        infos.push(json!({ "rid": rid, "name": project.name() }));
+
    }
+

+
    Ok::<_, Error>(Json(infos))
+
}
+

/// Get project metadata.
/// `GET /projects/:project`
async fn project_handler(State(ctx): State<Context>, Path(id): Path<RepoId>) -> impl IntoResponse {
@@ -411,6 +441,151 @@ async fn activity_handler(
    Ok::<_, Error>((StatusCode::OK, Json(json!({ "activity": timestamps }))))
}

+
/// Get notifications by repo.
+
/// `GET /projects/:project/inbox`
+
async fn inbox_handler(
+
    State(ctx): State<Context>,
+
    Path(rid): Path<RepoId>,
+
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
+
) -> impl IntoResponse {
+
    if !addr.ip().is_loopback() {
+
        return Err(Error::Auth("Node inbox data is only shown for localhost"));
+
    }
+
    let storage = &ctx.profile.storage;
+
    let aliases = &ctx.profile.aliases();
+
    let repo = storage.repository(rid)?;
+
    let (_, head) = repo.head()?;
+
    let issues = Issues::open(&repo)?;
+
    let patches = Patches::open(&repo)?;
+
    let store = ctx.profile.notifications()?;
+
    let notifications: Vec<_> = store
+
        .by_repo(&rid, "timestamp")?
+
        .filter_map(|notification| {
+
            notification.ok().and_then(|n| {
+
                match &n.kind {
+
                    NotificationKind::Cob { id, type_name } => {
+
                        if type_name == &*cob::issue::TYPENAME {
+
                            let Some(issue) = issues.get(id).ok().flatten() else {
+
                                // Issue could have been deleted after notification was created.
+
                                return None;
+
                            };
+
                            Some(api::json::notification_cob(
+
                                &n,
+
                                "issue".to_string(),
+
                                id.to_string(),
+
                                issue.title().to_owned(),
+
                                issue.labels().collect(),
+
                                issue.state().to_string(),
+
                                aliases,
+
                            ))
+
                        } else if type_name == &*cob::patch::TYPENAME {
+
                            let Some(patch) = patches.get(id).ok().flatten() else {
+
                                // Patch could have been deleted after notification was created.
+
                                return None;
+
                            };
+
                            Some(api::json::notification_cob(
+
                                &n,
+
                                "patch".to_string(),
+
                                id.to_string(),
+
                                patch.title().to_owned(),
+
                                patch.labels().collect(),
+
                                patch.state().to_string(),
+
                                aliases,
+
                            ))
+
                        } else {
+
                            None
+
                        }
+
                    }
+
                    NotificationKind::Branch { name } => {
+
                        let commit = if let Some(head) = n.update.new() {
+
                            repo.commit(head)
+
                                .ok()?
+
                                .summary()
+
                                .unwrap_or_default()
+
                                .to_owned()
+
                        } else {
+
                            String::new()
+
                        };
+

+
                        let state = match n
+
                            .update
+
                            .new()
+
                            .map(|oid| repo.is_ancestor_of(oid, head))
+
                            .transpose()
+
                            .ok()
+
                            .flatten()
+
                        {
+
                            Some(true) => "merged",
+
                            Some(false) | None => match n.update {
+
                                RefUpdate::Created { .. } => "created",
+
                                RefUpdate::Deleted { .. } => "deleted",
+
                                RefUpdate::Skipped { .. } => "skipped",
+
                                RefUpdate::Updated { .. } => "updated",
+
                            },
+
                        }
+
                        .to_owned();
+

+
                        Some(api::json::notification_branch(
+
                            &n,
+
                            "branch".to_string(),
+
                            name.to_string(),
+
                            commit,
+
                            state,
+
                            aliases,
+
                        ))
+
                    }
+
                }
+
            })
+
        })
+
        .collect();
+

+
    Ok::<_, Error>(Json(notifications))
+
}
+

+
/// Mark as read all node inbox items by repo.
+
/// `PATCH /projects/:rid/inbox`
+
async fn inbox_toggle_status_handler(
+
    State(ctx): State<Context>,
+
    AuthBearer(token): AuthBearer,
+
    Path(rid): Path<RepoId>,
+
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
+
) -> impl IntoResponse {
+
    if !addr.ip().is_loopback() {
+
        return Err(Error::Auth(
+
            "Node inbox data updates are only able for localhost",
+
        ));
+
    }
+
    api::auth::validate(&ctx, &token).await?;
+
    let mut store = ctx.profile.notifications_mut()?;
+
    let ids = store
+
        .by_repo(&rid, "timestamp")?
+
        .filter_map(|n| n.ok().map(|x| x.id))
+
        .collect::<Vec<_>>();
+
    let success = store.set_status(NotificationStatus::ReadAt(LocalTime::now()), &ids)?;
+

+
    Ok::<_, Error>(Json(json!({ "success": success })))
+
}
+

+
/// Clear a local node inbox by repo.
+
/// `DELETE /projects/:rid/inbox`
+
async fn inbox_clear_handler(
+
    State(ctx): State<Context>,
+
    AuthBearer(token): AuthBearer,
+
    Path(rid): Path<RepoId>,
+
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
+
) -> impl IntoResponse {
+
    if !addr.ip().is_loopback() {
+
        return Err(Error::Auth(
+
            "Node inbox data updates are only able for localhost",
+
        ));
+
    }
+
    api::auth::validate(&ctx, &token).await?;
+
    let mut store = ctx.profile.notifications_mut()?;
+
    let cleared = store.clear_by_repo(&rid)?;
+

+
    Ok::<_, Error>(Json(json!({ "success": true, "count": cleared })))
+
}
+

/// Get project source tree for '/' path.
/// `GET /projects/:project/tree/:sha/`
async fn tree_handler_root(
modified radicle/src/node/notifications.rs
@@ -13,7 +13,7 @@ use crate::storage::{RefUpdate, RemoteId};
pub use store::{Error, Store};
/// Read and write to the store.
pub type StoreWriter = Store<store::Write>;
-
/// Write to the store.
+
/// Read from the store.
pub type StoreReader = Store<store::Read>;

/// Unique identifier for a notification.
modified radicle/src/node/notifications/store.rs
@@ -303,6 +303,20 @@ impl<T> Store<T> {
        }))
    }

+
    /// Get repos that have notifications.
+
    pub fn repos_with_notifications(
+
        &self,
+
    ) -> Result<impl Iterator<Item = Result<RepoId, Error>> + '_, Error> {
+
        let stmt = self
+
            .db
+
            .prepare("SELECT DISTINCT repo FROM `repository-notifications`")?;
+

+
        Ok(stmt.into_iter().map(move |row| {
+
            let row = row?;
+
            row.try_read::<RepoId, _>("repo").map_err(Error::from)
+
        }))
+
    }
+

    /// Get the total notification count.
    pub fn count(&self) -> Result<usize, Error> {
        let stmt = self
modified radicle/src/profile.rs
@@ -453,6 +453,14 @@ impl Home {
            .unwrap_or_else(|| self.node().join(node::DEFAULT_SOCKET_NAME))
    }

+
    /// Return a handle to a ready-only database of the notifications database.
+
    pub fn notifications(&self) -> Result<notifications::StoreReader, notifications::store::Error> {
+
        let path = self.node().join(node::NOTIFICATIONS_DB_FILE);
+
        let db = notifications::Store::reader(path)?;
+

+
        Ok(db)
+
    }
+

    /// Return a read-write handle to the notifications database.
    pub fn notifications_mut(
        &self,