Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: connections manager
Fintan Halpenny committed 3 months ago
commit 1dc8d26821ee1ee39fa84c926a67a9bd687d08dd
parent 52d903a32fc958c3b34f0fda464a6577a41928ac
4 files changed +974 -0
modified crates/radicle-protocol/src/connections.rs
@@ -1,3 +1,46 @@
+
pub mod state;
+

pub mod session;
pub use session::State;
pub use session::{Attempts, Pinged, Session, Sessions};
+

+
use localtime::LocalDuration;
+
use radicle::node::config::RateLimits;
+

+
/// Minimum amount of time to wait before reconnecting to a peer.
+
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
+
/// Maximum amount of time to wait before reconnecting to a peer.
+
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
+

+
#[derive(Debug)]
+
pub struct Config {
+
    /// Duration for a connection to be considered idle.
+
    pub idle: LocalDuration,
+
    /// Duration to wait until a ping is sent to a connection.
+
    pub keep_alive: LocalDuration,
+
    /// Duration to wait until a connection is considered stale.
+
    pub stale_connection: LocalDuration,
+
    /// Allowed number of inbound connections
+
    pub inbound_limit: usize,
+
    /// The number of outbound peers that we want to reach.
+
    pub target_outbound_peers: usize,
+
    pub limits: RateLimits,
+
    pub reconnection_delay: ReconnectionDelay,
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct ReconnectionDelay {
+
    /// The minimum amount of time to wait before attempting a re-connection.
+
    pub min_delta: LocalDuration,
+
    /// The maximum amount of time to wait before attempting a re-connection.
+
    pub max_delta: LocalDuration,
+
}
+

+
impl Default for ReconnectionDelay {
+
    fn default() -> Self {
+
        Self {
+
            min_delta: MIN_RECONNECTION_DELTA,
+
            max_delta: MAX_RECONNECTION_DELTA,
+
        }
+
    }
+
}
added crates/radicle-protocol/src/connections/state.rs
@@ -0,0 +1,565 @@
+
pub mod command;
+
pub mod event;
+

+
use std::net::IpAddr;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::config::RateLimit;
+
use radicle::node::{address, Severity};
+
use radicle::node::{HostName, Link, NodeId};
+
use radicle::prelude::RepoId;
+

+
use crate::connections::session;
+
use crate::connections::session::Sessions;
+
use crate::connections::Config;
+
use crate::service::limiter::RateLimiter;
+
use crate::service::{message, DisconnectReason};
+

+
use super::Attempts;
+

+
/// Manage the state of node connections for a running node.
+
///
+
/// Note the following terminology:
+
///
+
/// - Outbound connection is one that is originating from this node to another node.
+
/// - Inbound connection is one that is coming from another node to this node.
+
///
+
/// These [`Sessions`] are categorized into one of the four following states.
+
///
+
/// # Initial
+
///
+
/// - [`Connections::connect`]
+
/// - [`Connections::reconnect`]
+
///
+
/// A connection is in the initial state when the running node has attempted to
+
/// make an outbound connection to another node.
+
///
+
/// It can also be in an initial state when a disconnected node is being
+
/// reconnected, and thus goes back to the initial state.
+
///
+
/// # Attempted
+
///
+
/// - [`Connections::attempted`]
+
///
+
/// A connection is in the attempted state when it was previously in the initial
+
/// state, and an attempt to make a connection was made.
+
///
+
/// # Connected
+
///
+
/// - [`Connections::connected`]
+
///
+
/// A connection is considered connected in one of two cases.
+
///
+
/// The attempted outbound connection was established. In this case, there must
+
/// have been a session to transition to being connected.
+
///
+
/// If the connection is inbound then the connection is simply marked as
+
/// connected, regardless of the state of a previous connection.
+
///
+
/// # Disconnected
+
///
+
/// - [`Connections::disconnected`]
+
///
+
/// A connection is marked as disconnected only if it is considered a persisted
+
/// peer (see [`ConnectionType`]). If this is the case, then a reconnection
+
/// attempt should be made after an appropriate delay.
+
///
+
/// If the connection is not considered for persistence, then it will be removed
+
/// from the [`Sessions`], and may be penalized for the severity of its
+
/// disconnection reason.
+
///
+
/// [`ConnectionType`]: session::ConnectionType
+
#[derive(Debug)]
+
pub struct Connections {
+
    /// The state of the connection lifecycle for each node in the network.
+
    sessions: Sessions,
+
    /// Rate limiter of IP hosts.
+
    limiter: RateLimiter,
+
    /// Configuration for managing connections.
+
    config: Config,
+
}
+

+
impl Connections {
+
    /// Construct a new [`Connections`] with the provided [`Config`] and [`RateLimiter`].
+
    ///
+
    /// The state will start with no [`Sessions`], to begin.
+
    pub fn new(config: Config, limiter: RateLimiter) -> Self {
+
        Self {
+
            sessions: Sessions::default(),
+
            limiter,
+
            config,
+
        }
+
    }
+

+
    /// Return the [`Config`] the [`Connections`] were initialized with.
+
    pub fn config(&self) -> &Config {
+
        &self.config
+
    }
+
}
+

+
impl Connections {
+
    // TODO(finto): we could enforce that only an accepted `IpAddr` is allowed
+
    // for calling to `connected` – which also helps reinforce that they are
+
    // interconnected.
+
    /// Perform checks on whether an incoming IP address should be accepted for
+
    /// connecting to.
+
    ///
+
    /// The caller can decide based on the resulting [`event::Accept`] whether
+
    /// to accept the connection. However, the following events are recommended
+
    /// to result in a rejected address:
+
    /// - [`event::Accept::LimitExceeded`]
+
    /// - [`event::Accept::HostLimited`]
+
    ///
+
    /// # State Transition
+
    ///
+
    /// This does not transition any session states, and simply inspects the
+
    /// rate limiter and [`IpAddr`] properties.
+
    pub fn accept(
+
        &mut self,
+
        command::Accept { ip }: command::Accept,
+
        now: LocalTime,
+
    ) -> event::Accept {
+
        // Always accept localhost connections, even if we already reached
+
        // our inbound connection limit.
+
        if ip.is_loopback() || ip.is_unspecified() {
+
            return event::Accept::LocalHost { ip };
+
        }
+

+
        if self.has_reached_inbound_limit() {
+
            return event::Accept::LimitExceeded {
+
                ip,
+
                current_inbound: self.sessions.connected_inbound(),
+
            };
+
        }
+

+
        if self.has_reached_ip_limit(&ip, now) {
+
            return event::Accept::HostLimited { ip };
+
        }
+

+
        event::Accept::Accepted { ip }
+
    }
+

+
    /// Mark a connection, with the given node, as attempted.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// Transitions the state of the existing session to `Attempted`.
+
    pub fn attempted(&mut self, command::Attempt { node }: command::Attempt) -> event::Attempted {
+
        self.sessions
+
            .session_to_attempted(&node)
+
            .map(event::Attempted::attempt)
+
            .unwrap_or(event::Attempted::missing(node))
+
    }
+

+
    /// Make an outbound connection to another node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// A new session will only be created for the given node if a session does
+
    /// already exist.
+
    pub fn connect(
+
        &mut self,
+
        command::Connect {
+
            node,
+
            addr,
+
            connection_type,
+
        }: command::Connect,
+
        now: LocalTime,
+
    ) -> event::Connect {
+
        if self.is_disconnected(&node) {
+
            return event::Connect::disconnected(node);
+
        }
+
        if self.is_connecting(&node) {
+
            return event::Connect::already_connecting(node);
+
        }
+
        match self.sessions.get_connected(&node) {
+
            Some(session) => event::Connect::already_connected(session.clone()),
+
            None => {
+
                let record_ip = match addr.host {
+
                    HostName::Ip(ip) => (!address::is_local(&ip)).then_some(ip),
+
                    _ => None,
+
                };
+
                self.sessions.outbound(node, addr, connection_type, now);
+
                event::Connect::establish(node, connection_type, record_ip)
+
            }
+
        }
+
    }
+

+
    /// Mark a connection as connected to another node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// The transition of the connection depends on the kind of the incoming
+
    /// connection.
+
    ///
+
    /// ## Inbound
+
    ///
+
    /// The connection transitions to connected regardless of what state of the
+
    /// session was in before and if the session did not exist.
+
    ///
+
    /// If the session existed, before the transition, then it is marked as
+
    /// inbound.
+
    ///
+
    /// ## Outbound
+
    ///
+
    /// The connection transitions to connected regardless of what state of the
+
    /// session was in before, however, it must have had an existing session before.
+
    pub fn connected(&mut self, connected: command::Connected, now: LocalTime) -> event::Connected {
+
        match connected {
+
            command::Connected::Inbound {
+
                node,
+
                addr,
+
                connection_type,
+
            } => {
+
                // In this scenario, it's possible that our peer is persistent, and
+
                // disconnected. We get an inbound connection before we attempt a re-connection,
+
                // and therefore we treat it as a regular inbound connection.
+
                //
+
                // It's also possible that a disconnection hasn't gone through yet and our
+
                // peer is still in connected state here, while a new inbound connection from
+
                // that same peer is made. This results in a new connection from a peer that is
+
                // already connected from the perspective of the service. This appears to be
+
                // a bug in the underlying networking library.
+
                match self.sessions.session_to_connected(
+
                    &node,
+
                    now,
+
                    Some(Link::Inbound),
+
                    connection_type,
+
                ) {
+
                    None => {
+
                        let session = self.sessions.inbound(node, addr, connection_type, now);
+
                        event::Connected::established(session)
+
                    }
+
                    Some(session) => event::Connected::established(session),
+
                }
+
            }
+
            // TODO(finto): why was the address never used? Or did I miss something
+
            command::Connected::Outbound {
+
                node,
+
                addr: _,
+
                connection_type,
+
            } => {
+
                // Transitions the session to connected no matter what state it is in
+
                match self.sessions.session_to_connected(
+
                    &node,
+
                    now,
+
                    Some(Link::Outbound),
+
                    connection_type,
+
                ) {
+
                    None => event::Connected::missing(node),
+
                    Some(session) => event::Connected::established(session),
+
                }
+
            }
+
        }
+
    }
+

+
    /// Disconnect a node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// The [`ConnectionType`] decides how a disconnected node should be
+
    /// treated.
+
    ///
+
    /// ## `Ephemeral`
+
    ///
+
    /// If the connection is ephemeral, then the session for that connection is
+
    /// removed, and the severity of the reason is recorded.
+
    ///
+
    /// The severity can then be used for penalizing a node.
+
    ///
+
    /// ## `Persistent`
+
    ///
+
    /// If the connection is persistent, then the session will remain, and be
+
    /// marked as disconnected. The connection should then be retried after the
+
    /// returned delay.
+
    ///
+
    /// [`ConnectionType`]: session::ConnectionType
+
    pub fn disconnected(
+
        &mut self,
+
        command::Disconnect {
+
            node,
+
            link,
+
            since,
+
            connection_type,
+
        }: command::Disconnect,
+
        reason: &DisconnectReason,
+
    ) -> event::Disconnected {
+
        let Some(session) = self.sessions.get_session(&node) else {
+
            return event::Disconnected::missing(node);
+
        };
+
        if matches!(session.state(), session::State::Disconnected(_)) {
+
            return event::Disconnected::already_disconnected(node);
+
        }
+
        if *session.link() != link {
+
            return event::Disconnected::conflict(&session, link);
+
        }
+

+
        match connection_type {
+
            session::ConnectionType::Ephemeral => {
+
                let severity = self.reason_severity(reason, since);
+
                self.sessions
+
                    .remove_session(&node)
+
                    .map(|session| event::Disconnected::severed(session, severity))
+
                    .unwrap_or(event::Disconnected::missing(node))
+
            }
+
            session::ConnectionType::Persistent => {
+
                let delay = self.reconnection_delay(session.attempts());
+
                let retry_at = since + delay;
+
                self.sessions
+
                    .session_to_disconnected(&node, since, retry_at)
+
                    .map(|session| event::Disconnected::retry(session, delay, retry_at))
+
                    .unwrap_or(event::Disconnected::missing(node))
+
            }
+
        }
+
    }
+

+
    /// Reconnect the node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// The session must be in the disconnected state, and transitions to the
+
    /// initial state.
+
    pub fn reconnect(
+
        &mut self,
+
        command::Reconnect { node }: command::Reconnect,
+
    ) -> event::Reconnect {
+
        self.sessions
+
            .session_to_initial(&node)
+
            .map(event::Reconnect::reconnecting)
+
            .unwrap_or(event::Reconnect::missing(node))
+
    }
+

+
    /// Mark connected nodes as stable.
+
    ///
+
    /// If the connected session has lasted longer than the configured stable
+
    /// threshold duration, then the session will be marked as stable, and the
+
    /// attempts counter is reset.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// This does not change the session's state.
+
    pub fn stabilise(&mut self, now: LocalTime) -> Vec<session::Session<session::Connected>> {
+
        self.sessions
+
            .connected_mut()
+
            .sessions()
+
            .fold(Vec::new(), |mut stabilised, session| {
+
                // Only stabilise sessions that are not already marked as stable
+
                if !session.is_stable() {
+
                    let stable = session
+
                        .stabilise(now, self.config.stale_connection)
+
                        .then_some(session.clone());
+
                    stabilised.extend(stable);
+
                    stabilised
+
                } else {
+
                    stabilised
+
                }
+
            })
+
    }
+

+
    /// Ping any inactive connections to see if they are alive.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// This does not change the sessions' state.
+
    pub fn ping<'a>(
+
        &'a mut self,
+
        mut ping: impl FnMut() -> message::Ping + 'a,
+
        now: LocalTime,
+
    ) -> impl Iterator<Item = event::Ping> + 'a {
+
        let keep_alive = self.config.keep_alive;
+
        self.sessions
+
            .inactive(now, keep_alive)
+
            .map(move |(_, session)| event::Ping {
+
                session: session.clone(),
+
                ping: session.ping(ping(), now),
+
            })
+
    }
