Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
cli: Add `rad node debug` command
Merged did:key:z6MksFqX...wzpT opened 1 year ago

Adds a bunch of metrics to the node.

12 files changed +212 -32 749e8239 30c9b0db
modified radicle-cli/src/commands/node.rs
@@ -33,6 +33,7 @@ Usage
    rad node start [--foreground] [--verbose] [<option>...] [-- <node-option>...]
    rad node stop [<option>...]
    rad node logs [-n <lines>]
+
    rad node debug [<option>...]
    rad node connect <nid>@<addr> [<option>...]
    rad node routing [--rid <rid>] [--nid <nid>] [--json] [<option>...]
    rad node events [--timeout <secs>] [-n <count>] [<option>...]
@@ -98,6 +99,7 @@ pub enum Operation {
        lines: usize,
    },
    Status,
+
    Debug,
    Sessions,
    Stop,
}
@@ -113,6 +115,7 @@ pub enum OperationName {
    Start,
    #[default]
    Status,
+
    Debug,
    Sessions,
    Stop,
}
@@ -152,6 +155,7 @@ impl Args for Options {
                    "status" => op = Some(OperationName::Status),
                    "stop" => op = Some(OperationName::Stop),
                    "sessions" => op = Some(OperationName::Sessions),
+
                    "debug" => op = Some(OperationName::Debug),

                    unknown => anyhow::bail!("unknown operation '{}'", unknown),
                },
@@ -222,6 +226,7 @@ impl Args for Options {
                path: path.unwrap_or(PathBuf::from("radicle-node")),
            },
            OperationName::Status => Operation::Status,
+
            OperationName::Debug => Operation::Debug,
            OperationName::Sessions => Operation::Sessions,
            OperationName::Stop => Operation::Stop,
        };
@@ -250,6 +255,9 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        Operation::Db { args } => {
            commands::db(&profile, args)?;
        }
+
        Operation::Debug => {
+
            control::debug(&mut node)?;
+
        }
        Operation::Sessions => {
            let sessions = control::sessions(&node)?;
            if let Some(table) = sessions {
modified radicle-cli/src/commands/node/control.rs
@@ -120,6 +120,13 @@ pub fn stop(node: Node) -> anyhow::Result<()> {
    Ok(())
}

+
pub fn debug(node: &mut Node) -> anyhow::Result<()> {
+
    let json = node.debug()?;
+
    term::json::to_pretty(&json, Path::new("debug.json"))?.print();
+

+
    Ok(())
+
}
+

pub fn logs(lines: usize, follow: Option<time::Duration>, profile: &Profile) -> anyhow::Result<()> {
    let logs = profile.home.node().join("node.log");

modified radicle-node/src/control.rs
@@ -199,6 +199,11 @@ where
            }
            Err(e) => return Err(CommandError::Runtime(e)),
        },
+
        Command::Debug => {
+
            let debug = handle.debug()?;
+

+
            CommandResult::Okay(debug).to_writer(writer)?;
+
        }
        Command::Shutdown => {
            log::debug!(target: "control", "Shutdown requested..");
            // Channel might already be disconnected if shutdown
modified radicle-node/src/runtime/handle.rs
@@ -8,6 +8,7 @@ use crossbeam_channel as chan;
use radicle::node::{ConnectOptions, ConnectResult, Link, Seeds};
use radicle::storage::refs::RefsAt;
use reactor::poller::popol::PopolWaker;
+
use serde_json::json;
use thiserror::Error;

use crate::identity::RepoId;
@@ -319,4 +320,45 @@ impl radicle::node::Handle for Handle {
            .shutdown()
            .map_err(|_| Error::ChannelDisconnected)
    }
+

+
    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            let debug = serde_json::json!({
+
                "outboxSize": state.outbox().len(),
+
                "fetching": state.fetching().iter().map(|(rid, state)| {
+
                    json!({
+
                        "rid": rid,
+
                        "from": state.from,
+
                        "refsAt": state.refs_at,
+
                        "subscribers": state.subscribers.len(),
+
                    })
+
                }).collect::<Vec<_>>(),
+
                "queue": state.queue().iter().map(|fetch| {
+
                    json!({
+
                        "rid": fetch.rid,
+
                        "from": fetch.from,
+
                        "refsAt": fetch.refs_at,
+
                    })
+
                }).collect::<Vec<_>>(),
+
                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
+
                    json!({
+
                        "host": host.to_string(),
+
                        "bucket": bucket
+
                    })
+
                }).collect::<Vec<_>>(),
+
                "metrics": state.metrics(),
+
            });
