Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Sans-IO Fetcher
Merged fintohaps opened 4 months ago

This patch series introduces a new family of types for keeping track of fetch state in the protocol.

This consolidates this tracking of state into one place, and removes it from the connection session data.

It uses sans-IO patterns so that the state transitions can be more easily tested without relying on complicated setup logic.

This data is then wired up to the Service to maintain the same (or best as possible) semantics for fetching in the running node.

Note there are some breaking changes due to the removal of the fetching state from the State type – which in turn was is used in Seeds.

41 files changed +2867 -386 eccfd6ba 1cab036c
modified CHANGELOG.md
@@ -27,6 +27,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
  pushed the default branch to the local user's namespace. The command is now
  deprecated, and the user should use `git push` instead.

+
## Breaking Changes
+

+
- The `Connected` state of a peer no longer contains fetching information. This
+
  information was returned when requesting for `Seeds` on the control socket.
+
  Callers should no longer expect the `fetching` inside that JSON result.
+
- The `rad debug` information for ongoing fetches contained the number of
+
  subscribers awaiting for results, this was removed.
+

## 1.6.1

## Fixed Bugs
modified Cargo.lock
@@ -3098,6 +3098,7 @@ dependencies = [
 "qcheck",
 "qcheck-macros",
 "radicle",
+
 "radicle-core",
 "radicle-crypto",
 "radicle-fetch",
 "radicle-localtime",
modified crates/radicle-node/src/runtime/handle.rs
@@ -350,20 +350,22 @@ impl radicle::node::Handle for Handle {
    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            let fetcher_state = state.fetching();
            let debug = serde_json::json!({
                "outboxSize": state.outbox().len(),
-
                "fetching": state.fetching().iter().map(|(rid, state)| {
-
                    json!({
-
                        "rid": rid,
-
                        "from": state.from,
-
                        "refsAt": state.refs_at,
-
                        "subscribers": state.subscribers.len(),
-
                    })
-
                }).collect::<Vec<_>>(),
-
                "queue": state.sessions().values().map(|sess| {
+
                "fetching": fetcher_state.active_fetches()
+
                    .iter()
+
                    .map(|(rid, active)| {
+
                        json!({
+
                            "rid": rid,
+
                            "from": active.from(),
+
                            "refsAt": active.refs_at(),
+
                        })
+
                    }).collect::<Vec<_>>(),
+
                "queue": fetcher_state.queued_fetches().iter().map(|(node, queue)| {
                    json!({
-
                        "nid": sess.id,
-
                        "queue": sess.queue.iter().map(|fetch| {
+
                        "nid": node,
+
                        "queue": queue.iter().map(|fetch| {
                            json!({
                                "rid": fetch.rid,
                                "from": fetch.from,
modified crates/radicle-node/src/tests.rs
@@ -1512,6 +1512,7 @@ fn test_queued_fetch_max_capacity() {

    // Finish the 1st fetch.
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));
+

    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid2);
    // ... but not the third.
modified crates/radicle-protocol/Cargo.toml
@@ -21,6 +21,7 @@ log = { workspace = true, features = ["std"] }
nonempty = { workspace = true, features = ["serialize"] }
qcheck = { workspace = true, optional = true }
radicle = { workspace = true, features = ["logger"] }
+
radicle-core = { workspace = true }
radicle-fetch = { workspace = true }
radicle-localtime = { workspace = true }
sqlite = { workspace = true, features = ["bundled"] }
added crates/radicle-protocol/src/fetcher.rs
@@ -0,0 +1,12 @@
+
pub mod service;
+
pub use service::FetcherService;
+

+
pub mod state;
+
pub use state::{ActiveFetch, Config, FetcherState, MaxQueueSize, Queue, QueueIter, QueuedFetch};
+

+
#[cfg(test)]
+
mod test;
+

+
// TODO(finto): `Service::fetch_refs_at` and the use of `refs_status_of` is a
+
// layer above the `Fetcher` where it would perform I/O, mocked out by a trait,
+
// to check if there are wants and add a fetch to the Fetcher.
added crates/radicle-protocol/src/fetcher/service.rs
@@ -0,0 +1,142 @@
+
use std::collections::HashMap;
+

+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
+

+
/// Service layer that wraps [`FetcherState`] and manages subscriber coalescing.
+
///
+
/// When multiple callers request the same fetch, their subscribers are collected
+
/// and all notified when the fetch completes.
+
///
+
/// # Type Parameter
+
/// - `S`: The subscriber type (e.g., `chan::Sender<FetchResult>`).
+
#[derive(Debug)]
+
pub struct FetcherService<S> {
+
    state: FetcherState,
+
    subscribers: HashMap<FetchKey, Vec<S>>,
+
}
+

+
impl<S> FetcherService<S> {
+
    /// Initialize the [`FetcherService`] with the give [`Config`].
+
    pub fn new(config: Config) -> Self {
+
        Self {
+
            state: FetcherState::new(config),
+
            subscribers: HashMap::new(),
+
        }
+
    }
+

+
    /// Provide a reference handle to the [`FetcherState`].
+
    pub fn state(&self) -> &FetcherState {
+
        &self.state
+
    }
+
}
+

+
/// Key for pending subscribers.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
struct FetchKey {
+
    rid: RepoId,
+
    node: NodeId,
+
}
+

+
impl FetchKey {
+
    fn new(rid: RepoId, node: NodeId) -> Self {
+
        Self { rid, node }
+
    }
+
}
+

+
/// The result of calling [`FetcherService::fetch`].
+
#[must_use]
+
#[derive(Debug)]
+
pub struct FetchInitiated<S> {
+
    /// The underlying result from calling [`FetcherState::fetch`].
+
    pub event: event::Fetch,
+
    /// Subscriber returned if fetch was rejected (queue at capacity).
+
    pub rejected: Option<S>,
+
}
+

+
/// The result of calling [`FetcherService::fetched`].
+
#[must_use]
+
#[derive(Debug)]
+
pub struct FetchCompleted<S> {
+
    /// The underlying result from calling [`FetcherState::fetched`].
+
    pub event: event::Fetched,
+
    /// All the subscribers that were interested in this given fetch.
+
    pub subscribers: Vec<S>,
+
}
+

+
/// The result of calling [`FetcherService::cancel`].
+
#[must_use]
+
#[derive(Debug)]
+
pub struct FetchesCancelled<S> {
+
    /// The underlying result from calling [`FetcherState::cancel`].
+
    pub event: event::Cancel,
+
    /// Orphaned subscribers paired with their [`RepoId`].
+
    pub orphaned: Vec<(RepoId, S)>,
+
}
+

+
impl<S> FetcherService<S> {
+
    /// Initiate a fetch, optionally registering a subscriber.
+
    ///
+
    /// Subscribers are coalesced: if the same `(rid, node)` is already being
+
    /// fetched or queued, the subscriber joins the existing waiters.
+
    ///
+
    /// If the fetch could not be initiated, and also could not be queued, then
+
    /// subscriber is returned to notify of the rejection.
+
    ///
+
    /// See [`FetcherState::fetch`].
+
    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
+
        let key = FetchKey::new(cmd.rid, cmd.from);
+
        let event = self.state.fetch(cmd);
+

+
        let rejected = match &event {
+
            event::Fetch::QueueAtCapacity { .. } => subscriber,
+
            _ => {
+
                if let Some(r) = subscriber {
+
                    self.subscribers.entry(key).or_default().push(r);
+
                }
+
                None
+
            }
+
        };
+

+
        FetchInitiated { event, rejected }
+
    }
+

+
    /// Mark a fetch as completed and retrieve waiting subscribers.
+
    ///
+
    /// See [`FetcherState::fetched`].
+
    pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
+
        let key = FetchKey::new(cmd.rid, cmd.from);
+
        let event = self.state.fetched(cmd);
+
        let subscribers = self.subscribers.remove(&key).unwrap_or_default();
+
        FetchCompleted { event, subscribers }
+
    }
+

+
    /// Cancel all fetches for a disconnected peer, returning any orphaned
+
    /// subscribers.
+
    ///
+
    /// See [`FetcherState::cancel`].
+
    pub fn cancel(&mut self, cmd: command::Cancel) -> FetchesCancelled<S> {
+
        let from = cmd.from;
+
        let event = self.state.cancel(cmd);
+

+
        let mut orphaned = Vec::new();
+
        self.subscribers.retain(|key, subscribers| {
+
            if key.node == from {
+
                orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
+
                false
+
            } else {
+
                true
+
            }
+
        });
+

+
        FetchesCancelled { event, orphaned }
+
    }
+

+
    /// Dequeue the next fetch for a node.
+
    ///
+
    /// See [`FetcherState::dequeue`].
+
    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
+
        self.state.dequeue(from)
+
    }
+
}
added crates/radicle-protocol/src/fetcher/state.rs
@@ -0,0 +1,454 @@
+
//! Logical state for Git fetches happening in the node.
+
//!
+
//! See [`FetcherState`] for more information.
+
//!
+
//! See [`command`]'s for input into [`FetcherState`].
+
//! See [`event`]'s for output from [`FetcherState`].
+

+
pub mod command;
+
pub mod event;
+

+
pub use command::Command;
+
pub use event::Event;
+

+
use std::collections::{BTreeMap, VecDeque};
+
use std::num::NonZeroUsize;
+
use std::time;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
/// Default for the maximum items per fetch queue.
+
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
+
/// Default for maximum concurrency per node.
+
pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;
+

+
/// Logical state for Git fetches happening in the node.
+
///
+
/// A fetch can either be:
+
///   - [`ActiveFetch`]: meaning it is currently being fetched from another node on the network
+
///   - [`QueuedFetch`]: meaning it is expected to be fetched from a given node, but the
+
///     repository is already being fetched, or the node is at capacity.
+
///
+
/// For any given repository, identified by its [`RepoId`], there can only be
+
/// one fetch occurring for it at a given time. This prevents any concurrent
+
/// fetches from clobbering overlapping references.
+
///
+
/// If the repository is actively being fetched, then that fetch will be queued
+
/// for a later attempt.
+
///
+
/// For any given node, there is a configurable capacity so that only `N` number
+
/// of fetches can happen with it concurrently. This does not guarantee that the
+
/// node will actually allow this node to fetch from it – since it will maintain
+
/// its own capacity for connections and load.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct FetcherState {
+
    /// The active fetches that are occurring, ensuring only one fetch per repository.
+
    active: BTreeMap<RepoId, ActiveFetch>,
+
    /// The queued fetches, waiting to happen, where each node maintains its own queue.
+
    queues: BTreeMap<NodeId, Queue>,
+
    /// Configuration for maintaining the fetch state.
+
    config: Config,
+
}
+

+
impl Default for FetcherState {
+
    fn default() -> Self {
+
        Self::new(Config::default())
+
    }
+
}
+

+
impl FetcherState {
+
    /// Initialize the [`FetcherState`] with the given [`Config`].
+
    pub fn new(config: Config) -> Self {
+
        Self {
+
            active: BTreeMap::new(),
+
            queues: BTreeMap::new(),
+
            config,
+
        }
+
    }
+
}
+

+
impl FetcherState {
+
    /// Process the handling of a [`Command`], delegating to its corresponding
+
    /// method, and returning the corresponding [`Event`].
+
    ///
+
    /// This method is useful if the [`FetcherState`] is used in batch
+
    /// processing and does need to be explicit about the underlying method.
+
    pub fn handle(&mut self, command: Command) -> Event {
+
        match command {
+
            Command::Fetch(fetch) => self.fetch(fetch).into(),
+
            Command::Fetched(fetched) => self.fetched(fetched).into(),
+
            Command::Cancel(cancel) => self.cancel(cancel).into(),
+
        }
+
    }
+

+
    /// Process a [`Fetch`] command, which transitions the given fetch to
+
    /// active, if possible.
+
    ///
+
    /// The fetch will only transition to being active if:
+
    ///
+
    ///   - A fetch is not already happening for that repository, in which case it gets queued.
+
    ///   - The node to be fetched from is not already at capacity, again it will be queued.
+
    ///
+
    /// [`Fetch`]: command::Fetch
+
    pub fn fetch(
+
        &mut self,
+
        command::Fetch {
+
            from,
+
            rid,
+
            refs_at,
+
            timeout,
+
        }: command::Fetch,
+
    ) -> event::Fetch {
+
        if let Some(active) = self.active.get(&rid) {
+
            if active.refs_at == refs_at && active.from == from {
+
                return event::Fetch::AlreadyFetching { rid, from };
+
            } else {
+
                return self.enqueue(rid, from, refs_at, timeout);
+
            }
+
        }
+

+
        if self.is_at_node_capacity(&from) {
+
            self.enqueue(rid, from, refs_at, timeout)
+
        } else {
+
            self.active.insert(
+
                rid,
+
                ActiveFetch {
+
                    from,
+
                    refs_at: refs_at.clone(),
+
                },
+
            );
+
            event::Fetch::Started {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            }
+
        }
+
    }
+

+
    /// Process a [`Fetched`] command, which removes the given fetch from the set of active fetches.
+
    /// Note that this is agnostic of whether the fetch succeeded or failed.
+
    ///
+
    /// The caller will be notified if the completed fetch did not exist in the active set.
+
    ///
+
    /// [`Fetched`]: command::Fetched
+
    pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
+
        match self.active.remove(&rid) {
+
            None => event::Fetched::NotFound { from, rid },
+
            Some(ActiveFetch { from, refs_at }) => event::Fetched::Completed { from, rid, refs_at },
+
        }
+
    }
+

+
    /// Attempt to dequeue a [`QueuedFetch`] for the given node.
+
    ///
+
    /// This will only dequeue the fetch if it is not active, and the given node
+
    /// is not at capacity.
+
    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
+
        let is_at_capacity = self.is_at_node_capacity(from);
+
        let queue = self.queues.get_mut(from)?;
+
        let active = &self.active;
+
        queue.try_dequeue(|QueuedFetch { rid, .. }| !is_at_capacity && !active.contains_key(rid))
+
    }
+

+
    /// Process a [`Cancel`] command, which cancels any active and/or queued
+
    /// fetches for that given node.
+
    ///
+
    /// [`Cancel`]: command::Cancel
+
    pub fn cancel(&mut self, command::Cancel { from }: command::Cancel) -> event::Cancel {
+
        let cancelled: Vec<_> = self
+
            .active
+
            .iter()
+
            .filter_map(|(rid, f)| (f.from == from).then_some(*rid))
+
            .collect();
+
        let ongoing: BTreeMap<_, _> = cancelled
+
            .iter()
+
            .filter_map(|rid| self.active.remove(rid).map(|f| (*rid, f)))
+
            .collect();
+
        let ongoing = (!ongoing.is_empty()).then_some(ongoing);
+
        let queued = self.queues.remove(&from).filter(|queue| !queue.is_empty());
+

+
        match (ongoing, queued) {
+
            (None, None) => event::Cancel::Unexpected { from },
+
            (ongoing, queued) => event::Cancel::Canceled {
+
                from,
+
                active: ongoing.unwrap_or_default(),
+
                queued: queued.map(|q| q.queue).unwrap_or_default(),
+
            },
+
        }
+
    }
+

+
    fn enqueue(
+
        &mut self,
+
        rid: RepoId,
+
        from: NodeId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    ) -> event::Fetch {
+
        let queue = self
+
            .queues
+
            .entry(from)
+
            .or_insert(Queue::new(self.config.maximum_queue_size));
+
        match queue.enqueue(QueuedFetch {
+
            rid,
+
            from,
+
            refs_at,
+
            timeout,
+
        }) {
+
            Enqueue::CapacityReached(QueuedFetch {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            }) => event::Fetch::QueueAtCapacity {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
                capacity: queue.len(),
+
            },
+
            Enqueue::Queued => event::Fetch::Queued { rid, from },
+
            Enqueue::Merged => event::Fetch::Queued { rid, from },
+
        }
+
    }
+
}
+

+
impl FetcherState {
+
    /// Get the set of queued fetches.
+
    pub fn queued_fetches(&self) -> &BTreeMap<NodeId, Queue> {
+
        &self.queues
+
    }
+

+
    /// Get the set of active fetches.
+
    pub fn active_fetches(&self) -> &BTreeMap<RepoId, ActiveFetch> {
+
        &self.active
+
    }
+

+
    /// Get the [`ActiveFetch`] for the provided [`RepoId`], returning `None` if
+
    /// it does not exist.
+
    pub fn get_active_fetch(&self, rid: &RepoId) -> Option<&ActiveFetch> {
+
        self.active.get(rid)
+
    }
+

+
    /// Check if the number of fetches exceeds the maximum number of concurrent
+
    /// fetches for a given [`NodeId`].
+
    ///
+
    /// Returns `true` if the fetcher is fetching the maximum number of
+
    /// repositories, for that node.
+
    fn is_at_node_capacity(&self, node: &NodeId) -> bool {
+
        let count = self.active.values().filter(|f| &f.from == node).count();
+
        count >= self.config.maximum_concurrency.into()
+
    }
+
}
+

+
/// Configuration for the [`FetcherState`].
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct Config {
+
    /// Maximum number of concurrent fetches per peer connection.
+
    maximum_concurrency: NonZeroUsize,
+
    /// Maximum fetching queue size for a single node.
+
    maximum_queue_size: MaxQueueSize,
+
}
+

+
impl Config {
+
    pub fn new() -> Self {
+
        Self::default()
+
    }
+

+
    /// Maximum fetching queue size for a single node.
+
    pub fn with_max_capacity(mut self, capacity: MaxQueueSize) -> Self {
+
        self.maximum_queue_size = capacity;
+
        self
+
    }
+

+
    /// Maximum number of concurrent fetches per peer connection.
+
    pub fn with_max_concurrency(mut self, concurrency: NonZeroUsize) -> Self {
+
        self.maximum_concurrency = concurrency;
+
        self
+
    }
+
}
+

+
impl Default for Config {
+
    fn default() -> Self {
+
        Self {
+
            maximum_concurrency: MAX_CONCURRENCY,
+
            maximum_queue_size: MaxQueueSize::default(),
+
        }
+
    }
+
}
+

+
/// An active fetch represents a repository being fetched by a particular node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct ActiveFetch {
+
    pub from: NodeId,
+
    pub refs_at: Vec<RefsAt>,
+
}
+

+
impl ActiveFetch {
+
    /// The node from which the repository is being fetched.
+
    pub fn from(&self) -> &NodeId {
+
        &self.from
+
    }
+

+
    /// The set of references that fetch is being performed for.
+
    pub fn refs_at(&self) -> &[RefsAt] {
+
        &self.refs_at
+
    }
+
}
+

+
/// A fetch that is waiting to be processed, in the fetch queue.
+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+
pub struct QueuedFetch {
+
    /// The repository that will be fetched.
+
    pub rid: RepoId,
+
    // TODO(finto): this might be redundant, since queues are per node
+
    /// The peer from which the repository will be fetched from.
+
    pub from: NodeId,
+
    /// The references that the fetch is being performed for.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    pub timeout: time::Duration,
+
}
+

+
/// A queue for keeping track of fetches.
+
///
+
/// It ensures that the queue contains unique items for fetching, and does not
+
/// exceed the provided maximum capacity.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Queue {
+
    queue: VecDeque<QueuedFetch>,
+
    max_queue_size: MaxQueueSize,
+
}
+

+
/// The maximum number of fetches that can be queued for a single node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct MaxQueueSize(usize);
+

+
impl MaxQueueSize {
+
    /// Minimum queue size is `1`.
+
    pub const MIN: Self = MaxQueueSize(1);
+

+
    /// Create a queue size, that must be larger than `0`.
+
    pub fn new(size: NonZeroUsize) -> Self {
+
        Self(size.into())
+
    }
+

+
    pub fn as_usize(&self) -> usize {
+
        self.0
+
    }
+

+
    /// Checks if the `n` provided exceeds the maximum queue size.
+
    fn is_exceeded_by(&self, n: usize) -> bool {
+
        n >= self.0
+
    }
+
}
+

+
impl Default for MaxQueueSize {
+
    fn default() -> Self {
+
        Self(MAX_FETCH_QUEUE_SIZE)
+
    }
+
}
+

+
/// The result of [`Queue::enqueue`].
+
#[must_use]
+
#[derive(Debug, PartialEq, Eq)]
+
pub(super) enum Enqueue {
+
    /// The capacity of the queue has been exceeded, and the [`QueuedFetch`] is
+
    /// returned.
+
    CapacityReached(QueuedFetch),
+
    /// The [`QueuedFetch`] was successfully queued.
+
    Queued,
+
    Merged,
+
}
+

+
impl Queue {
+
    /// Create the [`Queue`] with the given [`MaxQueueSize`].
+
    pub(super) fn new(max_queue_size: MaxQueueSize) -> Self {
+
        Self {
+
            queue: VecDeque::with_capacity(max_queue_size.0),
+
            max_queue_size,
+
        }
+
    }
+

+
    /// The current number of items in the queue.
+
    pub(super) fn len(&self) -> usize {
+
        self.queue.len()
+
    }
+

+
    /// Returns `true` if the [`Queue`] is empty.
+
    pub(super) fn is_empty(&self) -> bool {
+
        self.queue.is_empty()
+
    }
+

+
    /// Enqueues a fetch onto the back of the queue, and will only succeed if
+
    /// the queue has not reached capacity and if the item is unique.
+
    pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
+
        if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
+
            if existing.refs_at.is_empty() || fetch.refs_at.is_empty() {
+
                // We fetch everything
+
                existing.refs_at = vec![]
+
            } else {
+
                existing.refs_at.extend(fetch.refs_at);
+
            }
+
            // Take the longer timeout (more generous)
+
            existing.timeout = existing.timeout.max(fetch.timeout);
+
            return Enqueue::Merged;
+
        }
+

+
        if self.max_queue_size.is_exceeded_by(self.queue.len()) {
+
            Enqueue::CapacityReached(fetch)
+
        } else {
+
            self.queue.push_back(fetch);
+
            Enqueue::Queued
+
        }
+
    }
+

+
    /// Try to dequeue the next [`QueuedFetch`], but only if the `predicate`
+
    /// holds, otherwise it will be pushed back to the front of the queue.
+
    pub(super) fn try_dequeue<P>(&mut self, predicate: P) -> Option<QueuedFetch>
+
    where
+
        P: FnOnce(&QueuedFetch) -> bool,
+
    {
+
        let fetch = self.dequeue()?;
+
        if predicate(&fetch) {
+
            Some(fetch)
+
        } else {
+
            self.queue.push_front(fetch);
+
            None
+
        }
+
    }
+

+
    /// Dequeues a fetch from the front of the queue.
+
    pub(super) fn dequeue(&mut self) -> Option<QueuedFetch> {
+
        self.queue.pop_front()
+
    }
+

+
    /// Return an iterator over the queued fetches.
+
    pub fn iter<'a>(&'a self) -> QueueIter<'a> {
+
        QueueIter {
+
            inner: self.queue.iter(),
+
        }
+
    }
+
}
+

+
/// Iterator of the [`QueuedFetch`]'s
+
pub struct QueueIter<'a> {
+
    inner: std::collections::vec_deque::Iter<'a, QueuedFetch>,
+
}
+

+
impl<'a> Iterator for QueueIter<'a> {
+
    type Item = &'a QueuedFetch;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.inner.next()
+
    }
+
}
+

