Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Close worker streams on disconnect
Alexis Sellier committed 3 years ago
commit 83547f9c8abf1ac4b3405bad9ca9f130ffd0566a
parent 60fc50644fd6bf3fc8833d434777852ea17c2bea
2 files changed +26 -3
modified radicle-node/src/wire/protocol.rs
@@ -128,6 +128,14 @@ impl Streams {
    fn unregister(&mut self, stream: &StreamId) -> Option<worker::Channels> {
        self.streams.remove(stream)
    }
+

+
    /// Close all streams.
+
    fn shutdown(&mut self) {
+
        for (sid, chans) in self.streams.drain() {
+
            log::debug!(target: "wire", "Closing worker stream {sid}");
+
            chans.close().ok();
+
        }
+
    }
}

/// Peer connection state machine.
@@ -215,9 +223,11 @@ impl Peer {

    /// Switch to disconnecting state.
    fn disconnecting(&mut self, reason: DisconnectReason) {
-
        if let Self::Connected { nid: id, .. } = self {
+
        if let Self::Connected { nid, streams, .. } = self {
+
            streams.shutdown();
+

            *self = Self::Disconnecting {
-
                id: Some(*id),
+
                id: Some(*nid),
                reason,
            };
        } else if let Self::Inbound {} = self {
@@ -666,11 +676,15 @@ where
                // therefore there is no need to initiate a disconnection. We simply remove
                // the peer from the map.
                match self.peers.remove(&fd) {
-
                    Some(peer) => {
+
                    Some(mut peer) => {
                        let reason = DisconnectReason::Connection(Arc::new(io::Error::from(
                            io::ErrorKind::ConnectionReset,
                        )));

+
                        if let Peer::Connected { streams, .. } = &mut peer {
+
                            streams.shutdown();
+
                        }
+

                        if let Some(id) = peer.id() {
                            self.service.disconnected(*id, &reason);
                        } else {
modified radicle-node/src/worker/channels.rs
@@ -70,6 +70,10 @@ impl<T: AsRef<[u8]>> Channels<T> {
    pub fn send(&self, event: ChannelEvent<T>) -> io::Result<()> {
        self.sender.send(event)
    }
+

+
    pub fn close(self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
+
        self.sender.close()
+
    }
}

/// Wraps a [`chan::Receiver`] and provides it with [`io::Read`].
@@ -175,4 +179,9 @@ impl<T: AsRef<[u8]>> ChannelWriter<T> {
    pub fn eof(&self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
        self.sender.send(ChannelEvent::Eof)
    }
+

+
    /// Permanently close this stream.
+
    pub fn close(self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
+
        self.sender.send(ChannelEvent::Close)
+
    }
}