+

+
    /// Process a incoming message from a node.
+
    ///
+
    /// The [`Payload`] of the message may alter the state of the session.
+
    ///
+
    /// If the node is marked as disconnected, then the message is dropped from
+
    /// affecting the node's session.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// Since a message must come from a connected node, the session will
+
    /// transition from its initial or attempted state to connected.
+
    ///
+
    /// [`Payload`]: command::Payload
+
    pub fn handle_message(
+
        &mut self,
+
        command::Message {
+
            node,
+
            payload,
+
            connection_type,
+
        }: command::Message,
+
        now: LocalTime,
+
    ) -> event::HandledMessage {
+
        if self.sessions.is_diconnected(&node) {
+
            return event::HandledMessage::Disconnected { node };
+
        }
+
        let outbound_limit = RateLimit::from(self.config.limits.outbound);
+
        let inbound_limit = RateLimit::from(self.config.limits.inbound);
+
        let result =
+
            self.sessions
+
                .while_connecting(&node, None, connection_type, now, |connected| {
+
                    let limit: RateLimit = match connected.link() {
+
                        Link::Outbound => outbound_limit,
+
                        Link::Inbound => inbound_limit,
+
                    };
+
                    if self.limiter.limit(
+
                        connected.address().clone().into(),
+
                        Some(&connected.node()),
+
                        &limit,
+
                        now,
+
                    ) {
+
                        return event::HandledMessage::RateLimited { node };
+
                    }
+
                    match payload {
+
                        Some(command::Payload::Subscribe(subscription)) => {
+
                            connected.set_subscription(subscription);
+
                            event::HandledMessage::Subscribed {
+
                                session: connected.clone(),
+
                            }
+
                        }
+
                        Some(command::Payload::Pong(pong)) => {
+
                            let pinged = connected.pinged(pong);
+
                            event::HandledMessage::Pinged {
+
                                session: connected.clone(),
+
                                pinged,
+
                            }
+
                        }
+
                        None => event::HandledMessage::Connected {
+
                            session: connected.clone(),
+
                        },
+
                    }
+
                });
+
        result.unwrap_or(event::HandledMessage::MissingSession { node })
+
    }