+
impl<'a> IntoIterator for &'a Queue {
+
    type Item = &'a QueuedFetch;
+
    type IntoIter = QueueIter<'a>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.iter()
+
    }
+
}
added crates/radicle-protocol/src/fetcher/state/command.rs
@@ -0,0 +1,80 @@
+
use std::time;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
/// Commands for transitioning the [`FetcherState`].
+
///
+
/// [`FetcherState`]: super::FetcherState
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Command {
+
    Fetch(Fetch),
+
    Fetched(Fetched),
+
    Cancel(Cancel),
+
}
+

+
impl From<Fetch> for Command {
+
    fn from(v: Fetch) -> Self {
+
        Self::Fetch(v)
+
    }
+
}
+

+
impl From<Fetched> for Command {
+
    fn from(v: Fetched) -> Self {
+
        Self::Fetched(v)
+
    }
+
}
+

+
impl From<Cancel> for Command {
+
    fn from(v: Cancel) -> Self {
+
        Self::Cancel(v)
+
    }
+
}
+

+
impl Command {
+
    pub fn fetch(from: NodeId, rid: RepoId, refs_at: Vec<RefsAt>, timeout: time::Duration) -> Self {
+
        Self::from(Fetch {
+
            from,
+
            rid,
+
            refs_at,
+
            timeout,
+
        })
+
    }
+

+
    pub fn fetched(from: NodeId, rid: RepoId) -> Self {
+
        Self::from(Fetched { from, rid })
+
    }
+

+
    pub fn cancel(from: NodeId) -> Self {
+
        Self::from(Cancel { from })
+
    }
+
}
+

