Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
protocol: Allow configuration of `radicle_fetch::Config`
Fintan Halpenny committed 1 month ago
commit 07f3d56559d67a390c0598d2944540d6c4e595a3
parent 81ca4b9
24 files changed +291 -201
modified crates/radicle-fetch/src/state.rs
@@ -94,13 +94,13 @@ pub mod error {
type IdentityTips = BTreeMap<PublicKey, Oid>;
type SigrefTips = BTreeMap<PublicKey, Oid>;

-
#[derive(Clone, Copy, Debug)]
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FetchLimit {
    pub special: u64,
    pub refs: u64,
}

-
#[derive(Clone, Copy, Debug, Default)]
+
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
pub struct Config {
    pub limit: FetchLimit,
    pub level_min: FeatureLevel,
modified crates/radicle-node/src/wire.rs
@@ -1010,9 +1010,9 @@ where
                Io::Fetch {
                    rid,
                    remote,
-
                    timeout,
                    reader_limit,
                    refs_at,
+
                    config,
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");

@@ -1026,8 +1026,9 @@ where
                        log::debug!(target: "wire", "Peer {remote} is not connected: dropping fetch");
                        continue;
                    };
-
                    let (stream, channels) =
-
                        streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));
+
                    let (stream, channels) = streams.open(
+
                        ChannelsConfig::new(config.timeout()).with_reader_limit(reader_limit),
+
                    );

                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");

@@ -1037,6 +1038,7 @@ where
                            rid,
                            remote,
                            refs_at,
+
                            config: config.fetch_config(),
                        },
                        stream,
                        channels,