+

+
    /// Add a repository to the given node's subscription.
+
    ///
+
    /// Returns `true` if the session existed and the repository was added to
+
    /// the subscription successfully.
+
    pub fn subscribe_to(&mut self, node: &NodeId, rid: &RepoId) -> session::SubscribeTo {
+
        self.sessions.subscribe_to(node, rid)
+
    }
+
}
+

+
impl Connections {
+
    /// The [`Sessions`] that are currently being managed.
+
    pub fn sessions(&self) -> &Sessions {
+
        &self.sessions
+
    }
+

+
    /// Returns `true` is the session exists for the given node.
+
    pub fn has_session(&self, node: &NodeId) -> bool {
+
        self.sessions.has_session_for(node)
+
    }
+

+
    /// Returns the number of outbound connections that are in a "connecting"
+
    /// state. That is, they are either attempting to connect or have already
+
    /// connected.
+
    pub fn number_of_outbound_connections(&self) -> usize {
+
        self.sessions.number_of_outbound_connections()
+
    }
+

+
    /// Returns the number of inbound connections that are in a "connecting"
+
    /// state. That is, they are either attempting to connect or have already
+
    /// connected.
+
    pub fn number_of_inbound_connections(&self) -> usize {
+
        self.sessions.number_of_inbound_connections()
+
    }
+

+
    /// Return the [`Session`] for the given [`NodeId`], if it exists.
+
    /// Note that the session can be in any [`State`].
+
    ///
+
    /// [`Session`]: session::Session
+
    /// [`State`]: session::State
+
    pub fn session_for(&self, node: &NodeId) -> Option<session::Session<session::State>> {
+
        self.sessions.get_session(node)
+
    }
+

+
    /// Return the connected [`Session`] for the given [`NodeId`], if it exists.
+
    ///
+
    /// [`Session`]: session::Session
+
    pub fn get_connected(&self, node: &NodeId) -> Option<&session::Session<session::Connected>> {
+
        self.sessions.get_connected(node)
+
    }
+

+
    /// Return an `Iterator` of all unresponsive, connected [`Session`]s.
+
    ///
+
    /// A session is considered unresponsive, if it has be inactive after the
+
    /// configured stale connection duration.
+
    ///
+
    /// [`Session`]: session::Session
+
    pub fn unresponsive(
+
        &self,
+
        now: &LocalTime,
+
    ) -> impl Iterator<Item = (&NodeId, &session::Session<session::Connected>)> {
+
        self.sessions
+
            .unresponsive(*now, self.config.stale_connection)
+
    }
+

+
    fn has_reached_inbound_limit(&self) -> bool {
+
        self.sessions.connected_inbound() >= self.config.inbound_limit
+
    }
+

+
    fn has_reached_ip_limit(&mut self, ip: &IpAddr, now: LocalTime) -> bool {
+
        let addr = HostName::from(*ip);
+
        self.limiter
+
            .limit(addr, None, &self.config.limits.inbound, now)
+
    }
+

+
    fn reason_severity(&self, reason: &DisconnectReason, now: LocalTime) -> Severity {
+
        match reason {
+
            DisconnectReason::Dial(_)
+
            | DisconnectReason::Fetch(_)
+
            | DisconnectReason::Connection(_) => {
+
                if self.is_online(now) {
+
                    // If we're "online", there's something wrong with this
+
                    // peer connection specifically.
+
                    Severity::Medium
+
                } else {
+
                    Severity::Low
+
                }
+
            }
+
            DisconnectReason::Session(e) => e.severity(),
+
            DisconnectReason::Command
+
            | DisconnectReason::Conflict
+
            | DisconnectReason::SelfConnection => Severity::Low,
+
        }
+
    }
+

+
    /// Try to guess whether we're online or not.
+
    fn is_online(&self, now: LocalTime) -> bool {
+
        self.sessions
+
            .connected()
+
            .sessions()
+
            .filter(|s| s.address().is_routable() && *s.last_active() >= now - self.idle())
+
            .count()
+
            > 0
+
    }
+

+
    fn idle(&self) -> LocalDuration {
+
        self.config.idle
+
    }
+

+
    fn reconnection_delay(&self, attempts: Attempts) -> LocalDuration {
+
        let attempts = u32::try_from(usize::from(attempts)).unwrap_or(u32::MAX);
+
        LocalDuration::from_secs(2u64.saturating_pow(attempts)).clamp(
+
            self.config.reconnection_delay.min_delta,
+
            self.config.reconnection_delay.max_delta,
+
        )
+
    }
+

+
    fn is_connecting(&self, node: &NodeId) -> bool {
+
        self.sessions.is_initial(node) || self.sessions.is_attempted(node)
+
    }
+

+
    fn is_disconnected(&self, node: &NodeId) -> bool {
+
        self.sessions.is_diconnected(node)
+
    }
+
}
added crates/radicle-protocol/src/connections/state/command.rs
@@ -0,0 +1,104 @@
+
use std::net::IpAddr;
+