+
/// A fetch wants to be marked as active.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Fetch {
+
    /// The node from which the repository is being fetched from.
+
    pub from: NodeId,
+
    /// The repository to fetch.
+
    pub rid: RepoId,
+
    /// The references to fetch.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout for the fetch process.
+
    pub timeout: time::Duration,
+
}
+

+
/// A fetch wants to be marked as completed.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct Fetched {
+
    /// The node from which the repository was fetched from.
+
    pub from: NodeId,
+
    /// The repository that was fetch.
+
    pub rid: RepoId,
+
}
+

+
/// Any fetches are canceled for the given node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct Cancel {
+
    /// The node for which the fetches should be canceled.
+
    pub from: NodeId,
+
}
added crates/radicle-protocol/src/fetcher/state/event.rs
@@ -0,0 +1,112 @@
+
use std::collections::{BTreeMap, VecDeque};
+
use std::time;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
use super::{ActiveFetch, QueuedFetch};
+

+
/// Event returned from [`FetchState::handle`].
+
///
+
/// [`FetchState::handle`]: FetchState::handle.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Event {
+
    Fetch(Fetch),
+
    Fetched(Fetched),
+
    Cancel(Cancel),
+
}
+

+
impl From<Cancel> for Event {
+
    fn from(v: Cancel) -> Self {
+
        Self::Cancel(v)
+
    }
+
}
+

+
impl From<Fetched> for Event {
+
    fn from(v: Fetched) -> Self {
+
        Self::Fetched(v)
+
    }
+
}
+

+
impl From<Fetch> for Event {
+
    fn from(v: Fetch) -> Self {
+
        Self::Fetch(v)
+
    }
+
}
+

+
/// Events that occur when a repository is requested to be fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetch {
+
    /// The fetch can be started by the caller.
+
    Started {
+
        /// The repository to be fetched.
+
        rid: RepoId,
+
        /// The node to fetch from.
+
        from: NodeId,
+
        /// The references to be fetched.
+
        refs_at: Vec<RefsAt>,
+
        /// The timeout for the fetch process.
+
        timeout: time::Duration,
+
    },
+
    /// The repository is already being fetched from the given node.
+
    AlreadyFetching {
+
        /// The repository being actively fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        from: NodeId,
+
    },
+
    /// The queue for the given node is at capacity, and can no longer accept
+
    /// any more fetch requests.
+
    QueueAtCapacity {
+
        /// The rejected repository.
+
        rid: RepoId,
+
        /// The node who's queue is at capacity.
+
        from: NodeId,
+
        /// The references expected to be fetched.
+
        refs_at: Vec<RefsAt>,
+
        /// The timeout for the fetch process.
+
        timeout: time::Duration,
+
        /// The capacity of the queue.
+
        capacity: usize,
+
    },
+
    /// The fetch was queued for later processing.
+
    Queued {
+
        /// The repository to be fetched.
+
        rid: RepoId,
+
        /// The node to fetch from.
+
        from: NodeId,
+
    },
+
}
+

+
/// Events that occur after a repository has been fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetched {
+
    /// There was no ongoing fetch for the given [`NodeId`] and [`RepoId`].
+
    NotFound { from: NodeId, rid: RepoId },
+
    /// The active fetch was marked as completed and removed from the active
+
    /// set.
+
    Completed {
+
        /// The node the repository was fetched from.
+
        from: NodeId,
+
        /// The repository that was fetched.
+
        rid: RepoId,
+
        /// The references that were fetched.
+
        refs_at: Vec<RefsAt>,
+
    },
+
}
+

+
/// Events that occur when a fetch was canceled for a given node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Cancel {
+
    /// There were no active or queued fetches for the given node.
+
    Unexpected { from: NodeId },
+
    /// The were active or queued fetches that were canceled for the given node.
+
    Canceled {
+
        /// The node which was canceled.
+
        from: NodeId,
+
        /// The active fetches that were canceled.
+
        active: BTreeMap<RepoId, ActiveFetch>,
+
        /// The queued fetched that were canceled.
+
        queued: VecDeque<QueuedFetch>,
+
    },
+
}
added crates/radicle-protocol/src/fetcher/test.rs
@@ -0,0 +1,2 @@
+
mod queue;
+
mod state;
added crates/radicle-protocol/src/fetcher/test/arbitrary.rs
@@ -0,0 +1,52 @@
+
use std::collections::HashSet;
+

+
use radicle::{identity::DocAt, test::arbitrary};
+

+
use crate::fetcher::{commands, Command, Fetched};
+

+
impl qcheck::Arbitrary for Fetched {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        Fetched {
+
            updated: vec![],
+
            namespaces: HashSet::arbitrary(g),
+
            clone: bool::arbitrary(g),
+
            doc: DocAt::arbitrary(g),
+
        }
+
    }
+
}
+

+
impl qcheck::Arbitrary for Command {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        todo!()
+
    }
+
}
+

+
impl qcheck::Arbitrary for commands::Fetch {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        todo!()
+
    }
+
}
+

+
impl qcheck::Arbitrary for commands::Fetched {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        g.choose(&[
+
            commands::Fetched::DequeueFetches,
+
            commands::Fetched::Fetched {
+
                from: arbitrary::gen(g.size()),
+
                rid: arbitrary::gen(g.size()),
+
            },
+
        ])
+
        .cloned()
+
        .unwrap()
+
    }
+
}
+

+
impl qcheck::Arbitrary for commands::Dequeue {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        g.choose(&[commands::Dequeue::Nodes {
+
            nodes: arbitrary::gen(5),
+
        }])
+
        .cloned()
+
        .unwrap()
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue.rs
@@ -0,0 +1,35 @@
+
mod helpers;
+
mod properties;
+
mod unit;
+

+
use std::num::NonZeroUsize;
+
use std::time::Duration;
+

+
use qcheck::Arbitrary;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{MaxQueueSize, QueuedFetch};
+

+
impl Arbitrary for QueuedFetch {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        // Limit refs_at size to avoid slow shrinking
+
        let refs_at_len = usize::arbitrary(g) % 4;
+
        let refs_at: Vec<RefsAt> = (0..refs_at_len).map(|_| RefsAt::arbitrary(g)).collect();
+

+
        QueuedFetch {
+
            rid: RepoId::arbitrary(g),
+
            from: NodeId::arbitrary(g),
+
            refs_at,
+
            timeout: Duration::from_secs(u64::arbitrary(g) % 3600),
+
        }
+
    }
+
}
+

+
impl Arbitrary for MaxQueueSize {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        let size = NonZeroUsize::MIN.saturating_add(usize::arbitrary(g) % 255);
+
        MaxQueueSize::new(size)
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue/helpers.rs
@@ -0,0 +1,25 @@
+
use std::{num::NonZeroUsize, time::Duration};
+

+
use radicle::test::arbitrary;
+

+
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};
+

+
pub fn create_queue(capacity: usize) -> Queue {
+
    Queue::new(MaxQueueSize::new(
+
        NonZeroUsize::new(capacity).expect("capacity must be non-zero"),
+
    ))
+
}
+

+
pub fn create_fetch() -> QueuedFetch {
+
    QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    }
+
}
+

+
/// Generate a vector of unique QueuedFetch items (unique by rid)
+
pub fn unique_fetches(count: usize) -> Vec<QueuedFetch> {
+
    (0..count).map(|_| create_fetch()).collect()
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties.rs
@@ -0,0 +1,5 @@
+
mod capacity;
+
mod dequeue;
+
mod equality;
+
mod fifo;
+
mod merge;
added crates/radicle-protocol/src/fetcher/test/queue/properties/capacity.rs
@@ -0,0 +1,74 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::{state::Enqueue, MaxQueueSize};
+
use crate::fetcher::{Queue, QueuedFetch};
+

+
#[quickcheck]
+
fn bounded(max_size: MaxQueueSize, num_enqueues: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    for _ in 0..num_enqueues {
+
        let _ = queue.enqueue(create_fetch());
+

+
        // Invariant: length never exceeds capacity
+
        if queue.len() > max_size.as_usize() {
+
            return false;
+
        }
+
    }
+
    true
+
}
+

+
#[quickcheck]
+
fn rejection(max_size: MaxQueueSize) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    // Fill to capacity with unique items
+
    let items = unique_fetches(max_size.as_usize());
+
    for item in &items {
+
        if queue.enqueue(item.clone()) != Enqueue::Queued {
+
            return false;
+
        }
+
    }
+

+
    // Next enqueue of a NEW item must be rejected
+
    matches!(queue.enqueue(create_fetch()), Enqueue::CapacityReached(_))
+
}
+

