Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol/connections: ensure no sessions for local node
Fintan Halpenny committed 3 months ago
commit c8095f343ec356105d492cd1a26a5efe4f88f49b
parent 4bfc8fff53a9e4ccd51904c6dca610330568850b
3 files changed +76 -5
modified crates/radicle-protocol/src/connections/state.rs
@@ -71,6 +71,8 @@ use super::Attempts;
/// [`ConnectionType`]: session::ConnectionType
#[derive(Debug)]
pub struct Connections {
+
    /// [`NodeId`] of the running node.
+
    local: NodeId,
    /// The state of the connection lifecycle for each node in the network.
    sessions: Sessions,
    /// Rate limiter of IP hosts.
@@ -83,8 +85,9 @@ impl Connections {
    /// Construct a new [`Connections`] with the provided [`Config`] and [`RateLimiter`].
    ///
    /// The state will start with no [`Sessions`], to begin.
-
    pub fn new(config: Config, limiter: RateLimiter) -> Self {
+
    pub fn new(local: NodeId, config: Config, limiter: RateLimiter) -> Self {
        Self {
+
            local,
            sessions: Sessions::default(),
            limiter,
            config,
@@ -145,6 +148,11 @@ impl Connections {
    ///
    /// Transitions the state of the existing session to `Attempted`.
    pub fn attempted(&mut self, command::Attempt { node }: command::Attempt) -> event::Attempted {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::Attempted::SelfConnection { node })
+
        {
+
            return event;
+
        }
        self.sessions
            .session_to_attempted(&node)
            .map(event::Attempted::attempt)
@@ -169,6 +177,10 @@ impl Connections {
        if self.is_disconnected(&node) {
            return event::Connect::disconnected(node);
        }
+
        if let Some(event) = self.guard_self_session(&node, event::Connect::SelfConnection { node })
+
        {
+
            return event;
+
        }
        if self.is_connecting(&node) {
            return event::Connect::already_connecting(node);
        }
@@ -211,6 +223,11 @@ impl Connections {
                addr,
                connection_type,
            } => {
+
                if let Some(event) =
+
                    self.guard_self_session(&node, event::Connected::SelfConnection { node })
+
                {
+
                    return event;
+
                }
                // In this scenario, it's possible that our peer is persistent, and
                // disconnected. We get an inbound connection before we attempt a re-connection,
                // and therefore we treat it as a regular inbound connection.
@@ -239,6 +256,11 @@ impl Connections {
                addr: _,
                connection_type,
            } => {
+
                if let Some(event) =
+
                    self.guard_self_session(&node, event::Connected::SelfConnection { node })
+
                {
+
                    return event;
+
                }
                // Transitions the session to connected no matter what state it is in
                match self.sessions.session_to_connected(
                    &node,
@@ -284,6 +306,11 @@ impl Connections {
        }: command::Disconnect,
        reason: &DisconnectReason,
    ) -> event::Disconnected {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::Disconnected::SelfConnection { node })
+
        {
+
            return event;
+
        }
        let Some(session) = self.sessions.get_session(&node) else {
            return event::Disconnected::missing(node);
        };
@@ -323,6 +350,11 @@ impl Connections {
        &mut self,
        command::Reconnect { node }: command::Reconnect,
    ) -> event::Reconnect {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::Reconnect::SelfConnection { node })
+
        {
+
            return event;
+
        }
        self.sessions
            .session_to_initial(&node)
            .map(event::Reconnect::reconnecting)
@@ -397,6 +429,11 @@ impl Connections {
        }: command::Message,
        now: LocalTime,
    ) -> event::HandledMessage {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::HandledMessage::SelfConnection { node })
+
        {
+
            return event;
+
        }
        if self.sessions.is_diconnected(&node) {
            return event::HandledMessage::Disconnected { node };
        }
@@ -502,6 +539,10 @@ impl Connections {
        self.sessions.unresponsive(*now, self.config.stale())
    }

+
    fn guard_self_session<T>(&self, node: &NodeId, event: T) -> Option<T> {
+
        (&self.local == node).then_some(event)
+
    }
+

    fn has_reached_inbound_limit(&self) -> bool {
        self.sessions.connected_inbound() >= self.config.inbound.maximum
    }
modified crates/radicle-protocol/src/connections/state/event.rs
@@ -43,6 +43,8 @@ pub enum Attempted {
    },
    /// The session did not exist for this node, and it was expected to.
    MissingSession { node: NodeId },
+
    /// Attempted to connect to the local node.
+
    SelfConnection { node: NodeId },
}

impl Attempted {
@@ -80,6 +82,8 @@ pub enum Connect {
        /// local node.
        record_ip: Option<IpAddr>,
    },
+
    /// Attempted to connect to the local node.
+
    SelfConnection { node: NodeId },
}

impl Connect {
@@ -119,6 +123,8 @@ pub enum Connected {
    },
    /// An existing session was expected for the node, but there was none.
    MissingSession { node: NodeId },
+
    /// Connection came from the local node.
+
    SelfConnection { node: NodeId },
}

impl Connected {
@@ -169,6 +175,8 @@ pub enum Disconnected {
        /// The link that was expected from the call to disconnect.
        expected: Link,
    },
+
    /// Attempted to disconnect from the local node.
+
    SelfConnection { node: NodeId },
}

impl Disconnected {
@@ -215,6 +223,8 @@ pub enum Reconnect {
    },
    /// An existing session was expected for the node, but there was none.
    MissingSession { node: NodeId },
+
    /// Attempted to reconnect to the local node.
+
    SelfConnection { node: NodeId },
}

impl Reconnect {
@@ -251,6 +261,8 @@ pub enum HandledMessage {
    },
    /// An existing session was expected for the node, but there was none.
    MissingSession { node: NodeId },
+
    /// Message originated from the local node.
+
    SelfConnection { node: NodeId },
}

/// The result of pinging a connected session.
modified crates/radicle-protocol/src/service.rs
@@ -493,6 +493,7 @@ where
                outbound,
            };
            connections::state::Connections::new(
+
                *signer.node_id(),
                connections_config,
                RateLimiter::new(config.peers()),
            )
@@ -1262,6 +1263,9 @@ where
                #[cfg(debug_assertions)]
                panic!("Service::attempted: unknown session {nid}@{addr}");
            }
+
            event::Attempted::SelfConnection { node } => {
+
                debug!(target: "service", "Attempted connection to this running node {node}");
+
            }
        }
    }

@@ -1298,6 +1302,9 @@ where
            event::Connected::MissingSession { node } => {
                debug!(target: "service", "Could not transition {node} to connect since its session is missing");
            }
+
            event::Connected::SelfConnection { node } => {
+
                warn!(target: "service", "Connected to local node {node}");
+
            }
        }
    }

@@ -1361,6 +1368,9 @@ where
                // connections. In that case we don't want the service to remove the session.
                trace!(target: "service", "Conflicting sessions {node} found={found} expected={expected}");
            }
