Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
radicle-metrics: support incremental metric collection
✓ CI success shishqa committed 5 months ago
commit 874b4712b71ab08d278815d63bbf9a901869b4cb
parent 45abb881a3389bea913ae10af32fb837060787ab
2 passed (2 total) View logs
6 files changed +485 -43
modified Cargo.lock
@@ -345,7 +345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
dependencies = [
 "memchr",
-
 "regex-automata 0.4.9",
+
 "regex-automata",
 "serde",
]

@@ -974,8 +974,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298"
dependencies = [
 "bit-set",
-
 "regex-automata 0.4.9",
-
 "regex-syntax 0.8.5",
+
 "regex-automata",
+
 "regex-syntax",
]

[[package]]
@@ -1045,6 +1045,12 @@ dependencies = [
]

[[package]]
+
name = "fnv"
+
version = "1.0.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+

+
[[package]]
name = "form_urlencoded"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2069,7 +2075,7 @@ dependencies = [
 "percent-encoding",
 "referencing",
 "regex",
-
 "regex-syntax 0.8.5",
+
 "regex-syntax",
 "serde",
 "serde_json",
 "uuid-simd",
@@ -2198,11 +2204,11 @@ dependencies = [

[[package]]
name = "matchers"
-
version = "0.1.0"
+
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
+
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
-
 "regex-automata 0.1.10",
+
 "regex-automata",
]

[[package]]
@@ -2312,12 +2318,11 @@ checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"

[[package]]
name = "nu-ansi-term"
-
version = "0.46.0"
+
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
+
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
-
 "overload",
-
 "winapi",
+
 "windows-sys 0.60.2",
]

[[package]]
@@ -2463,12 +2468,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"

[[package]]
-
name = "overload"
-
version = "0.1.1"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
-

-
[[package]]
name = "p256"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2734,6 +2733,41 @@ dependencies = [
]

[[package]]
+
name = "prometheus"
+
version = "0.14.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
+
dependencies = [
+
 "cfg-if",
+
 "fnv",
+
 "lazy_static",
+
 "memchr",
+
 "parking_lot",
+
 "protobuf",
+
 "thiserror 2.0.12",
+
]
+

+
[[package]]
+
name = "protobuf"
+
version = "3.7.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
+
dependencies = [
+
 "once_cell",
+
 "protobuf-support",
+
 "thiserror 1.0.69",
+
]
+

+
[[package]]
+
name = "protobuf-support"
+
version = "3.7.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
+
dependencies = [
+
 "thiserror 1.0.69",
+
]
+

+
[[package]]
name = "qcheck"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2982,6 +3016,18 @@ dependencies = [
]

[[package]]
+
name = "radicle-metrics"
+
version = "0.1.0"
+
dependencies = [
+
 "anyhow",
+
 "clap",
+
 "prometheus",
+
 "radicle",
+
 "serde",
+
 "serde_json",
+
]
+

+
[[package]]
name = "radicle-node"
version = "0.16.0"
dependencies = [
@@ -3241,17 +3287,8 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
 "aho-corasick",
 "memchr",
-
 "regex-automata 0.4.9",
-
 "regex-syntax 0.8.5",
-
]
-

-
[[package]]
-
name = "regex-automata"
-
version = "0.1.10"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
-
dependencies = [
-
 "regex-syntax 0.6.29",
+
 "regex-automata",
+
 "regex-syntax",
]

[[package]]
@@ -3262,17 +3299,11 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
 "aho-corasick",
 "memchr",
-
 "regex-syntax 0.8.5",
+
 "regex-syntax",
]

[[package]]
name = "regex-syntax"
-
version = "0.6.29"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
-

-
[[package]]
-
name = "regex-syntax"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
@@ -4166,9 +4197,9 @@ checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64"