+
#[quickcheck]
+
fn restored_after_dequeue(max_size: MaxQueueSize, dequeue_count: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    // Fill to capacity
+
    for _ in 0..max_size.as_usize() {
+
        let _ = queue.enqueue(create_fetch());
+
    }
+

+
    // Dequeue some items
+
    let to_dequeue = (dequeue_count as usize).min(max_size.as_usize());
+
    for _ in 0..to_dequeue {
+
        let _ = queue.dequeue();
+
    }
+

+
    // Should be able to enqueue exactly that many items again
+
    for _ in 0..to_dequeue {
+
        if queue.enqueue(create_fetch()) != Enqueue::Queued {
+
            return false;
+
        }
+
    }
+

+
    // Next enqueue should fail
+
    matches!(queue.enqueue(create_fetch()), Enqueue::CapacityReached(_))
+
}
+

+
#[quickcheck]
+
fn capacity_reached_returns_same_item(item: QueuedFetch) -> bool {
+
    let mut queue = create_queue(1);
+
    let _ = queue.enqueue(create_fetch()); // Fill the queue
+

+
    match queue.enqueue(item.clone()) {
+
        Enqueue::CapacityReached(returned) => returned == item,
+
        Enqueue::Merged => true, // If same rid, merge takes precedence
+
        _ => false,
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/dequeue.rs
@@ -0,0 +1,56 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::{MaxQueueSize, Queue};
+

+
#[quickcheck]
+
fn enables_reenqueue(count: u8) -> bool {
+
    let count = ((count as usize) % 20).max(1);
+
    let items = unique_fetches(count);
+

+
    let mut queue = create_queue(count); // Exact capacity
+

+
    for item in &items {
+
        let _ = queue.enqueue(item.clone());
+
    }
+

+
    // Queue is full, dequeue first item
+
    let dequeued = queue.dequeue();
+
    if dequeued.is_none() {
+
        return false;
+
    }
+

+
    // Should be able to enqueue a new item now
+
    queue.enqueue(create_fetch()) == Enqueue::Queued
+
}
+

+
#[quickcheck]
+
fn empty_queue_returns_none(max_size: MaxQueueSize, dequeue_attempts: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    // Multiple dequeues from empty queue should all return None
+
    for _ in 0..dequeue_attempts {
+
        if queue.dequeue().is_some() {
+
            return false;
+
        }
+
    }
+
    true
+
}
+

+
#[quickcheck]
+
fn drained_queue_returns_none(max_size: MaxQueueSize, fill_count: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+
    let fill = (fill_count as usize).min(max_size.as_usize());
+

+
    // Fill then drain
+
    for _ in 0..fill {
+
        let _ = queue.enqueue(create_fetch());
+
    }
+
    for _ in 0..fill {
+
        let _ = queue.dequeue();
+
    }
+

+
    // Should return None now
+
    queue.dequeue().is_none()
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/equality.rs
@@ -0,0 +1,22 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::QueuedFetch;
+

+
#[quickcheck]
+
fn reflexive(item: QueuedFetch) -> bool {
+
    item == item.clone()
+
}
+

+
#[quickcheck]
+
fn symmetric(a: QueuedFetch, b: QueuedFetch) -> bool {
+
    (a == b) == (b == a)
+
}
+

+
#[quickcheck]
+
fn transitive(a: QueuedFetch, b: QueuedFetch, c: QueuedFetch) -> bool {
+
    if a == b && b == c {
+
        a == c
+
    } else {
+
        true
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/fifo.rs
@@ -0,0 +1,75 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::QueuedFetch;
+

+
#[quickcheck]
+
fn ordering(count: u8) -> bool {
+
    let count = (count as usize) % 50; // Reasonable upper bound
+
    if count == 0 {
+
        return true;
+
    }
+

+
    let items = unique_fetches(count);
+
    let mut queue = create_queue(count);
+

+
    // Enqueue all items
+
    for item in &items {
+
        if queue.enqueue(item.clone()) != Enqueue::Queued {
+
            return false;
+
        }
+
    }
+

+
    // Dequeue and verify order
+
    for expected in items {
+
        match queue.dequeue() {
+
            Some(actual) if actual.rid == expected.rid => continue,
+
            _ => return false,
+
        }
+
    }
+

+
    queue.is_empty()
+
}
+

+
#[quickcheck]
+
fn interleaved_operations(ops: Vec<bool>) -> bool {
+
    // Limit operations to avoid slow tests
+
    let ops: Vec<_> = ops.into_iter().take(100).collect();
+
    let capacity = ops.len().max(1);
+

+
    let mut queue = create_queue(capacity);
+
    let mut expected_order: Vec<QueuedFetch> = Vec::new();
+
    let mut dequeue_index = 0;
+

+
    for op in ops {
+
        if op {
+
            // Enqueue
+
            let item = create_fetch();
+
            match queue.enqueue(item.clone()) {
+
                Enqueue::Queued => expected_order.push(item),
+
                Enqueue::CapacityReached(_) => {} // Expected when full
+
                Enqueue::Merged => {}             // Can happen if same rid generated
+
            }
+
        } else {
+
            // Dequeue
+
            match queue.dequeue() {
+
                Some(item) => {
+
                    if dequeue_index >= expected_order.len()
+
                        || item.rid != expected_order[dequeue_index].rid
+
                    {
+
                        return false;
+
                    }
+
                    dequeue_index += 1;
+
                }
+
                None => {
+
                    // Should only happen if we've dequeued everything we enqueued
+
                    if dequeue_index != expected_order.len() {
+
                        return false;
+
                    }
+
                }
+
            }
+
        }
+
    }
+
    true
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/merge.rs
@@ -0,0 +1,221 @@
+
use std::time::Duration;
+

+
use qcheck_macros::quickcheck;
+
use radicle::storage::refs::RefsAt;
+
use radicle::test::arbitrary;
+
use radicle_core::RepoId;
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};
+

+
#[quickcheck]
+
fn same_rid_merges_anywhere_in_queue(max_size: MaxQueueSize, merge_index: usize) -> bool {
+
    if max_size.as_usize() < 2 {
+
        return true; // Need at least 2 slots to test properly
+
    }
+

+
    let mut queue = Queue::new(max_size);
+
    let items = unique_fetches(max_size.as_usize() - 1); // Leave room for potential new item
+

+
    for item in &items {
+
        let _ = queue.enqueue(item.clone());
+
    }
+

+
    if items.is_empty() {
+
        return true;
+
    }
+

+
    // Try to enqueue an item with same rid as one already in queue
+
    let target_index = merge_index % items.len();
+
    let same_rid_item = QueuedFetch {
+
        rid: items[target_index].rid,
+
        from: arbitrary::gen(1), // Different from
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    };
+

+
    matches!(queue.enqueue(same_rid_item), Enqueue::Merged)
+
}
+

+
#[quickcheck]
+
fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {
+
    let base_refs_count = (base_refs_count as usize) % 5;
+
    let merge_refs_count = (merge_refs_count as usize) % 5;
+

+
    let mut queue = create_queue(10);
+

+
    let rid: RepoId = arbitrary::gen(1);
+
    let base_refs: Vec<RefsAt> = (0..base_refs_count).map(|_| arbitrary::gen(1)).collect();
+
    let merge_refs: Vec<RefsAt> = (0..merge_refs_count).map(|_| arbitrary::gen(1)).collect();
+

+
    let base_item = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: base_refs.clone(),
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let merge_item = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: merge_refs.clone(),
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let _ = queue.enqueue(base_item);
+
    let result = queue.enqueue(merge_item);
+

+
    if result != Enqueue::Merged {
+
        return false;
+
    }
+

+
    let dequeued = queue.dequeue().unwrap();
+

+
    // If either was empty, result should be empty (fetch everything)
+
    if base_refs.is_empty() || merge_refs.is_empty() {
+
        dequeued.refs_at.is_empty()
+
    } else {
+
        // Otherwise refs should be combined
+
        dequeued.refs_at.len() == base_refs_count + merge_refs_count
+
    }
+
}
+

+
#[quickcheck]
+
fn empty_refs_fetches_all() -> bool {
+
    let mut queue = create_queue(10);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    // First enqueue with specific refs
+
    let item_with_refs = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1), arbitrary::gen(1)],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    // Second enqueue with empty refs (fetch everything)
+
    let item_empty_refs = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let _ = queue.enqueue(item_with_refs);
+
    let _ = queue.enqueue(item_empty_refs);
+

+
    let dequeued = queue.dequeue().unwrap();
+
    dequeued.refs_at.is_empty() // Should fetch everything
+
}
+

+
#[quickcheck]
+
fn longer_timeout_preserved(short_secs: u16, long_secs: u16) -> bool {
+
    let short = Duration::from_secs(short_secs.min(long_secs) as u64);
+
    let long = Duration::from_secs(short_secs.max(long_secs) as u64);
+

+
    let mut queue = create_queue(10);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    let item_short = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: short,
+
    };
+

+
    let item_long = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: long,
+
    };
+

+
    // Test both orderings
+
    let _ = queue.enqueue(item_short.clone());
+
    let _ = queue.enqueue(item_long.clone());
+
    let dequeued1 = queue.dequeue().unwrap();
+

+
    let mut queue2 = create_queue(10);
+
    let _ = queue2.enqueue(item_long);
+
    let _ = queue2.enqueue(item_short);
+
    let dequeued2 = queue2.dequeue().unwrap();
+

+
    dequeued1.timeout == long && dequeued2.timeout == long
+
}
+

+
#[quickcheck]
+
fn does_not_increase_queue_length() -> bool {
+
    let mut queue = create_queue(10);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    let item1 = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let item2 = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    };
+

+
    let _ = queue.enqueue(item1);
+
    let len_after_first = queue.len();
+

+
    let _ = queue.enqueue(item2);
+
    let len_after_merge = queue.len();
+

+
    len_after_first == 1 && len_after_merge == 1
+
}
+

+
#[quickcheck]
+
fn different_rid_accepted(base_item: QueuedFetch) -> bool {
+
    let mut queue = create_queue(10);
+
    let _ = queue.enqueue(base_item.clone());
+

+
    // Item with different rid should be queued (not merged)
+
    let different_rid = QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        ..base_item
+
    };
+

+
    queue.enqueue(different_rid) == Enqueue::Queued
+
}
+

+
#[quickcheck]
+
fn succeed_when_at_capacity() -> bool {
+
    // When queue is at capacity, merging with existing item should still work
+
    let mut queue = create_queue(2);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    let item1 = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let item2 = QueuedFetch {
+
        rid: arbitrary::gen(1), // Different rid
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let merge_item = QueuedFetch {
+
        rid, // Same as item1
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    };
+

+
    let _ = queue.enqueue(item1);
+
    let _ = queue.enqueue(item2);
+

+
    // Queue is now at capacity, but merge should still work
+
    queue.enqueue(merge_item) == Enqueue::Merged
+
}
added crates/radicle-protocol/src/fetcher/test/queue/unit.rs
@@ -0,0 +1,113 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::QueuedFetch;
+

+
#[test]
+
fn zero_timeout_accepted() {
+
    let mut queue = create_queue(10);
+
    let item = QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::ZERO,
+
    };
+
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
+
}
+

+
#[test]
+
fn max_timeout_accepted() {
+
    let mut queue = create_queue(10);
+
    let item = QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::MAX,
+
    };
+
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
+
}
+

