Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
protocol/fetcher: Introduce `RefsToFetch` and tidy
Merged lorenz opened 2 months ago

Using empty Vecs to mean “all refs” can be confusing. Avoid the chance of confusion by introduction of a new type that clearly describes which refs should be fetched.

21 files changed +269 -286 41f77a49 865abd35
modified crates/radicle-node/src/runtime/handle.rs
@@ -359,7 +359,7 @@ impl radicle::node::Handle for Handle {
                        json!({
                            "rid": rid,
                            "from": active.from(),
-
                            "refsAt": active.refs_at(),
+
                            "refsAt": active.refs(),
                        })
                    }).collect::<Vec<_>>(),
                "queue": fetcher_state.queued_fetches().iter().map(|(node, queue)| {
@@ -368,8 +368,7 @@ impl radicle::node::Handle for Handle {
                        "queue": queue.iter().map(|fetch| {
                            json!({
                                "rid": fetch.rid,
-
                                "from": fetch.from,
-
                                "refsAt": fetch.refs_at,
+
                                "refsAt": fetch.refs,
                            })
                        }).collect::<Vec<_>>()
                    })
modified crates/radicle-protocol/src/fetcher.rs
@@ -1,3 +1,7 @@
+
use nonempty::NonEmpty;
+
use radicle::storage::refs::RefsAt;
+
use serde::{Deserialize, Serialize};
+

pub mod service;
pub use service::FetcherService;

@@ -10,3 +14,54 @@ mod test;
// TODO(finto): `Service::fetch_refs_at` and the use of `refs_status_of` is a
// layer above the `Fetcher` where it would perform I/O, mocked out by a trait,
// to check if there are wants and add a fetch to the Fetcher.
+

+
/// Represents references to fetch, in the context of a repository.
+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
+
pub enum RefsToFetch {
+
    /// Indicates that all references should be fetched.
+
    All,
+
    /// Contains a non-empty collection of specific references to fetch.
+
    Refs(NonEmpty<RefsAt>),
+
}
+

+
impl RefsToFetch {
+
    /// Merges another `RefsToFetch` into this one, resulting in a new
+
    /// `RefsToFetch` that represents the combined set of references to fetch.
+
    /// If either `RefsToFetch` is `All`, the result will be `All`. If both are
+
    /// `Refs`, their contents will be combined into a single `Refs` variant.
+
    pub(super) fn merge(self, other: RefsToFetch) -> Self {
+
        match (self, other) {
+
            (RefsToFetch::All, _) | (_, RefsToFetch::All) => RefsToFetch::All,
+
            (RefsToFetch::Refs(mut ours), RefsToFetch::Refs(theirs)) => {
+
                ours.extend(theirs);
+
                RefsToFetch::Refs(ours)
+
            }
+
        }
+
    }
+

+
    #[cfg(test)]
+
    pub fn len(&self) -> Option<std::num::NonZeroUsize> {
+
        match self {
+
            RefsToFetch::All => None,
+
            RefsToFetch::Refs(refs) => std::num::NonZeroUsize::new(refs.len()),
+
        }
+
    }
+
}
+

+
impl From<RefsToFetch> for Vec<RefsAt> {
+
    fn from(val: RefsToFetch) -> Self {
+
        match val {
+
            RefsToFetch::All => Vec::new(),
+
            RefsToFetch::Refs(refs) => refs.into(),
+
        }
+
    }
+
}
+

+
impl From<Vec<RefsAt>> for RefsToFetch {
+
    fn from(refs_at: Vec<RefsAt>) -> Self {
+
        match NonEmpty::from_vec(refs_at) {
+
            Some(refs) => RefsToFetch::Refs(refs),
+
            None => RefsToFetch::All,
+
        }
+
    }
+
}
modified crates/radicle-protocol/src/fetcher/service.rs
@@ -1,9 +1,14 @@
use std::collections::HashMap;

-
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

-
use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
+
use crate::fetcher::{
+
    state::{
+
        command::{self},
+
        event, Config, FetcherState, QueuedFetch,
+
    },
+
    RefsToFetch,
+
};

/// Service layer that wraps [`FetcherState`] and manages subscriber coalescing.
///
@@ -38,12 +43,12 @@ impl<S> FetcherService<S> {
struct FetchKey {
    rid: RepoId,
    node: NodeId,
-
    refs_at: Vec<RefsAt>,
+
    refs: RefsToFetch,
}

impl FetchKey {
-
    fn new(rid: RepoId, node: NodeId, refs_at: Vec<RefsAt>) -> Self {
-
        Self { rid, node, refs_at }
+
    fn new(rid: RepoId, node: NodeId, refs: RefsToFetch) -> Self {
+
        Self { rid, node, refs }
    }
}

@@ -88,7 +93,7 @@ impl<S> FetcherService<S> {
    ///
    /// See [`FetcherState::fetch`].
    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
-
        let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs_at.clone());
+
        let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs.clone());
        let event = self.state.fetch(cmd);

        let rejected = match &event {
@@ -115,8 +120,8 @@ impl<S> FetcherService<S> {
                event: e,
                subscribers: vec![],
            },
-
            ref e @ event::Fetched::Completed { ref refs_at, .. } => {
-
                let key = FetchKey::new(cmd.rid, cmd.from, refs_at.clone());
+
            ref e @ event::Fetched::Completed { ref refs, .. } => {
+
                let key = FetchKey::new(cmd.rid, cmd.from, refs.clone());
                let subscribers = self.subscribers.remove(&key).unwrap_or_default();
                FetchCompleted {
                    event: e.clone(),
@@ -157,6 +162,7 @@ impl<S> FetcherService<S> {

#[cfg(test)]
mod tests {
+
    use radicle::storage::refs::RefsAt;
    use radicle::test::arbitrary;
    use std::num::NonZeroUsize;
    use std::time::Duration;
@@ -182,7 +188,7 @@ mod tests {
            command::Fetch {
                from: node,
                rid: repo,
-
                refs_at: refs_specific.clone(),
+
                refs: refs_specific.clone().into(),
                timeout,
            },
            Some(1),
@@ -195,7 +201,7 @@ mod tests {
            command::Fetch {
                from: node,
                rid: repo,
-
                refs_at: refs_all.clone(),
+
                refs: refs_all.clone().into(),
                timeout,
            },
            Some(2),
@@ -213,8 +219,8 @@ mod tests {
        });

        match completed.event {
-
            event::Fetched::Completed { ref refs_at, .. } => {
-
                assert_eq!(refs_at, &refs_specific);
+
            event::Fetched::Completed { ref refs, .. } => {
+
                assert_eq!(refs, &refs_specific.into());
            }
            _ => panic!("Expected Completed event"),
        }
@@ -223,11 +229,13 @@ mod tests {
        assert_eq!(completed.subscribers, vec![1]);

        // subscriber 2 should still be waiting
-
        assert!(service
-
            .subscribers
-
            .contains_key(&FetchKey::new(repo, node, refs_all.clone())));
+
        assert!(service.subscribers.contains_key(&FetchKey::new(
+
            repo,
+
            node,
+
            refs_all.clone().into()
+
        )));

-
        let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all)];
+
        let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all.into())];
        assert_eq!(remaining.len(), 1);
        assert_eq!(remaining[0], 2);
    }
modified crates/radicle-protocol/src/fetcher/state.rs
@@ -15,9 +15,10 @@ use std::collections::{BTreeMap, VecDeque};
use std::num::NonZeroUsize;
use std::time;

-
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

+
use crate::fetcher::RefsToFetch;
+

/// Default for the maximum items per fetch queue.
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
/// Default for maximum concurrency per node.
@@ -96,32 +97,32 @@ impl FetcherState {
        command::Fetch {
            from,
            rid,
-
            refs_at,
+
            refs,
            timeout,
        }: command::Fetch,
    ) -> event::Fetch {
        if let Some(active) = self.active.get(&rid) {
-
            if active.refs_at == refs_at && active.from == from {
+
            if active.refs == refs && active.from == from {
                return event::Fetch::AlreadyFetching { rid, from };
            } else {
-
                return self.enqueue(rid, from, refs_at, timeout);
+
                return self.enqueue(rid, from, refs, timeout);
            }
        }

        if self.is_at_node_capacity(&from) {
-
            self.enqueue(rid, from, refs_at, timeout)
+
            self.enqueue(rid, from, refs, timeout)
        } else {
            self.active.insert(
                rid,
                ActiveFetch {
                    from,
-
                    refs_at: refs_at.clone(),
+
                    refs: refs.clone(),
                },
            );
            event::Fetch::Started {
                rid,
                from,
-
                refs_at,
+
                refs,
                timeout,
            }
        }
@@ -136,7 +137,7 @@ impl FetcherState {
    pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
        match self.active.remove(&rid) {
            None => event::Fetched::NotFound { from, rid },
-
            Some(ActiveFetch { from, refs_at }) => event::Fetched::Completed { from, rid, refs_at },
+
            Some(ActiveFetch { from, refs }) => event::Fetched::Completed { from, rid, refs },
        }
    }

@@ -182,31 +183,23 @@ impl FetcherState {
        &mut self,
        rid: RepoId,
        from: NodeId,
-
        refs_at: Vec<RefsAt>,
+
        refs: RefsToFetch,
        timeout: time::Duration,
    ) -> event::Fetch {
        let queue = self
            .queues
            .entry(from)
            .or_insert(Queue::new(self.config.maximum_queue_size));
-
        match queue.enqueue(QueuedFetch {
-
            rid,
-
            from,
-
            refs_at,
-
            timeout,
-
        }) {
-
            Enqueue::CapacityReached(QueuedFetch {
-
                rid,
-
                from,
-
                refs_at,
-
                timeout,
-
            }) => event::Fetch::QueueAtCapacity {
-
                rid,
-
                from,
-
                refs_at,
-
                timeout,
-
                capacity: queue.len(),
-
            },
+
        match queue.enqueue(QueuedFetch { rid, refs, timeout }) {
+
            Enqueue::CapacityReached(QueuedFetch { rid, refs, timeout }) => {
+
                event::Fetch::QueueAtCapacity {
+
                    rid,
+
                    from,
+
                    refs,
+
                    timeout,
+
                    capacity: queue.len(),
+
                }
+
            }
            Enqueue::Queued => event::Fetch::Queued { rid, from },
            Enqueue::Merged => event::Fetch::Queued { rid, from },
        }
@@ -281,7 +274,7 @@ impl Default for Config {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ActiveFetch {
    pub from: NodeId,
-
    pub refs_at: Vec<RefsAt>,
+
    pub refs: RefsToFetch,
}

impl ActiveFetch {
@@ -291,8 +284,8 @@ impl ActiveFetch {
    }

    /// The set of references that fetch is being performed for.
-
    pub fn refs_at(&self) -> &[RefsAt] {
-
        &self.refs_at
+
    pub fn refs(&self) -> &RefsToFetch {
+
        &self.refs
    }
}

@@ -301,11 +294,8 @@ impl ActiveFetch {
pub struct QueuedFetch {
    /// The repository that will be fetched.
    pub rid: RepoId,
-
    // TODO(finto): this might be redundant, since queues are per node
-
    /// The peer from which the repository will be fetched from.
-
    pub from: NodeId,
    /// The references that the fetch is being performed for.
-
    pub refs_at: Vec<RefsAt>,
+
    pub refs: RefsToFetch,
    /// The timeout given for the fetch request.
    pub timeout: time::Duration,
}
@@ -384,12 +374,7 @@ impl Queue {
    /// the queue has not reached capacity and if the item is unique.
    pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
        if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
-
            if existing.refs_at.is_empty() || fetch.refs_at.is_empty() {
-
                // We fetch everything
-
                existing.refs_at = vec![]
-
            } else {
-
                existing.refs_at.extend(fetch.refs_at);
-
            }
+
            existing.refs = existing.refs.clone().merge(fetch.refs);
            // Take the longer timeout (more generous)
            existing.timeout = existing.timeout.max(fetch.timeout);
            return Enqueue::Merged;
modified crates/radicle-protocol/src/fetcher/state/command.rs
@@ -1,8 +1,9 @@
use std::time;

-
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

+
use crate::fetcher::RefsToFetch;
+

/// Commands for transitioning the [`FetcherState`].
///
/// [`FetcherState`]: super::FetcherState
@@ -32,11 +33,11 @@ impl From<Cancel> for Command {
}

impl Command {
-
    pub fn fetch(from: NodeId, rid: RepoId, refs_at: Vec<RefsAt>, timeout: time::Duration) -> Self {
+
    pub fn fetch(from: NodeId, rid: RepoId, refs: RefsToFetch, timeout: time::Duration) -> Self {
        Self::from(Fetch {
            from,
            rid,
-
            refs_at,
+
            refs,
            timeout,
        })
    }
@@ -58,7 +59,7 @@ pub struct Fetch {
    /// The repository to fetch.
    pub rid: RepoId,
    /// The references to fetch.
-
    pub refs_at: Vec<RefsAt>,
+
    pub refs: RefsToFetch,
    /// The timeout for the fetch process.
    pub timeout: time::Duration,
}
@@ -68,7 +69,7 @@ pub struct Fetch {
pub struct Fetched {
    /// The node from which the repository was fetched from.
    pub from: NodeId,
-
    /// The repository that was fetch.
+
    /// The repository that was fetched.
    pub rid: RepoId,
}

modified crates/radicle-protocol/src/fetcher/state/event.rs
@@ -1,9 +1,10 @@
use std::collections::{BTreeMap, VecDeque};
use std::time;

-
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

+
use crate::fetcher::RefsToFetch;
+

use super::{ActiveFetch, QueuedFetch};

/// Event returned from [`FetchState::handle`].
@@ -44,7 +45,7 @@ pub enum Fetch {
        /// The node to fetch from.
        from: NodeId,
        /// The references to be fetched.
-
        refs_at: Vec<RefsAt>,
+
        refs: RefsToFetch,
        /// The timeout for the fetch process.
        timeout: time::Duration,
    },
@@ -63,7 +64,7 @@ pub enum Fetch {
        /// The node who's queue is at capacity.
        from: NodeId,
        /// The references expected to be fetched.
-
        refs_at: Vec<RefsAt>,
+
        refs: RefsToFetch,
        /// The timeout for the fetch process.
        timeout: time::Duration,
        /// The capacity of the queue.
@@ -91,7 +92,7 @@ pub enum Fetched {
        /// The repository that was fetched.
        rid: RepoId,
        /// The references that were fetched.
-
        refs_at: Vec<RefsAt>,
+
        refs: RefsToFetch,
    },
}

deleted crates/radicle-protocol/src/fetcher/test/arbitrary.rs
@@ -1,52 +0,0 @@
-
use std::collections::HashSet;
-

-
use radicle::{identity::DocAt, test::arbitrary};
-

-
use crate::fetcher::{commands, Command, Fetched};
-

-
impl qcheck::Arbitrary for Fetched {
-
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        Fetched {
-
            updated: vec![],
-
            namespaces: HashSet::arbitrary(g),
-
            clone: bool::arbitrary(g),
-
            doc: DocAt::arbitrary(g),
-
        }
-
    }
-
}
-

-
impl qcheck::Arbitrary for Command {
-
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        todo!()
-
    }
-
}
-

-
impl qcheck::Arbitrary for commands::Fetch {
-
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        todo!()
-
    }
-
}
-

-
impl qcheck::Arbitrary for commands::Fetched {
-
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        g.choose(&[
-
            commands::Fetched::DequeueFetches,
-
            commands::Fetched::Fetched {
-
                from: arbitrary::gen(g.size()),
-
                rid: arbitrary::gen(g.size()),
-
            },
-
        ])
-
        .cloned()
-
        .unwrap()
-
    }
-
}
-

-
impl qcheck::Arbitrary for commands::Dequeue {
-
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        g.choose(&[commands::Dequeue::Nodes {
-
            nodes: arbitrary::gen(5),
-
        }])
-
        .cloned()
-
        .unwrap()
-
    }
-
}
modified crates/radicle-protocol/src/fetcher/test/queue.rs
@@ -8,7 +8,7 @@ use std::time::Duration;
use qcheck::Arbitrary;

use radicle::storage::refs::RefsAt;
-
use radicle_core::{NodeId, RepoId};
+
use radicle_core::RepoId;

use crate::fetcher::state::{MaxQueueSize, QueuedFetch};

@@ -20,8 +20,7 @@ impl Arbitrary for QueuedFetch {

        QueuedFetch {
            rid: RepoId::arbitrary(g),
-
            from: NodeId::arbitrary(g),
-
            refs_at,
+
            refs: refs_at.into(),
            timeout: Duration::from_secs(u64::arbitrary(g) % 3600),
        }
    }
modified crates/radicle-protocol/src/fetcher/test/queue/helpers.rs
@@ -2,7 +2,7 @@ use std::{num::NonZeroUsize, time::Duration};

use radicle::test::arbitrary;

-
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};
+
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch, RefsToFetch};

pub fn create_queue(capacity: usize) -> Queue {
    Queue::new(MaxQueueSize::new(
@@ -13,8 +13,7 @@ pub fn create_queue(capacity: usize) -> Queue {
pub fn create_fetch() -> QueuedFetch {
    QueuedFetch {
        rid: arbitrary::gen(1),
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    }
}
modified crates/radicle-protocol/src/fetcher/test/queue/properties/merge.rs
@@ -1,3 +1,4 @@
+
use std::num::NonZeroUsize;
use std::time::Duration;

use qcheck_macros::quickcheck;
@@ -7,6 +8,7 @@ use radicle_core::RepoId;

use crate::fetcher::state::Enqueue;
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::RefsToFetch;
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};

#[quickcheck]
@@ -30,8 +32,7 @@ fn same_rid_merges_anywhere_in_queue(max_size: MaxQueueSize, merge_index: usize)
    let target_index = merge_index % items.len();
    let same_rid_item = QueuedFetch {
        rid: items[target_index].rid,
-
        from: arbitrary::gen(1), // Different from
-
        refs_at: vec![arbitrary::gen(1)],
+
        refs: vec![arbitrary::gen(1)].into(),
        timeout: Duration::from_secs(60),
    };

@@ -51,15 +52,13 @@ fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {

    let base_item = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: base_refs.clone(),
+
        refs: base_refs.clone().into(),
        timeout: Duration::from_secs(30),
    };

    let merge_item = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: merge_refs.clone(),
+
        refs: merge_refs.clone().into(),
        timeout: Duration::from_secs(30),
    };

@@ -74,10 +73,10 @@ fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {

    // If either was empty, result should be empty (fetch everything)
    if base_refs.is_empty() || merge_refs.is_empty() {
-
        dequeued.refs_at.is_empty()
+
        dequeued.refs == RefsToFetch::All
    } else {
        // Otherwise refs should be combined
-
        dequeued.refs_at.len() == base_refs_count + merge_refs_count
+
        dequeued.refs.len() == Some(NonZeroUsize::new(base_refs_count + merge_refs_count).unwrap())
    }
}

@@ -89,16 +88,14 @@ fn empty_refs_fetches_all() -> bool {
    // First enqueue with specific refs
    let item_with_refs = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![arbitrary::gen(1), arbitrary::gen(1)],
+
        refs: vec![arbitrary::gen(1), arbitrary::gen(1)].into(),
        timeout: Duration::from_secs(30),
    };

    // Second enqueue with empty refs (fetch everything)
    let item_empty_refs = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    };

@@ -106,7 +103,7 @@ fn empty_refs_fetches_all() -> bool {
    let _ = queue.enqueue(item_empty_refs);

    let dequeued = queue.dequeue().unwrap();
-
    dequeued.refs_at.is_empty() // Should fetch everything
+
    dequeued.refs == RefsToFetch::All // Should fetch everything
}

#[quickcheck]
@@ -119,15 +116,13 @@ fn longer_timeout_preserved(short_secs: u16, long_secs: u16) -> bool {

    let item_short = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: short,
    };

    let item_long = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: long,
    };

@@ -151,15 +146,13 @@ fn does_not_increase_queue_length() -> bool {

    let item1 = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![arbitrary::gen(1)],
+
        refs: vec![arbitrary::gen(1)].into(),
        timeout: Duration::from_secs(30),
    };

    let item2 = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![arbitrary::gen(1)],
+
        refs: vec![arbitrary::gen(1)].into(),
        timeout: Duration::from_secs(60),
    };

@@ -194,22 +187,19 @@ fn succeed_when_at_capacity() -> bool {

    let item1 = QueuedFetch {
        rid,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    };

    let item2 = QueuedFetch {
        rid: arbitrary::gen(1), // Different rid
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    };

    let merge_item = QueuedFetch {
        rid, // Same as item1
-
        from: arbitrary::gen(1),
-
        refs_at: vec![arbitrary::gen(1)],
+
        refs: vec![arbitrary::gen(1)].into(),
        timeout: Duration::from_secs(60),
    };

modified crates/radicle-protocol/src/fetcher/test/queue/unit.rs
@@ -1,19 +1,19 @@
use std::time::Duration;

use radicle::test::arbitrary;
-
use radicle_core::{NodeId, RepoId};
+
use radicle_core::RepoId;

use crate::fetcher::state::Enqueue;
use crate::fetcher::test::queue::helpers::*;
use crate::fetcher::QueuedFetch;
+
use crate::fetcher::RefsToFetch;

#[test]
fn zero_timeout_accepted() {
    let mut queue = create_queue(10);
    let item = QueuedFetch {
        rid: arbitrary::gen(1),
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::ZERO,
    };
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
@@ -24,29 +24,25 @@ fn max_timeout_accepted() {
    let mut queue = create_queue(10);
    let item = QueuedFetch {
        rid: arbitrary::gen(1),
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::MAX,
    };
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
}

#[test]
-
fn empty_refs_at_items_can_be_equal() {
+
fn empty_refs_items_can_be_equal() {
    let rid: RepoId = arbitrary::gen(1);
-
    let from: NodeId = arbitrary::gen(1);
    let timeout = Duration::from_secs(30);

    let item1 = QueuedFetch {
        rid,
-
        from,
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout,
    };
    let item2 = QueuedFetch {
        rid,
-
        from,
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout,
    };

@@ -64,28 +60,24 @@ fn merge_preserves_position_in_queue() {
    // Enqueue three items
    let _ = queue.enqueue(QueuedFetch {
        rid: rid_first,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    });
    let _ = queue.enqueue(QueuedFetch {
        rid: rid_second,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    });
    let _ = queue.enqueue(QueuedFetch {
        rid: rid_third,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout: Duration::from_secs(30),
    });

    // Merge into the second item
    let result = queue.enqueue(QueuedFetch {
        rid: rid_second,
-
        from: arbitrary::gen(1),
-
        refs_at: vec![arbitrary::gen(1)],
+
        refs: vec![arbitrary::gen(1)].into(),
        timeout: Duration::from_secs(60),
    });
    assert_eq!(result, Enqueue::Merged);
modified crates/radicle-protocol/src/fetcher/test/state/command/cancel.rs
@@ -12,13 +12,13 @@ fn single_ongoing() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let refs_1 = helpers::gen_refs(1);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

@@ -36,7 +36,7 @@ fn single_ongoing() {
                ongoing.get(&repo_1),
                Some(&ActiveFetch {
                    from: node_a,
-
                    refs_at: refs_at_1,
+
                    refs: refs_1,
                })
            );
            assert!(queued.is_empty());
@@ -58,19 +58,19 @@ fn ongoing_and_queued() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -112,13 +112,13 @@ fn cancellation_is_isolated() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_b,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

modified crates/radicle-protocol/src/fetcher/test/state/command/fetch.rs
@@ -5,6 +5,7 @@ use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
+
use crate::fetcher::RefsToFetch;
use crate::fetcher::{ActiveFetch, FetcherState};

#[test]
@@ -12,13 +13,13 @@ fn fetch_start_first_fetch_for_node() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let refs_1 = helpers::gen_refs(2);
    let timeout = Duration::from_secs(30);

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

@@ -27,7 +28,7 @@ fn fetch_start_first_fetch_for_node() {
        event::Fetch::Started {
            rid: repo_1,
            from: node_a,
-
            refs_at: refs_at_1.clone(),
+
            refs: refs_1.clone(),
            timeout,
        }
    );
@@ -35,7 +36,7 @@ fn fetch_start_first_fetch_for_node() {
        state.get_active_fetch(&repo_1),
        Some(&ActiveFetch {
            from: node_a,
-
            refs_at: refs_at_1,
+
            refs: refs_1,
        })
    );
}
@@ -51,7 +52,7 @@ fn fetch_different_repo_same_node_within_capacity() {
    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    assert!(matches!(event1, event::Fetch::Started { .. }));
@@ -59,7 +60,7 @@ fn fetch_different_repo_same_node_within_capacity() {
    let event2 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -74,13 +75,13 @@ fn fetch_same_repo_different_nodes_queues_second() {
    let node_a: NodeId = arbitrary::gen(1);
    let node_b: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let refs_1 = helpers::gen_refs(1);
    let timeout = Duration::from_secs(30);

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });
    assert!(matches!(event1, event::Fetch::Started { .. }));
@@ -89,7 +90,7 @@ fn fetch_same_repo_different_nodes_queues_second() {
    let event2 = state.fetch(command::Fetch {
        from: node_b,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

@@ -107,20 +108,20 @@ fn fetch_duplicate_returns_already_fetching() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let refs_1 = helpers::gen_refs(2);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

@@ -138,21 +139,21 @@ fn fetch_same_repo_different_refs_enqueues() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(1);
-
    let refs_at_2 = helpers::gen_refs_at(2);
+
    let refs_1 = helpers::gen_refs(1);
+
    let refs_2 = helpers::gen_refs(2);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_2.clone(),
+
        refs: refs_2.clone(),
        timeout,
    });

@@ -176,14 +177,14 @@ fn fetch_at_capacity_enqueues() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -212,7 +213,7 @@ fn fetch_queue_rejected_capacity_reached() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -220,22 +221,22 @@ fn fetch_queue_rejected_capacity_reached() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

    // Exceed queue capacity
-
    let refs_at_4 = helpers::gen_refs_at(1);
+
    let refs_4 = helpers::gen_refs(1);
    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_4,
-
        refs_at: refs_at_4.clone(),
+
        refs: refs_4.clone(),
        timeout,
    });

@@ -244,7 +245,7 @@ fn fetch_queue_rejected_capacity_reached() {
        event::Fetch::QueueAtCapacity {
            rid: repo_4,
            from: node_a,
-
            refs_at: refs_at_4,
+
            refs: refs_4,
            timeout,
            capacity: 2,
        }
@@ -257,21 +258,21 @@ fn fetch_queue_merges_already_queued() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let refs_at_2a = helpers::gen_refs_at(1);
-
    let refs_at_2b = helpers::gen_refs_at(1);
+
    let refs_2a = helpers::gen_refs(1);
+
    let refs_2b = helpers::gen_refs(1);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2a.clone(),
+
        refs: refs_2a.clone(),
        timeout,
    });

@@ -279,7 +280,7 @@ fn fetch_queue_merges_already_queued() {
    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2b.clone(),
+
        refs: refs_2b.clone(),
        timeout,
    });

@@ -299,8 +300,16 @@ fn fetch_queue_merges_already_queued() {
    });
    let queued = state.dequeue(&node_a).unwrap();
    assert_eq!(queued.rid, repo_2);
-
    // refs_at should contain both sets of refs
-
    assert_eq!(queued.refs_at.len(), refs_at_2a.len() + refs_at_2b.len());
+
    // `queued.refs` should be the union of both sets of refs.
+
    assert_eq!(
+
        queued.refs.len(),
+
        Some(
+
            refs_2a
+
                .len()
+
                .unwrap()
+
                .saturating_add(refs_2b.len().unwrap().into())
+
        )
+
    );
}

#[test]
@@ -309,13 +318,13 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let refs_at_2 = helpers::gen_refs_at(2);
+
    let refs_2 = helpers::gen_refs(2);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -323,7 +332,7 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2.clone(),
+
        refs: refs_2.clone(),
        timeout,
    });

@@ -331,7 +340,7 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: vec![],
+
        refs: RefsToFetch::All,
        timeout,
    });

@@ -342,7 +351,7 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
    });
    let queued = state.dequeue(&node_a).unwrap();
    assert_eq!(queued.rid, repo_2);
-
    assert!(queued.refs_at.is_empty());
+
    assert_eq!(queued.refs, RefsToFetch::All);
}

#[test]
@@ -357,7 +366,7 @@ fn fetch_queue_merge_takes_longer_timeout() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout: short_timeout,
    });