modified crates/radicle-node/src/worker.rs
@@ -130,9 +130,10 @@ impl Worker {
                rid,
                remote,
                refs_at,
+
                config,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
-
                let result = self.fetch(rid, remote, refs_at, channels, notifs);
+
                let result = self.fetch(rid, remote, refs_at, config, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote, emitter } => {
@@ -238,11 +239,12 @@ impl Worker {
        rid: RepoId,
        remote: NodeId,
        refs_at: Option<Vec<RefsAt>>,
+
        fetch_config: radicle_fetch::Config,
        channels: channels::ChannelsFlush,
        notifs: notifications::StoreWriter,
    ) -> Result<fetch::FetchResult, FetchError> {
        let FetchConfig {
-
            config,
+
            config: _,
            local,
            expiry,
        } = &self.fetch_config;
@@ -266,7 +268,7 @@ impl Worker {
            &self.storage,
            &mut cache,
            &mut self.db,
-
            *config,
+
            fetch_config,
            remote,
            refs_at,
        )?;
modified crates/radicle-protocol/src/fetcher.rs
@@ -6,7 +6,9 @@ pub mod service;
pub use service::FetcherService;

pub mod state;
-
pub use state::{ActiveFetch, Config, FetcherState, MaxQueueSize, Queue, QueueIter, QueuedFetch};
+
pub use state::{
+
    ActiveFetch, Config, FetchConfig, FetcherState, MaxQueueSize, Queue, QueueIter, QueuedFetch,
+
};

#[cfg(test)]
mod test;
modified crates/radicle-protocol/src/fetcher/service.rs
@@ -165,11 +165,10 @@ mod tests {
    use radicle::storage::refs::RefsAt;
    use radicle::test::arbitrary;
    use std::num::NonZeroUsize;
-
    use std::time::Duration;

    use super::*;

-
    use crate::fetcher::MaxQueueSize;
+
    use crate::fetcher::{FetchConfig, MaxQueueSize};

    #[test]
    fn test_fetch_coalescing_different_refs() {
@@ -181,7 +180,7 @@ mod tests {
        let repo = arbitrary::gen(1);
        let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
        let refs_all = vec![];
-
        let timeout = Duration::from_secs(30);
+
        let config = FetchConfig::default();

        // fetch specific refs (Subscriber 1)
        let initiated1 = service.fetch(
@@ -189,7 +188,7 @@ mod tests {
                from: node,
                rid: repo,
                refs: refs_specific.clone().into(),
-
                timeout,
+
                config,
            },
            Some(1),
        );
@@ -202,7 +201,7 @@ mod tests {
                from: node,
                rid: repo,
                refs: refs_all.clone().into(),
-
                timeout,
+
                config,
            },
            Some(2),
        );
modified crates/radicle-protocol/src/fetcher/state.rs
@@ -10,6 +10,7 @@ pub mod event;

pub use command::Command;
pub use event::Event;
+
use radicle::storage::refs::FeatureLevel;

use std::collections::{BTreeMap, VecDeque};
use std::num::NonZeroUsize;
@@ -18,12 +19,86 @@ use std::time;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::RefsToFetch;
+
use crate::service::FETCH_TIMEOUT;

/// Default for the maximum items per fetch queue.
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
/// Default for maximum concurrency per node.
pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;

+
/// Configuration options for tuning the fetch process.
+
///
+
/// Note that these are not used directly by [`FetcherState`], but are
+
/// maintained within the state so that the options can be tracked across queued
+
/// fetches.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub struct FetchConfig {
+
    timeout: time::Duration,
+
    protocol: radicle_fetch::Config,
+
}
+

+
impl FetchConfig {
+
    /// Construct the default [`FetchConfig`].
+
    pub fn new() -> Self {
+
        Self {
+
            timeout: FETCH_TIMEOUT,
+
            protocol: radicle_fetch::Config::default(),
+
        }
+
    }
+

+
    /// Set the [`FetchConfig::timeout`] to the given [`time::Duration`].
+
    pub fn with_timeout(mut self, timeout: time::Duration) -> Self {
+
        self.timeout = timeout;
+
        self
+
    }
+

+
    /// Set the [`FetchConfig::fetch_config`] to the given [`radicle_fetch::Config`].
+
    pub fn with_fetch_config(mut self, config: radicle_fetch::Config) -> Self {
+
        self.protocol = config;
+
        self
+
    }
+

+
    /// Set the minimum feature level, within the [`FetchConfig::fetch_config`],
+
    /// to the given [`FeatureLevel`].
+
    pub fn with_minimum_feature_level(mut self, feature_level: FeatureLevel) -> Self {
+
        self.protocol.level_min = feature_level;
+
        self
+
    }
+

+
    /// Return the timeout duration configured for this fetch.
+
    pub fn timeout(&self) -> time::Duration {
+
        self.timeout
+
    }
+

+
    /// Return the [`radicle_fetch::Config`] configured for this fetch.
+
    pub fn fetch_config(&self) -> radicle_fetch::Config {
+
        self.protocol
+
    }
+

+
    /// Merge another [`FetchConfig`] with the current one.
+
    /// For each field, the following semantics occur:
+
    /// - `timeout`: the maximum timeout is taken
+
    /// - `protocol.limit.refs`: the maximum limit is taken
+
    /// - `protocol.limit.special`: the maximum limit is taken
+
    /// - `protocol.level_min`: the minimum level is taken
+
    fn merge(&mut self, other: FetchConfig) {
+
        self.timeout = self.timeout.max(other.timeout);
+
        self.protocol.limit.refs = self.protocol.limit.refs.max(other.protocol.limit.refs);
+
        self.protocol.limit.special = self
+
            .protocol
+
            .limit
+
            .special
+
            .max(other.protocol.limit.special);
+
        self.protocol.level_min = self.protocol.level_min.min(other.protocol.level_min);
+
    }
+
}
+

+
impl Default for FetchConfig {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

/// Logical state for Git fetches happening in the node.
///
/// A fetch can either be:
@@ -98,19 +173,19 @@ impl FetcherState {
            from,
            rid,
            refs,
-
            timeout,
+
            config,
        }: command::Fetch,
    ) -> event::Fetch {
        if let Some(active) = self.active.get(&rid) {
            if active.refs == refs && active.from == from {
                return event::Fetch::AlreadyFetching { rid, from };
            } else {
-
                return self.enqueue(rid, from, refs, timeout);
+
                return self.enqueue(rid, from, refs, config);
            }
        }

        if self.is_at_node_capacity(&from) {
-
            self.enqueue(rid, from, refs, timeout)
+
            self.enqueue(rid, from, refs, config)
        } else {
            self.active.insert(
                rid,
@@ -123,7 +198,7 @@ impl FetcherState {
                rid,
                from,
                refs,
-
                timeout,
+
                config,
            }
        }
    }
@@ -184,19 +259,19 @@ impl FetcherState {
        rid: RepoId,
        from: NodeId,
        refs: RefsToFetch,
-
        timeout: time::Duration,
+
        config: FetchConfig,
    ) -> event::Fetch {
        let queue = self
            .queues
            .entry(from)
            .or_insert(Queue::new(self.config.maximum_queue_size));
-
        match queue.enqueue(QueuedFetch { rid, refs, timeout }) {
-
            Enqueue::CapacityReached(QueuedFetch { rid, refs, timeout }) => {
+
        match queue.enqueue(QueuedFetch { rid, refs, config }) {
+
            Enqueue::CapacityReached(QueuedFetch { rid, refs, config }) => {
                event::Fetch::QueueAtCapacity {
                    rid,
                    from,
                    refs,
-
                    timeout,
+
                    config,
                    capacity: queue.len(),
                }
            }
@@ -296,8 +371,8 @@ pub struct QueuedFetch {
    pub rid: RepoId,
    /// The references that the fetch is being performed for.
    pub refs: RefsToFetch,
-
    /// The timeout given for the fetch request.
-
    pub timeout: time::Duration,
+
    /// The configuration options to pass to the fetch process.
+
    pub config: FetchConfig,
}

/// A queue for keeping track of fetches.
@@ -375,8 +450,7 @@ impl Queue {
    pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
        if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
            existing.refs = existing.refs.clone().merge(fetch.refs);
-
            // Take the longer timeout (more generous)
-
            existing.timeout = existing.timeout.max(fetch.timeout);
+
            existing.config.merge(fetch.config);
            return Enqueue::Merged;
        }

modified crates/radicle-protocol/src/fetcher/state/command.rs
@@ -1,9 +1,9 @@
-
use std::time;
-

use radicle_core::{NodeId, RepoId};

use crate::fetcher::RefsToFetch;

+
use super::FetchConfig;
+

/// Commands for transitioning the [`FetcherState`].
///
/// [`FetcherState`]: super::FetcherState
@@ -33,12 +33,12 @@ impl From<Cancel> for Command {
}

impl Command {
-
    pub fn fetch(from: NodeId, rid: RepoId, refs: RefsToFetch, timeout: time::Duration) -> Self {
+
    pub fn fetch(from: NodeId, rid: RepoId, refs: RefsToFetch, config: FetchConfig) -> Self {
        Self::from(Fetch {
            from,
            rid,
            refs,
-
            timeout,
+
            config,
        })
    }

@@ -60,8 +60,8 @@ pub struct Fetch {
    pub rid: RepoId,
    /// The references to fetch.
    pub refs: RefsToFetch,
-
    /// The timeout for the fetch process.
-
    pub timeout: time::Duration,
+
    /// The configuration options for the fetch process.
+
    pub config: FetchConfig,
}

/// A fetch wants to be marked as completed.
modified crates/radicle-protocol/src/fetcher/state/event.rs
@@ -1,11 +1,10 @@
use std::collections::{BTreeMap, VecDeque};
-
use std::time;

use radicle_core::{NodeId, RepoId};

use crate::fetcher::RefsToFetch;

-
use super::{ActiveFetch, QueuedFetch};
+
use super::{ActiveFetch, FetchConfig, QueuedFetch};

/// Event returned from [`FetchState::handle`].
///
@@ -46,8 +45,8 @@ pub enum Fetch {
        from: NodeId,
        /// The references to be fetched.
        refs: RefsToFetch,
-
        /// The timeout for the fetch process.
-
        timeout: time::Duration,
+
        /// The configuration options for the fetch process.
+
        config: FetchConfig,
    },
    /// The repository is already being fetched from the given node.
    AlreadyFetching {
@@ -65,8 +64,8 @@ pub enum Fetch {
        from: NodeId,
        /// The references expected to be fetched.
        refs: RefsToFetch,
-
        /// The timeout for the fetch process.
-
        timeout: time::Duration,
+
        /// The configuration options for the fetch process.
+
        config: FetchConfig,
        /// The capacity of the queue.
        capacity: usize,
    },
modified crates/radicle-protocol/src/fetcher/test/queue.rs
@@ -7,10 +7,13 @@ use std::time::Duration;

use qcheck::Arbitrary;

-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::{FeatureLevel, RefsAt};
use radicle_core::RepoId;

-
use crate::fetcher::state::{MaxQueueSize, QueuedFetch};
+
use crate::fetcher::{
+
    state::{MaxQueueSize, QueuedFetch},
+
    FetchConfig,
+
};

impl Arbitrary for QueuedFetch {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
@@ -21,7 +24,9 @@ impl Arbitrary for QueuedFetch {
        QueuedFetch {
            rid: RepoId::arbitrary(g),
            refs: refs_at.into(),
-
            timeout: Duration::from_secs(u64::arbitrary(g) % 3600),
+
            config: FetchConfig::default()
+
                .with_timeout(Duration::from_secs(u64::arbitrary(g) % 3600))
+
                .with_minimum_feature_level(FeatureLevel::arbitrary(g)),
        }
    }
}
modified crates/radicle-protocol/src/fetcher/test/queue/helpers.rs
@@ -1,8 +1,8 @@
-
use std::{num::NonZeroUsize, time::Duration};
+
use std::num::NonZeroUsize;

use radicle::test::arbitrary;

-
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch, RefsToFetch};
+
use crate::fetcher::{FetchConfig, MaxQueueSize, Queue, QueuedFetch, RefsToFetch};

pub fn create_queue(capacity: usize) -> Queue {
    Queue::new(MaxQueueSize::new(
@@ -14,7 +14,7 @@ pub fn create_fetch() -> QueuedFetch {
    QueuedFetch {
        rid: arbitrary::gen(1),
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config: FetchConfig::default(),
    }
}

modified crates/radicle-protocol/src/fetcher/test/queue/properties/merge.rs
@@ -8,7 +8,7 @@ use radicle_core::RepoId;

use crate::fetcher::state::Enqueue;
use crate::fetcher::test::queue::helpers::*;
-
use crate::fetcher::RefsToFetch;
+
use crate::fetcher::{FetchConfig, RefsToFetch};
use crate::fetcher::{MaxQueueSize, Queue, QueuedFetch};

#[quickcheck]
@@ -33,7 +33,7 @@ fn same_rid_merges_anywhere_in_queue(max_size: MaxQueueSize, merge_index: usize)
    let same_rid_item = QueuedFetch {
        rid: items[target_index].rid,
        refs: vec![arbitrary::gen(1)].into(),
-
        timeout: Duration::from_secs(60),
+
        config: FetchConfig::default(),
    };

    matches!(queue.enqueue(same_rid_item), Enqueue::Merged)
@@ -45,6 +45,7 @@ fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {
    let merge_refs_count = (merge_refs_count as usize) % 5;

    let mut queue = create_queue(10);
+
    let config = FetchConfig::default();

    let rid: RepoId = arbitrary::gen(1);
    let base_refs: Vec<RefsAt> = (0..base_refs_count).map(|_| arbitrary::gen(1)).collect();
@@ -53,13 +54,13 @@ fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {
    let base_item = QueuedFetch {
        rid,
        refs: base_refs.clone().into(),
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    };

    let merge_item = QueuedFetch {
        rid,
        refs: merge_refs.clone().into(),
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    };

    let _ = queue.enqueue(base_item);
@@ -84,19 +85,20 @@ fn combines_refs(base_refs_count: u8, merge_refs_count: u8) -> bool {
fn empty_refs_fetches_all() -> bool {
    let mut queue = create_queue(10);
    let rid: RepoId = arbitrary::gen(1);
+
    let config = FetchConfig::default();

    // First enqueue with specific refs
    let item_with_refs = QueuedFetch {
        rid,
        refs: vec![arbitrary::gen(1), arbitrary::gen(1)].into(),
-
        timeout: Duration::from_secs(30),
+
        config,
    };

    // Second enqueue with empty refs (fetch everything)
    let item_empty_refs = QueuedFetch {
        rid,
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config,
    };

    let _ = queue.enqueue(item_with_refs);
@@ -110,20 +112,20 @@ fn empty_refs_fetches_all() -> bool {
fn longer_timeout_preserved(short_secs: u16, long_secs: u16) -> bool {
    let short = Duration::from_secs(short_secs.min(long_secs) as u64);
    let long = Duration::from_secs(short_secs.max(long_secs) as u64);
-

+
    let config = FetchConfig::default();
    let mut queue = create_queue(10);
    let rid: RepoId = arbitrary::gen(1);

    let item_short = QueuedFetch {
        rid,
        refs: RefsToFetch::All,
-
        timeout: short,
+
        config: config.with_timeout(short),
    };

    let item_long = QueuedFetch {
        rid,
        refs: RefsToFetch::All,
-
        timeout: long,
+
        config: config.with_timeout(long),
    };

    // Test both orderings
@@ -136,24 +138,25 @@ fn longer_timeout_preserved(short_secs: u16, long_secs: u16) -> bool {
    let _ = queue2.enqueue(item_short);
    let dequeued2 = queue2.dequeue().unwrap();

-
    dequeued1.timeout == long && dequeued2.timeout == long
+
    dequeued1.config.timeout() == long && dequeued2.config.timeout() == long
}

#[quickcheck]
fn does_not_increase_queue_length() -> bool {
    let mut queue = create_queue(10);
    let rid: RepoId = arbitrary::gen(1);
+
    let config = FetchConfig::default();

    let item1 = QueuedFetch {
        rid,
        refs: vec![arbitrary::gen(1)].into(),
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    };

    let item2 = QueuedFetch {
        rid,
        refs: vec![arbitrary::gen(1)].into(),
-
        timeout: Duration::from_secs(60),
+
        config: config.with_timeout(Duration::from_secs(60)),
    };

    let _ = queue.enqueue(item1);
@@ -184,23 +187,24 @@ fn succeed_when_at_capacity() -> bool {
    // When queue is at capacity, merging with existing item should still work
    let mut queue = create_queue(2);
    let rid: RepoId = arbitrary::gen(1);
+
    let config = FetchConfig::default();

    let item1 = QueuedFetch {
        rid,
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    };

    let item2 = QueuedFetch {
        rid: arbitrary::gen(1), // Different rid
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    };

    let merge_item = QueuedFetch {
        rid, // Same as item1
        refs: vec![arbitrary::gen(1)].into(),
-
        timeout: Duration::from_secs(60),
+
        config: config.with_timeout(Duration::from_secs(60)),
    };

    let _ = queue.enqueue(item1);
modified crates/radicle-protocol/src/fetcher/test/queue/unit.rs
@@ -5,16 +5,18 @@ use radicle_core::RepoId;

use crate::fetcher::state::Enqueue;
use crate::fetcher::test::queue::helpers::*;
+
use crate::fetcher::FetchConfig;
use crate::fetcher::QueuedFetch;
use crate::fetcher::RefsToFetch;

#[test]
fn zero_timeout_accepted() {
    let mut queue = create_queue(10);
+
    let config = FetchConfig::default().with_timeout(Duration::ZERO);
    let item = QueuedFetch {
        rid: arbitrary::gen(1),
        refs: RefsToFetch::All,
-
        timeout: Duration::ZERO,
+
        config,
    };
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
}
@@ -22,10 +24,11 @@ fn zero_timeout_accepted() {
#[test]
fn max_timeout_accepted() {
    let mut queue = create_queue(10);
+
    let config = FetchConfig::default().with_timeout(Duration::MAX);
    let item = QueuedFetch {
        rid: arbitrary::gen(1),
        refs: RefsToFetch::All,
-
        timeout: Duration::MAX,
+
        config,
    };
    assert_eq!(queue.enqueue(item), Enqueue::Queued);
}
@@ -33,17 +36,17 @@ fn max_timeout_accepted() {
#[test]
fn empty_refs_items_can_be_equal() {
    let rid: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    let item1 = QueuedFetch {
        rid,
        refs: RefsToFetch::All,
-
        timeout,
+
        config,
    };
    let item2 = QueuedFetch {
        rid,
        refs: RefsToFetch::All,
-
        timeout,
+
        config,
    };

    assert_eq!(item1, item2);
@@ -56,29 +59,30 @@ fn merge_preserves_position_in_queue() {
    let rid_first: RepoId = arbitrary::gen(1);
    let rid_second: RepoId = arbitrary::gen(2);
    let rid_third: RepoId = arbitrary::gen(3);
+
    let config = FetchConfig::default();

    // Enqueue three items
    let _ = queue.enqueue(QueuedFetch {
        rid: rid_first,
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    });
    let _ = queue.enqueue(QueuedFetch {
        rid: rid_second,
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    });
    let _ = queue.enqueue(QueuedFetch {
        rid: rid_third,
        refs: RefsToFetch::All,
-
        timeout: Duration::from_secs(30),
+
        config: config.with_timeout(Duration::from_secs(30)),
    });

    // Merge into the second item
    let result = queue.enqueue(QueuedFetch {
        rid: rid_second,
        refs: vec![arbitrary::gen(1)].into(),
-
        timeout: Duration::from_secs(60),
+
        config: config.with_timeout(Duration::from_secs(60)),
    });
    assert_eq!(result, Enqueue::Merged);

modified crates/radicle-protocol/src/fetcher/test/state/command/cancel.rs
@@ -1,11 +1,9 @@
-
use std::time::Duration;
-

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::{ActiveFetch, FetcherState};
+
use crate::fetcher::{ActiveFetch, FetchConfig, FetcherState};

#[test]
fn single_ongoing() {
@@ -13,13 +11,13 @@ fn single_ongoing() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    let event = state.cancel(command::Cancel { from: node_a });
@@ -53,25 +51,25 @@ fn ongoing_and_queued() {
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    let event = state.cancel(command::Cancel { from: node_a });
@@ -107,19 +105,19 @@ fn cancellation_is_isolated() {
    let node_b: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_b,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    state.cancel(command::Cancel { from: node_a });
modified crates/radicle-protocol/src/fetcher/test/state/command/fetch.rs
@@ -5,8 +5,8 @@ use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::RefsToFetch;
use crate::fetcher::{ActiveFetch, FetcherState};
+
use crate::fetcher::{FetchConfig, RefsToFetch};

#[test]
fn fetch_start_first_fetch_for_node() {
@@ -14,13 +14,13 @@ fn fetch_start_first_fetch_for_node() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(2);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    assert_eq!(
@@ -29,7 +29,7 @@ fn fetch_start_first_fetch_for_node() {
            rid: repo_1,
            from: node_a,
            refs: refs_1.clone(),
-
            timeout,
+
            config,
        }
    );
    assert_eq!(
@@ -47,13 +47,13 @@ fn fetch_different_repo_same_node_within_capacity() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    assert!(matches!(event1, event::Fetch::Started { .. }));

@@ -61,7 +61,7 @@ fn fetch_different_repo_same_node_within_capacity() {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    assert!(matches!(event2, event::Fetch::Started { rid, .. } if rid == repo_2));
@@ -76,13 +76,13 @@ fn fetch_same_repo_different_nodes_queues_second() {
    let node_b: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });
    assert!(matches!(event1, event::Fetch::Started { .. }));

@@ -91,7 +91,7 @@ fn fetch_same_repo_different_nodes_queues_second() {
        from: node_b,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    assert!(
@@ -109,20 +109,20 @@ fn fetch_duplicate_returns_already_fetching() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(2);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    assert_eq!(
@@ -141,20 +141,20 @@ fn fetch_same_repo_different_refs_enqueues() {
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(1);
    let refs_2 = helpers::gen_refs(2);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_2.clone(),
-
        timeout,
+
        config,
    });

    assert_eq!(
@@ -172,20 +172,20 @@ fn fetch_at_capacity_enqueues() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    assert_eq!(
@@ -207,14 +207,14 @@ fn fetch_queue_rejected_capacity_reached() {
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
    let repo_4: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    // Fill concurrency
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    // Fill queue (capacity 2)
@@ -222,13 +222,13 @@ fn fetch_queue_rejected_capacity_reached() {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    // Exceed queue capacity
@@ -237,7 +237,7 @@ fn fetch_queue_rejected_capacity_reached() {
        from: node_a,
        rid: repo_4,
        refs: refs_4.clone(),
-
        timeout,
+
        config,
    });

    assert_eq!(
@@ -246,7 +246,7 @@ fn fetch_queue_rejected_capacity_reached() {
            rid: repo_4,
            from: node_a,
            refs: refs_4,
-
            timeout,
+
            config,
            capacity: 2,
        }
    );
@@ -260,20 +260,20 @@ fn fetch_queue_merges_already_queued() {
    let repo_2: RepoId = arbitrary::gen(1);
    let refs_2a = helpers::gen_refs(1);
    let refs_2b = helpers::gen_refs(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: refs_2a.clone(),
-
        timeout,
+
        config,
    });

    // Second fetch for same queued repo - should merge refs
@@ -281,7 +281,7 @@ fn fetch_queue_merges_already_queued() {
        from: node_a,
        rid: repo_2,
        refs: refs_2b.clone(),
-
        timeout,
+
        config,
    });

    // Returns Queued (merged)
@@ -319,13 +319,13 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
    let refs_2 = helpers::gen_refs(2);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    // Queue with specific refs
@@ -333,7 +333,7 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
        from: node_a,
        rid: repo_2,
        refs: refs_2.clone(),
-
        timeout,
+
        config,
    });

    // Queue again with empty refs (fetch everything)
@@ -341,7 +341,7 @@ fn fetch_queue_merge_empty_refs_fetches_all() {
        from: node_a,
        rid: repo_2,
        refs: RefsToFetch::All,
-
        timeout,
+
        config,
    });

    // Dequeue and verify refs became empty (fetch all)
@@ -362,12 +362,13 @@ fn fetch_queue_merge_takes_longer_timeout() {
    let repo_2: RepoId = arbitrary::gen(1);
    let short_timeout = Duration::from_secs(10);
    let long_timeout = Duration::from_secs(60);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout: short_timeout,
+
        config: config.with_timeout(short_timeout),
    });

    // Queue with short timeout
@@ -375,7 +376,7 @@ fn fetch_queue_merge_takes_longer_timeout() {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout: short_timeout,
+
        config: config.with_timeout(short_timeout),
    });

    // Queue again with longer timeout
@@ -383,7 +384,7 @@ fn fetch_queue_merge_takes_longer_timeout() {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout: long_timeout,
+
        config: config.with_timeout(long_timeout),
    });

    state.fetched(command::Fetched {
@@ -392,7 +393,7 @@ fn fetch_queue_merge_takes_longer_timeout() {
    });
    // Dequeue and verify timeout is the longer one
    let queued = state.dequeue(&node_a).unwrap();
-
    assert_eq!(queued.timeout, long_timeout);
+
    assert_eq!(queued.config.timeout(), long_timeout);
}

#[test]
@@ -401,13 +402,13 @@ fn fetch_after_previous_completed() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });
    state.fetched(command::Fetched {
        from: node_a,
@@ -418,7 +419,7 @@ fn fetch_after_previous_completed() {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    assert!(matches!(event, event::Fetch::Started { .. }));
modified crates/radicle-protocol/src/fetcher/test/state/command/fetched.rs
@@ -1,11 +1,9 @@
-
use std::time::Duration;
-

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::FetcherState;
+
use crate::fetcher::{FetchConfig, FetcherState};

#[test]
fn complete_single_ongoing() {
@@ -13,13 +11,13 @@ fn complete_single_ongoing() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let refs_1 = helpers::gen_refs(2);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
-
        timeout,
+
        config,
    });

    let event = state.fetched(command::Fetched {
@@ -46,13 +44,13 @@ fn complete_then_dequeue_fifo() {
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
    let refs_2 = helpers::gen_refs(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    // Queue repo_2 first, then repo_3
@@ -60,13 +58,13 @@ fn complete_then_dequeue_fifo() {
        from: node_a,
        rid: repo_2,
        refs: refs_2.clone(),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    let event = state.fetched(command::Fetched {
@@ -91,25 +89,25 @@ fn complete_one_of_multiple() {
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    let event = state.fetched(command::Fetched {
modified crates/radicle-protocol/src/fetcher/test/state/concurrent.rs
@@ -1,11 +1,9 @@
-
use std::time::Duration;
-

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::FetcherState;
+
use crate::fetcher::{FetchConfig, FetcherState};

#[test]
fn interleaved_operations() {
@@ -15,14 +13,14 @@ fn interleaved_operations() {
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    // fetch(A, r1)
    let e1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    assert!(matches!(e1, event::Fetch::Started { .. }));

@@ -31,7 +29,7 @@ fn interleaved_operations() {
        from: node_b,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    assert!(matches!(e2, event::Fetch::Started { .. }));

@@ -47,7 +45,7 @@ fn interleaved_operations() {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    assert!(matches!(e4, event::Fetch::Started { .. }));

@@ -70,19 +68,19 @@ fn fetched_then_cancel() {
    let node_a: NodeId = arbitrary::gen(1);
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    // Complete repo_1
modified crates/radicle-protocol/src/fetcher/test/state/config.rs
@@ -1,17 +1,15 @@
-
use std::time::Duration;
-

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::FetcherState;
+
use crate::fetcher::{FetchConfig, FetcherState};

#[test]
fn high_concurrency() {
    let mut state = FetcherState::new(helpers::config(100, 10));
    let node_a: NodeId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    for i in 0..100 {
        let repo: RepoId = arbitrary::gen(i + 1);
@@ -19,7 +17,7 @@ fn high_concurrency() {
            from: node_a,
            rid: repo,
            refs: helpers::gen_refs(1),
-
            timeout,
+
            config,
        });
        assert!(
            matches!(event, event::Fetch::Started { .. }),
@@ -45,20 +43,20 @@ fn min_queue_size() {
    let repo_1: RepoId = arbitrary::gen(1);
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    assert!(matches!(event1, event::Fetch::Queued { .. }));

@@ -66,7 +64,7 @@ fn min_queue_size() {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    assert!(matches!(event2, event::Fetch::QueueAtCapacity { .. }));
}
modified crates/radicle-protocol/src/fetcher/test/state/dequeue.rs
@@ -5,7 +5,7 @@ use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::command;
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::FetcherState;
+
use crate::fetcher::{FetchConfig, FetcherState};

#[test]
fn cannot_dequeue_while_node_at_capacity() {
@@ -20,14 +20,14 @@ fn cannot_dequeue_while_node_at_capacity() {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout: Duration::from_secs(10),
+
        config: FetchConfig::default().with_timeout(Duration::from_secs(10)),
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: refs_2.clone(),
-
        timeout: timeout_2,
+
        config: FetchConfig::default().with_timeout(timeout_2),
    });

    let result = state.dequeue(&node_a);
@@ -42,7 +42,7 @@ fn cannot_dequeue_while_node_at_capacity() {
    let queued = result.unwrap();
    assert_eq!(queued.rid, repo_2);
    assert_eq!(queued.refs, refs_2);
-
    assert_eq!(queued.timeout, timeout_2);
+
    assert_eq!(queued.config.timeout(), timeout_2);
}

#[test]
@@ -53,13 +53,13 @@ fn maintains_fifo_order() {
    let repo_2: RepoId = arbitrary::gen(1);
    let repo_3: RepoId = arbitrary::gen(1);
    let repo_4: RepoId = arbitrary::gen(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    // Queue in order: repo_2, repo_3, repo_4
@@ -67,19 +67,19 @@ fn maintains_fifo_order() {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_4,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    state.fetched(command::Fetched {
modified crates/radicle-protocol/src/fetcher/test/state/invariant.rs
@@ -1,11 +1,9 @@
-
use std::time::Duration;
-

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::command;
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::FetcherState;
+
use crate::fetcher::{FetchConfig, FetcherState};

#[test]
fn queue_integrity_after_merge() {
@@ -15,20 +13,20 @@ fn queue_integrity_after_merge() {
    let repo_2: RepoId = arbitrary::gen(1);
    let refs_2a = helpers::gen_refs(1);
    let refs_2b = helpers::gen_refs(1);
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config,
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: refs_2a.clone(),
-
        timeout,
+
        config,
    });

    // Second fetch for same repo - should merge
@@ -36,7 +34,7 @@ fn queue_integrity_after_merge() {
        from: node_a,
        rid: repo_2,
        refs: refs_2b.clone(),
-
        timeout,
+
        config,
    });

    // Queue should have exactly one repo_2 entry (merged)
modified crates/radicle-protocol/src/fetcher/test/state/multinode.rs
@@ -1,11 +1,9 @@
-
use std::time::Duration;
-

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
-
use crate::fetcher::FetcherState;
+
use crate::fetcher::{FetchConfig, FetcherState};

#[test]
fn independent_queues() {
@@ -16,20 +14,20 @@ fn independent_queues() {
    let repo_b_active: RepoId = arbitrary::gen(2);
    let repo_a_queued: RepoId = arbitrary::gen(10);
    let repo_b_queued: RepoId = arbitrary::gen(20);
-
    let timeout = Duration::from_secs(30);
+
    let fetch_config = FetchConfig::default();

    // Fill capacity for both nodes
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_a_active,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config: fetch_config,
    });
    state.fetch(command::Fetch {
        from: node_b,
        rid: repo_b_active,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config: fetch_config,
    });

    // Queue for both
@@ -37,13 +35,13 @@ fn independent_queues() {
        from: node_a,
        rid: repo_a_queued,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config: fetch_config,
    });
    state.fetch(command::Fetch {
        from: node_b,
        rid: repo_b_queued,
        refs: helpers::gen_refs(1),
-
        timeout,
+
        config: fetch_config,
    });

    // Dequeue from A doesn't affect B
@@ -65,7 +63,7 @@ fn independent_queues() {
#[test]
fn high_count() {
    let mut state = FetcherState::new(helpers::config(1, 10));
-
    let timeout = Duration::from_secs(30);
+
    let config = FetchConfig::default();

    for i in 0..100 {
        let node: NodeId = arbitrary::gen(i + 1);
@@ -74,7 +72,7 @@ fn high_count() {
            from: node,
            rid: repo,
            refs: helpers::gen_refs(1),
-
            timeout,
+
            config,
        });
        assert!(matches!(event, event::Fetch::Started { .. }));
    }
modified crates/radicle-protocol/src/service.rs
@@ -864,7 +864,9 @@ where
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
-
                self.fetch(rid, seed, vec![], timeout, Some(resp));
+
                // TODO(finto): pass through feature-level
+
                let config = self.fetch_config().with_timeout(timeout);
+
                self.fetch(rid, seed, vec![], config, Some(resp));
            }
            Command::Seed(rid, scope, resp) => {
                // Update our seeding policy.
@@ -971,14 +973,14 @@ where
        from: NodeId,
        refs: NonEmpty<RefsAt>,
        scope: Scope,
-
        timeout: time::Duration,
+
        config: fetcher::FetchConfig,
    ) -> bool {
        match self.refs_status_of(rid, refs, &scope) {
            Ok(status) => {
                if status.want.is_empty() {
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
-
                    self.fetch(rid, from, status.want, timeout, None);
+
                    self.fetch(rid, from, status.want, config, None);
                    return true;
                }
            }
@@ -995,7 +997,7 @@ where
        rid: RepoId,
        from: NodeId,
        refs_at: Vec<RefsAt>,
-
        timeout: time::Duration,
+
        config: fetcher::FetchConfig,
        channel: Option<command::Responder<FetchResult>>,
    ) {
        let session = {
@@ -1019,7 +1021,7 @@ where
            from,
            rid,
            refs: refs_at.into(),
-
            timeout,
+
            config,
        };
        let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);

@@ -1035,15 +1037,15 @@ where
                rid,
                from,
                refs: refs_at,
-
                timeout,
+
                config,
            } => {
                debug!(target: "service", "Starting fetch for {rid} from {from}");
                self.outbox.fetch(
                    session,
                    rid,
                    refs_at.into(),
-
                    timeout,
                    self.config.limits.fetch_pack_receive,
+
                    config,
                );
            }
            fetcher::state::event::Fetch::Queued { rid, from } => {
@@ -1182,7 +1184,7 @@ where
            let Some(fetcher::QueuedFetch {
                rid,
                refs: refs_at,
-
                timeout,
+
                config,
            }) = self.fetcher.dequeue(&nid)
            else {
                continue;
@@ -1203,12 +1205,12 @@ where

            match refs_at {
                RefsToFetch::Refs(refs) => {
-
                    self.fetch_refs_at(rid, nid, refs, scope, timeout);
+
                    self.fetch_refs_at(rid, nid, refs, scope, config);
                }
                RefsToFetch::All => {
                    // Channel is `None` since they will already be
                    // registered with the fetcher service.
-
                    self.fetch(rid, nid, vec![], timeout, None);
+
                    self.fetch(rid, nid, vec![], config, None);
                }
            }
        }
@@ -1592,7 +1594,7 @@ where

                for rid in missing {
                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
-
                    self.fetch(rid, *announcer, vec![], FETCH_TIMEOUT, None);
+
                    self.fetch(rid, *announcer, vec![], self.fetch_config(), None);
                }
                return Ok(relay);
            }
@@ -1682,7 +1684,7 @@ where
                    return Ok(relay);
                };
                // Finally, start the fetch.
-
                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT);
+
                self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_config());

                return Ok(relay);
            }
@@ -2513,7 +2515,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, seed.nid, vec![], FETCH_TIMEOUT, None);
+
                            self.fetch(rid, seed.nid, vec![], self.fetch_config(), None);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
@@ -2661,6 +2663,11 @@ where
            AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
        }
    }
+

+
    fn fetch_config(&self) -> fetcher::FetchConfig {
+
        fetcher::FetchConfig::default()
+
            .with_minimum_feature_level(self.config.fetch.feature_level_min())
+
    }
}

/// Gives read access to the service state.
modified crates/radicle-protocol/src/service/io.rs
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
-
use std::time;

use localtime::LocalDuration;
use log::*;
@@ -9,6 +8,7 @@ use radicle::node::Address;
use radicle::node::NodeId;
use radicle::storage::refs::RefsAt;

+
use crate::fetcher;
use crate::service::message::Message;
use crate::service::session::Session;
use crate::service::DisconnectReason;
@@ -34,10 +34,11 @@ pub enum Io {
        remote: NodeId,
        /// If the node is fetching specific `rad/sigrefs`.
        refs_at: Option<Vec<RefsAt>>,
-
        /// Fetch timeout.
-
        timeout: time::Duration,
        /// Limit the number of bytes fetched.
        reader_limit: FetchPackSizeLimit,
+
        /// Options for configuring the fetch worker, such as timeout, and
+
        /// internal fetch protocol options.
+
        config: fetcher::FetchConfig,
    },
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
@@ -135,8 +136,8 @@ impl Outbox {
        peer: &mut Session,
        rid: RepoId,
        refs_at: Vec<RefsAt>,
-
        timeout: time::Duration,
        reader_limit: FetchPackSizeLimit,
+
        config: fetcher::FetchConfig,
    ) {
        let refs_at = (!refs_at.is_empty()).then_some(refs_at);

@@ -153,8 +154,8 @@ impl Outbox {
            rid,
            refs_at,
            remote: peer.id,
-
            timeout,
            reader_limit,
+
            config,
        });
    }

modified crates/radicle-protocol/src/worker.rs
@@ -8,12 +8,8 @@ use radicle::node::Event;
use radicle::prelude::NodeId;
use radicle::storage::refs::RefsAt;

-
// use crate::runtime::{thread, Emitter, Handle};
-

use radicle::node::events::Emitter;

-
// pub use channels::{ChannelEvent, Channels, ChannelsConfig};
-

/// Error returned by fetch.
#[derive(thiserror::Error, Debug)]
pub enum FetchError {
@@ -82,6 +78,10 @@ pub enum FetchRequest {
        remote: NodeId,
        /// If this fetch is for a particular set of `rad/sigrefs`.
        refs_at: Option<Vec<RefsAt>>,
+
        /// [`Config`] options when initiating the fetch protocol.
+
        ///
+
        /// [`Config`]: radicle_fetch::Config.
+
        config: radicle_fetch::Config,
    },
    /// Server is responding to a fetch request by uploading the
    /// specified `refspecs` sent by the client.
modified crates/radicle/src/storage/refs.rs
@@ -281,7 +281,9 @@ where
///
/// Feature levels are monotonic, in the sense that a greater feature level
/// encompasses all the features of smaller ones.
-
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
+
#[derive(
+
    Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default, Hash, Serialize, Deserialize,
+
)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[non_exhaustive]