Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Upgrade `netservices` and `io-reactor`
cloudhead committed 2 years ago
commit 25ca4c8b92d49763049fdc9c4ad7a1f02f8b8f21
parent 2e781b1efdbca8781c43b43e350cf17fce8d6750
3 files changed +142 -149
modified Cargo.lock
@@ -80,9 +80,9 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"

[[package]]
name = "amplify"
-
version = "4.0.0"
+
version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f26966af46e0d200e8bf2b7f16230997c1c3f2d141bc27ccc091c012ed527b58"
+
checksum = "8629db306c0bbeb0a402e2918bdcf0026b5ddb24c46460f3bf5410b350d98710"
dependencies = [
 "amplify_derive",
 "amplify_num",
@@ -92,9 +92,9 @@ dependencies = [

[[package]]
name = "amplify_derive"
-
version = "3.0.1"
+
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "c87df0f28e6eb1f2d355f29ba6793fa9ca643967528609608d5cbd70bd68f9d1"
+
checksum = "759dcbfaf94d838367a86d493ec34ccc8aa6fe365cb7880d6bf89006de24d9c1"
dependencies = [
 "amplify_syn",
 "proc-macro2",
@@ -1677,9 +1677,9 @@ dependencies = [

[[package]]
name = "io-reactor"
-
version = "0.2.1"
+
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "850495a01d6f0b6d29adf5849a75b5a82449d9181bdd642cd681ce3d6f14b58c"
+
checksum = "2457e8fb1b1f298809fcd93cd15d485f52ef565f7ad47970583c7a80ae6c7e78"
dependencies = [
 "amplify",
 "crossbeam-channel",
@@ -1748,9 +1748,9 @@ checksum = "478ee9e62aaeaf5b140bd4138753d1f109765488581444218d3ddda43234f3e8"

[[package]]
name = "libc"
-
version = "0.2.150"
+
version = "0.2.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
+
checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"

[[package]]
name = "libgit2-sys"
@@ -1908,9 +1908,9 @@ dependencies = [

[[package]]
name = "netservices"
-
version = "0.4.0"
+
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "ee13a6ce51c79cf719cea8a3f0d3584098e529c726634bc1197cbbb6967b7de2"
+
checksum = "e227ff39744a414d32b9b473266c1739a950b89cd03694a58b624f4bde926c10"
dependencies = [
 "amplify",
 "cyphernet",
modified radicle-node/Cargo.toml
@@ -19,12 +19,12 @@ colored = { version = "1.9.0" }
crossbeam-channel = { version = "0.5.6" }
cyphernet = { version = "0.4.1", features = ["tor", "dns", "ed25519", "p2p-ed25519"] }
fastrand = { version = "2.0.0" }
-
io-reactor = { version = "0.2.1", features = ["popol"] }
+
io-reactor = { version = "0.3.0", features = ["popol"] }
lexopt = { version = "0.2.1" }
libc = { version = "0.2.137" }
log = { version = "0.4.17", features = ["std"] }
localtime = { version = "1.2.0" }
-
netservices = { version = "0.4.0", features = ["io-reactor", "socket2"] }
+
netservices = { version = "0.5.0", features = ["io-reactor", "socket2"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
once_cell = { version = "1.13" }
qcheck = { version = "1", default-features = false, optional = true }
modified radicle-node/src/wire/protocol.rs
@@ -4,8 +4,7 @@
//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
-
use std::os::unix::io::AsRawFd;
-
use std::os::unix::prelude::RawFd;
+
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::{io, net, time};

@@ -19,7 +18,7 @@ use localtime::LocalTime;
use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
use netservices::session::{ProtocolArtifact, Socks5Session};
use netservices::{NetConnection, NetProtocol, NetReader, NetWriter};
-
use reactor::Timestamp;
+
use reactor::{ResourceId, Timestamp};

use radicle::collections::RandomMap;
use radicle::node::NodeId;
@@ -141,15 +140,28 @@ impl Streams {
    }
}

+
/// The initial state of an outbound peer before handshake is completed.
+
#[derive(Debug)]
+
struct Outbound {
+
    /// Resource ID, if registered.
+
    id: Option<ResourceId>,
+
    /// Remote address.
+
    addr: NetAddr<HostName>,
+
    /// Remote Node ID.
+
    nid: NodeId,
+
}
+

+
/// The initial state of an inbound peer before handshake is completed.
+
#[derive(Debug)]
+
struct Inbound {
+
    /// Resource ID, if registered.
+
    id: Option<ResourceId>,
+
    /// Remote address.
+
    addr: NetAddr<HostName>,
+
}
+

/// Peer connection state machine.
enum Peer {
-
    /// The initial state of an inbound peer before handshake is completed.
-
    Inbound { addr: NetAddr<HostName> },
-
    /// The initial state of an outbound peer before handshake is completed.
-
    Outbound {
-
        addr: NetAddr<HostName>,
-
        nid: NodeId,
-
    },
    /// The state after handshake is completed.
    /// Peers in this state are handled by the underlying service.
    Connected {
@@ -171,8 +183,6 @@ enum Peer {
impl std::fmt::Debug for Peer {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
-
            Self::Inbound { addr } => write!(f, "Inbound({addr})"),
-
            Self::Outbound { nid, .. } => write!(f, "Outbound({nid})"),
            Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
        }
@@ -183,58 +193,19 @@ impl Peer {
    /// Return the peer's id, if any.
    fn id(&self) -> Option<&NodeId> {
        match self {
-
            Peer::Outbound { nid, .. }
-
            | Peer::Connected { nid, .. }
-
            | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
-
            Peer::Inbound { .. } => None,
+
            Peer::Connected { nid, .. } | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
            Peer::Disconnecting { nid: None, .. } => None,
        }
    }

-
    /// Return a new inbound connecting peer.
-
    fn inbound(addr: NetAddr<HostName>) -> Self {
-
        Self::Inbound { addr }
-
    }
-

-
    /// Return a new outbound connecting peer.
-
    fn outbound(addr: NetAddr<HostName>, nid: NodeId) -> Self {
-
        Self::Outbound { addr, nid }
-
    }
-

-
    /// Switch to connected state.
-
    fn connected(&mut self, nid: NodeId) -> (NetAddr<HostName>, Link) {
-
        if let Self::Inbound { addr } = self {
-
            let link = Link::Inbound;
-
            let addr = addr.clone();
-

-
            *self = Self::Connected {
-
                link,
-
                addr: addr.clone(),
-
                nid,
-
                inbox: Deserializer::default(),
-
                streams: Streams::new(link),
-
            };
-
            (addr, link)
-
        } else if let Self::Outbound {
+
    /// Connected peer.
+
    fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
+
        Self::Connected {
+
            link,
            addr,
-
            nid: expected,
-
        } = self
-
        {
-
            assert_eq!(nid, *expected);
-

-
            let link = Link::Outbound;
-
            let addr = addr.clone();
-

-
            *self = Self::Connected {
-
                link,
-
                addr: addr.clone(),
-
                nid,
-
                inbox: Deserializer::default(),
-
                streams: Streams::new(link),
-
            };
-
            (addr, link)
-
        } else {
-
            panic!("Peer::connected: session for {nid} is already established");
+
            nid,
+
            inbox: Deserializer::default(),
+
            streams: Streams::new(link),
        }
    }

@@ -247,67 +218,59 @@ impl Peer {
                nid: Some(*nid),
                reason,
            };
-
        } else if let Self::Inbound { .. } = self {
-
            *self = Self::Disconnecting { nid: None, reason };
-
        } else if let Self::Outbound { nid, .. } = self {
-
            *self = Self::Disconnecting {
-
                nid: Some(*nid),
-
                reason,
-
            };
        } else {
            panic!("Peer::disconnected: session is not connected ({self:?})");
        }
    }
}

-
struct Peers(RandomMap<RawFd, Peer>);
+
/// Holds connected peers.
+
struct Peers(RandomMap<ResourceId, Peer>);

impl Peers {
-
    fn get_mut(&mut self, fd: &RawFd) -> Option<&mut Peer> {
-
        self.0.get_mut(fd)
+
    fn get_mut(&mut self, id: &ResourceId) -> Option<&mut Peer> {
+
        self.0.get_mut(id)
    }

-
    fn entry(&mut self, fd: RawFd) -> Entry<RawFd, Peer> {
-
        self.0.entry(fd)
+
    fn entry(&mut self, id: ResourceId) -> Entry<ResourceId, Peer> {
+
        self.0.entry(id)
    }

-
    fn insert(&mut self, fd: RawFd, peer: Peer) {
-
        if self.0.insert(fd, peer).is_some() {
-
            log::warn!(target: "wire", "Replacing existing peer fd={fd}");
+
    fn insert(&mut self, id: ResourceId, peer: Peer) {
+
        if self.0.insert(id, peer).is_some() {
+
            log::warn!(target: "wire", "Replacing existing peer id={id}");
        }
    }

-
    fn remove(&mut self, fd: &RawFd) -> Option<Peer> {
-
        self.0.remove(fd)
+
    fn remove(&mut self, id: &ResourceId) -> Option<Peer> {
+
        self.0.remove(id)
    }

-
    fn lookup(&self, node_id: &NodeId) -> Option<(RawFd, &Peer)> {
+
    fn lookup(&self, node_id: &NodeId) -> Option<(ResourceId, &Peer)> {
        self.0
            .iter()
            .find(|(_, peer)| peer.id() == Some(node_id))
            .map(|(fd, peer)| (*fd, peer))
    }

-
    fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(RawFd, &mut Peer)> {
+
    fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(ResourceId, &mut Peer)> {
        self.0
            .iter_mut()
            .find(|(_, peer)| peer.id() == Some(node_id))
            .map(|(fd, peer)| (*fd, peer))
    }

-
    fn active(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
-
        self.0.iter().filter_map(|(fd, peer)| match peer {
-
            Peer::Inbound { .. } => None,
-
            Peer::Outbound { nid, .. } => Some((*fd, nid)),
-
            Peer::Connected { nid, .. } => Some((*fd, nid)),
+
    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
+
        self.0.iter().filter_map(|(id, peer)| match peer {
+
            Peer::Connected { nid, .. } => Some((*id, nid)),
            Peer::Disconnecting { .. } => None,
        })
    }

-
    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
-
        self.0.iter().filter_map(|(fd, peer)| {
-
            if let Peer::Connected { nid: id, .. } = peer {
-
                Some((*fd, id))
+
    fn connected(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
+
        self.0.iter().filter_map(|(id, peer)| {
+
            if let Peer::Connected { nid, .. } = peer {
+
                Some((*id, nid))
            } else {
                None
            }
@@ -325,7 +288,11 @@ pub struct Wire<D, S, G: Signer + Ecdh> {
    signer: G,
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
-
    /// Peer sessions.
+
    /// Outbound attempted peers without a session.
+
    outbound: RandomMap<RawFd, Outbound>,
+
    /// Inbound peers without a session.
+
    inbound: RandomMap<RawFd, Inbound>,
+
    /// Peer (established) sessions.
    peers: Peers,
    /// SOCKS5 proxy address.
    proxy: net::SocketAddr,
@@ -351,6 +318,8 @@ where
            signer,
            proxy,
            actions: VecDeque::new(),
+
            inbound: RandomMap::default(),
+
            outbound: RandomMap::default(),
            peers: Peers(RandomMap::default()),
        }
    }
@@ -359,19 +328,19 @@ where
        self.actions.push_back(Action::RegisterListener(socket));
    }

-
    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason) {
-
        match self.peers.get_mut(&fd) {
+
    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) {
+
        match self.peers.get_mut(&id) {
            Some(Peer::Disconnecting { .. }) => {
-
                log::error!(target: "wire", "Peer (fd={fd}) is already disconnecting");
+
                log::error!(target: "wire", "Peer (id={id}) is already disconnecting");
            }
            Some(peer) => {
-
                log::debug!(target: "wire", "Disconnecting peer (fd={fd}): {reason}");
+
                log::debug!(target: "wire", "Disconnecting peer (id={id}): {reason}");

                peer.disconnecting(reason);
-
                self.actions.push_back(Action::UnregisterTransport(fd));
+
                self.actions.push_back(Action::UnregisterTransport(id));
            }
            None => {
-
                log::error!(target: "wire", "Unknown peer (fd={fd}) cannot be disconnected");
+
                log::error!(target: "wire", "Unknown peer (id={id}) cannot be disconnected");
            }
        }
    }
@@ -467,22 +436,20 @@ where

    fn handle_listener_event(
        &mut self,
-
        _sock: net::SocketAddr,
+
        _: ResourceId, // Nb. This is the ID of the listener socket.
        event: ListenerEvent<WireSession<G>>,
        _: Timestamp,
    ) {
        match event {
            ListenerEvent::Accepted(connection) => {
                let addr = connection.remote_addr();
-
                log::debug!(target: "wire", "Accepting inbound peer connection from {addr}..");
-

-
                self.peers
-
                    .insert(connection.as_raw_fd(), Peer::inbound(addr.clone().into()));
+
                let fd = connection.as_raw_fd();
+
                log::debug!(target: "wire", "Accepting inbound connection from {addr} (fd={fd})..");

                // If the service doesn't want to accept this connection,
                // we drop the connection here, which disconnects the socket.
                if !self.service.accepted(NetAddr::from(addr.clone()).into()) {
-
                    log::debug!(target: "wire", "Dropping inbound connection from {addr}..");
+
                    log::debug!(target: "wire", "Rejecting inbound connection from {addr} (fd={fd})..");
                    drop(connection);

                    return;
@@ -496,6 +463,14 @@ where
                        return;
                    }
                };
+

+
                self.inbound.insert(
+
                    fd,
+
                    Inbound {
+
                        id: None,
+
                        addr: addr.into(),
+
                    },
+
                );
                self.actions
                    .push_back(reactor::Action::RegisterTransport(transport))
            }
@@ -505,45 +480,62 @@ where
        }
    }

+
    fn handle_registered(&mut self, fd: RawFd, id: ResourceId) {
+
        if let Some(outbound) = self.outbound.get_mut(&fd) {
+
            log::debug!(target: "wire", "Outbound peer resource registered with id={id} (fd={fd})");
+
            outbound.id = Some(id);
+
        } else if let Some(inbound) = self.inbound.get_mut(&fd) {
+
            log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
+
            inbound.id = Some(id);
+
        } else {
+
            log::error!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
+
        }
+
    }
+

    fn handle_transport_event(
        &mut self,
-
        fd: RawFd,
+
        id: ResourceId,
        event: SessionEvent<WireSession<G>>,
        _: Timestamp,
    ) {
        match event {
-
            SessionEvent::Established(ProtocolArtifact { state, .. }) => {
+
            SessionEvent::Established(fd, ProtocolArtifact { state, .. }) => {
                // SAFETY: With the NoiseXK protocol, there is always a remote static key.
-
                let id: NodeId = state.remote_static_key.unwrap();
+
                let nid: NodeId = state.remote_static_key.unwrap();

-
                log::debug!(target: "wire", "Session established with {id} (fd={fd})");
+
                log::debug!(target: "wire", "Session established with {nid} (id={id}) (fd={fd})");

                let conflicting = self
                    .peers
                    .active()
-
                    .filter(|(other, d)| **d == id && *other != fd)
-
                    .map(|(fd, _)| fd)
+
                    .filter(|(other, d)| **d == nid && *other != id)
+
                    .map(|(id, _)| id)
                    .collect::<Vec<_>>();

-
                for fd in conflicting {
+
                for id in conflicting {
                    log::warn!(
-
                        target: "wire", "Closing conflicting session with {id} (fd={fd})"
+
                        target: "wire", "Closing conflicting session with {nid} (id={id})"
                    );
                    self.disconnect(
-
                        fd,
+
                        id,
                        DisconnectReason::Dial(Arc::new(io::Error::from(
                            io::ErrorKind::AlreadyExists,
                        ))),
                    );
                }

-
                let Some(peer) = self.peers.get_mut(&fd) else {
-
                    log::error!(target: "wire", "Session not found for fd {fd}");
+
                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
+
                    (peer.addr, Link::Inbound)
+
                } else if let Some(peer) = self.outbound.remove(&fd) {
+
                    assert_eq!(nid, peer.nid);
+
                    (peer.addr, Link::Outbound)
+
                } else {
+
                    log::error!(target: "wire", "Session for {nid} (id={id}) not found");
                    return;
                };
-
                let (addr, link) = peer.connected(id);
-

-
                self.service.connected(id, addr.into(), link);
+
                self.peers
+
                    .insert(id, Peer::connected(nid, addr.clone(), link));
+
                self.service.connected(nid, addr.into(), link);
            }
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected {
@@ -551,7 +543,7 @@ where
                    inbox,
                    streams,
                    ..
-
                }) = self.peers.get_mut(&fd)
+
                }) = self.peers.get_mut(&id)
                {
                    inbox.input(&data);

@@ -631,7 +623,7 @@ where
                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.unparsed().count());
                                }
                                self.disconnect(
-
                                    fd,
+
                                    id,
                                    DisconnectReason::Session(session::Error::Misbehavior),
                                );
                                break;
@@ -639,11 +631,11 @@ where
                        }
                    }
                } else {
-
                    log::warn!(target: "wire", "Dropping message from unconnected peer (fd={fd})");
+
                    log::warn!(target: "wire", "Dropping message from unconnected peer (id={id})");
                }
            }
            SessionEvent::Terminated(err) => {
-
                self.disconnect(fd, DisconnectReason::Connection(Arc::new(err)));
+
                self.disconnect(id, DisconnectReason::Connection(Arc::new(err)));
            }
        }
    }
@@ -702,36 +694,35 @@ where
        }
    }

-
    fn handover_listener(&mut self, _listener: Self::Listener) {
+
    fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) {
        panic!("Wire::handover_listener: listener handover is not supported");
    }

-
    fn handover_transport(&mut self, transport: Self::Transport) {
+
    fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport) {
        let fd = transport.as_raw_fd();
-
        log::debug!(target: "wire", "Received transport handover (fd={fd})");

-
        match self.peers.entry(fd) {
+
        match self.peers.entry(id) {
            Entry::Occupied(e) => {
                match e.get() {
-
                    Peer::Disconnecting {
-
                        nid: id, reason, ..
-
                    } => {
+
                    Peer::Disconnecting { nid, reason, .. } => {
+
                        log::debug!(target: "wire", "Received transport handover for disconnecting peer with id={id} (fd={fd})");
+

                        // Disconnect TCP stream.
                        drop(transport);

-
                        // If there is no ID, the service is not aware of the peer.
-
                        if let Some(id) = id {
-
                            self.service.disconnected(*id, reason);
+
                        // If there is no NID, the service is not aware of the peer.
+
                        if let Some(nid) = nid {
+
                            self.service.disconnected(*nid, reason);
                        }
                        e.remove();
                    }
-
                    _ => {
-
                        panic!("Wire::handover_transport: Unexpected peer with fd {fd} handed over from the reactor");
+
                    Peer::Connected { nid, .. } => {
+
                        panic!("Wire::handover_transport: Unexpected handover of connected peer {} with id={id} (fd={fd})", nid);
                    }
                }
            }
            Entry::Vacant(_) => {
-
                panic!("Wire::handover_transport: Unknown peer with fd {fd} handed over");
+
                panic!("Wire::handover_transport: Unknown peer with id={id} (fd={fd}) handed over");
            }
        }
    }
@@ -795,11 +786,13 @@ where
                    }) {
                        Ok(transport) => {
                            self.service.attempted(node_id, addr.clone());
-
                            // TODO: Keep track of peer address for when peer disconnects before
-
                            // handshake is complete.
-
                            self.peers.insert(
+
                            self.outbound.insert(
                                transport.as_raw_fd(),
-
                                Peer::outbound(addr.to_inner(), node_id),
+
                                Outbound {
+
                                    id: None,
+
                                    nid: node_id,
+
                                    addr: addr.to_inner(),
+
                                },
                            );
                            self.actions
                                .push_back(reactor::Action::RegisterTransport(transport));
@@ -813,8 +806,8 @@ where
                    }
                }
                Io::Disconnect(nid, reason) => {
-
                    if let Some((fd, Peer::Connected { .. })) = self.peers.lookup(&nid) {
-
                        self.disconnect(fd, reason);
+
                    if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
+
                        self.disconnect(id, reason);
                    } else {
                        log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
                    }