@@ -365,7 +374,7 @@ fn fetch_queue_merge_takes_longer_timeout() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout: short_timeout,
    });

@@ -373,7 +382,7 @@ fn fetch_queue_merge_takes_longer_timeout() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout: long_timeout,
    });

@@ -391,13 +400,13 @@ fn fetch_after_previous_completed() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(1);
+
    let refs_1 = helpers::gen_refs(1);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });
    state.fetched(command::Fetched {
@@ -408,7 +417,7 @@ fn fetch_after_previous_completed() {
    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

modified crates/radicle-protocol/src/fetcher/test/state/command/fetched.rs
@@ -12,13 +12,13 @@ fn complete_single_ongoing() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
-
    let refs_at_1 = helpers::gen_refs_at(2);
+
    let refs_1 = helpers::gen_refs(2);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: refs_at_1.clone(),
+
        refs: refs_1.clone(),
        timeout,
    });

@@ -32,7 +32,7 @@ fn complete_single_ongoing() {
        event::Fetched::Completed {
            from: node_a,
            rid: repo_1,
-
            refs_at: refs_at_1,
+
            refs: refs_1,
        }
    );
    assert!(state.get_active_fetch(&repo_1).is_none());
@@ -45,13 +45,13 @@ fn complete_then_dequeue_fifo() {
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
-
    let refs_at_2 = helpers::gen_refs_at(1);
+
    let refs_2 = helpers::gen_refs(1);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -59,13 +59,13 @@ fn complete_then_dequeue_fifo() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2.clone(),
+
        refs: refs_2.clone(),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -81,8 +81,7 @@ fn complete_then_dequeue_fifo() {
    assert!(queued.is_some());
    let queued = queued.unwrap();
    assert_eq!(queued.rid, repo_2);
-
    assert_eq!(queued.from, node_a);
-
    assert_eq!(queued.refs_at, refs_at_2);
+
    assert_eq!(queued.refs, refs_2);
}

#[test]
@@ -97,19 +96,19 @@ fn complete_one_of_multiple() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

modified crates/radicle-protocol/src/fetcher/test/state/concurrent.rs
@@ -21,7 +21,7 @@ fn interleaved_operations() {
    let e1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    assert!(matches!(e1, event::Fetch::Started { .. }));
@@ -30,7 +30,7 @@ fn interleaved_operations() {
    let e2 = state.fetch(command::Fetch {
        from: node_b,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    assert!(matches!(e2, event::Fetch::Started { .. }));
@@ -46,7 +46,7 @@ fn interleaved_operations() {
    let e4 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    assert!(matches!(e4, event::Fetch::Started { .. }));
@@ -75,13 +75,13 @@ fn fetched_then_cancel() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

modified crates/radicle-protocol/src/fetcher/test/state/config.rs
@@ -18,7 +18,7 @@ fn high_concurrency() {
        let event = state.fetch(command::Fetch {
            from: node_a,
            rid: repo,
-
            refs_at: helpers::gen_refs_at(1),
+
            refs: helpers::gen_refs(1),
            timeout,
        });
        assert!(
@@ -50,14 +50,14 @@ fn min_queue_size() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    assert!(matches!(event1, event::Fetch::Queued { .. }));
@@ -65,7 +65,7 @@ fn min_queue_size() {
    let event2 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    assert!(matches!(event2, event::Fetch::QueueAtCapacity { .. }));
modified crates/radicle-protocol/src/fetcher/test/state/dequeue.rs
@@ -13,20 +13,20 @@ fn cannot_dequeue_while_node_at_capacity() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let refs_at_2 = helpers::gen_refs_at(3);
+
    let refs_2 = helpers::gen_refs(3);
    let timeout_2 = Duration::from_secs(42);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout: Duration::from_secs(10),
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2.clone(),
+
        refs: refs_2.clone(),
        timeout: timeout_2,
    });

@@ -41,8 +41,7 @@ fn cannot_dequeue_while_node_at_capacity() {
    let result = state.dequeue(&node_a);
    let queued = result.unwrap();
    assert_eq!(queued.rid, repo_2);
-
    assert_eq!(queued.from, node_a);
-
    assert_eq!(queued.refs_at, refs_at_2);
+
    assert_eq!(queued.refs, refs_2);
    assert_eq!(queued.timeout, timeout_2);
}

@@ -59,7 +58,7 @@ fn maintains_fifo_order() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -67,19 +66,19 @@ fn maintains_fifo_order() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_4,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

modified crates/radicle-protocol/src/fetcher/test/state/helpers.rs
@@ -1,8 +1,8 @@
use std::num::NonZeroUsize;

-
use radicle::{storage::refs::RefsAt, test::arbitrary};
+
use radicle::test::arbitrary;

-
use crate::fetcher::{Config, MaxQueueSize};
+
use crate::fetcher::{Config, MaxQueueSize, RefsToFetch};

pub fn config(max_concurrency: usize, max_queue_size: usize) -> Config {
    Config::new()
@@ -12,6 +12,7 @@ pub fn config(max_concurrency: usize, max_queue_size: usize) -> Config {
        ))
}

-
pub fn gen_refs_at(count: usize) -> Vec<RefsAt> {
-
    (0..count).map(|_| arbitrary::gen(1)).collect()
+
pub fn gen_refs(count: usize) -> RefsToFetch {
+
    let refs: Vec<_> = (0..count).map(|_| arbitrary::gen(1)).collect();
+
    refs.into()
}
modified crates/radicle-protocol/src/fetcher/test/state/invariant.rs
@@ -13,21 +13,21 @@ fn queue_integrity_after_merge() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let refs_at_2a = helpers::gen_refs_at(1);
-
    let refs_at_2b = helpers::gen_refs_at(1);
+
    let refs_2a = helpers::gen_refs(1);
+
    let refs_2b = helpers::gen_refs(1);
    let timeout = Duration::from_secs(30);

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2a.clone(),
+
        refs: refs_2a.clone(),
        timeout,
    });

@@ -35,7 +35,7 @@ fn queue_integrity_after_merge() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
-
        refs_at: refs_at_2b.clone(),
+
        refs: refs_2b.clone(),
        timeout,
    });

modified crates/radicle-protocol/src/fetcher/test/state/multinode.rs
@@ -22,13 +22,13 @@ fn independent_queues() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_a_active,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_b,
        rid: repo_b_active,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -36,13 +36,13 @@ fn independent_queues() {
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_a_queued,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });
    state.fetch(command::Fetch {
        from: node_b,
        rid: repo_b_queued,
-
        refs_at: helpers::gen_refs_at(1),
+
        refs: helpers::gen_refs(1),
        timeout,
    });

@@ -73,7 +73,7 @@ fn high_count() {
        let event = state.fetch(command::Fetch {
            from: node,
            rid: repo,
-
            refs_at: helpers::gen_refs_at(1),
+
            refs: helpers::gen_refs(1),
            timeout,
        });
        assert!(matches!(event, event::Fetch::Started { .. }));
modified crates/radicle-protocol/src/service.rs
@@ -41,6 +41,7 @@ use radicle_fetch::policy::SeedingPolicy;
use crate::fetcher;
use crate::fetcher::service::FetcherService;
use crate::fetcher::FetcherState;
+
use crate::fetcher::RefsToFetch;
use crate::service::gossip::Store as _;
use crate::service::message::{
    Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
@@ -971,14 +972,13 @@ where
        refs: NonEmpty<RefsAt>,
        scope: Scope,
        timeout: time::Duration,
-
        channel: Option<chan::Sender<FetchResult>>,
    ) -> bool {
        match self.refs_status_of(rid, refs, &scope) {
            Ok(status) => {
                if status.want.is_empty() {
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
-
                    self.fetch(rid, from, status.want, timeout, channel);
+
                    self.fetch(rid, from, status.want, timeout, None);
                    return true;
                }
            }
@@ -1018,7 +1018,7 @@ where
        let cmd = fetcher::state::command::Fetch {
            from,
            rid,
-
            refs_at,
+
            refs: refs_at.into(),
            timeout,
        };
        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
@@ -1034,14 +1034,14 @@ where
            fetcher::state::event::Fetch::Started {
                rid,
                from,
-
                refs_at,
+
                refs: refs_at,
                timeout,
            } => {
                debug!(target: "service", "Starting fetch for {rid} from {from}");
                self.outbox.fetch(
                    session,
                    rid,
-
                    refs_at,
+
                    refs_at.into(),
                    timeout,
                    self.config.limits.fetch_pack_receive,
                );
@@ -1074,11 +1074,7 @@ where
            fetcher::state::event::Fetched::NotFound { from, rid } => {
                debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
            }
-
            fetcher::state::event::Fetched::Completed {
-
                from,
-
                rid,
-
                refs_at: _,
-
            } => {
+
            fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
                // Notify responders
                let fetch_result = match &result {
                    Ok(success) => FetchResult::Success {
@@ -1185,8 +1181,7 @@ where

            let Some(fetcher::QueuedFetch {
                rid,
-
                from,
-
                refs_at,
+
                refs: refs_at,
                timeout,
            }) = self.fetcher.dequeue(&nid)
            else {
@@ -1204,14 +1199,17 @@ where
                continue;
            };

-
            debug!(target: "service", "Dequeued fetch for {} from {}", rid, from);
+
            debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);

-
            // Channel is `None` in both cases since they will already be
-
            // registered with the fetcher service.
-
            if let Some(refs) = NonEmpty::from_vec(refs_at.clone()) {
-
                self.fetch_refs_at(rid, from, refs, scope, timeout, None);
-
            } else {
-
                self.fetch(rid, from, refs_at, timeout, None);
+
            match refs_at {
+
                RefsToFetch::Refs(refs) => {
+
                    self.fetch_refs_at(rid, nid, refs, scope, timeout);
+
                }
+
                RefsToFetch::All => {
+
                    // Channel is `None` since they will already be
+
                    // registered with the fetcher service.
+
                    self.fetch(rid, nid, vec![], timeout, None);
+
                }
            }
        }
    }
@@ -1664,7 +1662,7 @@ where
                    return Ok(relay);
                };
                // Finally, start the fetch.
-
                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
+
                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT);

                return Ok(relay);
            }