+
            sender.send(debug).ok();
+

+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        let debug = receiver.recv()?;
+

+
        Ok(debug)
+
    }
}
modified radicle-node/src/service.rs
@@ -108,6 +108,38 @@ pub use message::INVENTORY_LIMIT;
/// Maximum number of project git references imposed by message size limits.
pub use message::REF_REMOTE_LIMIT;

+
/// Metrics we track.
+
#[derive(Clone, Debug, Default, serde::Serialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct Metrics {
+
    peers: HashMap<NodeId, PeerMetrics>,
+
}
+

+
impl Metrics {
+
    /// Get metrics for the given peer.
+
    pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
+
        self.peers.entry(nid).or_default()
+
    }
+
}
+

+
/// Per-peer metrics we track.
+
#[derive(Clone, Debug, Default, serde::Serialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct PeerMetrics {
+
    pub received_git_bytes: usize,
+
    pub received_fetch_requests: usize,
+
    pub received_bytes: usize,
+
    pub received_gossip_messages: usize,
+
    pub sent_bytes: usize,
+
    pub sent_fetch_requests: usize,
+
    pub sent_git_bytes: usize,
+
    pub sent_gossip_messages: usize,
+
    pub streams_opened: usize,
+
    pub inbound_connection_attempts: usize,
+
    pub outbound_connection_attempts: usize,
+
    pub disconnects: usize,
+
}
+

/// Result of syncing our routing table with a node's inventory.
#[derive(Default)]
struct SyncedRouting {
@@ -252,13 +284,13 @@ enum TryFetchError<'a> {

/// Fetch state for an ongoing fetch.
#[derive(Debug)]
-
struct FetchState {
+
pub struct FetchState {
    /// Node we're fetching from.
-
    from: NodeId,
+
    pub from: NodeId,
    /// What refs we're fetching.
-
    refs_at: Vec<RefsAt>,
+
    pub refs_at: Vec<RefsAt>,
    /// Channels waiting for fetch results.
-
    subscribers: Vec<chan::Sender<FetchResult>>,
+
    pub subscribers: Vec<chan::Sender<FetchResult>>,
}

impl FetchState {
@@ -272,13 +304,13 @@ impl FetchState {

/// Fetch waiting to be processed, in the fetch queue.
#[derive(Debug)]
-
struct QueuedFetch {
+
pub struct QueuedFetch {
    /// Repo being fetched.
-
    rid: RepoId,
+
    pub rid: RepoId,
    /// Peer being fetched from.
-
    from: NodeId,
+
    pub from: NodeId,
    /// Refs being fetched.
-
    refs_at: Vec<RefsAt>,
+
    pub refs_at: Vec<RefsAt>,
    /// The timeout given for the fetch request.
    timeout: time::Duration,
    /// Result channel.
@@ -409,6 +441,8 @@ pub struct Service<D, S, G> {
    emitter: Emitter<Event>,
    /// Local listening addresses.
    listening: Vec<net::SocketAddr>,
+
    /// Latest metrics for all nodes connected to since the last start.
+
    metrics: Metrics,
}

impl<D, S, G> Service<D, S, G>
@@ -476,6 +510,7 @@ where
            started_at: None, // Updated on initialize.
            emitter,
            listening: vec![],
+
            metrics: Metrics::default(),
        }
    }

@@ -695,7 +730,7 @@ where
        Ok(())
    }

-
    pub fn tick(&mut self, now: LocalTime) {
+
    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
        trace!(
            target: "service",
            "Tick +{}",
@@ -712,6 +747,7 @@ where
                "System clock is not monotonic: {now} is not greater or equal to {}", self.clock
            );
        }
+
        self.metrics = metrics.clone();
    }

