Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: introduce fetcher state
Fintan Halpenny committed 3 months ago
commit b71bac7a20014dbc22a4c8d71fbd4f7605d05be5
parent 3ae632392292b138e216101bb1963c1a1009e78f
30 files changed +2482 -3
modified Cargo.lock
@@ -3098,6 +3098,7 @@ dependencies = [
 "qcheck",
 "qcheck-macros",
 "radicle",
+
 "radicle-core",
 "radicle-crypto",
 "radicle-fetch",
 "radicle-localtime",
modified crates/radicle-cli/src/terminal/highlight.rs
@@ -145,9 +145,11 @@ impl Builder {
                    }
                }
                ts::HighlightEvent::HighlightStart(h) => {
-
                    let name = HIGHLIGHTS[h.0];
-
                    let style =
-
                        term::Style::default().fg(theme.highlight(name).unwrap_or_default());
+
                    let color = HIGHLIGHTS
+
                        .get(h.0)
+
                        .and_then(|name| theme.highlight(name))
+
                        .unwrap_or_default();
+
                    let style = term::Style::default().fg(color);

                    self.advance();
                    self.styles.push(style);
modified crates/radicle-protocol/Cargo.toml
@@ -21,6 +21,7 @@ log = { workspace = true, features = ["std"] }
nonempty = { workspace = true, features = ["serialize"] }
qcheck = { workspace = true, optional = true }
radicle = { workspace = true, features = ["logger"] }
+
radicle-core = { workspace = true }
radicle-fetch = { workspace = true }
radicle-localtime = { workspace = true }
sqlite = { workspace = true, features = ["bundled"] }
added crates/radicle-protocol/src/fetcher.rs
@@ -0,0 +1,5 @@
+
pub mod state;
+
pub use state::{ActiveFetch, Config, FetcherState, MaxQueueSize, Queue, QueueIter, QueuedFetch};
+

+
#[cfg(test)]
+
mod test;
added crates/radicle-protocol/src/fetcher/state.rs
@@ -0,0 +1,454 @@
+
//! Logical state for Git fetches happening in the node.
+
//!
+
//! See [`FetcherState`] for more information.
+
//!
+
//! See [`command`]'s for input into [`FetcherState`].
+
//! See [`event`]'s for output from [`FetcherState`].
+

+
pub mod command;
+
pub mod event;
+

+
pub use command::Command;
+
pub use event::Event;
+

+
use std::collections::{BTreeMap, VecDeque};
+
use std::num::NonZeroUsize;
+
use std::time;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
/// Default for the maximum items per fetch queue.
+
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
+
/// Default for maximum concurrency per node.
+
pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;
+

+
/// Logical state for Git fetches happening in the node.
+
///
+
/// A fetch can either be:
+
///   - [`ActiveFetch`]: meaning it is currently being fetched from another node on the network
+
///   - [`QueuedFetch`]: meaning it is expected to be fetched from a given node, but the
+
///     repository is already being fetched, or the node is at capacity.
+
///
+
/// For any given repository, identified by its [`RepoId`], there can only be
+
/// one fetch occurring for it at a given time. This prevents any concurrent
+
/// fetches from clobbering overlapping references.
+
///
+
/// If the repository is actively being fetched, then that fetch will be queued
+
/// for a later attempt.
+
///
+
/// For any given node, there is a configurable capacity so that only `N` number
+
/// of fetches can happen with it concurrently. This does not guarantee that the
+
/// node will actually allow this node to fetch from it – since it will maintain
+
/// its own capacity for connections and load.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct FetcherState {
+
    /// The active fetches that are occurring, ensuring only one fetch per repository.
+
    active: BTreeMap<RepoId, ActiveFetch>,
+
    /// The queued fetches, waiting to happen, where each node maintains its own queue.
+
    queues: BTreeMap<NodeId, Queue>,
+
    /// Configuration for maintaining the fetch state.
+
    config: Config,
+
}
+

+
impl Default for FetcherState {
+
    fn default() -> Self {
+
        Self::new(Config::default())
+
    }
+
}
+

+
impl FetcherState {
+
    /// Initialize the [`FetcherState`] with the given [`Config`].
+
    pub fn new(config: Config) -> Self {
+
        Self {
+
            active: BTreeMap::new(),
+
            queues: BTreeMap::new(),
+
            config,
+
        }
+
    }
+
}
+

+
impl FetcherState {
+
    /// Process the handling of a [`Command`], delegating to its corresponding
+
    /// method, and returning the corresponding [`Event`].
+
    ///
+
    /// This method is useful if the [`FetcherState`] is used in batch
+
    /// processing and does need to be explicit about the underlying method.
+
    pub fn handle(&mut self, command: Command) -> Event {
+
        match command {
+
            Command::Fetch(fetch) => self.fetch(fetch).into(),
+
            Command::Fetched(fetched) => self.fetched(fetched).into(),
+
            Command::Cancel(cancel) => self.cancel(cancel).into(),
+
        }
+
    }
+

+
    /// Process a [`Fetch`] command, which transitions the given fetch to
+
    /// active, if possible.
+
    ///
+
    /// The fetch will only transition to being active if:
+
    ///
+
    ///   - A fetch is not already happening for that repository, in which case it gets queued.
+
    ///   - The node to be fetched from is not already at capacity, again it will be queued.
+
    ///
+
    /// [`Fetch`]: command::Fetch
+
    pub fn fetch(
+
        &mut self,
+
        command::Fetch {
+
            from,
+
            rid,
+
            refs_at,
+
            timeout,
+
        }: command::Fetch,
+
    ) -> event::Fetch {
+
        if let Some(active) = self.active.get(&rid) {
+
            if active.refs_at == refs_at && active.from == from {
+
                return event::Fetch::AlreadyFetching { rid, from };
+
            } else {
+
                return self.enqueue(rid, from, refs_at, timeout);
+
            }
+
        }
+

+
        if self.is_at_node_capacity(&from) {
+
            self.enqueue(rid, from, refs_at, timeout)
+
        } else {
+
            self.active.insert(
+
                rid,
+
                ActiveFetch {
+
                    from,
+
                    refs_at: refs_at.clone(),
+
                },
+
            );
+
            event::Fetch::Started {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            }
+
        }
+
    }
+

+
    /// Process a [`Fetched`] command, which removes the given fetch from the set of active fetches.
+
    /// Note that this is agnostic of whether the fetch succeeded or failed.
+
    ///
+
    /// The caller will be notified if the completed fetch did not exist in the active set.
+
    ///
+
    /// [`Fetched`]: command::Fetched
+
    pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
+
        match self.active.remove(&rid) {
+
            None => event::Fetched::NotFound { from, rid },
+
            Some(ActiveFetch { from, refs_at }) => event::Fetched::Completed { from, rid, refs_at },
+
        }
+
    }
+

+
    /// Attempt to dequeue a [`QueuedFetch`] for the given node.
+
    ///
+
    /// This will only dequeue the fetch if it is not active, and the given node
+
    /// is not at capacity.
+
    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
+
        let is_at_capacity = self.is_at_node_capacity(from);
+
        let queue = self.queues.get_mut(from)?;
+
        let active = &self.active;
+
        queue.try_dequeue(|QueuedFetch { rid, .. }| !is_at_capacity && !active.contains_key(rid))
+
    }
+

+
    /// Process a [`Cancel`] command, which cancels any active and/or queued
+
    /// fetches for that given node.
