Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Simplify and fix fetch dequeueing
Merged did:key:z6MksFqX...wzpT opened 2 years ago

There was redundant code and a missing break.

3 files changed +43 -21 bd8e0ebc ef1ed621
modified radicle-fetch/src/lib.rs
@@ -103,13 +103,23 @@ where
    let result = state
        .run(handle, &handshake, limit, remote, None)
        .map_err(Error::Protocol);
+
    let elapsed = start.elapsed().as_millis();
+
    let rid = handle.repo.id();

-
    log::debug!(
-
        target: "fetch",
-
        "Finished clone of {} ({}ms)",
-
        handle.repo.id(),
-
        start.elapsed().as_millis(),
-
    );
+
    match &result {
+
        Ok(_) => {
+
            log::debug!(
+
                target: "fetch",
+
                "Finished clone of {rid} from {remote} ({elapsed}ms)",
+
            );
+
        }
+
        Err(e) => {
+
            log::debug!(
+
                target: "fetch",
+
                "Clone of {rid} from {remote} failed with '{e}' ({elapsed}ms)",
+
            );
+
        }
+
    }
    result
}

modified radicle-node/src/service.rs
@@ -281,6 +281,16 @@ struct QueuedFetch {
    channel: Option<chan::Sender<FetchResult>>,
}

+
impl PartialEq for QueuedFetch {
+
    fn eq(&self, other: &Self) -> bool {
+
        self.rid == other.rid
+
            && self.from == other.from
+
            && self.refs_at == other.refs_at
+
            && self.channel.is_none()
+
            && other.channel.is_none()
+
    }
+
}
+

/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
@@ -907,13 +917,18 @@ where
                        fetching.subscribe(c);
                    }
                } else {
-
                    debug!(target: "service", "Queueing fetch for {rid} with {from}..");
-
                    self.queue.push_back(QueuedFetch {
+
                    let fetch = QueuedFetch {
                        rid,
                        refs_at,
                        from,
                        channel,
-
                    });
+
                    };
+
                    if self.queue.contains(&fetch) {
+
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
+
                    } else {
+
                        debug!(target: "service", "Queueing fetch for {rid} with {from}..");
+
                        self.queue.push_back(fetch);
+
                    }
                }
            }
            Err(TryFetchError::SessionCapacityReached) => {
@@ -1088,23 +1103,20 @@ where
        {
            debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");

-
            // If no refs are specified, always do a full fetch.
-
            if refs_at.is_empty() {
-
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
-
                return;
-
            }
-

-
            let repo_entry = self
-
                .policies
-
                .seed_policy(&rid)
-
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");
-

            if let Some(refs) = NonEmpty::from_vec(refs_at) {
+
                let repo_entry = self
+
                    .policies
+
                    .seed_policy(&rid)
+
                    .expect("Service::dequeue_fetch: error accessing repo seeding configuration");
+

+
                // Keep dequeueing if there was nothing to fetch, otherwise break.
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, FETCH_TIMEOUT, channel) {
                    break;
                }
            } else {
+
                // If no refs are specified, always do a full fetch.
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
+
                break;
            }
        }
    }
modified radicle-node/src/wire/protocol.rs
@@ -372,7 +372,7 @@ where
    }

    fn worker_result(&mut self, task: TaskResult) {
-
        log::trace!(
+
        log::debug!(
            target: "wire",
            "Received fetch result from worker for stream {}, remote {}: {:?}",
            task.stream, task.remote, task.result