    pub fn wake(&mut self) {
@@ -2391,6 +2427,14 @@ pub trait ServiceState {
    fn nid(&self) -> &NodeId;
    /// Get the existing sessions.
    fn sessions(&self) -> &Sessions;
+
    /// Get fetch state.
+
    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
+
    /// Get fetch queue.
+
    fn queue(&self) -> &VecDeque<QueuedFetch>;
+
    /// Get outbox.
+
    fn outbox(&self) -> &Outbox;
+
    /// Get rate limitter.
+
    fn limiter(&self) -> &RateLimiter;
    /// Get a repository from storage.
    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError>;
    /// Get the clock.
@@ -2399,6 +2443,8 @@ pub trait ServiceState {
    fn clock_mut(&mut self) -> &mut LocalTime;
    /// Get service configuration.
    fn config(&self) -> &Config;
+
    /// Get service metrics.
+
    fn metrics(&self) -> &Metrics;
}

impl<D, S, G> ServiceState for Service<D, S, G>
@@ -2415,6 +2461,22 @@ where
        &self.sessions
    }

+
    fn fetching(&self) -> &HashMap<RepoId, FetchState> {
+
        &self.fetching
+
    }
+

+
    fn queue(&self) -> &VecDeque<QueuedFetch> {
+
        &self.queue
+
    }
+

+
    fn outbox(&self) -> &Outbox {
+
        &self.outbox
+
    }
+

+
    fn limiter(&self) -> &RateLimiter {
+
        &self.limiter
+
    }
+

    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError> {
        self.storage.get(rid)
    }
@@ -2430,6 +2492,10 @@ where
    fn config(&self) -> &Config {
        &self.config
    }
+

+
    fn metrics(&self) -> &Metrics {
+
        &self.metrics
+
    }
}

/// Disconnect reason.
modified radicle-node/src/service/io.rs
@@ -177,6 +177,12 @@ impl Outbox {
        }
    }

+
    /// Number of items in outbox.
+
    #[allow(clippy::len_without_is_empty)]
+
    pub fn len(&self) -> usize {
+
        self.io.len()
+
    }
+