+
    ///
+
    /// [`Cancel`]: command::Cancel
+
    pub fn cancel(&mut self, command::Cancel { from }: command::Cancel) -> event::Cancel {
+
        let cancelled: Vec<_> = self
+
            .active
+
            .iter()
+
            .filter_map(|(rid, f)| (f.from == from).then_some(*rid))
+
            .collect();
+
        let ongoing: BTreeMap<_, _> = cancelled
+
            .iter()
+
            .filter_map(|rid| self.active.remove(rid).map(|f| (*rid, f)))
+
            .collect();
+
        let ongoing = (!ongoing.is_empty()).then_some(ongoing);
+
        let queued = self.queues.remove(&from).filter(|queue| !queue.is_empty());
+

+
        match (ongoing, queued) {
+
            (None, None) => event::Cancel::Unexpected { from },
+
            (ongoing, queued) => event::Cancel::Canceled {
+
                from,
+
                active: ongoing.unwrap_or_default(),
+
                queued: queued.map(|q| q.queue).unwrap_or_default(),
+
            },
+
        }
+
    }
+

+
    fn enqueue(
+
        &mut self,
+
        rid: RepoId,
+
        from: NodeId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    ) -> event::Fetch {
+
        let queue = self
+
            .queues
+
            .entry(from)
+
            .or_insert(Queue::new(self.config.maximum_queue_size));
+
        match queue.enqueue(QueuedFetch {
+
            rid,
+
            from,
+
            refs_at,
+
            timeout,
+
        }) {
+
            Enqueue::CapacityReached(QueuedFetch {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            }) => event::Fetch::QueueAtCapacity {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
                capacity: queue.len(),
+
            },
+
            Enqueue::Queued => event::Fetch::Queued { rid, from },
+
            Enqueue::Merged => event::Fetch::Queued { rid, from },
+
        }
+
    }
+
}
+

+
impl FetcherState {
+
    /// Get the set of queued fetches.
+
    pub fn queued_fetches(&self) -> &BTreeMap<NodeId, Queue> {
+
        &self.queues
+
    }
+

+
    /// Get the set of active fetches.
+
    pub fn active_fetches(&self) -> &BTreeMap<RepoId, ActiveFetch> {
+
        &self.active
+
    }
+

+
    /// Get the [`ActiveFetch`] for the provided [`RepoId`], returning `None` if
+
    /// it does not exist.
+
    pub fn get_active_fetch(&self, rid: &RepoId) -> Option<&ActiveFetch> {
+
        self.active.get(rid)
+
    }
+

+
    /// Check if the number of fetches exceeds the maximum number of concurrent
+
    /// fetches for a given [`NodeId`].
+
    ///
+
    /// Returns `true` if the fetcher is fetching the maximum number of
+
    /// repositories, for that node.
+
    fn is_at_node_capacity(&self, node: &NodeId) -> bool {
+
        let count = self.active.values().filter(|f| &f.from == node).count();
+
        count >= self.config.maximum_concurrency.into()
+
    }
+
}
+

+
/// Configuration for the [`FetcherState`].
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct Config {
+
    /// Maximum number of concurrent fetches per peer connection.
+
    maximum_concurrency: NonZeroUsize,
+
    /// Maximum fetching queue size for a single node.
+
    maximum_queue_size: MaxQueueSize,
+
}
+

+
impl Config {
+
    pub fn new() -> Self {
+
        Self::default()
+
    }
+

+
    /// Maximum fetching queue size for a single node.
+
    pub fn with_max_capacity(mut self, capacity: MaxQueueSize) -> Self {
+
        self.maximum_queue_size = capacity;
+
        self
+
    }
+

+
    /// Maximum number of concurrent fetches per peer connection.
+
    pub fn with_max_concurrency(mut self, concurrency: NonZeroUsize) -> Self {
+
        self.maximum_concurrency = concurrency;
+
        self
+
    }
+
}
+

+
impl Default for Config {
+
    fn default() -> Self {
+
        Self {
+
            maximum_concurrency: MAX_CONCURRENCY,
+
            maximum_queue_size: MaxQueueSize::default(),
+
        }
+
    }
+
}
+

+
/// An active fetch represents a repository being fetched by a particular node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct ActiveFetch {
+
    pub(super) from: NodeId,
+
    pub(super) refs_at: Vec<RefsAt>,
+
}
+

+
impl ActiveFetch {
+
    /// The node from which the repository is being fetched.
+
    pub fn from(&self) -> &NodeId {
+
        &self.from
+
    }
+

+
    /// The set of references that fetch is being performed for.
+
    pub fn refs_at(&self) -> &[RefsAt] {
+
        &self.refs_at
+
    }
+
}
+

+
/// A fetch that is waiting to be processed, in the fetch queue.
+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+
pub struct QueuedFetch {
+
    /// The repository that will be fetched.
+
    pub rid: RepoId,
+
    // TODO(finto): this might be redundant, since queues are per node
+
    /// The peer from which the repository will be fetched from.
+
    pub from: NodeId,
+
    /// The references that the fetch is being performed for.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    pub timeout: time::Duration,
+
}
+

+
/// A queue for keeping track of fetches.
+
///
+
/// It ensures that the queue contains unique items for fetching, and does not
+
/// exceed the provided maximum capacity.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Queue {
+
    queue: VecDeque<QueuedFetch>,
+
    max_queue_size: MaxQueueSize,
+
}
+

+
/// The maximum number of fetches that can be queued for a single node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct MaxQueueSize(usize);
+

+
impl MaxQueueSize {
+
    /// Minimum queue size is `1`.
+
    pub const MIN: Self = MaxQueueSize(1);
+

+
    /// Create a queue size, that must be larger than `0`.
+
    pub fn new(size: NonZeroUsize) -> Self {
+
        Self(size.into())
+
    }
+

+
    pub fn as_usize(&self) -> usize {
+
        self.0
+
    }
+

+
    /// Checks if the `n` provided exceeds the maximum queue size.
+
    fn is_exceeded_by(&self, n: usize) -> bool {
+
        n >= self.0
+
    }
+
}
+

+
impl Default for MaxQueueSize {
+
    fn default() -> Self {
+
        Self(MAX_FETCH_QUEUE_SIZE)
+
    }
+
}
+

+
/// The result of [`Queue::enqueue`].
+
#[must_use]
+
#[derive(Debug, PartialEq, Eq)]
+
pub(super) enum Enqueue {
+
    /// The capacity of the queue has been exceeded, and the [`QueuedFetch`] is
+
    /// returned.
+
    CapacityReached(QueuedFetch),
+
    /// The [`QueuedFetch`] was successfully queued.
+
    Queued,
+
    Merged,
+
}
+

+
impl Queue {
+
    /// Create the [`Queue`] with the given [`MaxQueueSize`].
+
    pub(super) fn new(max_queue_size: MaxQueueSize) -> Self {
+
        Self {
+
            queue: VecDeque::with_capacity(max_queue_size.0),
+
            max_queue_size,
+
        }
+
    }
+

+
    /// The current number of items in the queue.
+
    pub(super) fn len(&self) -> usize {
+
        self.queue.len()
+
    }
+

+
    /// Returns `true` if the [`Queue`] is empty.
+
    pub(super) fn is_empty(&self) -> bool {
+
        self.queue.is_empty()
+
    }
+

+
    /// Enqueues a fetch onto the back of the queue, and will only succeed if
+
    /// the queue has not reached capacity and if the item is unique.
+
    pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
+
        if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
+
            if existing.refs_at.is_empty() || fetch.refs_at.is_empty() {
+
                // We fetch everything
+
                existing.refs_at = vec![]
+
            } else {
+
                existing.refs_at.extend(fetch.refs_at);
+
            }
+
            // Take the longer timeout (more generous)
+
            existing.timeout = existing.timeout.max(fetch.timeout);
+
            return Enqueue::Merged;
+
        }
