Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Make sure all channels we use are bounded
Merged did:key:z6MksFqX...wzpT opened 1 year ago

This prevents potential memory leaks. We also ensure that sends fail instead of blocking, in case the channels are full.

Additionally, we add some metrics to report on channel size.

6 files changed +72 -10 345ca923 5c0d1b10
modified radicle-node/src/runtime.rs
@@ -36,6 +36,9 @@ pub use handle::Error as HandleError;
pub use handle::Handle;
pub use node::events::Emitter;

+
/// Maximum pending worker tasks allowed.
+
pub const MAX_PENDING_TASKS: usize = 1024;
+

/// A client error.
#[derive(Error, Debug)]
pub enum Error {
@@ -214,7 +217,7 @@ impl Runtime {
        );
        service.initialize(clock)?;

-
        let (worker_send, worker_recv) = chan::unbounded::<worker::Task>();
+
        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
        let mut wire = Wire::new(service, worker_send, signer.clone());
        let mut local_addrs = Vec::new();

modified radicle-node/src/runtime/handle.rs
@@ -347,6 +347,10 @@ impl radicle::node::Handle for Handle {
                        "bucket": bucket
                    })
                }).collect::<Vec<_>>(),
+
                "events": json!({
+
                    "subscribers": state.emitter().subscriptions(),
+
                    "pending": state.emitter().pending(),
+
                }),
                "metrics": state.metrics(),
            });
            sender.send(debug).ok();
modified radicle-node/src/service.rs
@@ -117,7 +117,12 @@ pub use message::REF_REMOTE_LIMIT;
#[derive(Clone, Debug, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Metrics {
-
    peers: HashMap<NodeId, PeerMetrics>,
+
    /// Metrics for each peer.
+
    pub peers: HashMap<NodeId, PeerMetrics>,
+
    /// Tasks queued in worker queue.
+
    pub worker_queue_size: usize,
+
    /// Current open channel count.
+
    pub open_channels: usize,
}

impl Metrics {
@@ -2559,8 +2564,10 @@ pub trait ServiceState {
    fn queue(&self) -> &VecDeque<QueuedFetch>;
    /// Get outbox.
    fn outbox(&self) -> &Outbox;
-
    /// Get rate limitter.
+
    /// Get rate limiter.
    fn limiter(&self) -> &RateLimiter;
+
    /// Get event emitter.
+
    fn emitter(&self) -> &Emitter<Event>;
    /// Get a repository from storage.
    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError>;
    /// Get the clock.
@@ -2603,6 +2610,10 @@ where
        &self.limiter
    }

+
    fn emitter(&self) -> &Emitter<Event> {
+
        &self.emitter
+
    }
+

    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError> {
        self.storage.get(rid)
    }
modified radicle-node/src/wire/protocol.rs
@@ -300,6 +300,10 @@ impl Peers {
            }
        })
    }
+

+
    fn iter(&self) -> impl Iterator<Item = &Peer> {
+
        self.0.values()
+
    }
}

/// Wire protocol implementation for a set of peers.
@@ -504,6 +508,18 @@ where
    type Command = Control;

    fn tick(&mut self, time: Timestamp) {
+
        self.metrics.open_channels = self
+
            .peers
+
            .iter()
+
            .filter_map(|p| {
+
                if let Peer::Connected { streams, .. } = p {
+
                    Some(streams.streams.len())
+
                } else {
+
                    None
+
                }
+
            })
+
            .sum();
+
        self.metrics.worker_queue_size = self.worker.len();
        self.service.tick(
            LocalTime::from_millis(time.as_millis() as u128),
            &self.metrics,
@@ -761,8 +777,11 @@ where
                                    stream,
                                    channels,
                                };
-
                                if self.worker.send(task).is_err() {
-
                                    log::error!(target: "wire", "Worker pool is disconnected; cannot send task");
+
                                if let Err(e) = self.worker.try_send(task) {
+
                                    log::error!(
+
                                        target: "wire",
+
                                        "Worker pool failed to accept incoming fetch request: {e}"
+
                                    );
                                }
                            }
                            Ok(Some(Frame {
@@ -1088,8 +1107,11 @@ where
                            "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
                        );
                    }
-
                    if self.worker.send(task).is_err() {
-
                        log::error!(target: "wire", "Worker pool is disconnected; cannot send fetch request");
+
                    if let Err(e) = self.worker.try_send(task) {
+
                        log::error!(
+
                            target: "wire",
+
                            "Worker pool failed to accept outgoing fetch request: {e}"
+
                        );
                    }
                    let metrics = self.metrics.peer(remote);
                    metrics.streams_opened += 1;
modified radicle-node/src/worker/channels.rs
@@ -9,6 +9,9 @@ use radicle::node::NodeId;
use crate::runtime::Handle;
use crate::wire::StreamId;

+
/// Maximum size of channel used to communicate with a worker.
+
pub const MAX_WORKER_CHANNEL_SIZE: usize = 4096;
+

/// A reader and writer pair that can be used in the fetch protocol.
///
/// It implements [`radicle::fetch::transport::ConnectionStream`] to
@@ -96,8 +99,8 @@ impl<T: AsRef<[u8]>> Channels<T> {
    }

    pub fn pair(timeout: time::Duration) -> io::Result<(Channels<T>, Channels<T>)> {
-
        let (l_send, r_recv) = chan::unbounded::<ChannelEvent<T>>();
-
        let (r_send, l_recv) = chan::unbounded::<ChannelEvent<T>>();
+
        let (l_send, r_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);
+
        let (r_send, l_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);

        let l = Channels::new(l_send, l_recv, timeout);
        let r = Channels::new(r_send, r_recv, timeout);
modified radicle/src/node/events.rs
@@ -14,6 +14,9 @@ use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};

+
/// Maximum unconsumed events allowed per subscription.
+
pub const MAX_PENDING_EVENTS: usize = 8192;
+

/// A service event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
@@ -149,6 +152,7 @@ impl<T> Default for Emitter<T> {

impl<T: Clone> Emitter<T> {
    /// Emit event to subscribers and drop those who can't receive it.
+
    /// Nb. subscribers are also dropped if their channel is full.
    pub fn emit(&self, event: T) {
        self.subscribers
            .lock()
@@ -158,10 +162,25 @@ impl<T: Clone> Emitter<T> {

    /// Subscribe to events stream.
    pub fn subscribe(&self) -> chan::Receiver<T> {
-
        let (sender, receiver) = chan::unbounded();
+
        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);
        let mut subs = self.subscribers.lock().unwrap();
        subs.push(sender);

        receiver
    }
+

+
    /// Number of subscribers.
+
    pub fn subscriptions(&self) -> usize {
+
        self.subscribers.lock().unwrap().len()
+
    }
+

+
    /// Number of messages that have not yet been received.
+
    pub fn pending(&self) -> usize {
+
        self.subscribers
+
            .lock()
+
            .unwrap()
+
            .iter()
+
            .map(|ch| ch.len())
+
            .sum()
+
    }
}