+
#[test]
+
fn empty_refs_at_items_can_be_equal() {
+
    let rid: RepoId = arbitrary::gen(1);
+
    let from: NodeId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    let item1 = QueuedFetch {
+
        rid,
+
        from,
+
        refs_at: vec![],
+
        timeout,
+
    };
+
    let item2 = QueuedFetch {
+
        rid,
+
        from,
+
        refs_at: vec![],
+
        timeout,
+
    };
+

+
    assert_eq!(item1, item2);
+
}
+

+
#[test]
+
fn merge_preserves_position_in_queue() {
+
    let mut queue = create_queue(10);
+

+
    let rid_first: RepoId = arbitrary::gen(1);
+
    let rid_second: RepoId = arbitrary::gen(2);
+
    let rid_third: RepoId = arbitrary::gen(3);
+

+
    // Enqueue three items
+
    let _ = queue.enqueue(QueuedFetch {
+
        rid: rid_first,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    });
+
    let _ = queue.enqueue(QueuedFetch {
+
        rid: rid_second,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    });
+
    let _ = queue.enqueue(QueuedFetch {
+
        rid: rid_third,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    });
+

+
    // Merge into the second item
+
    let result = queue.enqueue(QueuedFetch {
+
        rid: rid_second,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    });
+
    assert_eq!(result, Enqueue::Merged);
+

+
    // Order should be preserved: first, second (merged), third
+
    assert_eq!(queue.dequeue().unwrap().rid, rid_first);
+
    assert_eq!(queue.dequeue().unwrap().rid, rid_second);
+
    assert_eq!(queue.dequeue().unwrap().rid, rid_third);
+
}
+

+
#[test]
+
fn capacity_takes_precedence_over_merge_for_new_items() {
+
    let mut queue = create_queue(2);
+

+
    // Fill to capacity with unique items
+
    let _ = queue.enqueue(create_fetch());
+
    let _ = queue.enqueue(create_fetch());
+

+
    // New item (different rid) should be rejected
+
    let new_item = create_fetch();
+
    match queue.enqueue(new_item.clone()) {
+
        Enqueue::CapacityReached(returned) => assert_eq!(returned, new_item),
+
        _ => panic!("Expected CapacityReached"),
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/state.rs
@@ -0,0 +1,7 @@
+
mod command;
+
mod concurrent;
+
mod config;
+
mod dequeue;
+
mod helpers;
+
mod invariant;
+
mod multinode;
added crates/radicle-protocol/src/fetcher/test/state/command.rs
@@ -0,0 +1,3 @@
+
mod cancel;
+
mod fetch;
+
mod fetched;
added crates/radicle-protocol/src/fetcher/test/state/command/cancel.rs
@@ -0,0 +1,129 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::{ActiveFetch, FetcherState};
+

+
#[test]
+
fn single_ongoing() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.cancel(command::Cancel { from: node_a });
+

+
    match event {
+
        event::Cancel::Canceled {
+
            from,
+
            active: ongoing,
+
            queued,
+
        } => {
+
            assert_eq!(from, node_a);
+
            assert_eq!(ongoing.len(), 1);
+
            assert_eq!(
+
                ongoing.get(&repo_1),
+
                Some(&ActiveFetch {
+
                    from: node_a,
+
                    refs_at: refs_at_1,
+
                })
+
            );
+
            assert!(queued.is_empty());
+
        }
+
        _ => panic!("Expected Canceled event"),
+
    }
+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
}
+

+
#[test]
+
fn ongoing_and_queued() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.cancel(command::Cancel { from: node_a });
+

+
    match event {
+
        event::Cancel::Canceled {
+
            active: ongoing,
+
            queued,
+
            ..
+
        } => {
+
            assert_eq!(ongoing.len(), 1);
+
            assert!(ongoing.contains_key(&repo_1));
+
            assert_eq!(queued.len(), 2);
+
        }
+
        _ => panic!("Expected Canceled event"),
+
    }
+
}
+

+
#[test]
+
fn non_existent_returns_unexpected() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_unknown: NodeId = arbitrary::gen(1);
+

+
    let event = state.cancel(command::Cancel { from: node_unknown });
+

+
    assert_eq!(event, event::Cancel::Unexpected { from: node_unknown });
+
}
+

+
#[test]
+
fn cancellation_is_isolated() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.cancel(command::Cancel { from: node_a });
+

+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
    assert!(state.get_active_fetch(&repo_2).is_some());
+
}
added crates/radicle-protocol/src/fetcher/test/state/command/fetch.rs
@@ -0,0 +1,416 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::{ActiveFetch, FetcherState};
+

+
#[test]
+
fn fetch_start_first_fetch_for_node() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::Started {
+
            rid: repo_1,
+
            from: node_a,
+
            refs_at: refs_at_1.clone(),
+
            timeout,
+
        }
+
    );
+
    assert_eq!(
+
        state.get_active_fetch(&repo_1),
+
        Some(&ActiveFetch {
+
            from: node_a,
+
            refs_at: refs_at_1,
+
        })
+
    );
+
}
+

+
#[test]
+
fn fetch_different_repo_same_node_within_capacity() {
+
    let mut state = FetcherState::new(helpers::config(2, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    let event1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(event1, event::Fetch::Started { .. }));
+

+
    let event2 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    assert!(matches!(event2, event::Fetch::Started { rid, .. } if rid == repo_2));
+
    assert!(state.get_active_fetch(&repo_1).is_some());
+
    assert!(state.get_active_fetch(&repo_2).is_some());
+
}
+

+
#[test]
+
fn fetch_same_repo_different_nodes_queues_second() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    let event1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+
    assert!(matches!(event1, event::Fetch::Started { .. }));
+

+
    // Same repo from different node - gets queued since repo_1 is already active
+
    let event2 = state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert!(
+
        matches!(event2, event::Fetch::Queued { rid, from } if rid == repo_1 && from == node_b)
+
    );
+
    // Only node_a's fetch is active
+
    let active = state.get_active_fetch(&repo_1);
+
    assert!(active.is_some());
+
    assert_eq!(*active.unwrap().from(), node_a);
+
}
+

+
#[test]
+
fn fetch_duplicate_returns_already_fetching() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::AlreadyFetching {
+
            rid: repo_1,
+
            from: node_a,
+
        }
+
    );
+
}
+

+
#[test]
+
fn fetch_same_repo_different_refs_enqueues() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let refs_at_2 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_2.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::Queued {
+
            rid: repo_1,
+
            from: node_a,
+
        }
+
    );
+
}
+

+
#[test]
+
fn fetch_at_capacity_enqueues() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::Queued {
+
            rid: repo_2,
+
            from: node_a,
+
        }
+
    );
+
    assert!(state.get_active_fetch(&repo_1).is_some());
+
    assert!(state.get_active_fetch(&repo_2).is_none());
+
}
+

+
#[test]
+
fn fetch_queue_rejected_capacity_reached() {
+
    let mut state = FetcherState::new(helpers::config(1, 2));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let repo_4: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    // Fill concurrency
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Fill queue (capacity 2)
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Exceed queue capacity
+
    let refs_at_4 = helpers::gen_refs_at(1);
+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_4,
+
        refs_at: refs_at_4.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::QueueAtCapacity {
+
            rid: repo_4,
+
            from: node_a,
+
            refs_at: refs_at_4,
+
            timeout,
+
            capacity: 2,
+
        }
+
    );
+
}
+

+
#[test]
+
fn fetch_queue_merges_already_queued() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2a = helpers::gen_refs_at(1);
+
    let refs_at_2b = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2a.clone(),
+
        timeout,
+
    });
+

+
    // Second fetch for same queued repo - should merge refs
+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2b.clone(),
+
        timeout,
+
    });
+

+
    // Returns Queued (merged)
+
    assert_eq!(
+
        event,
+
        event::Fetch::Queued {
+
            rid: repo_2,
+
            from: node_a,
+
        }
+
    );
+

+
    // Dequeue and verify refs were merged
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    let queued = state.dequeue(&node_a).unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    // refs_at should contain both sets of refs
+
    assert_eq!(queued.refs_at.len(), refs_at_2a.len() + refs_at_2b.len());
+
}
+

+
#[test]
+
fn fetch_queue_merge_empty_refs_fetches_all() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue with specific refs
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2.clone(),
+
        timeout,
+
    });
+

+
    // Queue again with empty refs (fetch everything)
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: vec![],
+
        timeout,
+
    });
+

+
    // Dequeue and verify refs became empty (fetch all)
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    let queued = state.dequeue(&node_a).unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    assert!(queued.refs_at.is_empty());
+
}
+

+
#[test]
+
fn fetch_queue_merge_takes_longer_timeout() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let short_timeout = Duration::from_secs(10);
+
    let long_timeout = Duration::from_secs(60);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: short_timeout,
+
    });
+

+
    // Queue with short timeout
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: short_timeout,
+
    });
+

+
    // Queue again with longer timeout
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: long_timeout,
+
    });
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    // Dequeue and verify timeout is the longer one
+
    let queued = state.dequeue(&node_a).unwrap();
+
    assert_eq!(queued.timeout, long_timeout);
+
}
+

+
#[test]
+
fn fetch_after_previous_completed() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert!(matches!(event, event::Fetch::Started { .. }));
+
}
added crates/radicle-protocol/src/fetcher/test/state/command/fetched.rs
@@ -0,0 +1,145 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn complete_single_ongoing() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetched::Completed {
+
            from: node_a,
+
            rid: repo_1,
+
            refs_at: refs_at_1,
+
        }
+
    );
+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
}
+

+
#[test]
+
fn complete_then_dequeue_fifo() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let refs_at_2 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue repo_2 first, then repo_3
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2.clone(),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    assert!(matches!(event, event::Fetched::Completed { .. }));
+

+
    // Dequeue next - FIFO: repo_2 was queued first
+
    let queued = state.dequeue(&node_a);
+
    assert!(queued.is_some());
+
    let queued = queued.unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    assert_eq!(queued.from, node_a);
+
    assert_eq!(queued.refs_at, refs_at_2);
+
}
+

+
#[test]
+
fn complete_one_of_multiple() {
+
    let mut state = FetcherState::new(helpers::config(3, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_2,
+
    });
+

+
    assert!(matches!(event, event::Fetched::Completed { rid, .. } if rid == repo_2));
+
    assert!(state.get_active_fetch(&repo_1).is_some());
+
    assert!(state.get_active_fetch(&repo_2).is_none());
+
    assert!(state.get_active_fetch(&repo_3).is_some());
+
}
+

+
#[test]
+
fn non_existent_returns_not_found() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetched::NotFound {
+
            from: node_a,
+
            rid: repo_1,
+
        }
+
    );
+
}
added crates/radicle-protocol/src/fetcher/test/state/concurrent.rs
@@ -0,0 +1,106 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn interleaved_operations() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    // fetch(A, r1)
+
    let e1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(e1, event::Fetch::Started { .. }));
+

+
    // fetch(B, r2)
+
    let e2 = state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(e2, event::Fetch::Started { .. }));
+

+
    // fetched(A, r1)
+
    let e3 = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    assert!(matches!(e3, event::Fetched::Completed { .. }));
+

+
    // fetch(A, r3)
+
    let e4 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(e4, event::Fetch::Started { .. }));
+

+
    // fetched(B, r2)
+
    let e5 = state.fetched(command::Fetched {
+
        from: node_b,
+
        rid: repo_2,
+
    });