+

+
        if self.max_queue_size.is_exceeded_by(self.queue.len()) {
+
            Enqueue::CapacityReached(fetch)
+
        } else {
+
            self.queue.push_back(fetch);
+
            Enqueue::Queued
+
        }
+
    }
+

+
    /// Try to dequeue the next [`QueuedFetch`], but only if the `predicate`
+
    /// holds, otherwise it will be pushed back to the front of the queue.
+
    pub(super) fn try_dequeue<P>(&mut self, predicate: P) -> Option<QueuedFetch>
+
    where
+
        P: FnOnce(&QueuedFetch) -> bool,
+
    {
+
        let fetch = self.dequeue()?;
+
        if predicate(&fetch) {
+
            Some(fetch)
+
        } else {
+
            self.queue.push_front(fetch);
+
            None
+
        }
+
    }
+

+
    /// Dequeues a fetch from the front of the queue.
+
    pub(super) fn dequeue(&mut self) -> Option<QueuedFetch> {
+
        self.queue.pop_front()
+
    }
+

+
    /// Return an iterator over the queued fetches.
+
    pub fn iter<'a>(&'a self) -> QueueIter<'a> {
+
        QueueIter {
+
            inner: self.queue.iter(),
+
        }
+
    }
+
}
+

+
/// Iterator of the [`QueuedFetch`]'s
+
pub struct QueueIter<'a> {
+
    inner: std::collections::vec_deque::Iter<'a, QueuedFetch>,
+
}
+

+
impl<'a> Iterator for QueueIter<'a> {
+
    type Item = &'a QueuedFetch;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.inner.next()
+
    }
+
}
+

+
impl<'a> IntoIterator for &'a Queue {
+
    type Item = &'a QueuedFetch;
+
    type IntoIter = QueueIter<'a>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.iter()
+
    }
+
}
added crates/radicle-protocol/src/fetcher/state/command.rs
@@ -0,0 +1,80 @@
+
use std::time;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
/// Commands for transitioning the [`FetcherState`].
+
///
+
/// [`FetcherState`]: super::FetcherState
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Command {
+
    Fetch(Fetch),
+
    Fetched(Fetched),
+
    Cancel(Cancel),
+
}
+

+
impl From<Fetch> for Command {
+
    fn from(v: Fetch) -> Self {
+
        Self::Fetch(v)
+
    }
+
}
+

+
impl From<Fetched> for Command {
+
    fn from(v: Fetched) -> Self {
+
        Self::Fetched(v)
+
    }
+
}
+

+
impl From<Cancel> for Command {
+
    fn from(v: Cancel) -> Self {
+
        Self::Cancel(v)
+
    }
+
}
+

+
impl Command {
+
    pub fn fetch(from: NodeId, rid: RepoId, refs_at: Vec<RefsAt>, timeout: time::Duration) -> Self {
+
        Self::from(Fetch {
+
            from,
+
            rid,
+
            refs_at,
+
            timeout,
+
        })
+
    }
+

+
    pub fn fetched(from: NodeId, rid: RepoId) -> Self {
+
        Self::from(Fetched { from, rid })
+
    }
+

+
    pub fn cancel(from: NodeId) -> Self {
+
        Self::from(Cancel { from })
+
    }
+
}
+

+
/// A fetch wants to be marked as active.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Fetch {
+
    /// The node from which the repository is being fetched from.
+
    pub from: NodeId,
+
    /// The repository to fetch.
+
    pub rid: RepoId,
+
    /// The references to fetch.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout for the fetch process.
+
    pub timeout: time::Duration,
+
}
+

+
/// A fetch wants to be marked as completed.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct Fetched {
+
    /// The node from which the repository was fetched from.
+
    pub from: NodeId,
+
    /// The repository that was fetch.
+
    pub rid: RepoId,
+
}
+

+
/// Any fetches are canceled for the given node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct Cancel {
+
    /// The node for which the fetches should be canceled.
+
    pub from: NodeId,
+
}
added crates/radicle-protocol/src/fetcher/state/event.rs
@@ -0,0 +1,112 @@
+
use std::collections::{BTreeMap, VecDeque};
+
use std::time;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
use super::{ActiveFetch, QueuedFetch};
+

+
/// Event returned from [`FetchState::handle`].
+
///
+
/// [`FetchState::handle`]: FetchState::handle.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Event {
+
    Fetch(Fetch),
+
    Fetched(Fetched),
+
    Cancel(Cancel),
+
}
+

+
impl From<Cancel> for Event {
+
    fn from(v: Cancel) -> Self {
+
        Self::Cancel(v)
+
    }
+
}
+

+
impl From<Fetched> for Event {
+
    fn from(v: Fetched) -> Self {
+
        Self::Fetched(v)
+
    }
+
}
+

+
impl From<Fetch> for Event {
+
    fn from(v: Fetch) -> Self {
+
        Self::Fetch(v)
+
    }
+
}
+

+
/// Events that occur when a repository is requested to be fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetch {
+
    /// The fetch can be started by the caller.
+
    Started {
+
        /// The repository to be fetched.
+
        rid: RepoId,
+
        /// The node to fetch from.
+
        from: NodeId,
+
        /// The references to be fetched.
+
        refs_at: Vec<RefsAt>,
+
        /// The timeout for the fetch process.
+
        timeout: time::Duration,
+
    },
+
    /// The repository is already being fetched from the given node.
+
    AlreadyFetching {
+
        /// The repository being actively fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        from: NodeId,
+
    },
+
    /// The queue for the given node is at capacity, and can no longer accept
+
    /// any more fetch requests.
+
    QueueAtCapacity {
+
        /// The rejected repository.
+
        rid: RepoId,
+
        /// The node who's queue is at capacity.
+
        from: NodeId,
+
        /// The references expected to be fetched.
+
        refs_at: Vec<RefsAt>,
+
        /// The timeout for the fetch process.
+
        timeout: time::Duration,
+
        /// The capacity of the queue.
+
        capacity: usize,
+
    },
+
    /// The fetch was queued for later processing.
+
    Queued {
+
        /// The repository to be fetched.
+
        rid: RepoId,
+
        /// The node to fetch from.
+
        from: NodeId,
+
    },
+
}
+

+
/// Events that occur after a repository has been fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetched {
+
    /// There was no ongoing fetch for the given [`NodeId`] and [`RepoId`].
+
    NotFound { from: NodeId, rid: RepoId },
+
    /// The active fetch was marked as completed and removed from the active
+
    /// set.
+
    Completed {
+
        /// The node the repository was fetched from.
+
        from: NodeId,
+
        /// The repository that was fetched.
+
        rid: RepoId,
+
        /// The references that were fetched.
+
        refs_at: Vec<RefsAt>,
+
    },
+
}
+

+
/// Events that occur when a fetch was canceled for a given node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Cancel {
+
    /// There were no active or queued fetches for the given node.
+
    Unexpected { from: NodeId },
+
    /// The were active or queued fetches that were canceled for the given node.