+
use localtime::LocalTime;
+
use radicle::node::{Address, Link, NodeId};
+

+
use crate::connections::session;
+
use crate::connections::session::ConnectionType;
+
use crate::service::message;
+
use crate::service::ZeroBytes;
+

+
/// Check whether the incoming [`IpAddr`] should be accepted for connection.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Accept {
+
    pub ip: IpAddr,
+
}
+

+
/// Mark a connection as attempted.
+
pub struct Attempt {
+
    /// The node that is being attempted.
+
    pub node: NodeId,
+
}
+

+
/// Make an outbound connection to a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Connect {
+
    /// The node that is being connected to.
+
    pub node: NodeId,
+
    /// The found address of the node that is being contacted.
+
    pub addr: Address,
+
    /// Mark the session with the given [`ConnectionType`].
+
    pub connection_type: ConnectionType,
+
}
+

+
/// Mark the node as connected.
+
pub enum Connected {
+
    /// The connected node is made through an inbound connection.
+
    Inbound {
+
        /// The node that is now connected.
+
        node: NodeId,
+
        /// The address the node is connected via.
+
        addr: Address,
+
        /// Mark the session with the given [`ConnectionType`].
+
        connection_type: ConnectionType,
+
    },
+
    /// The connected node is made through an outbound connection.
+
    Outbound {
+
        /// The node that is now connected.
+
        node: NodeId,
+
        /// The address the node is connected via.
+
        addr: Address,
+
        /// Mark the session with the given [`ConnectionType`].
+
        connection_type: ConnectionType,
+
    },
+
}
+

