Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Make fetch timeout configurable
cloudhead committed 2 years ago
commit 09a284f09cdf2700d6817b8cebc7e11187d2018b
parent a1df6d37488c01b6db5a791a87d6f15279068c3e
15 files changed +127 -65
modified radicle-cli/examples/rad-init-private.md
@@ -18,11 +18,12 @@ Bob tries to clone it, and even though he's connected to Alice, it fails.
``` ~bob
$ rad track rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu
✓ Tracking policy updated for rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu with scope 'trusted'
+
$ rad ls
```
``` ~bob (fail)
-
$ rad sync rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu --fetch --seed z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi --timeout 3
-
✗ Fetching rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu from z6MknSL…StBU8Vi.. <canceled>
-
✗ Sync failed: failed to call node: i/o: timed out reading from control socket
+
$ rad sync rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu --fetch --seed z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi --timeout 1
+
✗ Fetching rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu from z6MknSL…StBU8Vi.. error: connection reset
+
✗ Sync failed: repository fetch from 1 seed(s) failed
```

She allows Bob to view the repository. And when she syncs, one node (Bob) gets
@@ -34,7 +35,7 @@ e98cd6a0a3e94837b382e59e02b3ea83991a8244
$ rad id accept e98cd6a0a3e94837b382e59e02b3ea83991a8244 -q
$ rad id commit e98cd6a0a3e94837b382e59e02b3ea83991a8244 -q
c568f8aac97db40a5e63e1261872bfbd9a3a61e4
-
$ rad sync --announce --timeout 1
+
$ rad sync --announce --timeout 3
✓ Synced with 1 node(s)
```

modified radicle-cli/src/commands/sync.rs
@@ -262,7 +262,7 @@ pub fn fetch(
        SyncMode::Seeds(seeds) => {
            let mut results = FetchResults::default();
            for seed in seeds {
-
                let result = fetch_from(rid, &seed, node)?;
+
                let result = fetch_from(rid, &seed, timeout, node)?;
                results.push(seed, result);
            }
            Ok(results)
@@ -284,7 +284,7 @@ fn fetch_all(

    // Fetch from connected seeds.
    for seed in connected.iter().take(count) {
-
        let result = fetch_from(rid, &seed.nid, node)?;
+
        let result = fetch_from(rid, &seed.nid, timeout, node)?;
        results.push(seed.nid, result);
    }

@@ -312,7 +312,7 @@ fn fetch_all(
            match cr {
                node::ConnectResult::Connected => {
                    spinner.finish();
-
                    let result = fetch_from(rid, &seed.nid, node)?;
+
                    let result = fetch_from(rid, &seed.nid, timeout, node)?;
                    results.push(seed.nid, result);
                    break;
                }
@@ -327,13 +327,18 @@ fn fetch_all(
    Ok(results)
}

-
fn fetch_from(rid: Id, seed: &NodeId, node: &mut Node) -> Result<FetchResult, node::Error> {
+
fn fetch_from(
+
    rid: Id,
+
    seed: &NodeId,
+
    timeout: time::Duration,
+
    node: &mut Node,
+
) -> Result<FetchResult, node::Error> {
    let spinner = term::spinner(format!(
        "Fetching {} from {}..",
        term::format::tertiary(rid),
        term::format::tertiary(term::format::node(seed))
    ));
-
    let result = node.fetch(rid, *seed)?;
+
    let result = node.fetch(rid, *seed, timeout)?;

    match &result {
        FetchResult::Success { .. } => {
modified radicle-cli/tests/commands.rs
@@ -6,8 +6,8 @@ use radicle::git;
use radicle::node;
use radicle::node::address::Store as _;
use radicle::node::routing::Store as _;
-
use radicle::node::Alias;
use radicle::node::Handle as _;
+
use radicle::node::{Alias, DEFAULT_TIMEOUT};
use radicle::prelude::Id;
use radicle::profile::Home;
use radicle::storage::{ReadRepository, ReadStorage};
@@ -864,7 +864,7 @@ fn test_cob_deletion() {
    log::debug!(target: "test", "Removing issue..");

    radicle::assert_matches!(
-
        bob.handle.fetch(rid, alice.id).unwrap(),
+
        bob.handle.fetch(rid, alice.id, DEFAULT_TIMEOUT).unwrap(),
        radicle::node::FetchResult::Success { .. }
    );
    let bob_repo = bob.storage.repository(rid).unwrap();
modified radicle-node/src/control.rs
@@ -102,8 +102,8 @@ where
                }
            }
        }
-
        Command::Fetch { rid, nid } => {
-
            fetch(rid, nid, writer, &mut handle)?;
+
        Command::Fetch { rid, nid, timeout } => {
+
            fetch(rid, nid, timeout, writer, &mut handle)?;
        }
        Command::Seeds { rid } => {
            let seeds = handle.seeds(rid)?;
@@ -201,10 +201,11 @@ where
fn fetch<W: Write, H: Handle<Error = runtime::HandleError>>(
    id: Id,
    node: NodeId,
+
    timeout: time::Duration,
    mut writer: W,
    handle: &mut H,
) -> Result<(), CommandError> {
-
    match handle.fetch(id, node) {
+
    match handle.fetch(id, node, timeout) {
        Ok(result) => {
            json::to_writer(&mut writer, &result)?;
        }
modified radicle-node/src/runtime/handle.rs
@@ -183,9 +183,14 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn fetch(&mut self, id: Id, from: NodeId) -> Result<FetchResult, Error> {
+
    fn fetch(
+
        &mut self,
+
        id: Id,
+
        from: NodeId,
+
        timeout: time::Duration,
+
    ) -> Result<FetchResult, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Fetch(id, from, sender))?;
+
        self.command(service::Command::Fetch(id, from, timeout, sender))?;
        receiver.recv().map_err(Error::from)
    }

modified radicle-node/src/service.rs
@@ -10,9 +10,9 @@ pub mod tracking;

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
-
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
+
use std::{fmt, time};

use crossbeam_channel as chan;
use fastrand::Rng;
@@ -79,6 +79,8 @@ pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Connection retry delta used for ephemeral peers that failed to connect previously.
pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
+
/// How long to wait for a fetch to stall before aborting.
+
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(9);

/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
@@ -139,7 +141,7 @@ pub enum Command {
    /// Lookup seeds for the given repository in the routing table.
    Seeds(Id, chan::Sender<Seeds>),
    /// Fetch the given repository from the network.
-
    Fetch(Id, NodeId, chan::Sender<FetchResult>),
+
    Fetch(Id, NodeId, time::Duration, chan::Sender<FetchResult>),
    /// Track the given repository.
    TrackRepo(Id, Scope, chan::Sender<bool>),
    /// Untrack the given repository.
@@ -161,7 +163,7 @@ impl fmt::Debug for Command {
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
-
            Self::Fetch(id, node, _) => write!(f, "Fetch({id}, {node})"),
+
            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
            Self::TrackRepo(id, scope, _) => write!(f, "TrackRepo({id}, {scope})"),
            Self::UntrackRepo(id, _) => write!(f, "UntrackRepo({id})"),
            Self::TrackNode(id, _, _) => write!(f, "TrackNode({id})"),
@@ -530,10 +532,10 @@ where
                    error!(target: "service", "Error reading routing table for {rid}: {e}");
                }
            },
-
            Command::Fetch(rid, seed, resp) => {
+
            Command::Fetch(rid, seed, timeout, resp) => {
                // TODO: Establish connections to unconnected seeds, and retry.
                self.fetch_reqs.insert((rid, seed), resp);
-
                self.fetch(rid, &seed);
+
                self.fetch(rid, &seed, timeout);
            }
            Command::TrackRepo(rid, scope, resp) => {
                // Update our tracking policy.
@@ -596,7 +598,7 @@ where
    }

    /// Initiate an outgoing fetch for some repository.
-
    fn fetch(&mut self, rid: Id, from: &NodeId) {
+
    fn fetch(&mut self, rid: Id, from: &NodeId, timeout: time::Duration) {
        let Some(session) = self.sessions.get_mut(from) else {
            error!(target: "service", "Session {from} does not exist; cannot initiate fetch");
            return;
@@ -618,7 +620,7 @@ where

                match self.tracking.namespaces_for(&self.storage, &rid) {
                    Ok(namespaces) => {
-
                        self.outbox.fetch(session, rid, namespaces);
+
                        self.outbox.fetch(session, rid, namespaces, timeout);
                    }
                    Err(err) => {
                        error!(target: "service", "Error getting namespaces for {rid}: {err}");
@@ -717,7 +719,7 @@ where
            if let Some(dequeued) = s.fetched(rid) {
                debug!(target: "service", "Dequeued fetch {dequeued} from session {remote}..");

-
                self.fetch(dequeued, &remote);
+
                self.fetch(dequeued, &remote, FETCH_TIMEOUT);
            }
        }
    }
@@ -933,7 +935,7 @@ where
                                Ok(false) => {
                                    debug!(target: "service", "Missing tracked inventory {id}; initiating fetch..");

-
                                    self.fetch(*id, announcer);
+
                                    self.fetch(*id, announcer, FETCH_TIMEOUT);
                                }
                                Err(e) => {
                                    error!(target: "service", "Error checking local inventory: {e}");
@@ -1002,7 +1004,7 @@ where
                    // which is required by the protocol to only announce refs it has.
                    if self.sessions.is_connected(announcer) {
                        match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
-
                            Ok(true) => self.fetch(message.rid, announcer),
+
                            Ok(true) => self.fetch(message.rid, announcer, FETCH_TIMEOUT),
                            Ok(false) => {}
                            Err(e) => {
                                error!(target: "service", "Failed to check refs announcement: {e}");
@@ -1502,7 +1504,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, &seed.nid);
+
                            self.fetch(rid, &seed.nid, FETCH_TIMEOUT);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
modified radicle-node/src/service/io.rs
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
+
use std::time;

use log::*;

@@ -26,6 +27,8 @@ pub enum Io {
        remote: NodeId,
        /// Namespaces being fetched.
        namespaces: Namespaces,
+
        /// Fetch timeout.
+
        timeout: time::Duration,
    },
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
@@ -77,11 +80,18 @@ impl Outbox {
        self.io.push_back(Io::Wakeup(after));
    }

-
    pub fn fetch(&mut self, remote: &mut Session, rid: Id, namespaces: Namespaces) {
+
    pub fn fetch(
+
        &mut self,
+
        remote: &mut Session,
+
        rid: Id,
+
        namespaces: Namespaces,
+
        timeout: time::Duration,
+
    ) {
        self.io.push_back(Io::Fetch {
            rid,
            namespaces,
            remote: remote.id,
+
            timeout,
        });
    }

modified radicle-node/src/test/handle.rs
@@ -41,7 +41,12 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

-
    fn fetch(&mut self, _id: Id, _from: NodeId) -> Result<FetchResult, Self::Error> {
+
    fn fetch(
+
        &mut self,
+
        _id: Id,
+
        _from: NodeId,
+
        _timeout: time::Duration,
+
    ) -> Result<FetchResult, Self::Error> {
        Ok(FetchResult::Success {
            updated: vec![],
            namespaces: HashSet::new(),
modified radicle-node/src/test/peer.rs
@@ -392,6 +392,7 @@ where
                rid,
                remote,
                namespaces,
+
                ..
            } = io
            {
                Some((rid, remote, namespaces))
modified radicle-node/src/test/simulator.rs
@@ -626,6 +626,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                rid,
                remote,
                namespaces,
+
                ..
            } => {
                log::info!(
                    target: "sim",
modified radicle-node/src/tests.rs
@@ -10,7 +10,7 @@ use crossbeam_channel as chan;
use netservices::Direction as Link;
use radicle::identity::Visibility;
use radicle::node::routing::Store as _;
-
use radicle::node::ConnectOptions;
+
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::ReadRepository;

use crate::collections::{RandomMap, RandomSet};
@@ -1167,15 +1167,15 @@ fn test_queued_fetch() {

    // Send the first fetch.
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid1, bob.id, send));
+
    alice.command(Command::Fetch(rid1, bob.id, DEFAULT_TIMEOUT, send));

    // Send the 2nd fetch that will be queued.
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid2, bob.id, send2));
+
    alice.command(Command::Fetch(rid2, bob.id, DEFAULT_TIMEOUT, send2));

    // Send the 3rd fetch that will be queued.
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid3, bob.id, send3));
+
    alice.command(Command::Fetch(rid3, bob.id, DEFAULT_TIMEOUT, send3));

    // The first fetch is initiated.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid1);
modified radicle-node/src/tests/e2e.rs
@@ -2,7 +2,7 @@ use std::{collections::HashSet, thread, time};

use radicle::crypto::{test::signer::MockSigner, Signer};
use radicle::git;
-
use radicle::node::{Alias, FetchResult, Handle as _};
+
use radicle::node::{Alias, FetchResult, Handle as _, DEFAULT_TIMEOUT};
use radicle::storage::{ReadRepository, ReadStorage, WriteRepository, WriteStorage};
use radicle::test::fixtures;
use radicle::{assert_matches, rad};
@@ -166,7 +166,7 @@ fn test_replication() {
    let seeds = alice.handle.seeds(acme).unwrap();
    assert!(seeds.is_connected(&bob.id));

-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    let updated = match result {
@@ -224,7 +224,7 @@ fn test_replication_no_delegates() {
    converge([&alice, &bob]);

    alice.handle.track_repo(acme, Scope::All).unwrap();
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();

    assert_matches!(
        result,
@@ -271,7 +271,7 @@ fn test_replication_invalid() {

    alice.handle.track_node(*carol.public_key(), None).unwrap();
    alice.handle.track_repo(acme, Scope::Trusted).unwrap();
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();

    // Fetch is successful despite not fetching Carol's refs, since she isn't a delegate.
    assert!(result.is_success());
@@ -303,7 +303,7 @@ fn test_migrated_clone() {
    let tracked = bob.handle.track_repo(acme, Scope::All).unwrap();
    assert!(tracked);

-
    let result = bob.handle.fetch(acme, alice.id).unwrap();
+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    log::debug!(target: "test", "Fetch complete with {}", alice.id);
@@ -314,7 +314,7 @@ fn test_migrated_clone() {
        std::fs::remove_dir_all(path).unwrap();
    }
    assert!(!alice.storage.contains(&acme).unwrap());
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    let alice_repo = alice.storage.repository(acme).unwrap();
@@ -352,13 +352,13 @@ fn test_dont_fetch_owned_refs() {

    assert!(bob.handle.track_repo(acme, Scope::Trusted).unwrap());

-
    let result = bob.handle.fetch(acme, alice.id).unwrap();
+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    log::debug!(target: "test", "Fetch complete with {}", bob.id);

    alice.issue(acme, "Don't fetch self", "Use ^");
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success())
}

@@ -400,7 +400,7 @@ fn test_fetch_trusted_remotes() {
        assert!(bob.handle.track_node(*nid, None).unwrap());
    }

-
    let result = bob.handle.fetch(acme, alice.id).unwrap();
+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    log::debug!(target: "test", "Fetch complete with {}", bob.id);
@@ -435,13 +435,13 @@ fn test_missing_remote() {

    assert!(bob.handle.track_repo(acme, Scope::Trusted).unwrap());
    assert!(bob.handle.track_node(*carol.public_key(), None).unwrap());
-
    let result = bob.handle.fetch(acme, alice.id).unwrap();
+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());
    log::debug!(target: "test", "Fetch complete with {}", bob.id);
    rad::fork_remote(acme, &alice.id, &carol, &bob.storage).unwrap();

    alice.issue(acme, "Missing Remote", "Fixing the missing remote issue");
-
    let result = bob.handle.fetch(acme, alice.id).unwrap();
+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());
    log::debug!(target: "test", "Fetch complete with {}", bob.id);
}
@@ -463,7 +463,7 @@ fn test_fetch_preserve_owned_refs() {
    assert!(bob.handle.track_repo(acme, Scope::Trusted).unwrap());
    assert!(bob.handle.track_node(alice.id, None).unwrap());

-
    let result = bob.handle.fetch(acme, alice.id).unwrap();
+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    log::debug!(target: "test", "Fetch complete with {}", bob.id);
@@ -478,7 +478,7 @@ fn test_fetch_preserve_owned_refs() {
        .unwrap();

    // Fetch shouldn't prune any of our own refs.
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    let (updated, _) = result.success().unwrap();
    assert_eq!(updated, vec![]);

@@ -513,7 +513,7 @@ fn test_clone() {
    let seeds = alice.handle.seeds(acme).unwrap();
    assert!(seeds.is_connected(&bob.id));

-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    rad::fork(acme, &alice.signer, &alice.storage).unwrap();
@@ -568,11 +568,11 @@ fn test_fetch_up_to_date() {
    transport::local::register(alice.storage.clone());

    let _ = alice.handle.track_repo(acme, Scope::All).unwrap();
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    assert!(result.is_success());

    // Fetch again! This time, everything's up to date.
-
    let result = alice.handle.fetch(acme, bob.id).unwrap();
+
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
    assert_eq!(
        result.success(),
        Some((vec![], HashSet::from_iter([bob.id])))
@@ -791,9 +791,9 @@ fn test_non_fastforward_sigrefs() {
    converge([&alice, &bob, &eve]);

    // Eve fetches the inital project from Bob.
-
    eve.handle.fetch(rid, bob.id).unwrap();
+
    eve.handle.fetch(rid, bob.id, DEFAULT_TIMEOUT).unwrap();
    // Alice fetches it too.
-
    alice.handle.fetch(rid, bob.id).unwrap();
+
    alice.handle.fetch(rid, bob.id, DEFAULT_TIMEOUT).unwrap();

    // Now Eve disconnects from Bob so she doesn't fetch his update.
    eve.handle
@@ -807,12 +807,12 @@ fn test_non_fastforward_sigrefs() {
        "Updated sigrefs are harshing my vibes",
    );
    // Alice fetches from Bob.
-
    alice.handle.fetch(rid, bob.id).unwrap();
+
    alice.handle.fetch(rid, bob.id, DEFAULT_TIMEOUT).unwrap();

    // Now Alice has the latest, and when she tries to fetch from Eve, it breaks because
    // Eve has old refs.
    assert_matches!(
-
        alice.handle.fetch(rid, eve.id).unwrap(),
+
        alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
        FetchResult::Success { .. }
    );
}
@@ -840,11 +840,11 @@ fn test_outdated_sigrefs() {
    eve.connect(&alice);
    converge([&alice, &bob, &eve]);

-
    bob.handle.fetch(rid, alice.id).unwrap();
+
    bob.handle.fetch(rid, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(bob.storage.contains(&rid).unwrap());
    rad::fork(rid, &bob.signer, &bob.storage).unwrap();

-
    eve.handle.fetch(rid, alice.id).unwrap();
+
    eve.handle.fetch(rid, alice.id, DEFAULT_TIMEOUT).unwrap();
    assert!(eve.storage.contains(&rid).unwrap());
    rad::fork(rid, &eve.signer, &eve.storage).unwrap();

@@ -852,12 +852,12 @@ fn test_outdated_sigrefs() {
        .handle
        .track_node(eve.id, Some(Alias::new("eve")))
        .unwrap();
-
    alice.handle.fetch(rid, eve.id).unwrap();
+
    alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap();
    let repo = alice.storage.repository(rid).unwrap();
    assert!(repo.remote(&eve.id).is_ok());

    assert_matches!(
-
        bob.handle.fetch(rid, eve.id).unwrap(),
+
        bob.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
        FetchResult::Success { .. }
    );
    let repo = bob.storage.repository(rid).unwrap();
@@ -878,7 +878,7 @@ fn test_outdated_sigrefs() {
    // Get the current state of eve's refs in alice's storage
    log::debug!(target: "test", "Alice fetches from Eve..");
    assert_matches!(
-
        alice.handle.fetch(rid, eve.id).unwrap(),
+
        alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
        FetchResult::Success { .. }
    );
    let repo = alice.storage.repository(rid).unwrap();
@@ -894,7 +894,7 @@ fn test_outdated_sigrefs() {
        .track_node(bob.id, Some(Alias::new("bob")))
        .unwrap();
    assert_matches!(
-
        alice.handle.fetch(rid, bob.id).unwrap(),
+
        alice.handle.fetch(rid, bob.id, DEFAULT_TIMEOUT).unwrap(),
        FetchResult::Success { .. }
    );

modified radicle-node/src/wire/protocol.rs
@@ -828,6 +828,7 @@ where
                    rid,
                    remote,
                    namespaces,
+
                    timeout,
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");

@@ -850,6 +851,7 @@ where
                            rid,
                            namespaces,
                            remote,
+
                            timeout,
                        },
                        stream,
                        channels,
modified radicle-node/src/worker.rs
@@ -1,3 +1,4 @@
+
#![allow(clippy::too_many_arguments)]
mod channels;
mod fetch;
mod tunnel;
@@ -93,6 +94,8 @@ pub enum FetchRequest {
        namespaces: Namespaces,
        /// Remote peer we are interacting with.
        remote: NodeId,
+
        /// Fetch timeout.
+
        timeout: time::Duration,
    },
    /// Server is responding to a fetch request by uploading the
    /// specified `refspecs` sent by the client.
@@ -197,9 +200,10 @@ impl Worker {
                rid,
                namespaces,
                remote,
+
                timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
-
                let result = self.fetch(rid, remote, stream, &namespaces, channels);
+
                let result = self.fetch(rid, remote, stream, &namespaces, channels, timeout);

                FetchResult::Initiator { rid, result }
            }
@@ -230,6 +234,7 @@ impl Worker {
        stream: StreamId,
        namespaces: &Namespaces,
        mut channels: Channels,
+
        timeout: time::Duration,
    ) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError> {
        let staging =
            fetch::StagingPhaseInitial::new(&self.storage, rid, self.nid, namespaces.clone())?;
@@ -241,6 +246,7 @@ impl Worker {
                staging.refspecs(),
                stream,
                &mut channels,
+
                timeout,
            ) {
                Ok(_) => {
                    log::debug!(target: "worker", "Initial fetch for {rid} exited successfully")
@@ -279,6 +285,7 @@ impl Worker {
            staging.refspecs(),
            stream,
            &mut channels,
+
            timeout,
        ) {
            Ok(()) => log::debug!(target: "worker", "Final fetch for {rid} exited successfully"),
            Err(e) => {
@@ -496,6 +503,7 @@ impl Worker {
        specs: S,
        stream: StreamId,
        channels: &mut Channels,
+
        timeout: time::Duration,
    ) -> Result<(), FetchError>
    where
        S: IntoIterator<Item = fetch::Refspec>,
@@ -546,7 +554,7 @@ impl Worker {
            }
        });

-
        tunnel.run(self.timeout)?;
+
        tunnel.run(timeout)?;

        let result = child.wait()?;
        if result.success() {
modified radicle/src/node.rs
@@ -355,7 +355,11 @@ pub enum Command {

    /// Fetch the given repository from the network.
    #[serde(rename_all = "camelCase")]
-
    Fetch { rid: Id, nid: NodeId },
+
    Fetch {
+
        rid: Id,
+
        nid: NodeId,
+
        timeout: time::Duration,
+
    },

    /// Track the given repository.
    #[serde(rename_all = "camelCase")]
@@ -652,7 +656,12 @@ pub trait Handle: Clone + Sync + Send {
    /// Lookup the seeds of a given repository in the routing table.
    fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error>;
    /// Fetch a repository from the network.
-
    fn fetch(&mut self, id: Id, from: NodeId) -> Result<FetchResult, Self::Error>;
+
    fn fetch(
+
        &mut self,
+
        id: Id,
+
        from: NodeId,
+
        timeout: time::Duration,
+
    ) -> Result<FetchResult, Self::Error>;
    /// Start tracking the given project. Doesn't do anything if the project is already
    /// tracked.
    fn track_repo(&mut self, id: Id, scope: tracking::Scope) -> Result<bool, Self::Error>;
@@ -824,9 +833,21 @@ impl Handle for Node {
        Ok(seeds.with(profile::env::rng()))
    }

-
    fn fetch(&mut self, rid: Id, from: NodeId) -> Result<FetchResult, Error> {
+
    fn fetch(
+
        &mut self,
+
        rid: Id,
+
        from: NodeId,
+
        timeout: time::Duration,
+
    ) -> Result<FetchResult, Error> {
        let result = self
-
            .call(Command::Fetch { rid, nid: from }, DEFAULT_TIMEOUT)?
+
            .call(
+
                Command::Fetch {
+
                    rid,
+
                    nid: from,
+
                    timeout,
+
                },
+
                DEFAULT_TIMEOUT,
+
            )?
            .next()
            .ok_or(Error::EmptyResponse)??;