+
    Canceled {
+
        /// The node which was canceled.
+
        from: NodeId,
+
        /// The active fetches that were canceled.
+
        active: BTreeMap<RepoId, ActiveFetch>,
+
        /// The queued fetched that were canceled.
+
        queued: VecDeque<QueuedFetch>,
+
    },
+
}
added crates/radicle-protocol/src/fetcher/test.rs
@@ -0,0 +1,2 @@
+
mod queue;
+
mod state;
added crates/radicle-protocol/src/fetcher/test/arbitrary.rs
@@ -0,0 +1,52 @@
+
use std::collections::HashSet;
+

+
use radicle::{identity::DocAt, test::arbitrary};
+

+
use crate::fetcher::{commands, Command, Fetched};
+

+
impl qcheck::Arbitrary for Fetched {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        Fetched {
+
            updated: vec![],
+
            namespaces: HashSet::arbitrary(g),
+
            clone: bool::arbitrary(g),
+
            doc: DocAt::arbitrary(g),
+
        }
+
    }
+
}
+

+
impl qcheck::Arbitrary for Command {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        todo!()
+
    }
+
}
+

+
impl qcheck::Arbitrary for commands::Fetch {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        todo!()
+
    }
+
}
+

+
impl qcheck::Arbitrary for commands::Fetched {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        g.choose(&[
+
            commands::Fetched::DequeueFetches,
+
            commands::Fetched::Fetched {
+
                from: arbitrary::gen(g.size()),
+
                rid: arbitrary::gen(g.size()),
+
            },
+
        ])
+
        .cloned()
+
        .unwrap()
+
    }
+
}
+

+
impl qcheck::Arbitrary for commands::Dequeue {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        g.choose(&[commands::Dequeue::Nodes {
+
            nodes: arbitrary::gen(5),
+
        }])
+
        .cloned()
+
        .unwrap()
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue.rs
@@ -0,0 +1,35 @@
+
mod helpers;
+
mod properties;
+
mod unit;
+

+
use std::num::NonZeroUsize;
+
use std::time::Duration;
+

+
use qcheck::Arbitrary;
+

+
use radicle::storage::refs::RefsAt;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{MaxQueueSize, QueuedFetch};
+

+
impl Arbitrary for QueuedFetch {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        // Limit refs_at size to avoid slow shrinking
+
        let refs_at_len = usize::arbitrary(g) % 4;
+
        let refs_at: Vec<RefsAt> = (0..refs_at_len).map(|_| RefsAt::arbitrary(g)).collect();
+

+
        QueuedFetch {
+
            rid: RepoId::arbitrary(g),
+
            from: NodeId::arbitrary(g),
+
            refs_at,
+
            timeout: Duration::from_secs(u64::arbitrary(g) % 3600),
+
        }
+
    }
+
}
+

+
impl Arbitrary for MaxQueueSize {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        let size = NonZeroUsize::MIN.saturating_add(usize::arbitrary(g) % 255);
+
        MaxQueueSize::new(size)
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue/helpers.rs
@@ -0,0 +1,25 @@
+
use std::{num::NonZeroUsize, time::Duration};
+

+
use radicle::test::arbitrary;
+

+
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};
+

+
pub fn create_queue(capacity: usize) -> Queue {
+
    Queue::new(MaxQueueSize::new(
+
        NonZeroUsize::new(capacity).expect("capacity must be non-zero"),
+
    ))
+
}
+

+
pub fn create_fetch() -> QueuedFetch {
+
    QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    }
+
}
+

+
/// Generate a vector of unique QueuedFetch items (unique by rid)
+
pub fn unique_fetches(count: usize) -> Vec<QueuedFetch> {
+
    (0..count).map(|_| create_fetch()).collect()
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties.rs
@@ -0,0 +1,5 @@
+
mod capacity;
+
mod dequeue;
+
mod equality;
+
mod fifo;
+
mod merge;
added crates/radicle-protocol/src/fetcher/test/queue/properties/capacity.rs
@@ -0,0 +1,74 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::{state::Enqueue, MaxQueueSize};
+
use crate::fetcher::{Queue, QueuedFetch};
+

+
#[quickcheck]
+
fn bounded(max_size: MaxQueueSize, num_enqueues: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    for _ in 0..num_enqueues {
+
        let _ = queue.enqueue(create_fetch());
+

+
        // Invariant: length never exceeds capacity
+
        if queue.len() > max_size.as_usize() {
+
            return false;
+
        }
+
    }
+
    true
+
}
+

+
#[quickcheck]
+
fn rejection(max_size: MaxQueueSize) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    // Fill to capacity with unique items
+
    let items = unique_fetches(max_size.as_usize());
+
    for item in &items {
+
        if queue.enqueue(item.clone()) != Enqueue::Queued {
+
            return false;
+
        }
+
    }
+

+
    // Next enqueue of a NEW item must be rejected
+
    matches!(queue.enqueue(create_fetch()), Enqueue::CapacityReached(_))
+
}
+

+
#[quickcheck]
+
fn restored_after_dequeue(max_size: MaxQueueSize, dequeue_count: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    // Fill to capacity
+
    for _ in 0..max_size.as_usize() {
+
        let _ = queue.enqueue(create_fetch());
+
    }
+

+
    // Dequeue some items
+
    let to_dequeue = (dequeue_count as usize).min(max_size.as_usize());
+
    for _ in 0..to_dequeue {
+
        let _ = queue.dequeue();
+
    }
+

+
    // Should be able to enqueue exactly that many items again
+
    for _ in 0..to_dequeue {
+
        if queue.enqueue(create_fetch()) != Enqueue::Queued {
+
            return false;
+
        }
+
    }
+

+
    // Next enqueue should fail
+
    matches!(queue.enqueue(create_fetch()), Enqueue::CapacityReached(_))
+
}
+

