Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add a max capacity to fetch queues
cloudhead committed 1 year ago
commit aac23dde0c5dcaf3861ccf6b78181b2cbc07ab7a
parent 3c1c35f259f145d10c64abc6b006825c7bf8243b
2 files changed +40 -15
modified radicle-node/src/service.rs
@@ -1017,12 +1017,9 @@ where
                        timeout,
                        channel,
                    };
+
                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");

-
                    if self.queue_fetch(fetch) {
-
                        debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
-
                    } else {
-
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
-
                    }
+
                    self.queue_fetch(fetch);
                }
            }
            Err(TryFetchError::SessionCapacityReached) => {
@@ -1047,12 +1044,15 @@ where
        false
    }

-
    fn queue_fetch(&mut self, fetch: QueuedFetch) -> bool {
+
    fn queue_fetch(&mut self, fetch: QueuedFetch) {
        let Some(s) = self.sessions.get_mut(&fetch.from) else {
            log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
-
            return false;
+
            return;
        };
-
        s.queue_fetch(fetch)
+
        if let Err(e) = s.queue_fetch(fetch) {
+
            let fetch = e.inner();
+
            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
+
        }
    }

    // TODO: Buffer/throttle fetches.
modified radicle-node/src/service/session.rs
@@ -15,6 +15,8 @@ pub use crate::node::{PingState, State};

/// Time after which a connection is considered stable.
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
+
/// Maximum items in the fetch queue.
+
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;

#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
@@ -46,6 +48,27 @@ impl Error {
    }
}

+
/// Error when trying to queue a fetch.
+
#[derive(thiserror::Error, Debug, Clone)]
+
pub enum QueueError {
+
    /// The item already exists in the queue.
+
    #[error("item is already queued")]
+
    Duplicate(QueuedFetch),
+
    /// The queue is at capacity.
+
    #[error("queue capacity reached")]
+
    CapacityReached(QueuedFetch),
+
}
+

+
impl QueueError {
+
    /// Get the inner [`QueuedFetch`].
+
    pub fn inner(&self) -> &QueuedFetch {
+
        match self {
+
            Self::Duplicate(f) => f,
+
            Self::CapacityReached(f) => f,
+
        }
+
    }
+
}
+

/// Fetch waiting to be processed, in the fetch queue.
#[derive(Debug, Clone)]
pub struct QueuedFetch {
@@ -131,7 +154,7 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
-
            queue: VecDeque::new(),
+
            queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
            attempts: 1,
            rng,
            limits,
@@ -205,15 +228,17 @@ impl Session {

    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
    /// it already was present in the queue.
-
    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> bool {
+
    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
        assert_eq!(fetch.from, self.id);

-
        if self.queue.contains(&fetch) {
-
            false
-
        } else {
-
            self.queue.push_back(fetch);
-
            true
+
        if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
+
            return Err(QueueError::CapacityReached(fetch));
+
        } else if self.queue.contains(&fetch) {
+
            return Err(QueueError::Duplicate(fetch));
        }
+
        self.queue.push_back(fetch);
+

+
        Ok(())
    }

    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {