Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Make sure we properly disconnect outbounds
cloudhead committed 2 years ago
commit 8858cefc89b52ce5072f6e859961a4ff3cd5344a
parent e58419a9b0cc21a7353c7a94756bb6bf42326ba4
2 files changed +20 -10
modified radicle-node/src/service.rs
@@ -2205,6 +2205,12 @@ impl DisconnectReason {
    pub fn is_connection_err(&self) -> bool {
        matches!(self, Self::Connection(_))
    }
+

+
    pub fn connection() -> Self {
+
        DisconnectReason::Connection(Arc::new(std::io::Error::from(
+
            std::io::ErrorKind::ConnectionReset,
+
        )))
+
    }
}

impl fmt::Display for DisconnectReason {
modified radicle-node/src/wire/protocol.rs
@@ -419,8 +419,10 @@ where
    fn cleanup(&mut self, id: ResourceId, fd: RawFd) {
        if self.inbound.remove(&fd).is_some() {
            log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
-
        } else if self.outbound.remove(&fd).is_some() {
+
        } else if let Some(outbound) = self.outbound.remove(&fd) {
            log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
+
            self.service
+
                .disconnected(outbound.nid, &DisconnectReason::connection());
        } else {
            log::warn!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
        }
@@ -494,7 +496,7 @@ where

    fn handle_registered(&mut self, fd: RawFd, id: ResourceId) {
        if let Some(outbound) = self.outbound.get_mut(&fd) {
-
            log::debug!(target: "wire", "Outbound peer resource registered with id={id} (fd={fd})");
+
            log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
            outbound.id = Some(id);
        } else if let Some(inbound) = self.inbound.get_mut(&fd) {
            log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
@@ -673,28 +675,25 @@ where
                // TODO: This should be a fatal error, there's nothing we can do here.
                log::error!(target: "wire", "Received error: listener {} disconnected", id);
            }
-
            reactor::Error::TransportDisconnect(id, session) => {
-
                let fd = session.as_raw_fd();
+
            reactor::Error::TransportDisconnect(id, transport) => {
+
                let fd = transport.as_raw_fd();
                log::error!(target: "wire", "Received error: peer id={id} (fd={fd}) disconnected");

                // We're dropping the TCP connection here.
-
                drop(session);
+
                drop(transport);

                // The peer transport is already disconnected and removed from the reactor;
                // therefore there is no need to initiate a disconnection. We simply remove
                // the peer from the map.
                match self.peers.remove(&id) {
                    Some(mut peer) => {
-
                        let reason = DisconnectReason::Connection(Arc::new(io::Error::from(
-
                            io::ErrorKind::ConnectionReset,
-
                        )));
-

                        if let Peer::Connected { streams, .. } = &mut peer {
                            streams.shutdown();
                        }

                        if let Some(id) = peer.id() {
-
                            self.service.disconnected(*id, &reason);
+
                            self.service
+
                                .disconnected(*id, &DisconnectReason::connection());
                        } else {
                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
                        }
@@ -795,6 +794,11 @@ where
                        NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
                    }) {
                        Ok(transport) => {
+
                            log::debug!(
+
                                target: "wire",
+
                                "Registering transport for {node_id} (fd={})..",
+
                                transport.as_raw_fd()
+
                            );
                            self.outbound.insert(
                                transport.as_raw_fd(),
                                Outbound {