+
#[quickcheck]
+
fn capacity_reached_returns_same_item(item: QueuedFetch) -> bool {
+
    let mut queue = create_queue(1);
+
    let _ = queue.enqueue(create_fetch()); // Fill the queue
+

+
    match queue.enqueue(item.clone()) {
+
        Enqueue::CapacityReached(returned) => returned == item,
+
        Enqueue::Merged => true, // If same rid, merge takes precedence
+
        _ => false,
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/dequeue.rs
@@ -0,0 +1,56 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::{MaxQueueSize, Queue};
+

+
#[quickcheck]
+
fn enables_reenqueue(count: u8) -> bool {
+
    let count = ((count as usize) % 20).max(1);
+
    let items = unique_fetches(count);
+

+
    let mut queue = create_queue(count); // Exact capacity
+

+
    for item in &items {
+
        let _ = queue.enqueue(item.clone());
+
    }
+

+
    // Queue is full, dequeue first item
+
    let dequeued = queue.dequeue();
+
    if dequeued.is_none() {
+
        return false;
+
    }
+

+
    // Should be able to enqueue a new item now
+
    queue.enqueue(create_fetch()) == Enqueue::Queued
+
}
+

+
#[quickcheck]
+
fn empty_queue_returns_none(max_size: MaxQueueSize, dequeue_attempts: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+

+
    // Multiple dequeues from empty queue should all return None
+
    for _ in 0..dequeue_attempts {
+
        if queue.dequeue().is_some() {
+
            return false;
+
        }
+
    }
+
    true
+
}
+

+
#[quickcheck]
+
fn drained_queue_returns_none(max_size: MaxQueueSize, fill_count: u8) -> bool {
+
    let mut queue = Queue::new(max_size);
+
    let fill = (fill_count as usize).min(max_size.as_usize());
+

+
    // Fill then drain
+
    for _ in 0..fill {
+
        let _ = queue.enqueue(create_fetch());
+
    }
+
    for _ in 0..fill {
+
        let _ = queue.dequeue();
+
    }
+

+
    // Should return None now
+
    queue.dequeue().is_none()
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/equality.rs
@@ -0,0 +1,22 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::QueuedFetch;
+

+
#[quickcheck]
+
fn reflexive(item: QueuedFetch) -> bool {
+
    item == item.clone()
+
}
+

+
#[quickcheck]
+
fn symmetric(a: QueuedFetch, b: QueuedFetch) -> bool {
+
    (a == b) == (b == a)
+
}
+

+
#[quickcheck]
+
fn transitive(a: QueuedFetch, b: QueuedFetch, c: QueuedFetch) -> bool {
+
    if a == b && b == c {
+
        a == c
+
    } else {
+
        true
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/fifo.rs
@@ -0,0 +1,75 @@
+
use qcheck_macros::quickcheck;
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::QueuedFetch;
+

+
#[quickcheck]
+
fn ordering(count: u8) -> bool {
+
    let count = (count as usize) % 50; // Reasonable upper bound
+
    if count == 0 {
+
        return true;
+
    }
+

+
    let items = unique_fetches(count);
+
    let mut queue = create_queue(count);
+

+
    // Enqueue all items
+
    for item in &items {
+
        if queue.enqueue(item.clone()) != Enqueue::Queued {
+
            return false;
+
        }
+
    }
+

+
    // Dequeue and verify order
+
    for expected in items {
+
        match queue.dequeue() {
+
            Some(actual) if actual.rid == expected.rid => continue,
+
            _ => return false,
+
        }
+
    }
+

+
    queue.is_empty()
+
}
+

+
#[quickcheck]
+
fn interleaved_operations(ops: Vec<bool>) -> bool {
+
    // Limit operations to avoid slow tests
+
    let ops: Vec<_> = ops.into_iter().take(100).collect();
+
    let capacity = ops.len().max(1);
+

+
    let mut queue = create_queue(capacity);
+
    let mut expected_order: Vec<QueuedFetch> = Vec::new();
+
    let mut dequeue_index = 0;
+

+
    for op in ops {
+
        if op {
+
            // Enqueue
+
            let item = create_fetch();
+
            match queue.enqueue(item.clone()) {
+
                Enqueue::Queued => expected_order.push(item),
+
                Enqueue::CapacityReached(_) => {} // Expected when full
+
                Enqueue::Merged => {}             // Can happen if same rid generated
+
            }
+
        } else {
+
            // Dequeue
+
            match queue.dequeue() {
+
                Some(item) => {
+
                    if dequeue_index >= expected_order.len()
+
                        || item.rid != expected_order[dequeue_index].rid
+
                    {
+
                        return false;
+
                    }
+
                    dequeue_index += 1;
+
                }
+
                None => {
+
                    // Should only happen if we've dequeued everything we enqueued
+
                    if dequeue_index != expected_order.len() {
+
                        return false;
+
                    }
+
                }
+
            }
+
        }
+
    }
+
    true
+
}
added crates/radicle-protocol/src/fetcher/test/queue/properties/merge.rs
@@ -0,0 +1,221 @@
+
use std::time::Duration;
+

+
use qcheck_macros::quickcheck;
+
use radicle::storage::refs::RefsAt;
+
use radicle::test::arbitrary;
+
use radicle_core::RepoId;
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};
+

+
#[quickcheck]
+
fn same_rid_merges_anywhere_in_queue(max_size: MaxQueueSize, merge_index: usize) -> bool {
+
    if max_size.as_usize() < 2 {
+
        return true; // Need at least 2 slots to test properly
+
    }
+

+
    let mut queue = Queue::new(max_size);
+
    let items = unique_fetches(max_size.as_usize() - 1); // Leave room for potential new item
+

+
    for item in &items {
+
        let _ = queue.enqueue(item.clone());
+
    }
+

+
    if items.is_empty() {
+
        return true;
+
    }
+

+
    // Try to enqueue an item with same rid as one already in queue
+
    let target_index = merge_index % items.len();
+
    let same_rid_item = QueuedFetch {
+
        rid: items[target_index].rid,
+
        from: arbitrary::gen(1), // Different from
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    };
+

+
    matches!(queue.enqueue(same_rid_item), Enqueue::Merged)
+
}
+

+
#[quickcheck]
+
fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {
+
    let base_refs_count = (base_refs_count as usize) % 5;
+
    let merge_refs_count = (merge_refs_count as usize) % 5;
+

+
    let mut queue = create_queue(10);
+

+
    let rid: RepoId = arbitrary::gen(1);
+
    let base_refs: Vec<RefsAt> = (0..base_refs_count).map(|_| arbitrary::gen(1)).collect();
+
    let merge_refs: Vec<RefsAt> = (0..merge_refs_count).map(|_| arbitrary::gen(1)).collect();
+

+
    let base_item = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: base_refs.clone(),
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let merge_item = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: merge_refs.clone(),
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let _ = queue.enqueue(base_item);
+
    let result = queue.enqueue(merge_item);
+

+
    if result != Enqueue::Merged {
+
        return false;
+
    }
+

+
    let dequeued = queue.dequeue().unwrap();
+

+
    // If either was empty, result should be empty (fetch everything)
+
    if base_refs.is_empty() || merge_refs.is_empty() {
+
        dequeued.refs_at.is_empty()
+
    } else {
+
        // Otherwise refs should be combined
+
        dequeued.refs_at.len() == base_refs_count + merge_refs_count
+
    }
+
}
+

+
#[quickcheck]
+
fn empty_refs_fetches_all() -> bool {
+
    let mut queue = create_queue(10);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    // First enqueue with specific refs
+
    let item_with_refs = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1), arbitrary::gen(1)],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    // Second enqueue with empty refs (fetch everything)
+
    let item_empty_refs = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let _ = queue.enqueue(item_with_refs);
+
    let _ = queue.enqueue(item_empty_refs);
+

+
    let dequeued = queue.dequeue().unwrap();
+
    dequeued.refs_at.is_empty() // Should fetch everything
+
}
+

+
#[quickcheck]
+
fn longer_timeout_preserved(short_secs: u16, long_secs: u16) -> bool {
+
    let short = Duration::from_secs(short_secs.min(long_secs) as u64);
+
    let long = Duration::from_secs(short_secs.max(long_secs) as u64);
+

+
    let mut queue = create_queue(10);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    let item_short = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: short,
+
    };
+

+
    let item_long = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: long,
+
    };
+

+
    // Test both orderings
+
    let _ = queue.enqueue(item_short.clone());
+
    let _ = queue.enqueue(item_long.clone());
+
    let dequeued1 = queue.dequeue().unwrap();
+

+
    let mut queue2 = create_queue(10);
+
    let _ = queue2.enqueue(item_long);
+
    let _ = queue2.enqueue(item_short);
+
    let dequeued2 = queue2.dequeue().unwrap();
+

+
    dequeued1.timeout == long && dequeued2.timeout == long
+
}
+

+
#[quickcheck]
+
fn does_not_increase_queue_length() -> bool {
+
    let mut queue = create_queue(10);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    let item1 = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let item2 = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    };