+
/// Either mark the node as disconnected, or remove its session.
+
#[derive(Debug)]
+
pub struct Disconnect {
+
    /// The node being disconnected.
+
    pub node: NodeId,
+
    /// The link of the disconnection.
+
    pub link: Link,
+
    /// When did the disconnection occur.
+
    pub since: LocalTime,
+
    /// Decides whether the session is disconnected or removed.
+
    pub connection_type: ConnectionType,
+
}
+

+
/// Mark the node as initial, if it was disconnected.
+
#[derive(Debug)]
+
pub struct Reconnect {
+
    pub node: NodeId,
+
}
+

+
/// Handle an incoming message from the given node.
+
pub struct Message {
+
    /// The node sending the message.
+
    pub node: NodeId,
+
    /// The payload that is part of the incoming message.
+
    ///
+
    /// Not all messages are required for changing the state of the connection's
+
    /// state, so it is optional.
+
    pub payload: Option<Payload>,
+
    /// Mark the session with the given [`ConnectionType`].
+
    pub connection_type: ConnectionType,
+
}
+

+
/// The payload of an incoming message.
+
pub enum Payload {
+
    /// The message describes the node's subscription payload.
+
    Subscribe(message::Subscribe),
+
    /// The message was a "pong" in response to this node's "ping".
+
    Pong(session::Pong),
+
}
+

