Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Revise fetch dequeue behavior
cloudhead committed 1 year ago
commit dd7a0b3524923fcf51f0b93daa598fa76c529e99
parent 47c9f792e608c80ec4cdf57429a8177efcfcc73f
1 file changed +22 -6
modified radicle-node/src/service.rs
@@ -809,6 +809,7 @@ where
            self.disconnect_unresponsive_peers(&now);
            self.idle_connections();
            self.maintain_connections();
+
            self.dequeue_fetch();
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
@@ -989,8 +990,7 @@ where
                if status.want.is_empty() {
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
-
                    self._fetch(rid, from, status.want, timeout, channel);
-
                    return true;
+
                    return self._fetch(rid, from, status.want, timeout, channel);
                }
            }
            Err(e) => {
@@ -1008,7 +1008,7 @@ where
        from: NodeId,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) {
+
    ) -> bool {
        self._fetch(rid, from, vec![], timeout, channel)
    }

@@ -1019,12 +1019,13 @@ where
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) {
+
    ) -> bool {
        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
            Ok(fetching) => {
                if let Some(c) = channel {
                    fetching.subscribe(c);
                }
+
                return true;
            }
            Err(TryFetchError::AlreadyFetching(fetching)) => {
                // If we're already fetching the same refs from the requested peer, there's nothing
@@ -1047,7 +1048,7 @@ where
                    if self.queue.contains(&fetch) {
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
                    } else {
-
                        debug!(target: "service", "Queueing fetch for {rid} with {from}..");
+
                        debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
                        self.queue.push_back(fetch);
                    }
                }
@@ -1071,6 +1072,7 @@ where
                }
            }
        }
+
        false
    }

    // TODO: Buffer/throttle fetches.
@@ -1220,6 +1222,8 @@ where
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
    pub fn dequeue_fetch(&mut self) {
+
        let mut tries = self.queue.len();
+

        while let Some(QueuedFetch {
            rid,
            from,
@@ -1245,7 +1249,19 @@ where
                }
            } else {
                // If no refs are specified, always do a full fetch.
-
                self.fetch(rid, from, timeout, channel);
+
                if self.fetch(rid, from, timeout, channel) {
+
                    break;
+
                }
+
            }
+
            // Nb. Just a precaution, `tries` should always be >= 1 here.
+
            tries = tries.saturating_sub(1);
+
            // To avoid looping forever, only try to dequeue a fixed number of fetches at a time.
+
            if tries == 0 {
+
                debug!(
+
                    target: "service",
+
                    "Giving up on dequeuing, {} item(s) still left in the queue..",
+
                    self.queue.len()
+
                );
                break;
            }
        }