Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Update `netservices` to 0.8.0
cloudhead committed 2 years ago
commit f1f1313b6da0a28bddd93a1c1e0b48e9fe43dc30
parent d771ce85224c9abd4fb731f1667259751537ac34
4 files changed +31 -14
modified Cargo.lock
@@ -1838,9 +1838,9 @@ dependencies = [

[[package]]
name = "netservices"
-
version = "0.7.0"
+
version = "0.8.0-beta.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "dcd7e1d8a6763072326ece5a4329c6a924d7e8889a301334169e46613c5ef7a4"
+
checksum = "56bee0bcdc62d83ff53321dde78c51a1ec2d727da9342ffafa8ae5df892e5a7d"
dependencies = [
 "amplify",
 "cyphernet",
modified radicle-node/Cargo.toml
@@ -24,7 +24,7 @@ lexopt = { version = "0.3.0" }
libc = { version = "0.2.137" }
log = { version = "0.4.17", features = ["std"] }
localtime = { version = "1.2.0" }
-
netservices = { version = "0.7.0", features = ["io-reactor", "socket2"] }
+
netservices = { version = "0.8.0-beta.1", features = ["io-reactor", "socket2"] }
nonempty = { version = "0.9.0", features = ["serialize"] }
once_cell = { version = "1.13" }
qcheck = { version = "1", default-features = false, optional = true }
modified radicle-node/src/service.rs
@@ -2188,7 +2188,7 @@ where
            return;
        }

-
        for (id, ka) in self
+
        let available = self
            .available_peers()
            .into_iter()
            .filter_map(|peer| {
@@ -2208,7 +2208,16 @@ where
                    .map(|ka| (peer.nid, ka))
            })
            .take(wanted)
-
        {
+
            .collect::<Vec<_>>();
+

+
        if available.len() < target {
+
            log::warn!(
+
                target: "service",
+
                "Not enough available peers to connect to (available={}, target={target})",
+
                available.len()
+
            );
+
        }
+
        for (id, ka) in available {
            self.connect(id, ka.addr.clone());
        }
    }
modified radicle-node/src/wire/protocol.rs
@@ -1,6 +1,6 @@
//! Implementation of the transport protocol.
//!
-
//! We use the Noise NN handshake pattern to establish an encrypted stream with a remote peer.
+
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
@@ -47,7 +47,10 @@ pub const NOISE_XK: HandshakePattern = HandshakePattern {
pub const DEFAULT_CHANNEL_TIMEOUT: time::Duration = time::Duration::from_secs(30);

/// Default time to wait until a network connection is considered inactive.
-
pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(30);
+
pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);
+

+
/// Default time to wait when dialing a connection, before the remote is considered unreachable.
+
pub const DEFAULT_DIAL_TIMEOUT: time::Duration = time::Duration::from_secs(6);

/// Control message used internally between workers, users, and the service.
#[allow(clippy::large_enum_variant)]
@@ -819,11 +822,6 @@ where
                        NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
                    }) {
                        Ok(transport) => {
-
                            log::debug!(
-
                                target: "wire",
-
                                "Registering transport for {node_id} (fd={})..",
-
                                transport.as_raw_fd()
-
                            );
                            self.outbound.insert(
                                transport.as_raw_fd(),
                                Outbound {
@@ -832,6 +830,11 @@ where
                                    addr: addr.to_inner(),
                                },
                            );
+
                            log::debug!(
+
                                target: "wire",
+
                                "Registering outbound transport for {node_id} (fd={})..",
+
                                transport.as_raw_fd()
+
                            );
                            self.actions
                                .push_back(reactor::Action::RegisterTransport(transport));
                        }
@@ -917,13 +920,18 @@ pub fn dial<G: Signer + Ecdh<Pk = NodeId>>(
    force_proxy: bool,
) -> io::Result<WireSession<G>> {
    let connection = if force_proxy {
-
        net::TcpStream::connect_nonblocking(proxy_addr, DEFAULT_CONNECTION_TIMEOUT)?
+
        // Nb. This timeout is currently not used by the underlying library due to the
+
        // `socket2` library not supporting non-blocking connect with timeout.
+
        net::TcpStream::connect_nonblocking(proxy_addr, DEFAULT_DIAL_TIMEOUT)?
    } else {
        net::TcpStream::connect_nonblocking(
            remote_addr.connection_addr(proxy_addr),
-
            DEFAULT_CONNECTION_TIMEOUT,
+
            DEFAULT_DIAL_TIMEOUT,
        )?
    };
+
    connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
+
    connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
+

    Ok(session::<G>(
        remote_addr,
        Some(remote_id),