+
impl Payload {
+
    pub fn pong(zeroes: ZeroBytes, now: LocalTime) -> Self {
+
        Self::Pong(session::Pong { now, zeroes })
+
    }
+

+
    pub fn subscribe(subscription: message::Subscribe) -> Self {
+
        Self::Subscribe(subscription)
+
    }
+
}
added crates/radicle-protocol/src/connections/state/event.rs
@@ -0,0 +1,262 @@
+
use std::net::IpAddr;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::{Link, NodeId, Severity};
+

+
use crate::connections::session;
+
use crate::connections::session::{ConnectionType, Pinged, Session};
+
use crate::service::message;
+

+
/// The result of checking an address for accepting an inbound connection.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Accept {
+
    /// The inbound limit for the node has been reached.
+
    ///
+
    /// It is recommended that the incoming connection is rejected.
+
    LimitExceeded {
+
        /// The [`IpAddr`] that made the attempt.
+
        ip: IpAddr,
+
        /// The current inbound size.
+
        current_inbound: usize,
+
    },
+
    /// The address has been rate limited.
+
    ///
+
    /// It is recommended that the incoming connection is rejected.
+
    HostLimited {
+
        /// The [`IpAddr`] that made the attempt, and is being rate limited.
+
        ip: IpAddr,
+
    },