+
    assert!(matches!(e5, event::Fetched::Completed { .. }));
+

+
    // Final state: only r3 from A ongoing
+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
    assert!(state.get_active_fetch(&repo_2).is_none());
+
    assert!(state.get_active_fetch(&repo_3).is_some());
+
}
+

+
#[test]
+
fn fetched_then_cancel() {
+
    let mut state = FetcherState::new(helpers::config(2, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Complete repo_1
+
    let e1 = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    assert!(matches!(e1, event::Fetched::Completed { .. }));
+

+
    // Cancel remaining
+
    let e2 = state.cancel(command::Cancel { from: node_a });
+
    match e2 {
+
        event::Cancel::Canceled {
+
            active: ongoing, ..
+
        } => {
+
            assert_eq!(ongoing.len(), 1);
+
            assert!(ongoing.contains_key(&repo_2));
+
        }
+
        _ => panic!("Expected Canceled"),
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/state/config.rs
@@ -0,0 +1,72 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn high_concurrency() {
+
    let mut state = FetcherState::new(helpers::config(100, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    for i in 0..100 {
+
        let repo: RepoId = arbitrary::gen(i + 1);
+
        let event = state.fetch(command::Fetch {
+
            from: node_a,
+
            rid: repo,
+
            refs_at: helpers::gen_refs_at(1),
+
            timeout,
+
        });
+
        assert!(
+
            matches!(event, event::Fetch::Started { .. }),
+
            "Fetch {} should start",
+
            i
+
        );
+
    }
+

+
    assert_eq!(
+
        state
+
            .active_fetches()
+
            .iter()
+
            .filter(|(_, f)| *f.from() == node_a)
+
            .count(),
+
        100
+
    );
+
}
+

+
#[test]
+
fn min_queue_size() {
+
    let mut state = FetcherState::new(helpers::config(1, 1));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(event1, event::Fetch::Queued { .. }));
+

+
    let event2 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(event2, event::Fetch::QueueAtCapacity { .. }));
+
}
added crates/radicle-protocol/src/fetcher/test/state/dequeue.rs
@@ -0,0 +1,112 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::command;
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn cannot_dequeue_while_node_at_capacity() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2 = helpers::gen_refs_at(3);
+
    let timeout_2 = Duration::from_secs(42);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: Duration::from_secs(10),
+
    });
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2.clone(),
+
        timeout: timeout_2,
+
    });
+

+
    let result = state.dequeue(&node_a);
+
    assert!(result.is_none());
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    let result = state.dequeue(&node_a);
+
    let queued = result.unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    assert_eq!(queued.from, node_a);
+
    assert_eq!(queued.refs_at, refs_at_2);
+
    assert_eq!(queued.timeout, timeout_2);
+
}
+

+
#[test]
+
fn maintains_fifo_order() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let repo_4: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue in order: repo_2, repo_3, repo_4
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_4,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    assert_eq!(state.dequeue(&node_a).unwrap().rid, repo_2);
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_2,
+
    });
+
    assert_eq!(state.dequeue(&node_a).unwrap().rid, repo_3);
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_3,
+
    });
+
    assert_eq!(state.dequeue(&node_a).unwrap().rid, repo_4);
+
    assert!(state.dequeue(&node_a).is_none());
+
}
+

+
#[test]
+
fn empty_queue_returns_none() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+

+
    assert!(state.dequeue(&node_a).is_none());
+
}
added crates/radicle-protocol/src/fetcher/test/state/helpers.rs
@@ -0,0 +1,17 @@
+
use std::num::NonZeroUsize;
+

+
use radicle::{storage::refs::RefsAt, test::arbitrary};
+

+
use crate::fetcher::{Config, MaxQueueSize};
+

+
pub fn config(max_concurrency: usize, max_queue_size: usize) -> Config {
+
    Config::new()
+
        .with_max_concurrency(NonZeroUsize::new(max_concurrency).unwrap())
+
        .with_max_capacity(MaxQueueSize::new(
+
            NonZeroUsize::new(max_queue_size).unwrap(),
+
        ))
+
}
+

+
pub fn gen_refs_at(count: usize) -> Vec<RefsAt> {
+
    (0..count).map(|_| arbitrary::gen(1)).collect()
+
}
added crates/radicle-protocol/src/fetcher/test/state/invariant.rs
@@ -0,0 +1,53 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::command;
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn queue_integrity_after_merge() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2a = helpers::gen_refs_at(1);
+
    let refs_at_2b = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2a.clone(),
+
        timeout,
+
    });
+

+
    // Second fetch for same repo - should merge
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2b.clone(),
+
        timeout,
+
    });
+

+
    // Queue should have exactly one repo_2 entry (merged)
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    let first = state.dequeue(&node_a);
+
    assert!(first.is_some());
+
    assert_eq!(first.unwrap().rid, repo_2);
+

+
    let second = state.dequeue(&node_a);
+
    assert!(second.is_none());
+
}
added crates/radicle-protocol/src/fetcher/test/state/multinode.rs
@@ -0,0 +1,83 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn independent_queues() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_a_active: RepoId = arbitrary::gen(1);
+
    let repo_b_active: RepoId = arbitrary::gen(2);
+
    let repo_a_queued: RepoId = arbitrary::gen(10);
+
    let repo_b_queued: RepoId = arbitrary::gen(20);
+
    let timeout = Duration::from_secs(30);
+

+
    // Fill capacity for both nodes
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_a_active,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_b_active,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue for both
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_a_queued,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_b_queued,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Dequeue from A doesn't affect B
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_a_active,
+
    });
+
    let a_item = state.dequeue(&node_a);
+
    assert_eq!(a_item.unwrap().rid, repo_a_queued);
+

+
    state.fetched(command::Fetched {
+
        from: node_b,
+
        rid: repo_b_active,
+
    });
+
    let b_item = state.dequeue(&node_b);
+
    assert_eq!(b_item.unwrap().rid, repo_b_queued);
+
}
+

+
#[test]
+
fn high_count() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let timeout = Duration::from_secs(30);
+

+
    for i in 0..100 {
+
        let node: NodeId = arbitrary::gen(i + 1);
+
        let repo: RepoId = arbitrary::gen(i + 1);
+
        let event = state.fetch(command::Fetch {
+
            from: node,
+
            rid: repo,
+
            refs_at: helpers::gen_refs_at(1),
+
            timeout,
+
        });
+
        assert!(matches!(event, event::Fetch::Started { .. }));
+
    }
+

+
    assert_eq!(state.active_fetches().len(), 100);
+
}
modified crates/radicle-protocol/src/lib.rs
@@ -1,5 +1,6 @@
pub mod bounded;
pub mod deserializer;
+
pub mod fetcher;
pub mod service;
pub mod wire;
pub mod worker;
modified crates/radicle-protocol/src/service.rs
@@ -38,6 +38,9 @@ use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::RepositoryError;
use radicle_fetch::policy::SeedingPolicy;

+
use crate::fetcher;
+
use crate::fetcher::service::FetcherService;
+
use crate::fetcher::FetcherState;
use crate::service::gossip::Store as _;
use crate::service::message::{
    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
@@ -221,7 +224,9 @@ pub enum ConnectError {
    SelfConnection,
    #[error("outbound connection limit reached when attempting {nid} ({addr})")]
    LimitReached { nid: NodeId, addr: Address },
-
    #[error("attempted connection to {nid}, via {addr} but addresses of this kind are not supported")]
+
    #[error(
+
        "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
+
    )]
    UnsupportedAddress { nid: NodeId, addr: Address },
}

@@ -301,25 +306,6 @@ pub enum CommandError {
    Policy(#[from] policy::Error),
}

-
/// Error returned by [`Service::try_fetch`].
-
#[derive(thiserror::Error, Debug)]
-
enum TryFetchError<'a> {
-
    #[error("ongoing fetch for repository exists")]
-
    AlreadyFetching(&'a mut FetchState),
-
    #[error("peer is not connected; cannot initiate fetch")]
-
    SessionNotConnected,
-
    #[error("peer fetch capacity reached; cannot initiate fetch")]
-
    SessionCapacityReached,
-
    #[error(transparent)]
-
    Namespaces(Box<NamespacesError>),
-
}
-

-
impl From<NamespacesError> for TryFetchError<'_> {
-
    fn from(e: NamespacesError) -> Self {
-
        Self::Namespaces(Box::new(e))
-
    }
-
}
-

/// Fetch state for an ongoing fetch.
#[derive(Debug)]
pub struct FetchState {
@@ -331,15 +317,6 @@ pub struct FetchState {
    pub subscribers: Vec<chan::Sender<FetchResult>>,
}

-
impl FetchState {
-
    /// Add a subscriber to this fetch.
-
    fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
-
        if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
-
            self.subscribers.push(c);
-
        }
-
    }
-
}
-

/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
@@ -439,8 +416,7 @@ pub struct Service<D, S, G> {
    inventory: InventoryAnnouncement,
    /// Source of entropy.
    rng: Rng,
-
    /// Ongoing fetches.
-
    fetching: HashMap<RepoId, FetchState>,
+
    fetcher: FetcherService<chan::Sender<FetchResult>>,
    /// Request/connection rate limiter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -508,7 +484,15 @@ where
        let last_timestamp = node.timestamp;
        let clock = LocalTime::default(); // Updated on initialize.
        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
-

+
        let fetcher = {
+
            let config = fetcher::Config::new()
+
                .with_max_concurrency(
+
                    std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
+
                        .expect("fetch concurrency was zero, must be at least 1"),
+
                )
+
                .with_max_capacity(fetcher::MaxQueueSize::default());
+
            FetcherService::new(config)
+
        };
        Self {
            config,
            storage,
@@ -522,7 +506,7 @@ where
            outbox: Outbox::default(),
            limiter,
            sessions,
-
            fetching: HashMap::new(),
+
            fetcher,
            filter: Filter::empty(),
            relayed_by: HashMap::default(),
            last_idle: LocalTime::default(),
@@ -623,6 +607,10 @@ where
        Events::from(self.emitter.subscribe())
    }

+
    pub fn fetcher(&self) -> &FetcherState {
+
        self.fetcher.state()
+
    }
+

    /// Get I/O outbox.
    pub fn outbox(&mut self) -> &mut Outbox {
        &mut self.outbox
@@ -898,7 +886,7 @@ where
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
-
                self.fetch(rid, seed, timeout, Some(resp));
+
                self.fetch(rid, seed, vec![], timeout, Some(resp));
            }
            Command::Seed(rid, scope, resp) => {
                // Update our seeding policy.
@@ -990,7 +978,8 @@ where
                if status.want.is_empty() {
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
-
                    return self._fetch(rid, from, status.want, timeout, channel);
+
                    self.fetch(rid, from, status.want, timeout, channel);
+
                    return true;
                }
            }
            Err(e) => {
@@ -1001,247 +990,176 @@ where
        false
    }

-
    /// Initiate an outgoing fetch for some repository.
    fn fetch(
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        timeout: time::Duration,
-
        channel: Option<chan::Sender<FetchResult>>,
-
    ) -> bool {
-
        self._fetch(rid, from, vec![], timeout, channel)
-
    }
-

-
    fn _fetch(
-
        &mut self,
-
        rid: RepoId,
-
        from: NodeId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) -> bool {
-
        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
-
            Ok(fetching) => {
+
    ) {
+
        let session = {
+
            let reason = format!("peer {from} is not connected; cannot initiate fetch");
+
            let Some(session) = self.sessions.get_mut(&from) else {
                if let Some(c) = channel {
-
                    fetching.subscribe(c);
+
                    c.send(FetchResult::Failed { reason }).ok();
                }
-
                return true;
-
            }
-
            Err(TryFetchError::AlreadyFetching(fetching)) => {
-
                // If we're already fetching the same refs from the requested peer, there's nothing
-
                // to do, we simply add the supplied channel to the list of subscribers so that it
-
                // is notified on completion. Otherwise, we queue a fetch with the requested peer.
-
                if fetching.from == from && fetching.refs_at == refs_at {
-
                    debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
-

-
                    if let Some(c) = channel {
-
                        fetching.subscribe(c);
-
                    }
-
                } else {
-
                    let fetch = QueuedFetch {
-
                        rid,
-
                        refs_at,
-
                        from,
-
                        timeout,
-
                        channel,
-
                    };
-
                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
-

-
                    self.queue_fetch(fetch);
-
                }
-
            }
-
            Err(TryFetchError::SessionCapacityReached) => {
-
                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
-
                self.queue_fetch(QueuedFetch {
-
                    rid,
-
                    refs_at,
-
                    from,
-
                    timeout,
-
                    channel,
-
                });
-
            }
-
            Err(e) => {
+
                return;
+
            };
+
            if !session.is_connected() {
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed {
-
                        reason: e.to_string(),
-
                    })
-
                    .ok();
+
                    c.send(FetchResult::Failed { reason }).ok();
                }
+
                return;
            }
-
        }
-
        false
-
    }