+

+
    let _ = queue.enqueue(item1);
+
    let len_after_first = queue.len();
+

+
    let _ = queue.enqueue(item2);
+
    let len_after_merge = queue.len();
+

+
    len_after_first == 1 && len_after_merge == 1
+
}
+

+
#[quickcheck]
+
fn different_rid_accepted(base_item: QueuedFetch) -> bool {
+
    let mut queue = create_queue(10);
+
    let _ = queue.enqueue(base_item.clone());
+

+
    // Item with different rid should be queued (not merged)
+
    let different_rid = QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        ..base_item
+
    };
+

+
    queue.enqueue(different_rid) == Enqueue::Queued
+
}
+

+
#[quickcheck]
+
fn succeed_when_at_capacity() -> bool {
+
    // When queue is at capacity, merging with existing item should still work
+
    let mut queue = create_queue(2);
+
    let rid: RepoId = arbitrary::gen(1);
+

+
    let item1 = QueuedFetch {
+
        rid,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let item2 = QueuedFetch {
+
        rid: arbitrary::gen(1), // Different rid
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    };
+

+
    let merge_item = QueuedFetch {
+
        rid, // Same as item1
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    };
+

+
    let _ = queue.enqueue(item1);
+
    let _ = queue.enqueue(item2);
+

+
    // Queue is now at capacity, but merge should still work
+
    queue.enqueue(merge_item) == Enqueue::Merged
+
}
added crates/radicle-protocol/src/fetcher/test/queue/unit.rs
@@ -0,0 +1,113 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::Enqueue;
+
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::QueuedFetch;
+

+
#[test]
+
fn zero_timeout_accepted() {
+
    let mut queue = create_queue(10);
+
    let item = QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::ZERO,
+
    };
+
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
+
}
+

+
#[test]
+
fn max_timeout_accepted() {
+
    let mut queue = create_queue(10);
+
    let item = QueuedFetch {
+
        rid: arbitrary::gen(1),
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::MAX,
+
    };
+
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
+
}
+

+
#[test]
+
fn empty_refs_at_items_can_be_equal() {
+
    let rid: RepoId = arbitrary::gen(1);
+
    let from: NodeId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    let item1 = QueuedFetch {
+
        rid,
+
        from,
+
        refs_at: vec![],
+
        timeout,
+
    };
+
    let item2 = QueuedFetch {
+
        rid,
+
        from,
+
        refs_at: vec![],
+
        timeout,
+
    };
+

+
    assert_eq!(item1, item2);
+
}
+

+
#[test]
+
fn merge_preserves_position_in_queue() {
+
    let mut queue = create_queue(10);
+

+
    let rid_first: RepoId = arbitrary::gen(1);
+
    let rid_second: RepoId = arbitrary::gen(2);
+
    let rid_third: RepoId = arbitrary::gen(3);
+

+
    // Enqueue three items
+
    let _ = queue.enqueue(QueuedFetch {
+
        rid: rid_first,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    });
+
    let _ = queue.enqueue(QueuedFetch {
+
        rid: rid_second,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    });
+
    let _ = queue.enqueue(QueuedFetch {
+
        rid: rid_third,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![],
+
        timeout: Duration::from_secs(30),
+
    });
+

+
    // Merge into the second item
+
    let result = queue.enqueue(QueuedFetch {
+
        rid: rid_second,
+
        from: arbitrary::gen(1),
+
        refs_at: vec![arbitrary::gen(1)],
+
        timeout: Duration::from_secs(60),
+
    });
+
    assert_eq!(result, Enqueue::Merged);
+

+
    // Order should be preserved: first, second (merged), third
+
    assert_eq!(queue.dequeue().unwrap().rid, rid_first);
+
    assert_eq!(queue.dequeue().unwrap().rid, rid_second);
+
    assert_eq!(queue.dequeue().unwrap().rid, rid_third);
+
}
+

+
#[test]
+
fn capacity_takes_precedence_over_merge_for_new_items() {
+
    let mut queue = create_queue(2);
+

+
    // Fill to capacity with unique items
+
    let _ = queue.enqueue(create_fetch());
+
    let _ = queue.enqueue(create_fetch());
+

+
    // New item (different rid) should be rejected
+
    let new_item = create_fetch();
+
    match queue.enqueue(new_item.clone()) {
+
        Enqueue::CapacityReached(returned) => assert_eq!(returned, new_item),
+
        _ => panic!("Expected CapacityReached"),
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/state.rs
@@ -0,0 +1,7 @@
+
mod command;
+
mod concurrent;
+
mod config;
+
mod dequeue;
+
mod helpers;
+
mod invariant;
+
mod multinode;
added crates/radicle-protocol/src/fetcher/test/state/command.rs
@@ -0,0 +1,3 @@
+
mod cancel;
+
mod fetch;
+
mod fetched;
added crates/radicle-protocol/src/fetcher/test/state/command/cancel.rs
@@ -0,0 +1,129 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::{ActiveFetch, FetcherState};
+

+
#[test]
+
fn single_ongoing() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.cancel(command::Cancel { from: node_a });
+

+
    match event {
+
        event::Cancel::Canceled {
+
            from,
+
            active: ongoing,
+
            queued,
+
        } => {
+
            assert_eq!(from, node_a);
+
            assert_eq!(ongoing.len(), 1);
+
            assert_eq!(
+
                ongoing.get(&repo_1),
+
                Some(&ActiveFetch {
+
                    from: node_a,
+
                    refs_at: refs_at_1,
+
                })
+
            );
+
            assert!(queued.is_empty());
+
        }
+
        _ => panic!("Expected Canceled event"),
+
    }
+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
}
+

+
#[test]
+
fn ongoing_and_queued() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.cancel(command::Cancel { from: node_a });
+

+
    match event {
+
        event::Cancel::Canceled {
+
            active: ongoing,
+
            queued,
+
            ..
+
        } => {
+
            assert_eq!(ongoing.len(), 1);
+
            assert!(ongoing.contains_key(&repo_1));
+
            assert_eq!(queued.len(), 2);
+
        }
+
        _ => panic!("Expected Canceled event"),
+
    }
+
}
+

+
#[test]
+
fn non_existent_returns_unexpected() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_unknown: NodeId = arbitrary::gen(1);
+

+
    let event = state.cancel(command::Cancel { from: node_unknown });
+

+
    assert_eq!(event, event::Cancel::Unexpected { from: node_unknown });
+
}
+

+
#[test]
+
fn cancellation_is_isolated() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.cancel(command::Cancel { from: node_a });
+

+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
    assert!(state.get_active_fetch(&repo_2).is_some());
+
}
added crates/radicle-protocol/src/fetcher/test/state/command/fetch.rs
@@ -0,0 +1,416 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::{ActiveFetch, FetcherState};
+

+
#[test]
+
fn fetch_start_first_fetch_for_node() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::Started {
+
            rid: repo_1,
+
            from: node_a,
+
            refs_at: refs_at_1.clone(),
+
            timeout,
+
        }
+
    );
+
    assert_eq!(
+
        state.get_active_fetch(&repo_1),
+
        Some(&ActiveFetch {
+
            from: node_a,
+
            refs_at: refs_at_1,
+
        })
+
    );
+
}
+

