Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Handle connection state discrepancies
cloudhead committed 1 year ago
commit 47c9f792e608c80ec4cdf57429a8177efcfcc73f
parent a1c9a927c28539cc0435269c3fa8121956eb89f0
2 files changed +58 -38
modified radicle-node/src/service.rs
@@ -1314,11 +1314,17 @@ where
            }
        } else {
            match self.sessions.entry(remote) {
-
                Entry::Occupied(e) => {
-
                    warn!(
+
                Entry::Occupied(mut e) => {
+
                    // In this scenario, it's possible that our peer is persistent, and
+
                    // disconnected. We get an inbound connection before we attempt a re-connection,
+
                    // and therefore we treat it as a regular inbound connection.
+
                    let peer = e.get_mut();
+
                    debug!(
                        target: "service",
-
                        "Connecting peer {remote} already has a session open ({})", e.get()
+
                        "Connecting peer {remote} already has a session open ({peer})"
                    );
+
                    peer.to_connected(self.clock);
+
                    self.outbox.write_all(peer, msgs);
                }
                Entry::Vacant(e) => {
                    if let HostName::Ip(ip) = addr.host {
@@ -1772,15 +1778,39 @@ where
        }
        message.log(log::Level::Debug, remote, Link::Inbound);

-
        trace!(target: "service", "Received message {:?} from {}", &message, peer.id);
+
        let connected = match &mut peer.state {
+
            session::State::Disconnected { .. } => {
+
                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
+
                return Ok(());
+
            }
+
            // In case of a discrepancy between the service state and the state of the underlying
+
            // wire protocol, we may receive a message from a peer that we consider not fully connected
+
            // at the service level. To remedy this, we simply transition the peer to a connected state.
+
            //
+
            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
+
            // solution to converge towards the same state.
+
            session::State::Attempted { .. } | session::State::Initial => {
+
                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
+
                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
+

+
                peer.to_connected(self.clock);
+

+
                None
+
            }
+
            session::State::Connected {
+
                ping, latencies, ..
+
            } => Some((ping, latencies)),
+
        };

-
        match (&mut peer.state, message) {
+
        trace!(target: "service", "Received message {message:?} from {remote}");
+

+
        match message {
            // Process a peer announcement.
-
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
-
                let relayer = peer.id;
+
            Message::Announcement(ann) => {
+
                let relayer = remote;
                let relayer_addr = peer.addr.clone();

-
                if let Some(id) = self.handle_announcement(&relayer, &relayer_addr, &ann)? {
+
                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
                    if self.config.is_relay() {
                        if let AnnouncementMessage::Inventory(_) = ann.message {
                            if let Err(e) = self
@@ -1797,7 +1827,7 @@ where
                    }
                }
            }
-
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
+
            Message::Subscribe(subscribe) => {
                // Filter announcements by interest.
                match self
                    .db
@@ -1829,11 +1859,10 @@ where
                }
                peer.subscribe = Some(subscribe);
            }
-
            (session::State::Connected { .. }, Message::Info(info)) => {
-
                let remote = peer.id;
-
                self.handle_info(remote, &info)?;
+
            Message::Info(info) => {
+
                self.handle_info(*remote, &info)?;
            }
-
            (session::State::Connected { .. }, Message::Ping(Ping { ponglen, .. })) => {
+
            Message::Ping(Ping { ponglen, .. }) => {
                // Ignore pings which ask for too much data.
                if ponglen > Ping::MAX_PONG_ZEROES {
                    return Ok(());
@@ -1845,33 +1874,24 @@ where
                    },
                );
            }
-
            (
-
                session::State::Connected {
-
                    ping, latencies, ..
-
                },
-
                Message::Pong { zeroes },
-
            ) => {
-
                if let session::PingState::AwaitingResponse {
-
                    len: ponglen,
-
                    since,
-
                } = *ping
-
                {
-
                    if (ponglen as usize) == zeroes.len() {
-
                        *ping = session::PingState::Ok;
-
                        // Keep track of peer latency.
-
                        latencies.push_back(self.clock - since);
-
                        if latencies.len() > MAX_LATENCIES {
-
                            latencies.pop_front();
+
            Message::Pong { zeroes } => {
+
                if let Some((ping, latencies)) = connected {
+
                    if let session::PingState::AwaitingResponse {
+
                        len: ponglen,
+
                        since,
+
                    } = *ping
+
                    {
+
                        if (ponglen as usize) == zeroes.len() {
+
                            *ping = session::PingState::Ok;
+
                            // Keep track of peer latency.
+
                            latencies.push_back(self.clock - since);
+
                            if latencies.len() > MAX_LATENCIES {
+
                                latencies.pop_front();
+
                            }
                        }
                    }
                }
            }
-
            (session::State::Attempted { .. } | session::State::Initial, msg) => {
-
                debug!(target: "service", "Ignoring unexpected message {:?} from connecting peer {}", msg, peer.id);
-
            }
-
            (session::State::Disconnected { .. }, msg) => {
-
                debug!(target: "service", "Ignoring {:?} from disconnected peer {}", msg, peer.id);
-
            }
        }
        Ok(())
    }
modified radicle-node/src/service/session.rs
@@ -230,8 +230,8 @@ impl Session {
    pub fn to_connected(&mut self, since: LocalTime) {
        self.last_active = since;

-
        let State::Attempted = &self.state else {
-
            panic!("Session::to_connected: can only transition to 'connected' state from 'attempted' state");
+
        if let State::Connected { .. } = &self.state {
+
            panic!("Session::to_connected: session is already in 'connected' state");
        };
        self.state = State::Connected {
            since,