-

-
    fn queue_fetch(&mut self, fetch: QueuedFetch) {
-
        let Some(s) = self.sessions.get_mut(&fetch.from) else {
-
            log::debug!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
-
            return;
+
            session
        };
-
        if let Err(e) = s.queue_fetch(fetch) {
-
            let fetch = e.inner();
-
            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
-
        }
-
    }

-
    // TODO: Buffer/throttle fetches.
-
    fn try_fetch(
-
        &mut self,
-
        rid: RepoId,
-
        from: &NodeId,
-
        refs_at: Vec<RefsAt>,
-
        timeout: time::Duration,
-
    ) -> Result<&mut FetchState, TryFetchError<'_>> {
-
        let from = *from;
-
        let Some(session) = self.sessions.get_mut(&from) else {
-
            return Err(TryFetchError::SessionNotConnected);
-
        };
-
        let fetching = self.fetching.entry(rid);
-

-
        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
-

-
        let fetching = match fetching {
-
            Entry::Vacant(fetching) => fetching,
-
            Entry::Occupied(fetching) => {
-
                // We're already fetching this repo from some peer.
-
                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
-
            }
-
        };
-
        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
-
        // fetching from any session.
-
        debug_assert!(!session.is_fetching(&rid));
-

-
        if !session.is_connected() {
-
            // This can happen if a session disconnects in the time between asking for seeds to
-
            // fetch from, and initiating the fetch from one of those seeds.
-
            return Err(TryFetchError::SessionNotConnected);
-
        }
-
        if session.is_at_capacity() {
-
            // If we're already fetching multiple repos from this peer.
-
            return Err(TryFetchError::SessionCapacityReached);
-
        }
-

-
        let fetching = fetching.insert(FetchState {
+
        let cmd = fetcher::state::command::Fetch {
            from,
-
            refs_at: refs_at.clone(),
-
            subscribers: vec![],
-
        });
-
        self.outbox.fetch(
-
            session,
            rid,
            refs_at,
            timeout,
-
            self.config.limits.fetch_pack_receive,
-
        );
+
        };
+
        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);

-
        Ok(fetching)
+
        if let Some(c) = rejected {
+
            c.send(FetchResult::Failed {
+
                reason: "fetch queue at capacity".to_string(),
+
            })
+
            .ok();
+
        }
+

+
        match event {
+
            fetcher::state::event::Fetch::Started {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            } => {
+
                debug!(target: "service", "Starting fetch for {rid} from {from}");
+
                self.outbox.fetch(
+
                    session,
+
                    rid,
+
                    refs_at,
+
                    timeout,
+
                    self.config.limits.fetch_pack_receive,
+
                );
+
            }
+
            fetcher::state::event::Fetch::Queued { rid, from } => {
+
                debug!(target: "service", "Queued fetch for {rid} from {from}");
+
            }
+
            fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
+
                debug!(target: "service", "Already fetching {rid} from {from}");
+
            }
+
            fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
+
                debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
+
            }
+
        }
    }

    pub fn fetched(
        &mut self,
        rid: RepoId,
-
        remote: NodeId,
+
        from: NodeId,
        result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
    ) {
-
        let Some(fetching) = self.fetching.remove(&rid) else {
-
            debug!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
-
            return;
-
        };
-
        debug_assert_eq!(fetching.from, remote);
-

-
        if let Some(s) = self.sessions.get_mut(&remote) {
-
            // Mark this RID as fetched for this session.
-
            s.fetched(rid);
-
        }
+
        let cmd = fetcher::state::command::Fetched { from, rid };
+
        let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);

-
        // Notify all fetch subscribers of the fetch result. This is used when the user requests
-
        // a fetch via the CLI, for example.
-
        for sub in &fetching.subscribers {
-
            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
+
        // Dequeue next fetches
+
        self.dequeue_fetches();

-
            let result = match &result {
-
                Ok(success) => FetchResult::Success {
-
                    updated: success.updated.clone(),
-
                    namespaces: success.namespaces.clone(),
-
                    clone: success.clone,
-
                },
-
                Err(e) => FetchResult::Failed {
-
                    reason: e.to_string(),
-
                },
-
            };
-
            if sub.send(result).is_err() {
-
                debug!(target: "service", "Failed to send fetch result for {rid} from {remote}..");
-
            } else {
-
                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
-
            }
-
        }
-

-
        match result {
-
            Ok(crate::worker::fetch::FetchResult {
-
                updated,
-
                canonical,
-
                namespaces,
-
                clone,
-
                doc,
-
            }) => {
-
                info!(target: "service", "Fetched {rid} from {remote} successfully");
-
                // Update our routing table in case this fetch was user-initiated and doesn't
-
                // come from an announcement.
-
                self.seed_discovered(rid, remote, self.clock.into());
-

-
                for update in &updated {
-
                    if update.is_skipped() {
-
                        trace!(target: "service", "Ref skipped: {update} for {rid}");
-
                    } else {
-
                        debug!(target: "service", "Ref updated: {update} for {rid}");
-
                    }
+
        match event {
+
            fetcher::state::event::Fetched::NotFound { from, rid } => {
+
                debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
+
            }
+
            fetcher::state::event::Fetched::Completed {
+
                from,
+
                rid,
+
                refs_at: _,
+
            } => {
+
                // Notify responders
+
                let fetch_result = match &result {
+
                    Ok(success) => FetchResult::Success {
+
                        updated: success.updated.clone(),
+
                        namespaces: success.namespaces.clone(),
+
                        clone: success.clone,
+
                    },
+
                    Err(e) => FetchResult::Failed {
+
                        reason: e.to_string(),
+
                    },
+
                };
+
                for responder in subscribers {
+
                    responder.send(fetch_result.clone()).ok();
                }
-
                self.emitter.emit(Event::RefsFetched {
-
                    remote,
-
                    rid,
-
                    updated: updated.clone(),
-
                });
-
                self.emitter
-
                    .emit_all(canonical.into_iter().map(|(refname, target)| {
-
                        Event::CanonicalRefUpdated {
-
                            rid,
-
                            refname,
-
                            target,
+
                match result {
+
                    Ok(crate::worker::fetch::FetchResult {
+
                        updated,
+
                        canonical,
+
                        namespaces,
+
                        clone,
+
                        doc,
+
                    }) => {
+
                        info!(target: "service", "Fetched {rid} from {from} successfully");
+
                        // Update our routing table in case this fetch was user-initiated and doesn't
+
                        // come from an announcement.
+
                        self.seed_discovered(rid, from, self.clock.into());
+

+
                        for update in &updated {
+
                            if update.is_skipped() {
+
                                trace!(target: "service", "Ref skipped: {update} for {rid}");
+
                            } else {
+
                                debug!(target: "service", "Ref updated: {update} for {rid}");
+
                            }
                        }
-
                    }));
+
                        self.emitter.emit(Event::RefsFetched {
+
                            remote: from,
+
                            rid,
+
                            updated: updated.clone(),
+
                        });
+
                        self.emitter
+
                            .emit_all(canonical.into_iter().map(|(refname, target)| {
+
                                Event::CanonicalRefUpdated {
+
                                    rid,
+
                                    refname,
+
                                    target,
+
                                }
+
                            }));

-
                // Announce our new inventory if this fetch was a full clone.
-
                // Only update and announce inventory for public repositories.
-
                if clone && doc.is_public() {
-
                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
+
                        // Announce our new inventory if this fetch was a full clone.
+
                        // Only update and announce inventory for public repositories.
+
                        if clone && doc.is_public() {
+
                            debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");

-
                    if let Err(e) = self.add_inventory(rid) {
-
                        warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
-
                    }
-
                }
+
                            if let Err(e) = self.add_inventory(rid) {
+
                                warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
+
                            }
+
                        }

-
                // It's possible for a fetch to succeed but nothing was updated.
-
                if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
-
                    debug!(target: "service", "Nothing to announce, no refs were updated..");
-
                } else {
-
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
-
                    // beyond just knowing that we have added an item to our inventory.
-
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
-
                        warn!(target: "service", "Failed to announce new refs: {e}");
+
                        // It's possible for a fetch to succeed but nothing was updated.
+
                        if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
+
                            debug!(target: "service", "Nothing to announce, no refs were updated..");
+
                        } else {
+
                            // Finally, announce the refs. This is useful for nodes to know what we've synced,
+
                            // beyond just knowing that we have added an item to our inventory.
+
                            if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
+
                                warn!(target: "service", "Failed to announce new refs: {e}");
+
                            }
+
                        }
                    }
-
                }
-
            }
-
            Err(err) => {
-
                warn!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
+
                    Err(err) => {
+
                        warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");

-
                // For now, we only disconnect the remote in case of timeout. In the future,
-
                // there may be other reasons to disconnect.
-
                if err.is_timeout() {
-
                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
+
                        // For now, we only disconnect the from in case of timeout. In the future,
+
                        // there may be other reasons to disconnect.
+
                        if err.is_timeout() {
+
                            self.outbox.disconnect(from, DisconnectReason::Fetch(err));
+
                        }
+
                    }
                }
            }
        }
-
        // We can now try to dequeue more fetches.
-
        self.dequeue_fetches();
    }

    /// Attempt to dequeue fetches from all peers.
@@ -1258,38 +1176,42 @@ where
            .map(|(k, _)| *k)
            .collect::<Vec<_>>();

-
        // Try to dequeue once per session.
        for nid in sessions {
-
            // SAFETY: All the keys we are iterating on exist.
            #[allow(clippy::unwrap_used)]
            let sess = self.sessions.get_mut(&nid).unwrap();
-
            if !sess.is_connected() || sess.is_at_capacity() {
+
            if !sess.is_connected() {
                continue;
            }

-
            if let Some(QueuedFetch {
+
            let Some(fetcher::QueuedFetch {
                rid,
                from,
                refs_at,
                timeout,
-
                channel,
-
            }) = sess.dequeue_fetch()
-
            {
-
                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
+
            }) = self.fetcher.dequeue(&nid)
+
            else {
+
                continue;
+
            };

-
                if let Some(refs) = NonEmpty::from_vec(refs_at) {
-
                    let repo_entry = self.policies.seed_policy(&rid).expect(
-
                        "Service::dequeue_fetch: error accessing repo seeding configuration",
-
                    );
-
                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
-
                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
-
                        continue;
-
                    };
-
                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
-
                } else {
-
                    // If no refs are specified, always do a full fetch.
-
                    self.fetch(rid, from, timeout, channel);
-
                }
+
            // Check seeding policy
+
            let repo_entry = self
+
                .policies
+
                .seed_policy(&rid)
+
                .expect("error accessing repo seeding configuration");
+

+
            let SeedingPolicy::Allow { scope } = repo_entry.policy else {
+
                debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
+
                continue;
+
            };
+

+
            debug!(target: "service", "Dequeued fetch for {} from {}", rid, from);
+

+
            // Channel is `None` in both cases since they will already be
+
            // registered with the fetcher service.
+
            if let Some(refs) = NonEmpty::from_vec(refs_at.clone()) {
+
                self.fetch_refs_at(rid, from, refs, scope, timeout, None);
+
            } else {
+
                self.fetch(rid, from, refs_at, timeout, None);
            }
        }
    }
@@ -1391,7 +1313,6 @@ where
                        self.config.is_persistent(&remote),
                        self.rng.clone(),
                        self.clock,
-
                        self.config.limits.clone(),
                    ));
                    self.outbox.write_all(peer, msgs);
                }