[[package]]
name = "tracing"
-
version = "0.1.41"
+
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
+
checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
dependencies = [
 "pin-project-lite",
 "tracing-core",
@@ -4176,9 +4207,9 @@ dependencies = [

[[package]]
name = "tracing-core"
-
version = "0.1.34"
+
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
+
checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
dependencies = [
 "once_cell",
 "valuable",
@@ -4197,14 +4228,14 @@ dependencies = [

[[package]]
name = "tracing-subscriber"
-
version = "0.3.19"
+
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
+
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
 "matchers",
 "nu-ansi-term",
 "once_cell",
-
 "regex",
+
 "regex-automata",
 "sharded-slab",
 "thread_local",
 "tracing",
@@ -4220,7 +4251,7 @@ checksum = "b67baf55e7e1b6806063b1e51041069c90afff16afcbbccd278d899f9d84bca4"
dependencies = [
 "cc",
 "regex",
-
 "regex-syntax 0.8.5",
+
 "regex-syntax",
 "streaming-iterator",
 "tree-sitter-language",
]
added crates/radicle-metrics/Cargo.toml
@@ -0,0 +1,24 @@
+
[package]
+
name = "radicle-metrics"
+
description = "Radicle Metrics"
+
homepage.workspace = true
+
license.workspace = true
+
version = "0.1.0"
+
authors = ["shishqa <shishqa.main@gmail.com>"]
+
edition.workspace = true
+
rust-version.workspace = true
+

+
[[bin]]
+
name = "rad-metrics"
+
path = "src/main.rs"
+

+
[dependencies]
+
anyhow = "1"
+
clap = { version = "4.5.44", features = ["derive"] }
+
prometheus = "0.14.0"
+
radicle = { workspace = true }
+
serde = { workspace = true }
+
serde_json = { workspace = true }
+

+
[lints]
+
workspace = true
added crates/radicle-metrics/src/lib.rs
@@ -0,0 +1,2 @@
+
pub mod metrics;
+
pub mod walk;
added crates/radicle-metrics/src/main.rs
@@ -0,0 +1,60 @@
+
use anyhow::Context;
+
use clap::Parser;
+

+
use radicle::cob;
+

+
use radicle_metrics::metrics::MetricsWindow;
+
use radicle_metrics::walk::CobHistoryWalk;
+

+
pub const NAME: &str = "rad-metrics";
+

+
#[derive(Parser, Debug)]
+
#[command(name = NAME)]
+
struct CliArgs {
+
    #[arg(long = "type", short, value_name = "TYPENAME")]
+
    type_names: Vec<cob::TypeName>,
+

+
    #[arg(long, short, value_name = "MAX_WINDOW_SIZE")]
+
    max_window_size: Option<usize>,
+

+
    #[arg(long, short, value_name = "POLL_INTERVAL")]
+
    poll_interval: Option<u64>,
+
}
+

+
fn main() -> anyhow::Result<()> {
+
    let CliArgs {
+
        type_names,
+
        max_window_size,
+
        poll_interval,
+
    } = CliArgs::parse();
+

+
    let profile = radicle::profile::Profile::load().context("profile::load")?;
+
    let storage = &profile.storage;
+

+
    let mut window = MetricsWindow::new(max_window_size)?;
+

+
    let mut walk = CobHistoryWalk::new(storage, type_names);
+

+
    if let Some(poll_interval) = poll_interval {
+
        loop {
+
            walk.scan_new_records(|record| {
+
                window.add(record);
+
            })?;
+

+
            window.print_aggregated()?;
+
            if max_window_size.is_some() {
+
                window.print_window()?;
+
            }
+

+
            std::thread::sleep(std::time::Duration::from_secs(poll_interval));
+
        }
+
    } else {
+
        walk.scan_new_records(|record| {
+
            window.add(record);
+
        })?;
+

+
        window.print_window()?;
+
    }
+

+
    Ok(())
+
}
added crates/radicle-metrics/src/metrics.rs
@@ -0,0 +1,159 @@
+
use prometheus::{Encoder, IntGaugeVec, Opts, Registry, TextEncoder};
+
use serde::Serialize;
+

+
use std::collections::{BinaryHeap, HashMap};
+

+
use radicle::cob;
+

+
use crate::walk::{Event, Record};
+

+
#[derive(Serialize, PartialEq, Eq)]
+
struct Point {
+
    value: i64,
+
    name: String,
+
    timestamp: cob::common::Timestamp,
+
    labels: HashMap<String, String>,
+
}
+

+
impl std::cmp::PartialOrd for Point {
+
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+
        Some(self.cmp(other))
+
    }
+
}
+

+
impl std::cmp::Ord for Point {
+
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+
        self.timestamp.cmp(&other.timestamp).reverse()
+
    }
+
}
+

+
pub struct MetricsWindow {
+
    registry: Registry,
+
    open_issues: IntGaugeVec,
+

+
    max_points: Option<usize>,
+
    points: BinaryHeap<Point>,
+
}
+

+
impl MetricsWindow {
+
    pub fn new(max_points: Option<usize>) -> anyhow::Result<Self> {
+
        let open_issues = IntGaugeVec::new(
+
            Opts::new(
+
                "radicle_open_issues",
+
                "counter for open issues across all repositories",
+
            ),
+
            &["rid"],
+
        )?;
+

+
        let registry = Registry::new();
+
        registry.register(Box::new(open_issues.clone()))?;
+

+
        Ok(Self {
+
            registry,
+
            open_issues,
+
            max_points,
+
            points: Default::default(),
+
        })
+
    }
+

+
    pub fn add(&mut self, record: Record) {
+
        if record.op.manifest.type_name != *cob::issue::TYPENAME {
+
            // TODO: process other COB types
+
            return;
+
        }
+

+
        let mut new_points = Vec::<Point>::new();
+

+
        match record.event {
+
            Event::Created => {
+
                new_points.push(Point {
+
                    timestamp: record.op.timestamp,
+
                    name: "radicle_open_issues".to_string(),
+
                    labels: HashMap::from([("rid".to_string(), record.rid.to_string())]),
+
                    value: 1,
+
                });
+
            }
+
            Event::Deleted => {
+
                new_points.push(Point {
+
                    timestamp: record.op.timestamp,
+
                    name: "radicle_open_issues".to_string(),
+
                    labels: HashMap::from([("rid".to_string(), record.rid.to_string())]),
+
                    value: -1,
+
                });
+
            }
+
            _ => {}
+
        }
+

+
        for action in record.op.actions {
+
            let action: cob::issue::Action = serde_json::from_value(action).unwrap();
+

+
            if let cob::issue::Action::Lifecycle { state } = action {
+
                match state {
+
                    cob::issue::State::Open => {
+
                        new_points.push(Point {
+
                            timestamp: record.op.timestamp,
+
                            name: "radicle_open_issues".to_string(),
+
                            labels: HashMap::from([("rid".to_string(), record.rid.to_string())]),
+
                            value: 1,
+
                        });
+
                    }
+
                    cob::issue::State::Closed { .. } => {
+
                        new_points.push(Point {
+
                            timestamp: record.op.timestamp,
+
                            name: "radicle_open_issues".to_string(),
+
                            labels: HashMap::from([("rid".to_string(), record.rid.to_string())]),
+
                            value: -1,
+
                        });
+
                    }
+
                }
+
            }
+
        }
+

+
        for point in new_points {
+
            if self
+
                .max_points
+
                .map(|m| self.points.len() >= m)
+
                .unwrap_or(false)
+
            {
+
                match self.points.peek() {
+
                    Some(oldest_point) if oldest_point < &point => {
+
                        let rid = oldest_point
+
                            .labels
+
                            .get("rid")
+
                            .cloned()
+
                            .unwrap_or("-".to_string());
+
                        self.open_issues
+
                            .with_label_values(&[rid])
+
                            .add(oldest_point.value);
+
                        self.points.pop();
+
                        self.points.push(point);
+
                    }
+
                    _ => {
+
                        let rid = point.labels.get("rid").cloned().unwrap_or("-".to_string());
+
                        self.open_issues.with_label_values(&[rid]).add(point.value);
+
                    }
+
                }
+
            } else {
+
                self.points.push(point);
+
            }
+
        }
+
    }
+

+
    pub fn print_window(&self) -> anyhow::Result<()> {
+
        for point in &self.points {
+
            println!("{}", serde_json::to_string(&point)?);
+
        }
+
        Ok(())
+
    }
+

+
    pub fn print_aggregated(&self) -> anyhow::Result<()> {
+
        let mut buffer = vec![];
+
        let encoder = TextEncoder::new();
+
        let metric_families = self.registry.gather();
+
        encoder.encode(&metric_families, &mut buffer).unwrap();
+

+
        println!("{}", String::from_utf8(buffer).unwrap());
+

+
        Ok(())
+
    }
+
}
added crates/radicle-metrics/src/walk.rs
@@ -0,0 +1,166 @@
+
use anyhow;
+
use anyhow::Context;
+

+
use std::collections::{HashMap, HashSet};
+

+
use radicle::cob;
+
use radicle::cob::object::Storage as _;
+
use radicle::cob::stream::CobStream as _;
+
use radicle::identity::RepoId;
+
use radicle::storage::git::Storage;
+
use radicle::storage::ReadStorage;
+

+
/// Represents the type of change that occurred to a COB.
+
pub enum Event {
+
    Created,
+
    Changed,
+
    Deleted,
+
}
+

+
/// A single record of a COB event with associated metadata.
+
pub struct Record {
+
    /// The type of event that occurred.
+
    pub event: Event,
+
    /// The repository ID where the COB exists.
+
    pub rid: RepoId,
+
    /// The unique identifier of the COB object.
+
    pub object_id: cob::ObjectId,
+
    /// The operation that caused the event, with its JSON payload.
+
    pub op: cob::op::Op<serde_json::Value>,
+
}
+

+
/// A checkpoint that tracks the last known state of COBs for incremental updates.
+
#[derive(Default)]
+
pub struct Checkpoint {
+
    /// The COB types that are being tracked.
+
    type_names: Vec<cob::TypeName>,
+

+
    /// Last known operation for each COB object, keyed by object ID.
+
    /// The value is a tuple of (repository ID, last operation).
+
    last_ops: HashMap<cob::ObjectId, (RepoId, cob::op::Op<serde_json::Value>)>,
+
}
+

+
/// A walker that traverses the history of COBs across repositories.
+
///
+
/// This structure provides incremental scanning of COB history, allowing
+
/// callers to process only new or changed records since the last scan.
+
pub struct CobHistoryWalk<'a> {
+
    storage: &'a Storage,
+
    checkpoint: Checkpoint,
+
}
+

+
impl<'a> CobHistoryWalk<'a> {
+
    pub fn new(storage: &'a Storage, type_names: Vec<cob::TypeName>) -> Self {
+
        Self {
+
            storage,
+
            checkpoint: Checkpoint {
+
                type_names,
+
                last_ops: Default::default(),
+
            },
+
        }
+
    }
+

+
    pub fn new_with_checkpoint(storage: &'a Storage, checkpoint: Checkpoint) -> Self {
+
        Self {
+
            storage,
+
            checkpoint,
+
        }
+
    }
+

+
    pub fn get_checkpoint(&self) -> &Checkpoint {
+
        &self.checkpoint
+
    }
+

+
    /// Scans for new or changed COB records since the last checkpoint.
+
    ///
+
    /// This method:
+
    /// 1. Iterates through all repositories in storage
+
    /// 2. For each repository, scans the specified COB types
+
    /// 3. For each COB object, processes operations that occurred since the last known operation
+
    /// 4. Updates the checkpoint with the latest operations
+
    /// 5. Identifies and reports deleted COBs (objects that no longer exist)
+
    pub fn scan_new_records<F>(&mut self, mut sink_fn: F) -> anyhow::Result<()>
+
    where
+
        F: FnMut(Record),
+
    {
+
        // XXX: This could be turned into an iterator to better control the portion of records.
+

+
        let mut visited_objects = HashSet::<cob::ObjectId>::new();
+
        for repo in self.storage.repositories()? {
+
            let rid = repo.rid.to_string();
+

+
            let repo = self
+
                .storage
+
                .repository(repo.rid)
+
                .context(format!("load {}", rid))?;
+

+
            for type_name in &self.checkpoint.type_names {
+
                let references = repo.types(type_name)?;
+

+
                for (object_id, _) in references {
+
                    let history = cob::stream::CobRange::new(type_name, &object_id);
+
                    let stream = cob::stream::Stream::<serde_json::Value>::new(
+
                        &repo.backend,
+
                        history,
+
                        type_name.clone(),
+
                    );
+

+
                    let (iter, is_new) = match self.checkpoint.last_ops.get(&object_id) {
+
                        Some(last_op) => (stream.since(last_op.1.id)?, false),
+
                        None => (stream.all()?, true),
+
                    };
+

+
                    let mut last_op = None;
+
                    let mut first_op = None;
+

+
                    // Walk in historical order.
+
                    for op in iter.skip(if is_new { 0 } else { 1 }) {
+
                        let op = op?;
+
                        first_op.get_or_insert(op.clone());
+
                        last_op = Some(op.clone());
+
                        sink_fn(Record {
+
                            event: Event::Changed,
+
                            rid: repo.id,
+
                            object_id,
+
                            op,
+
                        });
+
                    }
+

+
                    if let Some(first_op) = first_op {
+
                        if is_new {
+
                            sink_fn(Record {
+
                                event: Event::Created,
+
                                rid: repo.id,
+
                                object_id,
+
                                op: first_op.clone(),
+
                            });
+
                        }
+
                    }
+

+
                    if let Some(last_op) = last_op {
+
                        self.checkpoint
+
                            .last_ops
+
                            .insert(object_id, (repo.id, last_op));
+
                    }
+

+
                    visited_objects.insert(object_id);
+
                }
+
            }
+
        }
+

+
        self.checkpoint.last_ops.retain(|k, v| {
+
            if visited_objects.contains(k) {
+
                return true;
+
            }
+
            sink_fn(Record {
+
                event: Event::Deleted,
+
                rid: v.0,
+
                object_id: *k,
+
                op: v.1.clone(),
+
            });
+
            false
+
        });
+

+
        Ok(())
+
    }
+
}