Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: dynamic fetch timeout via channels
Fintan Halpenny committed 2 years ago
commit fd4c4cd0af8836213a0e846e674fb6554c123e96
parent cadd996a763b4490ef177524d21e5059820ff833
3 files changed +15 -18
modified radicle-node/src/service.rs
@@ -280,6 +280,8 @@ struct QueuedFetch {
    from: NodeId,
    /// Refs being fetched.
    refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    timeout: time::Duration,
    /// Result channel.
    channel: Option<chan::Sender<FetchResult>>,
}
@@ -948,6 +950,7 @@ where
                        rid,
                        refs_at,
                        from,
+
                        timeout,
                        channel,
                    };
                    if self.queue.contains(&fetch) {
@@ -964,6 +967,7 @@ where
                    rid,
                    refs_at,
                    from,
+
                    timeout,
                    channel,
                });
            }
@@ -1128,6 +1132,7 @@ where
            rid,
            from,
            refs_at,
+
            timeout,
            channel,
        }) = self.queue.pop_front()
        {
@@ -1140,12 +1145,12 @@ where
                    .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

                // Keep dequeueing if there was nothing to fetch, otherwise break.
-
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, FETCH_TIMEOUT, channel) {
+
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, timeout, channel) {
                    break;
                }
            } else {
                // If no refs are specified, always do a full fetch.
-
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
+
                self.fetch(rid, from, timeout, channel);
                break;
            }
        }
modified radicle-node/src/wire/protocol.rs
@@ -29,6 +29,7 @@ use crate::crypto::Signer;
use crate::prelude::Deserializer;
use crate::service;
use crate::service::io::Io;
+
use crate::service::FETCH_TIMEOUT;
use crate::service::{session, DisconnectReason, Service};
use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
@@ -43,10 +44,6 @@ pub const NOISE_XK: HandshakePattern = HandshakePattern {
    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
};

-
/// Default time to wait to receive something from a worker channel. Applies to
-
/// workers waiting for data from remotes as well.
-
pub const DEFAULT_CHANNEL_TIMEOUT: time::Duration = time::Duration::from_secs(30);
-

/// Default time to wait until a network connection is considered inactive.
pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);

@@ -131,22 +128,22 @@ impl Streams {
    }

    /// Open a new stream.
-
    fn open(&mut self) -> (StreamId, worker::Channels) {
+
    fn open(&mut self, timeout: time::Duration) -> (StreamId, worker::Channels) {
        self.seq += 1;

        let id = StreamId::git(self.link)
            .nth(self.seq)
            .expect("Streams::open: too many streams");
        let channels = self
-
            .register(id)
+
            .register(id, timeout)
            .expect("Streams::open: stream was already open");

        (id, channels)
    }

    /// Register an open stream.
-
    fn register(&mut self, stream: StreamId) -> Option<worker::Channels> {
-
        let (wire, worker) = worker::Channels::pair(DEFAULT_CHANNEL_TIMEOUT)
+
    fn register(&mut self, stream: StreamId, timeout: time::Duration) -> Option<worker::Channels> {
+
        let (wire, worker) = worker::Channels::pair(timeout)
            .expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
@@ -718,7 +715,7 @@ where
                            })) => {
                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");

-
                                let Some(channels) = streams.register(stream) else {
+
                                let Some(channels) = streams.register(stream, FETCH_TIMEOUT) else {
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
                                    continue;
                                };
@@ -1017,7 +1014,7 @@ where
                        log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
                        continue;
                    };
-
                    let (stream, channels) = streams.open();
+
                    let (stream, channels) = streams.open(timeout);

                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");

@@ -1027,7 +1024,6 @@ where
                            rid,
                            remote,
                            refs_at,
-
                            timeout,
                        },
                        stream,
                        channels,
modified radicle-node/src/worker.rs
@@ -5,8 +5,8 @@ mod upload_pack;
pub mod fetch;
pub mod garbage;

+
use std::io;
use std::path::PathBuf;
-
use std::{io, time};

use crossbeam_channel as chan;

@@ -107,8 +107,6 @@ pub enum FetchRequest {
        remote: NodeId,
        /// If this fetch is for a particular set of `rad/sigrefs`.
        refs_at: Option<Vec<RefsAt>>,
-
        /// Fetch timeout.
-
        timeout: time::Duration,
    },
    /// Server is responding to a fetch request by uploading the
    /// specified `refspecs` sent by the client.
@@ -230,8 +228,6 @@ impl Worker {
                rid,
                remote,
                refs_at,
-
                // TODO: nowhere to use this currently
-
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
                let result = self.fetch(rid, remote, refs_at, channels, notifs);