@@ -1422,19 +1343,30 @@ where
        let link = session.link;
        let addr = session.addr.clone();

-
        self.fetching.retain(|_, fetching| {
-
            if fetching.from != remote {
-
                return true;
+
        let cmd = fetcher::state::command::Cancel { from: remote };
+
        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
+

+
        match event {
+
            fetcher::state::event::Cancel::Unexpected { from } => {
+
                debug!(target: "service", "No fetches to cancel for {from}");
            }
-
            // Remove and fail any pending fetches from this remote node.
-
            for resp in &fetching.subscribers {
-
                resp.send(FetchResult::Failed {
-
                    reason: format!("disconnected: {reason}"),
+
            fetcher::state::event::Cancel::Canceled {
+
                from,
+
                active,
+
                queued,
+
            } => {
+
                debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
+
            }
+
        }
+

+
        // Notify orphaned responders
+
        for (rid, responder) in orphaned {
+
            responder
+
                .send(FetchResult::Failed {
+
                    reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
                })
                .ok();
-
            }
-
            false
-
        });
+
        }

        // Attempt to re-connect to persistent peers.
        if self.config.is_persistent(&remote) {
@@ -1651,7 +1583,7 @@ where

                for rid in missing {
                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
-
                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
+
                    self.fetch(rid, *announcer, vec![], FETCH_TIMEOUT, None);
                }
                return Ok(relay);
            }
@@ -2284,13 +2216,7 @@ where
        }
        self.sessions.insert(
            nid,
-
            Session::outbound(
-
                nid,
-
                addr.clone(),
-
                persistent,
-
                self.rng.clone(),
-
                self.config.limits.clone(),
-
            ),
+
            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
        );
        self.outbox.connect(nid, addr);

@@ -2563,7 +2489,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
+
                            self.fetch(rid, seed.nid, vec![], FETCH_TIMEOUT, None);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
@@ -2717,7 +2643,7 @@ pub trait ServiceState {
    /// Get the existing sessions.
    fn sessions(&self) -> &Sessions;
    /// Get fetch state.
-
    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
+
    fn fetching(&self) -> &FetcherState;
    /// Get outbox.
    fn outbox(&self) -> &Outbox;
    /// Get rate limiter.
@@ -2750,8 +2676,8 @@ where
        &self.sessions
    }

-
    fn fetching(&self) -> &HashMap<RepoId, FetchState> {
-
        &self.fetching
+
    fn fetching(&self) -> &FetcherState {
+
        self.fetcher.state()
    }

    fn outbox(&self) -> &Outbox {
modified crates/radicle-protocol/src/service/io.rs
@@ -138,8 +138,6 @@ impl Outbox {
        timeout: time::Duration,
        reader_limit: FetchPackSizeLimit,
    ) {
-
        peer.fetching(rid);
-

        let refs_at = (!refs_at.is_empty()).then_some(refs_at);

        if let Some(refs_at) = &refs_at {
modified crates/radicle-protocol/src/service/session.rs
@@ -1,8 +1,7 @@
-
use std::collections::{HashSet, VecDeque};
+
use std::collections::VecDeque;
use std::{fmt, time};

use crossbeam_channel as chan;
-
use radicle::node::config::Limits;
use radicle::node::{FetchResult, Severity};
use radicle::node::{Link, Timestamp};
pub use radicle::node::{PingState, State};
@@ -111,8 +110,6 @@ pub struct Session {
    pub subscribe: Option<message::Subscribe>,
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,
-
    /// Fetch queue.
-
    pub queue: VecDeque<QueuedFetch>,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
@@ -120,8 +117,6 @@ pub struct Session {
    attempts: usize,
    /// Source of entropy.
    rng: Rng,
-
    /// Protocol limits.
-
    limits: Limits,
}

impl fmt::Display for Session {
@@ -159,7 +154,7 @@ impl From<&Session> for radicle::node::Session {
}

impl Session {
-
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, limits: Limits) -> Self {
+
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
        Self {
            id,
            addr,
@@ -168,28 +163,18 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
-
            queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
            attempts: 1,
            rng,
-
            limits,
        }
    }

-
    pub fn inbound(
-
        id: NodeId,
-
        addr: Address,
-
        persistent: bool,
-
        rng: Rng,
-
        time: LocalTime,
-
        limits: Limits,
-
    ) -> Self {
+
    pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
        Self {
            id,
            addr,
            state: State::Connected {
                since: time,
                ping: PingState::default(),
-
                fetching: HashSet::default(),
                latencies: VecDeque::default(),
                stable: false,
            },
@@ -197,10 +182,8 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: time,
-
            queue: VecDeque::new(),
            attempts: 0,
            rng,
-
            limits,
        }
    }

@@ -224,41 +207,6 @@ impl Session {
        matches!(self.state, State::Initial)
    }

-
    pub fn is_at_capacity(&self) -> bool {
-
        if let State::Connected { fetching, .. } = &self.state {
-
            if fetching.len() >= self.limits.fetch_concurrency.into() {
-
                return true;
-
            }
-
        }
-
        false
-
    }
-

-
    pub fn is_fetching(&self, rid: &RepoId) -> bool {
-
        if let State::Connected { fetching, .. } = &self.state {
-
            return fetching.contains(rid);
-
        }
-
        false
-
    }
-

-
    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
-
    /// it already was present in the queue.
-
    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
-
        assert_eq!(fetch.from, self.id);
-

-
        if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
-
            return Err(QueueError::CapacityReached(fetch));
-
        } else if self.queue.contains(&fetch) {
-
            return Err(QueueError::Duplicate(fetch));
-
        }
-
        self.queue.push_back(fetch);
-

-
        Ok(())
-
    }
-

-
    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
-
        self.queue.pop_front()
-
    }
-

    pub fn attempts(&self) -> usize {
        self.attempts
    }
@@ -279,33 +227,6 @@ impl Session {
        }
    }

-
    /// Mark this session as fetching the given RID.
-
    ///
-
    /// # Panics
-
    ///
-
    /// If it is already fetching that RID, or the session is disconnected.
-
    pub fn fetching(&mut self, rid: RepoId) {
-
        if let State::Connected { fetching, .. } = &mut self.state {
-
            assert!(
-
                fetching.insert(rid),
-
                "Session must not already be fetching {rid}"
-
            );
-
        } else {
-
            panic!(
-
                "Attempting to fetch {rid} from disconnected session {}",
-
                self.id
-
            );
-
        }
-
    }
-

-
    pub fn fetched(&mut self, rid: RepoId) {
-
        if let State::Connected { fetching, .. } = &mut self.state {
-
            if !fetching.remove(&rid) {
-
                log::debug!(target: "service", "Fetched unknown repository {rid}");
-
            }
-
        }
-
    }
-

    pub fn to_attempted(&mut self) {
        assert!(
            self.is_initial(),
@@ -324,7 +245,6 @@ impl Session {
        self.state = State::Connected {
            since,
            ping: PingState::default(),
-
            fetching: HashSet::default(),
            latencies: VecDeque::default(),
            stable: false,
        };
modified crates/radicle-schemars/src/main.rs
@@ -87,7 +87,7 @@ fn print_schema() -> io::Result<()> {
                    #[schemars(with = "radicle::schemars_ext::crypto::PublicKey")]
                    radicle::node::NodeId,
                ),
-
                Config(radicle::node::Config),
+
                Config(Box<radicle::node::Config>),
                ListenAddrs(ListenAddrs),
                ConnectResult(radicle::node::ConnectResult),
                Success(radicle::node::Success),
modified crates/radicle/CHANGELOG.md
@@ -46,6 +46,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed

+
- The data returned by `Seeds` contains `state`, which in turn contained the
+
  field `fetching` for ongoing fetches of that node, if in the `Connected`
+
  state. `Connected` no longer contains that field.
+

### Security

## 0.20.0
modified crates/radicle/src/node.rs
@@ -107,8 +107,6 @@ pub enum State {
        /// Ping state.
        #[serde(skip)]
        ping: PingState,
-
        /// Ongoing fetches.
-
        fetching: HashSet<RepoId>,
        /// Measured latencies for this peer.
        #[serde(skip)]
        latencies: VecDeque<LocalDuration>,
@@ -696,7 +694,7 @@ impl From<Vec<Seed>> for Seeds {
    }
}

-
#[derive(Clone, Debug, Serialize, Deserialize)]
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum FetchResult {
@@ -1486,7 +1484,6 @@ mod test {
            &serde_json::to_string(&CommandResult::Okay(State::Connected {
                since: LocalTime::now(),
                ping: Default::default(),
-
                fetching: Default::default(),
                latencies: VecDeque::default(),
                stable: false,
            }))
modified crates/radicle/src/node/command.rs
@@ -313,7 +313,6 @@ mod test {
            &serde_json::to_string(&CommandResult::Okay(State::Connected {
                since: LocalTime::now(),
                ping: Default::default(),
-
                fetching: Default::default(),
                latencies: VecDeque::default(),
                stable: false,
            }))
@@ -329,7 +328,7 @@ mod test {
        );
        assert_matches!(
            json::from_str::<CommandResult<Seeds>>(
-
                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
+
                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994}}}]"#
            ),
            Ok(CommandResult::Okay(_))
        );
modified crates/radicle/src/storage/refs.rs
@@ -371,7 +371,7 @@ impl<V> Deref for SignedRefs<V> {
///
/// `RefsAt` can also be used for communicating announcements of updates
/// references to other nodes.
-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct RefsAt {