+
            event::Disconnected::SelfConnection { node } => {
+
                warn!(target: "service", "Disconnection came for local node {node}");
+
            }
        }

        let cmd = fetcher::state::command::Cancel { from: remote };
@@ -1756,6 +1766,10 @@ where
            HandledMessage::Connected { session } => session,
            HandledMessage::Subscribed { session } => session,
            HandledMessage::Pinged { session, pinged: _ } => session,
+
            HandledMessage::SelfConnection { node } => {
+
                warn!(target: "service", "Message sender is the local node {node}");
+
                return Ok(());
+
            }
        };

        message.log(log::Level::Debug, remote, Link::Inbound);
@@ -2135,6 +2149,10 @@ where
                debug!("Reconnecting to missing session for {node}");
                false
            }
+
            event::Reconnect::SelfConnection { node } => {
+
                warn!(target: "service", "Attempted reconnect with local node {node}");
+
                false
+
            }
        }
    }

@@ -2142,16 +2160,16 @@ where
        use connections::state::command;
        use connections::state::event;

-
        if nid == self.node_id() {
-
            return Err(ConnectError::SelfConnection);
-
        }
-

        let command = command::Connect {
            node: nid,
            addr: addr.clone(),
            connection_type: self.connection_type(&nid),
        };
        match self.connections.connect(command, self.clock) {
+
            event::Connect::SelfConnection { node } => {
+
                debug!(target: "service", "Attempted connect to local node {node}");
+
                Err(ConnectError::SelfConnection)
+
            }
            event::Connect::AlreadyConnected { session } => {
                let node = session.node();
                trace!(target: "service", "Connected to {node} already");