    #[cfg(any(test, feature = "test"))]
    pub(crate) fn queue(&mut self) -> &mut VecDeque<Io> {
        &mut self.io
modified radicle-node/src/service/limiter.rs
@@ -11,8 +11,8 @@ use radicle::node::{address, config, HostName, NodeId};
/// bucket's capacity.
#[derive(Debug, Default)]
pub struct RateLimiter {
-
    buckets: HashMap<HostName, TokenBucket>,
-
    bypass: HashSet<NodeId>,
+
    pub buckets: HashMap<HostName, TokenBucket>,
+
    pub bypass: HashSet<NodeId>,
}

impl RateLimiter {
@@ -74,7 +74,8 @@ impl AsTokens for config::RateLimit {
    }
}

-
#[derive(Debug)]
+
#[derive(Debug, serde::Serialize)]
+
#[serde(rename_all = "camelCase")]
pub struct TokenBucket {
    /// Token refill rate per second.
    rate: f64,
modified radicle-node/src/test/handle.rs
@@ -115,4 +115,8 @@ impl radicle::node::Handle for Handle {
    fn shutdown(self) -> Result<(), Self::Error> {
        Ok(())
    }
+

+
    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
+
        Ok(serde_json::Value::Null)
+
    }
}
modified radicle-node/src/test/peer.rs
@@ -406,13 +406,15 @@ where
    pub fn messages(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
        let mut msgs = Vec::new();

-
        self.service.outbox().queue().retain(|o| match o {
-
            Io::Write(a, messages) if *a == remote => {
-
                msgs.extend(messages.clone());
-
                false
-
            }
-
            _ => true,
-
        });
+
        Service::outbox(&mut self.service)
+
            .queue()
+
            .retain(|o| match o {
+
                Io::Write(a, messages) if *a == remote => {
+
                    msgs.extend(messages.clone());
+
                    false
+
                }
+
                _ => true,
+
            });

        msgs.into_iter()
    }
@@ -423,7 +425,7 @@ where
        let mut filtered: Vec<Message> = Vec::new();
        let nid = *self.nid();

-
        for o in self.service.outbox().queue() {
+
        for o in Service::outbox(&mut self.service).queue() {
            match o {
                Io::Write(a, messages) if *a == remote => {
                    let (relayed, other): (Vec<Message>, _) =
@@ -448,7 +450,7 @@ where
    pub fn inventory_announcements(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
        let mut invs: Vec<Message> = Vec::new();

-
        for o in self.service.outbox().queue() {
+
        for o in Service::outbox(&mut self.service).queue() {
            match o {
                Io::Write(a, messages) if *a == remote => {
                    let (inventories, other): (Vec<Message>, _) =
@@ -473,12 +475,12 @@ where

    /// Get a draining iterator over the peer's I/O outbox.
    pub fn outbox(&mut self) -> impl Iterator<Item = Io> + '_ {
-
        iter::from_fn(|| self.service.outbox().next())
+
        iter::from_fn(|| Service::outbox(&mut self.service).next())
    }

    /// Get a draining iterator over the peer's I/O outbox, which only returns fetches.
    pub fn fetches(&mut self) -> impl Iterator<Item = (RepoId, NodeId)> + '_ {
-
        iter::from_fn(|| self.service.outbox().next()).filter_map(|io| {
+
        iter::from_fn(|| Service::outbox(&mut self.service).next()).filter_map(|io| {
            if let Io::Fetch { rid, remote, .. } = io {
                Some((rid, remote))
            } else {
modified radicle-node/src/test/simulator.rs
@@ -17,7 +17,7 @@ use log::*;
use crate::crypto::Signer;
use crate::prelude::{Address, RepoId};
use crate::service::io::Io;
-
use crate::service::{DisconnectReason, Event, Message, NodeId};
+
use crate::service::{DisconnectReason, Event, Message, Metrics, NodeId};
use crate::storage::Namespaces;
use crate::storage::{ReadRepository, WriteStorage};
use crate::test::arbitrary;
@@ -368,7 +368,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
            let Scheduled { input, node, .. } = next;

            if let Some(ref mut p) = nodes.get_mut(&node) {
-
                p.tick(time);
+
                p.tick(time, &Metrics::default());

                match input {
                    Input::Connecting { id, addr } => {
modified radicle-node/src/wire/protocol.rs
@@ -30,7 +30,7 @@ use crate::prelude::Deserializer;
use crate::service;
use crate::service::io::Io;
use crate::service::FETCH_TIMEOUT;
-
use crate::service::{session, DisconnectReason, Service};
+
use crate::service::{session, DisconnectReason, Metrics, Service};
use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
use crate::wire::Encode;
@@ -309,6 +309,8 @@ pub struct Wire<D, S, G: Signer + Ecdh> {
    worker: chan::Sender<Task>,
    /// Used for authentication.
    signer: G,
+
    /// Node metrics.
+
    metrics: service::Metrics,
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Outbound attempted peers without a session.
@@ -334,6 +336,7 @@ where
            service,
            worker,
            signer,
+
            metrics: Metrics::default(),
            actions: VecDeque::new(),
            inbound: RandomMap::default(),
            outbound: RandomMap::default(),
@@ -456,11 +459,13 @@ where
            log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
            return;
        };
+
        let metrics = self.metrics.peer(remote);

        for data in s.channels.try_iter() {
            let frame = match data {
                ChannelEvent::Data(data) => {
-
                    s.sent_bytes += data.len();
+
                    metrics.sent_git_bytes += data.len();
+
                    metrics.sent_bytes += data.len();
                    Frame::git(stream, data)
                }
                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
@@ -498,8 +503,10 @@ where
    type Command = Control;

    fn tick(&mut self, time: Timestamp) {
-
        self.service
-
            .tick(LocalTime::from_millis(time.as_millis() as u128));
+
        self.service.tick(
+
            LocalTime::from_millis(time.as_millis() as u128),
+
            &self.metrics,
+
        );
    }

    fn handle_timer(&mut self) {
@@ -603,6 +610,7 @@ where
                    return;
                }
                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
+
                    self.metrics.peer(nid).inbound_connection_attempts += 1;
                    (peer.addr, Link::Inbound)
                } else if let Some(peer) = self.outbound.remove(&fd) {
                    assert_eq!(nid, peer.nid);
@@ -711,6 +719,9 @@ where
                    ..
                }) = self.peers.get_mut(&id)
                {
+
                    let metrics = self.metrics.peer(*nid);
+
                    metrics.received_bytes += data.len();
+

                    if inbox.input(&data).is_err() {
                        log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
                        log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
@@ -726,6 +737,8 @@ where
                                ..
                            })) => {
                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
+
                                metrics.streams_opened += 1;
+
                                metrics.received_fetch_requests += 1;

                                let Some(channels) = streams.register(stream, FETCH_TIMEOUT) else {
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
@@ -777,6 +790,7 @@ where
                                data: FrameData::Gossip(msg),
                                ..
                            })) => {
+
                                metrics.received_gossip_messages += 1;
                                self.service.received_message(*nid, msg);
                            }
                            Ok(Some(Frame {
@@ -785,7 +799,7 @@ where
                                ..
                            })) => {
                                if let Some(s) = streams.get_mut(&stream) {
-
                                    s.received_bytes += data.len();
+
                                    metrics.received_git_bytes += data.len();

                                    if s.channels.send(ChannelEvent::Data(data)).is_err() {
                                        log::error!(target: "wire", "Worker is disconnected; cannot send data");
@@ -943,13 +957,17 @@ where
                    log::trace!(
                        target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
                    );
-

                    let mut data = Vec::new();
+
                    let metrics = self.metrics.peer(node_id);
+
                    metrics.sent_gossip_messages += msgs.len();
+

                    for msg in msgs {
                        Frame::gossip(link, msg)
                            .encode(&mut data)
                            .expect("in-memory writes never fail");
                    }
+
                    metrics.sent_bytes += data.len();
+

                    self.actions.push_back(reactor::Action::Send(fd, data));
                }
                Io::Connect(node_id, addr) => {
@@ -963,6 +981,7 @@ where
                        continue;
                    }
                    self.service.attempted(node_id, addr.clone());
+
                    self.metrics.peer(node_id).outbound_connection_attempts += 1;

                    match dial::<G>(
                        addr.to_inner(),
@@ -1003,7 +1022,9 @@ where
                }
                Io::Disconnect(nid, reason) => {
                    if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
-
                        self.disconnect(id, reason);
+
                        if let Some((nid, _)) = self.disconnect(id, reason) {
+
                            self.metrics.peer(nid).disconnects += 1;
+
                        }
                    } else {
                        log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
                    }
@@ -1053,6 +1074,10 @@ where
                    if self.worker.send(task).is_err() {
                        log::error!(target: "wire", "Worker pool is disconnected; cannot send fetch request");
                    }
+
                    let metrics = self.metrics.peer(remote);
+
                    metrics.streams_opened += 1;
+
                    metrics.sent_fetch_requests += 1;
+

                    self.actions.push_back(Action::Send(
                        fd,
                        Frame::control(link, frame::Control::Open { stream }).to_bytes(),
modified radicle/src/node.rs
@@ -516,6 +516,9 @@ pub enum Command {
    /// Get the node's status.
    Status,

+
    /// Get node debug information.
+
    Debug,
+

    /// Get the node's NID.
    NodeId,

@@ -928,6 +931,8 @@ pub trait Handle: Clone + Sync + Send {
    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
    /// Subscribe to node events.
    fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
+
    /// Return debug information as a JSON value.
+
    fn debug(&self) -> Result<json::Value, Self::Error>;
}

/// Iterator of results `T` when passing a [`Command`] to [`Node::call`].
@@ -1239,6 +1244,15 @@ impl Handle for Node {
        Ok(sessions)
    }

+
    fn debug(&self) -> Result<json::Value, Self::Error> {
+
        let debug = self
+
            .call::<json::Value>(Command::Debug, DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse {})??;
+

+
        Ok(debug)
+
    }
+

    fn shutdown(self) -> Result<(), Error> {
        for line in self.call::<Success>(Command::Shutdown, DEFAULT_TIMEOUT)? {
            line?;