+
#[test]
+
fn fetch_different_repo_same_node_within_capacity() {
+
    let mut state = FetcherState::new(helpers::config(2, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    let event1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(event1, event::Fetch::Started { .. }));
+

+
    let event2 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    assert!(matches!(event2, event::Fetch::Started { rid, .. } if rid == repo_2));
+
    assert!(state.get_active_fetch(&repo_1).is_some());
+
    assert!(state.get_active_fetch(&repo_2).is_some());
+
}
+

+
#[test]
+
fn fetch_same_repo_different_nodes_queues_second() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    let event1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+
    assert!(matches!(event1, event::Fetch::Started { .. }));
+

+
    // Same repo from different node - gets queued since repo_1 is already active
+
    let event2 = state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert!(
+
        matches!(event2, event::Fetch::Queued { rid, from } if rid == repo_1 && from == node_b)
+
    );
+
    // Only node_a's fetch is active
+
    let active = state.get_active_fetch(&repo_1);
+
    assert!(active.is_some());
+
    assert_eq!(*active.unwrap().from(), node_a);
+
}
+

+
#[test]
+
fn fetch_duplicate_returns_already_fetching() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::AlreadyFetching {
+
            rid: repo_1,
+
            from: node_a,
+
        }
+
    );
+
}
+

+
#[test]
+
fn fetch_same_repo_different_refs_enqueues() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let refs_at_2 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_2.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::Queued {
+
            rid: repo_1,
+
            from: node_a,
+
        }
+
    );
+
}
+

+
#[test]
+
fn fetch_at_capacity_enqueues() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::Queued {
+
            rid: repo_2,
+
            from: node_a,
+
        }
+
    );
+
    assert!(state.get_active_fetch(&repo_1).is_some());
+
    assert!(state.get_active_fetch(&repo_2).is_none());
+
}
+

+
#[test]
+
fn fetch_queue_rejected_capacity_reached() {
+
    let mut state = FetcherState::new(helpers::config(1, 2));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let repo_4: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    // Fill concurrency
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Fill queue (capacity 2)
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Exceed queue capacity
+
    let refs_at_4 = helpers::gen_refs_at(1);
+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_4,
+
        refs_at: refs_at_4.clone(),
+
        timeout,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetch::QueueAtCapacity {
+
            rid: repo_4,
+
            from: node_a,
+
            refs_at: refs_at_4,
+
            timeout,
+
            capacity: 2,
+
        }
+
    );
+
}
+

+
#[test]
+
fn fetch_queue_merges_already_queued() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2a = helpers::gen_refs_at(1);
+
    let refs_at_2b = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2a.clone(),
+
        timeout,
+
    });
+

+
    // Second fetch for same queued repo - should merge refs
+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2b.clone(),
+
        timeout,
+
    });
+

+
    // Returns Queued (merged)
+
    assert_eq!(
+
        event,
+
        event::Fetch::Queued {
+
            rid: repo_2,
+
            from: node_a,
+
        }
+
    );
+

+
    // Dequeue and verify refs were merged
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    let queued = state.dequeue(&node_a).unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    // refs_at should contain both sets of refs
+
    assert_eq!(queued.refs_at.len(), refs_at_2a.len() + refs_at_2b.len());
+
}
+

+
#[test]
+
fn fetch_queue_merge_empty_refs_fetches_all() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue with specific refs
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2.clone(),
+
        timeout,
+
    });
+

+
    // Queue again with empty refs (fetch everything)
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: vec![],
+
        timeout,
+
    });
+

+
    // Dequeue and verify refs became empty (fetch all)
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    let queued = state.dequeue(&node_a).unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    assert!(queued.refs_at.is_empty());
+
}
+

+
#[test]
+
fn fetch_queue_merge_takes_longer_timeout() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let short_timeout = Duration::from_secs(10);
+
    let long_timeout = Duration::from_secs(60);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: short_timeout,
+
    });
+

+
    // Queue with short timeout
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: short_timeout,
+
    });
+

+
    // Queue again with longer timeout
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: long_timeout,
+
    });
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    // Dequeue and verify timeout is the longer one
+
    let queued = state.dequeue(&node_a).unwrap();
+
    assert_eq!(queued.timeout, long_timeout);
+
}
+

+
#[test]
+
fn fetch_after_previous_completed() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    let event = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    assert!(matches!(event, event::Fetch::Started { .. }));
+
}
added crates/radicle-protocol/src/fetcher/test/state/command/fetched.rs
@@ -0,0 +1,145 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn complete_single_ongoing() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: refs_at_1.clone(),
+
        timeout,
+
    });
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetched::Completed {
+
            from: node_a,
+
            rid: repo_1,
+
            refs_at: refs_at_1,
+
        }
+
    );
+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
}
+

+
#[test]
+
fn complete_then_dequeue_fifo() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let refs_at_2 = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue repo_2 first, then repo_3
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2.clone(),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    assert!(matches!(event, event::Fetched::Completed { .. }));
+

+
    // Dequeue next - FIFO: repo_2 was queued first
+
    let queued = state.dequeue(&node_a);
+
    assert!(queued.is_some());
+
    let queued = queued.unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    assert_eq!(queued.from, node_a);
+
    assert_eq!(queued.refs_at, refs_at_2);
+
}
+

+
#[test]
+
fn complete_one_of_multiple() {
+
    let mut state = FetcherState::new(helpers::config(3, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_2,
+
    });
+

+
    assert!(matches!(event, event::Fetched::Completed { rid, .. } if rid == repo_2));
+
    assert!(state.get_active_fetch(&repo_1).is_some());
+
    assert!(state.get_active_fetch(&repo_2).is_none());
+
    assert!(state.get_active_fetch(&repo_3).is_some());
+
}
+

+
#[test]
+
fn non_existent_returns_not_found() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+

+
    let event = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    assert_eq!(
+
        event,
+
        event::Fetched::NotFound {
+
            from: node_a,
+
            rid: repo_1,
+
        }
+
    );
+
}
added crates/radicle-protocol/src/fetcher/test/state/concurrent.rs
@@ -0,0 +1,106 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn interleaved_operations() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    // fetch(A, r1)
+
    let e1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(e1, event::Fetch::Started { .. }));
+

+
    // fetch(B, r2)
+
    let e2 = state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(e2, event::Fetch::Started { .. }));
+

+
    // fetched(A, r1)
+
    let e3 = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    assert!(matches!(e3, event::Fetched::Completed { .. }));
+

+
    // fetch(A, r3)
+
    let e4 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(e4, event::Fetch::Started { .. }));
+

+
    // fetched(B, r2)
+
    let e5 = state.fetched(command::Fetched {
+
        from: node_b,
+
        rid: repo_2,
+
    });
+
    assert!(matches!(e5, event::Fetched::Completed { .. }));
+

+
    // Final state: only r3 from A ongoing
+
    assert!(state.get_active_fetch(&repo_1).is_none());
+
    assert!(state.get_active_fetch(&repo_2).is_none());
+
    assert!(state.get_active_fetch(&repo_3).is_some());
+
}
+

