Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol/service: Wire up `FetcherService`
✗ CI failure Fintan Halpenny committed 3 months ago
commit 9e28c1c56c39ab567352f7a9dc994850e68e4a8b
parent 2663fe96ae621d79af42bc870c417706ccfcd63d
1 failed (1 total) View logs
11 files changed +242 -387
modified CHANGELOG.md
@@ -27,6 +27,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
  pushed the default branch to the local user's namespace. The command is now
  deprecated, and the user should use `git push` instead.

+
## Breaking Changes
+

+
- The `Connected` state of a peer no longer contains fetching information. This
+
  information was returned when requesting for `Seeds` on the control socket.
+
  Callers should no longer expect the `fetching` inside that JSON result.
+
- The `rad debug` information for ongoing fetches contained the number of
+
  subscribers awaiting for results, this was removed.
+

## 1.6.1

## Fixed Bugs
modified crates/radicle-node/src/runtime/handle.rs
@@ -350,20 +350,22 @@ impl radicle::node::Handle for Handle {
    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            let fetcher_state = state.fetching();
            let debug = serde_json::json!({
                "outboxSize": state.outbox().len(),
-
                "fetching": state.fetching().iter().map(|(rid, state)| {
-
                    json!({
-
                        "rid": rid,
-
                        "from": state.from,
-
                        "refsAt": state.refs_at,
-
                        "subscribers": state.subscribers.len(),
-
                    })
-
                }).collect::<Vec<_>>(),
-
                "queue": state.sessions().values().map(|sess| {
+
                "fetching": fetcher_state.active_fetches()
+
                    .iter()
+
                    .map(|(rid, active)| {
+
                        json!({
+
                            "rid": rid,
+
                            "from": active.from(),
+
                            "refsAt": active.refs_at(),
+
                        })
+
                    }).collect::<Vec<_>>(),
+
                "queue": fetcher_state.queued_fetches().iter().map(|(node, queue)| {
                    json!({
-
                        "nid": sess.id,
-
                        "queue": sess.queue.iter().map(|fetch| {
+
                        "nid": node,
+
                        "queue": queue.iter().map(|fetch| {
                            json!({
                                "rid": fetch.rid,
                                "from": fetch.from,
modified crates/radicle-node/src/tests.rs
@@ -1512,6 +1512,7 @@ fn test_queued_fetch_max_capacity() {

    // Finish the 1st fetch.
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));
+

    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid2);
    // ... but not the third.
modified crates/radicle-protocol/src/fetcher/state.rs
@@ -280,8 +280,8 @@ impl Default for Config {
/// An active fetch represents a repository being fetched by a particular node.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ActiveFetch {
-
    pub(super) from: NodeId,
-
    pub(super) refs_at: Vec<RefsAt>,
+
    pub from: NodeId,
+
    pub refs_at: Vec<RefsAt>,
}

impl ActiveFetch {
modified crates/radicle-protocol/src/service.rs
@@ -38,6 +38,9 @@ use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::RepositoryError;
use radicle_fetch::policy::SeedingPolicy;

+
use crate::fetcher;
+
use crate::fetcher::service::FetcherService;
+
use crate::fetcher::FetcherState;
use crate::service::gossip::Store as _;
use crate::service::message::{
    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
@@ -221,7 +224,9 @@ pub enum ConnectError {
    SelfConnection,
    #[error("outbound connection limit reached when attempting {nid} ({addr})")]
    LimitReached { nid: NodeId, addr: Address },
-
    #[error("attempted connection to {nid}, via {addr} but addresses of this kind are not supported")]
+
    #[error(
+
        "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
+
    )]
    UnsupportedAddress { nid: NodeId, addr: Address },
}

@@ -301,25 +306,6 @@ pub enum CommandError {
    Policy(#[from] policy::Error),
}

-
/// Error returned by [`Service::try_fetch`].
-
#[derive(thiserror::Error, Debug)]
-
enum TryFetchError<'a> {
-
    #[error("ongoing fetch for repository exists")]
-
    AlreadyFetching(&'a mut FetchState),
-
    #[error("peer is not connected; cannot initiate fetch")]
-
    SessionNotConnected,
-
    #[error("peer fetch capacity reached; cannot initiate fetch")]
-
    SessionCapacityReached,
-
    #[error(transparent)]
-
    Namespaces(Box<NamespacesError>),
-
}
-

-
impl From<NamespacesError> for TryFetchError<'_> {
-
    fn from(e: NamespacesError) -> Self {
-
        Self::Namespaces(Box::new(e))
-
    }
-
}
-

/// Fetch state for an ongoing fetch.
#[derive(Debug)]
pub struct FetchState {
@@ -331,15 +317,6 @@ pub struct FetchState {
    pub subscribers: Vec<chan::Sender<FetchResult>>,
}

-
impl FetchState {
-
    /// Add a subscriber to this fetch.
-
    fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
-
        if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
-
            self.subscribers.push(c);
-
        }
-
    }
-
}
-

/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
@@ -439,8 +416,7 @@ pub struct Service<D, S, G> {
    inventory: InventoryAnnouncement,
    /// Source of entropy.
    rng: Rng,
-
    /// Ongoing fetches.
-
    fetching: HashMap<RepoId, FetchState>,
+
    fetcher: FetcherService<chan::Sender<FetchResult>>,
    /// Request/connection rate limiter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -508,7 +484,15 @@ where
        let last_timestamp = node.timestamp;
        let clock = LocalTime::default(); // Updated on initialize.
        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
-

+
        let fetcher = {
+
            let config = fetcher::Config::new()
+
                .with_max_concurrency(
+
                    std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
+
                        .expect("fetch concurrency was zero, must be at least 1"),
+
                )
+
                .with_max_capacity(fetcher::MaxQueueSize::default());
+
            FetcherService::new(config)
+
        };
        Self {
            config,
            storage,
@@ -522,7 +506,7 @@ where
            outbox: Outbox::default(),
            limiter,
            sessions,
-
            fetching: HashMap::new(),
+
            fetcher,
            filter: Filter::empty(),
            relayed_by: HashMap::default(),
            last_idle: LocalTime::default(),
@@ -623,6 +607,10 @@ where
        Events::from(self.emitter.subscribe())
    }

+
    pub fn fetcher(&self) -> &FetcherState {
+
        self.fetcher.state()
+
    }
+

    /// Get I/O outbox.
    pub fn outbox(&mut self) -> &mut Outbox {
        &mut self.outbox
@@ -898,7 +886,7 @@ where
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
-
                self.fetch(rid, seed, timeout, Some(resp));
+
                self.fetch(rid, seed, vec![], timeout, Some(resp));
            }
            Command::Seed(rid, scope, resp) => {
                // Update our seeding policy.
@@ -990,7 +978,8 @@ where
                if status.want.is_empty() {
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
-
                    return self._fetch(rid, from, status.want, timeout, channel);
+
                    self.fetch(rid, from, status.want, timeout, channel);
+
                    return true;
                }
            }
            Err(e) => {
@@ -1001,247 +990,176 @@ where
        false
    }

-
    /// Initiate an outgoing fetch for some repository.
    fn fetch(
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        timeout: time::Duration,
-
        channel: Option<chan::Sender<FetchResult>>,
-
    ) -> bool {
-
        self._fetch(rid, from, vec![], timeout, channel)
-
    }
-

-
    fn _fetch(
-
        &mut self,
-
        rid: RepoId,
-
        from: NodeId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) -> bool {
-
        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
-
            Ok(fetching) => {
+
    ) {
+
        let session = {
+
            let reason = format!("peer {from} is not connected; cannot initiate fetch");
+
            let Some(session) = self.sessions.get_mut(&from) else {
                if let Some(c) = channel {
-
                    fetching.subscribe(c);
+
                    c.send(FetchResult::Failed { reason }).ok();
                }
-
                return true;
-
            }
-
            Err(TryFetchError::AlreadyFetching(fetching)) => {
-
                // If we're already fetching the same refs from the requested peer, there's nothing
-
                // to do, we simply add the supplied channel to the list of subscribers so that it
-
                // is notified on completion. Otherwise, we queue a fetch with the requested peer.
-
                if fetching.from == from && fetching.refs_at == refs_at {
-
                    debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
-

-
                    if let Some(c) = channel {
-
                        fetching.subscribe(c);
-
                    }
-
                } else {
-
                    let fetch = QueuedFetch {
-
                        rid,
-
                        refs_at,
-
                        from,
-
                        timeout,
-
                        channel,
-
                    };
-
                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
-

-
                    self.queue_fetch(fetch);
-
                }
-
            }
-
            Err(TryFetchError::SessionCapacityReached) => {
-
                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
-
                self.queue_fetch(QueuedFetch {
-
                    rid,
-
                    refs_at,
-
                    from,
-
                    timeout,
-
                    channel,
-
                });
-
            }
-
            Err(e) => {
+
                return;
+
            };
+
            if !session.is_connected() {
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed {
-
                        reason: e.to_string(),
-
                    })
-
                    .ok();
+
                    c.send(FetchResult::Failed { reason }).ok();
                }
+
                return;
            }
-
        }
-
        false
-
    }
-

-
    fn queue_fetch(&mut self, fetch: QueuedFetch) {
-
        let Some(s) = self.sessions.get_mut(&fetch.from) else {
-
            log::debug!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
-
            return;
+
            session
        };
-
        if let Err(e) = s.queue_fetch(fetch) {
-
            let fetch = e.inner();
-
            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
-
        }
-
    }

-
    // TODO: Buffer/throttle fetches.
-
    fn try_fetch(
-
        &mut self,
-
        rid: RepoId,
-
        from: &NodeId,
-
        refs_at: Vec<RefsAt>,
-
        timeout: time::Duration,
-
    ) -> Result<&mut FetchState, TryFetchError<'_>> {
-
        let from = *from;
-
        let Some(session) = self.sessions.get_mut(&from) else {
-
            return Err(TryFetchError::SessionNotConnected);
-
        };
-
        let fetching = self.fetching.entry(rid);
-

-
        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
-

-
        let fetching = match fetching {
-
            Entry::Vacant(fetching) => fetching,
-
            Entry::Occupied(fetching) => {
-
                // We're already fetching this repo from some peer.
-
                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
-
            }
-
        };
-
        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
-
        // fetching from any session.
-
        debug_assert!(!session.is_fetching(&rid));
-

-
        if !session.is_connected() {
-
            // This can happen if a session disconnects in the time between asking for seeds to
-
            // fetch from, and initiating the fetch from one of those seeds.
-
            return Err(TryFetchError::SessionNotConnected);
-
        }
-
        if session.is_at_capacity() {
-
            // If we're already fetching multiple repos from this peer.
-
            return Err(TryFetchError::SessionCapacityReached);
-
        }
-

-
        let fetching = fetching.insert(FetchState {
+
        let cmd = fetcher::state::command::Fetch {
            from,
-
            refs_at: refs_at.clone(),
-
            subscribers: vec![],
-
        });
-
        self.outbox.fetch(
-
            session,
            rid,
            refs_at,
            timeout,
-
            self.config.limits.fetch_pack_receive,
-
        );
+
        };
+
        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);

-
        Ok(fetching)
+
        if let Some(c) = rejected {
+
            c.send(FetchResult::Failed {
+
                reason: "fetch queue at capacity".to_string(),
+
            })
+
            .ok();
+
        }
+

+
        match event {
+
            fetcher::state::event::Fetch::Started {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            } => {
+
                debug!(target: "service", "Starting fetch for {rid} from {from}");
+
                self.outbox.fetch(
+
                    session,
+
                    rid,
+
                    refs_at,
+
                    timeout,
+
                    self.config.limits.fetch_pack_receive,
+
                );
+
            }
+
            fetcher::state::event::Fetch::Queued { rid, from } => {
+
                debug!(target: "service", "Queued fetch for {rid} from {from}");
+
            }
+
            fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
+
                debug!(target: "service", "Already fetching {rid} from {from}");
+
            }
+
            fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
+
                debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
+
            }
+
        }
    }

    pub fn fetched(
        &mut self,
        rid: RepoId,
-
        remote: NodeId,
+
        from: NodeId,
        result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
    ) {
-
        let Some(fetching) = self.fetching.remove(&rid) else {
-
            debug!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
-
            return;
-
        };
-
        debug_assert_eq!(fetching.from, remote);
-

-
        if let Some(s) = self.sessions.get_mut(&remote) {
-
            // Mark this RID as fetched for this session.
-
            s.fetched(rid);
-
        }
+
        let cmd = fetcher::state::command::Fetched { from, rid };
+
        let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);

-
        // Notify all fetch subscribers of the fetch result. This is used when the user requests
-
        // a fetch via the CLI, for example.
-
        for sub in &fetching.subscribers {
-
            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
+
        // Dequeue next fetches
+
        self.dequeue_fetches();

-
            let result = match &result {
-
                Ok(success) => FetchResult::Success {
-
                    updated: success.updated.clone(),
-
                    namespaces: success.namespaces.clone(),
-
                    clone: success.clone,
-
                },
-
                Err(e) => FetchResult::Failed {
-
                    reason: e.to_string(),
-
                },
-
            };
-
            if sub.send(result).is_err() {
-
                debug!(target: "service", "Failed to send fetch result for {rid} from {remote}..");
-
            } else {
-
                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
-
            }
-
        }
-

-
        match result {
-
            Ok(crate::worker::fetch::FetchResult {
-
                updated,
-
                canonical,
-
                namespaces,
-
                clone,
-
                doc,
-
            }) => {
-
                info!(target: "service", "Fetched {rid} from {remote} successfully");
-
                // Update our routing table in case this fetch was user-initiated and doesn't
-
                // come from an announcement.
-
                self.seed_discovered(rid, remote, self.clock.into());
-

-
                for update in &updated {
-
                    if update.is_skipped() {
-
                        trace!(target: "service", "Ref skipped: {update} for {rid}");
-
                    } else {
-
                        debug!(target: "service", "Ref updated: {update} for {rid}");
-
                    }
+
        match event {
+
            fetcher::state::event::Fetched::NotFound { from, rid } => {
+
                debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
+
            }
+
            fetcher::state::event::Fetched::Completed {
+
                from,
+
                rid,
+
                refs_at: _,
+
            } => {
+
                // Notify responders
+
                let fetch_result = match &result {
+
                    Ok(success) => FetchResult::Success {
+
                        updated: success.updated.clone(),
+
                        namespaces: success.namespaces.clone(),
+
                        clone: success.clone,
+
                    },
+
                    Err(e) => FetchResult::Failed {
+
                        reason: e.to_string(),
+
                    },
+
                };
+
                for responder in subscribers {
+
                    responder.send(fetch_result.clone()).ok();
                }
-
                self.emitter.emit(Event::RefsFetched {
-
                    remote,
-
                    rid,
-
                    updated: updated.clone(),
-
                });
-
                self.emitter
-
                    .emit_all(canonical.into_iter().map(|(refname, target)| {
-
                        Event::CanonicalRefUpdated {
-
                            rid,
-
                            refname,
-
                            target,
+
                match result {
+
                    Ok(crate::worker::fetch::FetchResult {
+
                        updated,
+
                        canonical,
+
                        namespaces,
+
                        clone,
+
                        doc,
+
                    }) => {
+
                        info!(target: "service", "Fetched {rid} from {from} successfully");
+
                        // Update our routing table in case this fetch was user-initiated and doesn't
+
                        // come from an announcement.
+
                        self.seed_discovered(rid, from, self.clock.into());
+

+
                        for update in &updated {
+
                            if update.is_skipped() {
+
                                trace!(target: "service", "Ref skipped: {update} for {rid}");
+
                            } else {
+
                                debug!(target: "service", "Ref updated: {update} for {rid}");
+
                            }
                        }
-
                    }));
+
                        self.emitter.emit(Event::RefsFetched {
+
                            remote: from,
+
                            rid,
+
                            updated: updated.clone(),
+
                        });
+
                        self.emitter
+
                            .emit_all(canonical.into_iter().map(|(refname, target)| {
+
                                Event::CanonicalRefUpdated {
+
                                    rid,
+
                                    refname,
+
                                    target,
+
                                }
+
                            }));

-
                // Announce our new inventory if this fetch was a full clone.
-
                // Only update and announce inventory for public repositories.
-
                if clone && doc.is_public() {
-
                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
+
                        // Announce our new inventory if this fetch was a full clone.
+
                        // Only update and announce inventory for public repositories.
+
                        if clone && doc.is_public() {
+
                            debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");

-
                    if let Err(e) = self.add_inventory(rid) {
-
                        warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
-
                    }
-
                }
+
                            if let Err(e) = self.add_inventory(rid) {
+
                                warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
+
                            }
+
                        }

-
                // It's possible for a fetch to succeed but nothing was updated.
-
                if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
-
                    debug!(target: "service", "Nothing to announce, no refs were updated..");
-
                } else {
-
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
-
                    // beyond just knowing that we have added an item to our inventory.
-
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
-
                        warn!(target: "service", "Failed to announce new refs: {e}");
+
                        // It's possible for a fetch to succeed but nothing was updated.
+
                        if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
+
                            debug!(target: "service", "Nothing to announce, no refs were updated..");
+
                        } else {
+
                            // Finally, announce the refs. This is useful for nodes to know what we've synced,
+
                            // beyond just knowing that we have added an item to our inventory.
+
                            if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
+
                                warn!(target: "service", "Failed to announce new refs: {e}");
+
                            }
+
                        }
                    }
-
                }
-
            }
-
            Err(err) => {
-
                warn!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
+
                    Err(err) => {
+
                        warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");

-
                // For now, we only disconnect the remote in case of timeout. In the future,
-
                // there may be other reasons to disconnect.
-
                if err.is_timeout() {
-
                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
+
                        // For now, we only disconnect the from in case of timeout. In the future,
+
                        // there may be other reasons to disconnect.
+
                        if err.is_timeout() {
+
                            self.outbox.disconnect(from, DisconnectReason::Fetch(err));
+
                        }
+
                    }
                }
            }
        }
-
        // We can now try to dequeue more fetches.
-
        self.dequeue_fetches();
    }

    /// Attempt to dequeue fetches from all peers.
@@ -1258,38 +1176,42 @@ where
            .map(|(k, _)| *k)
            .collect::<Vec<_>>();

-
        // Try to dequeue once per session.
        for nid in sessions {
-
            // SAFETY: All the keys we are iterating on exist.
            #[allow(clippy::unwrap_used)]
            let sess = self.sessions.get_mut(&nid).unwrap();
-
            if !sess.is_connected() || sess.is_at_capacity() {
+
            if !sess.is_connected() {
                continue;
            }

-
            if let Some(QueuedFetch {
+
            let Some(fetcher::QueuedFetch {
                rid,
                from,
                refs_at,
                timeout,
-
                channel,
-
            }) = sess.dequeue_fetch()
-
            {
-
                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
+
            }) = self.fetcher.dequeue(&nid)
+
            else {
+
                continue;
+
            };

-
                if let Some(refs) = NonEmpty::from_vec(refs_at) {
-
                    let repo_entry = self.policies.seed_policy(&rid).expect(
-
                        "Service::dequeue_fetch: error accessing repo seeding configuration",
-
                    );
-
                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
-
                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
-
                        continue;
-
                    };
-
                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
-
                } else {
-
                    // If no refs are specified, always do a full fetch.
-
                    self.fetch(rid, from, timeout, channel);
-
                }
+
            // Check seeding policy
+
            let repo_entry = self
+
                .policies
+
                .seed_policy(&rid)
+
                .expect("error accessing repo seeding configuration");
+

+
            let SeedingPolicy::Allow { scope } = repo_entry.policy else {
+
                debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
+
                continue;
+
            };
+

+
            debug!(target: "service", "Dequeued fetch for {} from {}", rid, from);
+

+
            // Channel is `None` in both cases since they will already be
+
            // registered with the fetcher service.
+
            if let Some(refs) = NonEmpty::from_vec(refs_at.clone()) {
+
                self.fetch_refs_at(rid, from, refs, scope, timeout, None);
+
            } else {
+
                self.fetch(rid, from, refs_at, timeout, None);
            }
        }
    }
@@ -1391,7 +1313,6 @@ where
                        self.config.is_persistent(&remote),
                        self.rng.clone(),
                        self.clock,
-
                        self.config.limits.clone(),
                    ));
                    self.outbox.write_all(peer, msgs);
                }
@@ -1422,19 +1343,30 @@ where
        let link = session.link;
        let addr = session.addr.clone();

-
        self.fetching.retain(|_, fetching| {
-
            if fetching.from != remote {
-
                return true;
+
        let cmd = fetcher::state::command::Cancel { from: remote };
+
        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
+

+
        match event {
+
            fetcher::state::event::Cancel::Unexpected { from } => {
+
                debug!(target: "service", "No fetches to cancel for {from}");
            }
-
            // Remove and fail any pending fetches from this remote node.
-
            for resp in &fetching.subscribers {
-
                resp.send(FetchResult::Failed {
-
                    reason: format!("disconnected: {reason}"),
+
            fetcher::state::event::Cancel::Canceled {
+
                from,
+
                active,
+
                queued,
+
            } => {
+
                debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
+
            }
+
        }
+

+
        // Notify orphaned responders
+
        for (rid, responder) in orphaned {
+
            responder
+
                .send(FetchResult::Failed {
+
                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
                })
                .ok();
-
            }
-
            false
-
        });
+
        }

        // Attempt to re-connect to persistent peers.
        if self.config.is_persistent(&remote) {
@@ -1651,7 +1583,7 @@ where

                for rid in missing {
                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
-
                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
+
                    self.fetch(rid, *announcer, vec![], FETCH_TIMEOUT, None);
                }
                return Ok(relay);
            }
@@ -2284,13 +2216,7 @@ where
        }
        self.sessions.insert(
            nid,
-
            Session::outbound(
-
                nid,
-
                addr.clone(),
-
                persistent,
-
                self.rng.clone(),
-
                self.config.limits.clone(),
-
            ),
+
            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
        );
        self.outbox.connect(nid, addr);

@@ -2563,7 +2489,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
+
                            self.fetch(rid, seed.nid, vec![], FETCH_TIMEOUT, None);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
@@ -2717,7 +2643,7 @@ pub trait ServiceState {
    /// Get the existing sessions.
    fn sessions(&self) -> &Sessions;
    /// Get fetch state.
-
    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
+
    fn fetching(&self) -> &FetcherState;
    /// Get outbox.
    fn outbox(&self) -> &Outbox;
    /// Get rate limiter.
@@ -2750,8 +2676,8 @@ where
        &self.sessions
    }

-
    fn fetching(&self) -> &HashMap<RepoId, FetchState> {
-
        &self.fetching
+
    fn fetching(&self) -> &FetcherState {
+
        self.fetcher.state()
    }

    fn outbox(&self) -> &Outbox {
modified crates/radicle-protocol/src/service/io.rs
@@ -138,8 +138,6 @@ impl Outbox {
        timeout: time::Duration,
        reader_limit: FetchPackSizeLimit,
    ) {
-
        peer.fetching(rid);
-

        let refs_at = (!refs_at.is_empty()).then_some(refs_at);

        if let Some(refs_at) = &refs_at {
modified crates/radicle-protocol/src/service/session.rs
@@ -1,8 +1,7 @@
-
use std::collections::{HashSet, VecDeque};
+
use std::collections::VecDeque;
use std::{fmt, time};

use crossbeam_channel as chan;
-
use radicle::node::config::Limits;
use radicle::node::{FetchResult, Severity};
use radicle::node::{Link, Timestamp};
pub use radicle::node::{PingState, State};
@@ -111,8 +110,6 @@ pub struct Session {
    pub subscribe: Option<message::Subscribe>,
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,
-
    /// Fetch queue.
-
    pub queue: VecDeque<QueuedFetch>,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
@@ -120,8 +117,6 @@ pub struct Session {
    attempts: usize,
    /// Source of entropy.
    rng: Rng,
-
    /// Protocol limits.
-
    limits: Limits,
}

impl fmt::Display for Session {
@@ -159,7 +154,7 @@ impl From<&Session> for radicle::node::Session {
}

impl Session {
-
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
+
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
        Self {
            id,
            addr,
@@ -168,28 +163,18 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
-
            queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
            attempts: 1,
            rng,
-
            limits,
        }
    }

-
    pub fn inbound(
-
        id: NodeId,
-
        addr: Address,
-
        persistent: bool,
-
        rng: Rng,
-
        time: LocalTime,
-
        limits: Limits,
-
    ) -> Self {
+
    pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
        Self {
            id,
            addr,
            state: State::Connected {
                since: time,
                ping: PingState::default(),
-
                fetching: HashSet::default(),
                latencies: VecDeque::default(),
                stable: false,
            },
@@ -197,10 +182,8 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: time,
-
            queue: VecDeque::new(),
            attempts: 0,
            rng,
-
            limits,
        }
    }

@@ -224,41 +207,6 @@ impl Session {
        matches!(self.state, State::Initial)
    }

-
    pub fn is_at_capacity(&self) -> bool {
-
        if let State::Connected { fetching, .. } = &self.state {
-
            if fetching.len() >= self.limits.fetch_concurrency.into() {
-
                return true;
-
            }
-
        }
-
        false
-
    }
-

-
    pub fn is_fetching(&self, rid: &RepoId) -> bool {
-
        if let State::Connected { fetching, .. } = &self.state {
-
            return fetching.contains(rid);
-
        }
-
        false
-
    }
-

-
    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
-
    /// it already was present in the queue.
-
    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
-
        assert_eq!(fetch.from, self.id);
-

-
        if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
-
            return Err(QueueError::CapacityReached(fetch));
-
        } else if self.queue.contains(&fetch) {
-
            return Err(QueueError::Duplicate(fetch));
-
        }
-
        self.queue.push_back(fetch);
-

-
        Ok(())
-
    }
-

-
    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
-
        self.queue.pop_front()
-
    }
-

    pub fn attempts(&self) -> usize {
        self.attempts
    }
@@ -279,33 +227,6 @@ impl Session {
        }
    }

-
    /// Mark this session as fetching the given RID.
-
    ///
-
    /// # Panics
-
    ///
-
    /// If it is already fetching that RID, or the session is disconnected.
-
    pub fn fetching(&mut self, rid: RepoId) {
-
        if let State::Connected { fetching, .. } = &mut self.state {
-
            assert!(
-
                fetching.insert(rid),
-
                "Session must not already be fetching {rid}"
-
            );
-
        } else {
-
            panic!(
-
                "Attempting to fetch {rid} from disconnected session {}",
-
                self.id
-
            );
-
        }
-
    }
-

-
    pub fn fetched(&mut self, rid: RepoId) {
-
        if let State::Connected { fetching, .. } = &mut self.state {
-
            if !fetching.remove(&rid) {
-
                log::debug!(target: "service", "Fetched unknown repository {rid}");
-
            }
-
        }
-
    }
-

    pub fn to_attempted(&mut self) {
        assert!(
            self.is_initial(),
@@ -324,7 +245,6 @@ impl Session {
        self.state = State::Connected {
            since,
            ping: PingState::default(),
-
            fetching: HashSet::default(),
            latencies: VecDeque::default(),
            stable: false,
        };
modified crates/radicle-schemars/src/main.rs
@@ -87,7 +87,7 @@ fn print_schema() -> io::Result<()> {
                    #[schemars(with = "radicle::schemars_ext::crypto::PublicKey")]
                    radicle::node::NodeId,
                ),
-
                Config(radicle::node::Config),
+
                Config(Box<radicle::node::Config>),
                ListenAddrs(ListenAddrs),
                ConnectResult(radicle::node::ConnectResult),
                Success(radicle::node::Success),
modified crates/radicle/CHANGELOG.md
@@ -46,6 +46,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed

+
- The data returned by `Seeds` contains `state`, which in turn contained the
+
  field `fetching` for ongoing fetches of that node, if in the `Connected`
+
  state. `Connected` no longer contains that field.
+

### Security

## 0.20.0
modified crates/radicle/src/node.rs
@@ -107,8 +107,6 @@ pub enum State {
        /// Ping state.
        #[serde(skip)]
        ping: PingState,
-
        /// Ongoing fetches.
-
        fetching: HashSet<RepoId>,
        /// Measured latencies for this peer.
        #[serde(skip)]
        latencies: VecDeque<LocalDuration>,
@@ -696,7 +694,7 @@ impl From<Vec<Seed>> for Seeds {
    }
}

-
#[derive(Clone, Debug, Serialize, Deserialize)]
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum FetchResult {
@@ -1486,7 +1484,6 @@ mod test {
            &serde_json::to_string(&CommandResult::Okay(State::Connected {
                since: LocalTime::now(),
                ping: Default::default(),
-
                fetching: Default::default(),
                latencies: VecDeque::default(),
                stable: false,
            }))
modified crates/radicle/src/node/command.rs
@@ -313,7 +313,6 @@ mod test {
            &serde_json::to_string(&CommandResult::Okay(State::Connected {
                since: LocalTime::now(),
                ping: Default::default(),
-
                fetching: Default::default(),
                latencies: VecDeque::default(),
                stable: false,
            }))
@@ -329,7 +328,7 @@ mod test {
        );
        assert_matches!(
            json::from_str::<CommandResult<Seeds>>(
-
                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
+
                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994}}}]"#
            ),
            Ok(CommandResult::Okay(_))
        );