Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix state mismatch between service and wire
Alexis Sellier committed 3 years ago
commit 98ee9c46a9bebe3ad43da688811201b6e41beb2f
parent 5828b917f9bb9f3251284ef7f73036f1c4d28a44
1 file changed +55 -44
modified radicle-node/src/wire/protocol.rs
@@ -2,6 +2,7 @@
//!
//! We use the Noise NN handshake pattern to establish an encrypted stream with a remote peer.
//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
+
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
@@ -80,9 +81,9 @@ enum Peer {
        id: NodeId,
        inbox: VecDeque<u8>,
    },
-
    /// The state after a peer was disconnected, either during handshake,
-
    /// or once connected.
-
    Disconnected {
+
    /// The peer was scheduled for disconnection. Once the transport is handed over
+
    /// by the reactor, we can consider it disconnected.
+
    Disconnecting {
        id: Option<NodeId>,
        reason: DisconnectReason,
    },
@@ -104,7 +105,7 @@ impl std::fmt::Debug for Peer {
            Self::Inbound {} => write!(f, "Inbound"),
            Self::Outbound { id } => write!(f, "Outbound({id})"),
            Self::Connected { link, id, .. } => write!(f, "Connected({link:?}, {id})"),
-
            Self::Disconnected { .. } => write!(f, "Disconnected"),
+
            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
            Self::Upgrading {
                fetch, link, id, ..
            } => write!(
@@ -123,12 +124,12 @@ impl Peer {
        match self {
            Peer::Outbound { id }
            | Peer::Connected { id, .. }
-
            | Peer::Disconnected { id: Some(id), .. }
+
            | Peer::Disconnecting { id: Some(id), .. }
            | Peer::Upgrading { id, .. }
            | Peer::Upgraded { id, .. } => Some(id),

            Peer::Inbound {} => None,
-
            Peer::Disconnected { id: None, .. } => None,
+
            Peer::Disconnecting { id: None, .. } => None,
        }
    }

@@ -168,17 +169,17 @@ impl Peer {
        }
    }

-
    /// Switch to disconnected state.
-
    fn disconnected(&mut self, reason: DisconnectReason) {
+
    /// Switch to disconnecting state.
+
    fn disconnecting(&mut self, reason: DisconnectReason) {
        if let Self::Connected { id, .. } = self {
-
            *self = Self::Disconnected {
+
            *self = Self::Disconnecting {
                id: Some(*id),
                reason,
            };
        } else if let Self::Inbound {} = self {
-
            *self = Self::Disconnected { id: None, reason };
+
            *self = Self::Disconnecting { id: None, reason };
        } else if let Self::Outbound { id } = self {
-
            *self = Self::Disconnected {
+
            *self = Self::Disconnecting {
                id: Some(*id),
                reason,
            };
@@ -320,7 +321,7 @@ where
            Peer::Connected { id, .. } => Some((*fd, id)),
            Peer::Upgrading { id, .. } => Some((*fd, id)),
            Peer::Upgraded { id, .. } => Some((*fd, id)),
-
            Peer::Disconnected { .. } => None,
+
            Peer::Disconnecting { .. } => None,
        })
    }

@@ -335,21 +336,26 @@ where
    }

    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason) {
-
        let peer = self.peer_mut_by_fd(fd);
-
        if let Peer::Disconnected { .. } = peer {
-
            log::error!(target: "wire", "Peer (fd={fd}) is already disconnected");
-
            return;
-
        };
-
        log::debug!(target: "wire", "Disconnecting peer (fd={fd}): {reason}");
-
        peer.disconnected(reason);
+
        match self.peers.get_mut(&fd) {
+
            Some(Peer::Disconnecting { .. }) => {
+
                log::error!(target: "wire", "Peer (fd={fd}) is already disconnecting");
+
            }
+
            Some(peer) => {
+
                log::debug!(target: "wire", "Disconnecting peer (fd={fd}): {reason}");

-
        self.actions.push_back(Action::UnregisterTransport(fd));
+
                peer.disconnecting(reason);
+
                self.actions.push_back(Action::UnregisterTransport(fd));
+
            }
+
            None => {
+
                log::error!(target: "wire", "Unknown peer (fd={fd}) cannot be disconnected");
+
            }
+
        }
    }

    fn upgrade(&mut self, fd: RawFd, fetch: Fetch) {
        let peer = self.peer_mut_by_fd(fd);
-
        if let Peer::Disconnected { .. } = peer {
-
            log::error!(target: "wire", "Peer (fd={fd}) is already disconnected");
+
        if let Peer::Disconnecting { .. } = peer {
+
            log::error!(target: "wire", "Peer (fd={fd}) is disconnecting");
            return;
        };
        log::debug!(target: "wire", "Requesting transport handover from reactor for peer (fd={fd})");
@@ -387,8 +393,8 @@ where
        let fd = session.as_connection().as_raw_fd();
        let peer = self.peer_mut_by_fd(fd);

-
        let session = if let Peer::Disconnected { .. } = peer {
-
            log::error!(target: "wire", "Peer with fd {fd} is already disconnected");
+
        let session = if let Peer::Disconnecting { .. } = peer {
+
            log::error!(target: "wire", "Peer with fd {fd} is disconnecting");
            return;
        } else if let Peer::Upgraded { link, .. } = peer {
            match NetTransport::with_session(session, *link) {
@@ -587,7 +593,10 @@ where
            reactor::Error::TransportDisconnect(fd, _, _) => {
                log::error!(target: "wire", "Received error: peer (fd={fd}) disconnected");

-
                match self.peers.get_mut(fd) {
+
                // The peer transport is already disconnected and removed from the reactor;
+
                // therefore there is no need to initiate a disconnection. We simply remove
+
                // the peer from the map.
+
                match self.peers.remove(fd) {
                    Some(peer) => {
                        let reason = DisconnectReason::Connection(Arc::new(io::Error::from(
                            io::ErrorKind::ConnectionReset,
@@ -598,7 +607,6 @@ where
                        } else {
                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
                        }
-
                        peer.disconnected(reason);
                    }
                    None => {
                        log::warn!(target: "wire", "Peer with fd {fd} is unknown");
@@ -617,33 +625,36 @@ where
    }

    fn handover_listener(&mut self, _listener: Self::Listener) {
-
        panic!("Transport::handover_listener: listener handover is not supported");
+
        panic!("Wire::handover_listener: listener handover is not supported");
    }

    fn handover_transport(&mut self, transport: Self::Transport) {
        let fd = transport.as_raw_fd();
        log::debug!(target: "wire", "Received transport handover (fd={fd})");

-
        match self.peers.get(&fd) {
-
            Some(Peer::Disconnected { id, reason, .. }) => {
-
                // Disconnect TCP stream.
-
                drop(transport);
+
        match self.peers.entry(fd) {
+
            Entry::Occupied(e) => {
+
                match e.get() {
+
                    Peer::Disconnecting { id, reason, .. } => {
+
                        // Disconnect TCP stream.
+
                        drop(transport);

-
                if let Some(id) = id {
-
                    self.service.disconnected(*id, reason);
-
                } else {
-
                    // TODO: Handle this case by calling `disconnected` with the address instead of
-
                    // the node id.
+
                        // If there is no ID, the service is not aware of the peer.
+
                        if let Some(id) = id {
+
                            self.service.disconnected(*id, reason);
+
                        }
+
                        e.remove();
+
                    }
+
                    Peer::Upgrading { .. } => {
+
                        self.upgraded(transport);
+
                    }
+
                    _ => {
+
                        panic!("Wire::handover_transport: Unexpected peer with fd {fd} handed over from the reactor");
+
                    }
                }
            }
-
            Some(Peer::Upgrading { .. }) => {
-
                self.upgraded(transport);
-
            }
-
            Some(_) => {
-
                panic!("Transport::handover_transport: Unexpected peer with fd {fd} handed over from the reactor");
-
            }
-
            None => {
-
                panic!("Transport::handover_transport: Unknown peer with fd {fd} handed over");
+
            Entry::Vacant(_) => {
+
                panic!("Wire::handover_transport: Unknown peer with fd {fd} handed over");
            }
        }
    }