+
#[test]
+
fn fetched_then_cancel() {
+
    let mut state = FetcherState::new(helpers::config(2, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Complete repo_1
+
    let e1 = state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    assert!(matches!(e1, event::Fetched::Completed { .. }));
+

+
    // Cancel remaining
+
    let e2 = state.cancel(command::Cancel { from: node_a });
+
    match e2 {
+
        event::Cancel::Canceled {
+
            active: ongoing, ..
+
        } => {
+
            assert_eq!(ongoing.len(), 1);
+
            assert!(ongoing.contains_key(&repo_2));
+
        }
+
        _ => panic!("Expected Canceled"),
+
    }
+
}
added crates/radicle-protocol/src/fetcher/test/state/config.rs
@@ -0,0 +1,72 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn high_concurrency() {
+
    let mut state = FetcherState::new(helpers::config(100, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    for i in 0..100 {
+
        let repo: RepoId = arbitrary::gen(i + 1);
+
        let event = state.fetch(command::Fetch {
+
            from: node_a,
+
            rid: repo,
+
            refs_at: helpers::gen_refs_at(1),
+
            timeout,
+
        });
+
        assert!(
+
            matches!(event, event::Fetch::Started { .. }),
+
            "Fetch {} should start",
+
            i
+
        );
+
    }
+

+
    assert_eq!(
+
        state
+
            .active_fetches()
+
            .iter()
+
            .filter(|(_, f)| *f.from() == node_a)
+
            .count(),
+
        100
+
    );
+
}
+

+
#[test]
+
fn min_queue_size() {
+
    let mut state = FetcherState::new(helpers::config(1, 1));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    let event1 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(event1, event::Fetch::Queued { .. }));
+

+
    let event2 = state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    assert!(matches!(event2, event::Fetch::QueueAtCapacity { .. }));
+
}
added crates/radicle-protocol/src/fetcher/test/state/dequeue.rs
@@ -0,0 +1,112 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::command;
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn cannot_dequeue_while_node_at_capacity() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2 = helpers::gen_refs_at(3);
+
    let timeout_2 = Duration::from_secs(42);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout: Duration::from_secs(10),
+
    });
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2.clone(),
+
        timeout: timeout_2,
+
    });
+

+
    let result = state.dequeue(&node_a);
+
    assert!(result.is_none());
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+

+
    let result = state.dequeue(&node_a);
+
    let queued = result.unwrap();
+
    assert_eq!(queued.rid, repo_2);
+
    assert_eq!(queued.from, node_a);
+
    assert_eq!(queued.refs_at, refs_at_2);
+
    assert_eq!(queued.timeout, timeout_2);
+
}
+

+
#[test]
+
fn maintains_fifo_order() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let repo_3: RepoId = arbitrary::gen(1);
+
    let repo_4: RepoId = arbitrary::gen(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue in order: repo_2, repo_3, repo_4
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_3,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_4,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    assert_eq!(state.dequeue(&node_a).unwrap().rid, repo_2);
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_2,
+
    });
+
    assert_eq!(state.dequeue(&node_a).unwrap().rid, repo_3);
+

+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_3,
+
    });
+
    assert_eq!(state.dequeue(&node_a).unwrap().rid, repo_4);
+
    assert!(state.dequeue(&node_a).is_none());
+
}
+

+
#[test]
+
fn empty_queue_returns_none() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+

+
    assert!(state.dequeue(&node_a).is_none());
+
}
added crates/radicle-protocol/src/fetcher/test/state/helpers.rs
@@ -0,0 +1,17 @@
+
use std::num::NonZeroUsize;
+

+
use radicle::{storage::refs::RefsAt, test::arbitrary};
+

+
use crate::fetcher::{Config, MaxQueueSize};
+

+
pub fn config(max_concurrency: usize, max_queue_size: usize) -> Config {
+
    Config::new()
+
        .with_max_concurrency(NonZeroUsize::new(max_concurrency).unwrap())
+
        .with_max_capacity(MaxQueueSize::new(
+
            NonZeroUsize::new(max_queue_size).unwrap(),
+
        ))
+
}
+

+
pub fn gen_refs_at(count: usize) -> Vec<RefsAt> {
+
    (0..count).map(|_| arbitrary::gen(1)).collect()
+
}
added crates/radicle-protocol/src/fetcher/test/state/invariant.rs
@@ -0,0 +1,53 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::command;
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn queue_integrity_after_merge() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let repo_1: RepoId = arbitrary::gen(1);
+
    let repo_2: RepoId = arbitrary::gen(1);
+
    let refs_at_2a = helpers::gen_refs_at(1);
+
    let refs_at_2b = helpers::gen_refs_at(1);
+
    let timeout = Duration::from_secs(30);
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_1,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2a.clone(),
+
        timeout,
+
    });
+

+
    // Second fetch for same repo - should merge
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_2,
+
        refs_at: refs_at_2b.clone(),
+
        timeout,
+
    });
+

+
    // Queue should have exactly one repo_2 entry (merged)
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_1,
+
    });
+
    let first = state.dequeue(&node_a);
+
    assert!(first.is_some());
+
    assert_eq!(first.unwrap().rid, repo_2);
+

+
    let second = state.dequeue(&node_a);
+
    assert!(second.is_none());
+
}
added crates/radicle-protocol/src/fetcher/test/state/multinode.rs
@@ -0,0 +1,83 @@
+
use std::time::Duration;
+

+
use radicle::test::arbitrary;
+
use radicle_core::{NodeId, RepoId};
+

+
use crate::fetcher::state::{command, event};
+
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::FetcherState;
+

+
#[test]
+
fn independent_queues() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let node_a: NodeId = arbitrary::gen(1);
+
    let node_b: NodeId = arbitrary::gen(1);
+
    let repo_a_active: RepoId = arbitrary::gen(1);
+
    let repo_b_active: RepoId = arbitrary::gen(2);
+
    let repo_a_queued: RepoId = arbitrary::gen(10);
+
    let repo_b_queued: RepoId = arbitrary::gen(20);
+
    let timeout = Duration::from_secs(30);
+

+
    // Fill capacity for both nodes
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_a_active,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_b_active,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Queue for both
+
    state.fetch(command::Fetch {
+
        from: node_a,
+
        rid: repo_a_queued,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+
    state.fetch(command::Fetch {
+
        from: node_b,
+
        rid: repo_b_queued,
+
        refs_at: helpers::gen_refs_at(1),
+
        timeout,
+
    });
+

+
    // Dequeue from A doesn't affect B
+
    state.fetched(command::Fetched {
+
        from: node_a,
+
        rid: repo_a_active,
+
    });
+
    let a_item = state.dequeue(&node_a);
+
    assert_eq!(a_item.unwrap().rid, repo_a_queued);
+

+
    state.fetched(command::Fetched {
+
        from: node_b,
+
        rid: repo_b_active,
+
    });
+
    let b_item = state.dequeue(&node_b);
+
    assert_eq!(b_item.unwrap().rid, repo_b_queued);
+
}
+

+
#[test]
+
fn high_count() {
+
    let mut state = FetcherState::new(helpers::config(1, 10));
+
    let timeout = Duration::from_secs(30);
+

+
    for i in 0..100 {
+
        let node: NodeId = arbitrary::gen(i + 1);
+
        let repo: RepoId = arbitrary::gen(i + 1);
+
        let event = state.fetch(command::Fetch {
+
            from: node,
+
            rid: repo,
+
            refs_at: helpers::gen_refs_at(1),
+
            timeout,
+
        });
+
        assert!(matches!(event, event::Fetch::Started { .. }));
+
    }
+

+
    assert_eq!(state.active_fetches().len(), 100);
+
}
modified crates/radicle-protocol/src/lib.rs
@@ -1,5 +1,6 @@
pub mod bounded;
pub mod deserializer;
+
pub mod fetcher;
pub mod service;
pub mod wire;
pub mod worker;