Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Make sure all channels we use are bounded
cloudhead committed 1 year ago
commit 5c0d1b10e0c99e763b377b0098ef3d893fdc8098
parent 345ca923801aa13f124526c2e6431acc5823a0a8
6 files changed +72 -10
modified radicle-node/src/runtime.rs
@@ -36,6 +36,9 @@ pub use handle::Error as HandleError;
pub use handle::Handle;
pub use node::events::Emitter;

+
/// Maximum pending worker tasks allowed.
+
pub const MAX_PENDING_TASKS: usize = 1024;
+

/// A client error.
#[derive(Error, Debug)]
pub enum Error {
@@ -214,7 +217,7 @@ impl Runtime {
        );
        service.initialize(clock)?;

-
        let (worker_send, worker_recv) = chan::unbounded::<worker::Task>();
+
        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
        let mut wire = Wire::new(service, worker_send, signer.clone());
        let mut local_addrs = Vec::new();

modified radicle-node/src/runtime/handle.rs
@@ -347,6 +347,10 @@ impl radicle::node::Handle for Handle {
                        "bucket": bucket
                    })
                }).collect::<Vec<_>>(),
+
                "events": json!({
+
                    "subscribers": state.emitter().subscriptions(),
+
                    "pending": state.emitter().pending(),
+
                }),
                "metrics": state.metrics(),
            });
            sender.send(debug).ok();
modified radicle-node/src/service.rs
@@ -117,7 +117,12 @@ pub use message::REF_REMOTE_LIMIT;
#[derive(Clone, Debug, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Metrics {
-
    peers: HashMap<NodeId, PeerMetrics>,
+
    /// Metrics for each peer.
+
    pub peers: HashMap<NodeId, PeerMetrics>,
+
    /// Tasks queued in worker queue.
+
    pub worker_queue_size: usize,
+
    /// Current open channel count.
+
    pub open_channels: usize,
}

impl Metrics {
@@ -2559,8 +2564,10 @@ pub trait ServiceState {
    fn queue(&self) -> &VecDeque<QueuedFetch>;
    /// Get outbox.
    fn outbox(&self) -> &Outbox;
-
    /// Get rate limitter.
+
    /// Get rate limiter.
    fn limiter(&self) -> &RateLimiter;
+
    /// Get event emitter.
+
    fn emitter(&self) -> &Emitter<Event>;
    /// Get a repository from storage.
    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError>;
    /// Get the clock.
@@ -2603,6 +2610,10 @@ where
        &self.limiter
    }

+
    fn emitter(&self) -> &Emitter<Event> {
+
        &self.emitter
+
    }
+

    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError> {
        self.storage.get(rid)
    }
modified radicle-node/src/wire/protocol.rs
@@ -300,6 +300,10 @@ impl Peers {
            }
        })
    }
+

+
    fn iter(&self) -> impl Iterator<Item = &Peer> {
+
        self.0.values()
+
    }
}

/// Wire protocol implementation for a set of peers.
@@ -504,6 +508,18 @@ where
    type Command = Control;

    fn tick(&mut self, time: Timestamp) {
+
        self.metrics.open_channels = self
+
            .peers
+
            .iter()
+
            .filter_map(|p| {
+
                if let Peer::Connected { streams, .. } = p {
+
                    Some(streams.streams.len())
+
                } else {
+
                    None
+
                }
+
            })
+
            .sum();
+
        self.metrics.worker_queue_size = self.worker.len();
        self.service.tick(
            LocalTime::from_millis(time.as_millis() as u128),
            &self.metrics,
@@ -761,8 +777,11 @@ where
                                    stream,
                                    channels,
                                };
-
                                if self.worker.send(task).is_err() {
-
                                    log::error!(target: "wire", "Worker pool is disconnected; cannot send task");
+
                                if let Err(e) = self.worker.try_send(task) {
+
                                    log::error!(
+
                                        target: "wire",
+
                                        "Worker pool failed to accept incoming fetch request: {e}"
+
                                    );
                                }
                            }
                            Ok(Some(Frame {
@@ -1088,8 +1107,11 @@ where
                            "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
                        );
                    }
-
                    if self.worker.send(task).is_err() {
-
                        log::error!(target: "wire", "Worker pool is disconnected; cannot send fetch request");
+
                    if let Err(e) = self.worker.try_send(task) {
+
                        log::error!(
+
                            target: "wire",
+
                            "Worker pool failed to accept outgoing fetch request: {e}"
+
                        );
                    }
                    let metrics = self.metrics.peer(remote);
                    metrics.streams_opened += 1;
modified radicle-node/src/worker/channels.rs
@@ -9,6 +9,9 @@ use radicle::node::NodeId;
use crate::runtime::Handle;
use crate::wire::StreamId;

+
/// Maximum size of channel used to communicate with a worker.
+
pub const MAX_WORKER_CHANNEL_SIZE: usize = 4096;
+

/// A reader and writer pair that can be used in the fetch protocol.
///
/// It implements [`radicle::fetch::transport::ConnectionStream`] to
@@ -96,8 +99,8 @@ impl<T: AsRef<[u8]>> Channels<T> {
    }

    pub fn pair(timeout: time::Duration) -> io::Result<(Channels<T>, Channels<T>)> {
-
        let (l_send, r_recv) = chan::unbounded::<ChannelEvent<T>>();
-
        let (r_send, l_recv) = chan::unbounded::<ChannelEvent<T>>();
+
        let (l_send, r_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);
+
        let (r_send, l_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);

        let l = Channels::new(l_send, l_recv, timeout);
        let r = Channels::new(r_send, r_recv, timeout);
modified radicle/src/node/events.rs
@@ -14,6 +14,9 @@ use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};

+
/// Maximum unconsumed events allowed per subscription.
+
pub const MAX_PENDING_EVENTS: usize = 8192;
+

/// A service event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
@@ -149,6 +152,7 @@ impl<T> Default for Emitter<T> {

impl<T: Clone> Emitter<T> {
    /// Emit event to subscribers and drop those who can't receive it.
+
    /// Nb. subscribers are also dropped if their channel is full.
    pub fn emit(&self, event: T) {
        self.subscribers
            .lock()
@@ -158,10 +162,25 @@ impl<T: Clone> Emitter<T> {

    /// Subscribe to events stream.
    pub fn subscribe(&self) -> chan::Receiver<T> {
-
        let (sender, receiver) = chan::unbounded();
+
        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);
        let mut subs = self.subscribers.lock().unwrap();
        subs.push(sender);

        receiver
    }
+

+
    /// Number of subscribers.
+
    pub fn subscriptions(&self) -> usize {
+
        self.subscribers.lock().unwrap().len()
+
    }
+

+
    /// Number of messages that have not yet been received.
+
    pub fn pending(&self) -> usize {
+
        self.subscribers
+
            .lock()
+
            .unwrap()
+
            .iter()
+
            .map(|ch| ch.len())
+
            .sum()
+
    }
}