+
    /// The [`IpAddr`] is likely a localhost connection.
+
    ///
+
    /// It is recommended that this is accepted for local area networks.
+
    LocalHost { ip: IpAddr },
+
    /// The [`IpAddr`] should be accepted by the system, and the connection allowed.
+
    Accepted { ip: IpAddr },
+
}
+

+
/// The result of a connection attempt from a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Attempted {
+
    /// The connection was transitioned to attempted.
+
    ConnectionAttempt {
+
        session: Box<Session<session::Attempted>>,
+
    },
+
    /// The session did not exist for this node, and it was expected to.
+
    MissingSession { node: NodeId },
+
}
+

+
impl Attempted {
+
    pub(super) fn attempt(session: Session<session::Attempted>) -> Self {
+
        Self::ConnectionAttempt {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result when making an outbound connection to a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Connect {
+
    /// The node is already being connected to, but has not transitioned to
+
    /// fully-connected.
+
    AlreadyConnecting { node: NodeId },
+
    /// The node already has a connected session.
+
    AlreadyConnected {
+
        session: Box<session::Session<session::Connected>>,
+
    },
+
    /// The node is already in a disconnected state, and requires a call to
+
    /// reconnect to transition it to initial.
+
    Disconnected { node: NodeId },
+
    /// The caller should establish the outbound connection.
+
    Establish {
+
        /// The node to establish the connection with.
+
        node: NodeId,
+
        /// The session was given this [`ConnectionType`].
+
        connection_type: ConnectionType,
+
        /// If this is `Some`, then the [`IpAddr`] should be recorded by the
+
        /// local node.
+
        record_ip: Option<IpAddr>,
+
    },
+
}
+

+
impl Connect {
+
    pub(super) fn already_connecting(node: NodeId) -> Self {
+
        Self::AlreadyConnecting { node }
+
    }
+

+
    pub(super) fn already_connected(session: session::Session<session::Connected>) -> Self {
+
        Self::AlreadyConnected {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn disconnected(node: NodeId) -> Self {
+
        Self::Disconnected { node }
+
    }
+

+
    pub(super) fn establish(
+
        node: NodeId,
+
        connection_type: ConnectionType,
+
        record_ip: Option<IpAddr>,
+
    ) -> Self {
+
        Self::Establish {
+
            node,
+
            connection_type,
+
            record_ip,
+
        }
+
    }
+
}
+

+
/// The result when a node is marked a connected.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Connected {
+
    /// The connection was marked as connected.
+
    Established {
+
        session: Box<session::Session<session::Connected>>,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
}
+

+
impl Connected {
+
    pub(super) fn established(session: session::Session<session::Connected>) -> Self {
+
        Self::Established {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result when a node is disconnected.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Disconnected {
+
    /// The session was marked as disconnected and a reconnection should be
+
    /// tried.
+
    Retry {
+
        /// The session that is now marked as disconnected.
+
        session: session::Session<session::Disconnected>,
+
        /// The delay to wait until the reconnection.
+
        delay: LocalDuration,
+
        /// The time for when the reconnection should happen.
+
        retry_at: LocalTime,
+
    },
+
    /// The session was removed, and the severity of the disconnection is
+
    /// recorded.
+
    Severed {
+
        /// The session that was removed.
+
        session: session::Session<session::State>,
+
        /// The severity of the reason for disconnection.
+
        ///
+
        /// Can be used for penalizing the node for bad behavior.
+
        severity: Severity,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
    /// The node was already marked as disconnected.
+
    AlreadyDisconnected { node: NodeId },
+
    /// The reported link of the disconnect did not match the existing link of
+
    /// the session.
+
    LinkConflict {
+
        node: NodeId,
+
        /// The link that was found in the existing session.
+
        found: Link,
+
        /// The link that was expected from the call to disconnect.
+
        expected: Link,
+
    },
+
}
+

+
impl Disconnected {
+
    pub(super) fn retry(
+
        session: session::Session<session::Disconnected>,
+
        delay: LocalDuration,
+
        retry_at: LocalTime,
+
    ) -> Self {
+
        Self::Retry {
+
            session,
+
            delay,
+
            retry_at,
+
        }
+
    }
+

+
    pub(super) fn severed(session: session::Session<session::State>, severity: Severity) -> Self {
+
        Self::Severed { session, severity }
+
    }
+

+
    pub(super) fn already_disconnected(node: NodeId) -> Self {
+
        Self::AlreadyDisconnected { node }
+
    }
+

+
    pub(super) fn conflict<S>(session: &session::Session<S>, expected: Link) -> Self {
+
        Self::LinkConflict {
+
            node: session.node(),
+
            found: *session.link(),
+
            expected,
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result when a node is being reconnected to.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Reconnect {
+
    /// The connection was marked as initial, transitioning from a disconnected
+
    /// state.
+
    Reconnecting {
+
        session: Box<session::Session<session::Initial>>,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
}
+

+
impl Reconnect {
+
    pub(super) fn reconnecting(session: session::Session<session::Initial>) -> Self {
+
        Self::Reconnecting {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result of handling an incoming message from a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum HandledMessage {
+
    /// The node was in a disconnected state, so the message was dropped.
+
    Disconnected { node: NodeId },
+
    /// The node was rate limited, so the message was dropped.
+
    RateLimited { node: NodeId },
+
    /// The node's subscription was updated, and is in a connected state.
+
    Subscribed {
+
        session: session::Session<session::Connected>,
+
    },
+
    /// The node's pong was received, and is in a connected state.
+
    Pinged {
+
        session: session::Session<session::Connected>,
+
        pinged: Option<Pinged>,
+
    },
+
    /// There was no message to process, and the node is in a connected state.
+
    Connected {
+
        session: session::Session<session::Connected>,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
}
+

+
/// The result of pinging a connected session.
+
pub struct Ping {
+
    /// The session that was being pinged.
+
    pub session: session::Session<session::Connected>,
+
    /// The ping message.
+
    pub ping: message::Ping,
+
}