Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Sans-IO Connection Management
Open fintohaps opened 4 months ago

This patch defines the connection session management in a sans-IO fashion, and replaces the old logic in Service.

It is stacked on patches/806043519e7439bebe26a66b247025ef5a7f8ef0.

16 files changed +6373 -713 9cdd08ab d46587a9
modified crates/radicle-node/src/runtime/handle.rs
@@ -293,8 +293,8 @@ impl radicle::node::Handle for Handle {
        let query: Arc<QueryState> = Arc::new(move |state| {
            let sessions = state
                .sessions()
-
                .values()
-
                .map(radicle::node::Session::from)
+
                .iter()
+
                .map(|(_, s)| radicle::node::Session::from(s))
                .collect();
            sender.send(sessions).ok();

@@ -312,7 +312,10 @@ impl radicle::node::Handle for Handle {
    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
+
            let session = state
+
                .sessions()
+
                .get_session(&nid)
+
                .map(radicle::node::Session::from);
            sender.send(session).ok();

            Ok(())
modified crates/radicle-node/src/tests.rs
@@ -23,6 +23,8 @@ use radicle::storage::RefUpdate;
use radicle::test::arbitrary::gen;
use radicle::test::storage::MockRepository;
use radicle_protocol::bounded::BoundedVec;
+
use radicle_protocol::connections::config::KEEP_ALIVE_DELTA;
+
use radicle_protocol::connections::config::STALE_CONNECTION_TIMEOUT;

use crate::collections::{RandomMap, RandomSet};
use crate::identity::RepoId;
@@ -129,7 +131,7 @@ fn test_disconnecting_unresponsive_peer() {
    let bob = Peer::new("bob", [9, 9, 9, 9]);

    alice.connect_to(&bob);
-
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");
+
    assert_eq!(1, alice.sessions().connected().len(), "bob connects");
    alice.elapse(STALE_CONNECTION_TIMEOUT + LocalDuration::from_secs(1));
    alice
        .outbox()
@@ -173,7 +175,7 @@ fn test_connection_kept_alive() {
        ConnectOptions::default(),
    ));
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
-
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");
+
    assert_eq!(1, alice.sessions().connected().len(), "bob connects");

    let mut elapsed: LocalDuration = LocalDuration::from_secs(0);
    let step: LocalDuration = STALE_CONNECTION_TIMEOUT / 10;
@@ -185,8 +187,16 @@ fn test_connection_kept_alive() {
        elapsed = elapsed + step;
    }

-
    assert_eq!(1, alice.sessions().len(), "alice remains connected to Bob");
-
    assert_eq!(1, bob.sessions().len(), "bob remains connected to Alice");
+
    assert_eq!(
+
        1,
+
        alice.sessions().connected().len(),
+
        "alice remains connected to Bob"
+
    );
+
    assert_eq!(
+
        1,
+
        bob.sessions().connected().len(),
+
        "bob remains connected to Alice"
+
    );
}

#[test]
@@ -202,7 +212,8 @@ fn test_outbound_connection() {
        .service
        .sessions()
        .connected()
-
        .map(|(id, _)| *id)
+
        .node_ids()
+
        .copied()
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.id()));
@@ -222,7 +233,8 @@ fn test_inbound_connection() {
        .service
        .sessions()
        .connected()
-
        .map(|(id, _)| *id)
+
        .node_ids()
+
        .copied()
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.id()));
@@ -1215,7 +1227,8 @@ fn test_persistent_peer_reconnect_attempt() {
    let ips = alice
        .sessions()
        .connected()
-
        .map(|(id, _)| *id)
+
        .node_ids()
+
        .copied()
        .collect::<Vec<_>>();
    assert!(ips.contains(&bob.id()));
    assert!(ips.contains(&eve.id()));
@@ -1304,7 +1317,7 @@ fn test_maintain_connections() {
    }
    assert_eq!(
        connected.len(),
-
        alice.sessions().len(),
+
        alice.sessions().connected().len(),
        "alice should be connected to the first set of peers"
    );
    // We now import the other addresses.
@@ -2016,19 +2029,19 @@ fn test_announcement_message_amplification() {
        });

        // Ensure nodes are all connected, otherwise skip this test run.
-
        if alice.sessions().connected().count() != 4 {
+
        if alice.sessions().connected().len() != 4 {
            continue;
        }
-
        if bob.sessions().connected().count() != 4 {
+
        if bob.sessions().connected().len() != 4 {
            continue;
        }
-
        if eve.sessions().connected().count() != 4 {
+
        if eve.sessions().connected().len() != 4 {
            continue;
        }
-
        if zod.sessions().connected().count() != 4 {
+
        if zod.sessions().connected().len() != 4 {
            continue;
        }
-
        if tom.sessions().connected().count() != 4 {
+
        if tom.sessions().connected().len() != 4 {
            continue;
        }

added crates/radicle-protocol/src/connections.rs
@@ -0,0 +1,8 @@
+
pub mod config;
+
pub use config::Config;
+

+
pub mod state;
+

+
pub mod session;
+
pub use session::State;
+
pub use session::{Attempts, Pinged, Session, Sessions};
added crates/radicle-protocol/src/connections/config.rs
@@ -0,0 +1,160 @@
+
//! Configuration parameter for the [`Connections`] state management.
+
//!
+
//! [`Connections`]: crate::connections::state::Connections
+

+
use localtime::LocalDuration;
+

+
// TODO(finto): these are realistically only used here. I think that components
+
// should define their own configuration values, that eventually compose into
+
// the final larger configuration. I think this would result in a more useful
+
// layout of the config, e.g. connections.inbound.rateLimit,
+
// connections.outbound.rateLimit, connections.duration.idle, etc.
+
use radicle::node::config::{RateLimit, RateLimits};
+

+
/// How often to run the "idle" task.
+
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
+
/// How much time should pass after a peer was last active for a *ping* to be sent.
+
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
+
/// Duration to wait on an unresponsive peer before dropping its connection.
+
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
+
/// Minimum amount of time to wait before reconnecting to a peer.
+
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
+
/// Maximum amount of time to wait before reconnecting to a peer.
+
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
+
/// Target number of peers to maintain connections to.
+
pub const TARGET_OUTBOUND_PEERS: usize = 8;
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Config {
+
    /// Configurations for connection durations, such as idleness, keep alive,
+
    /// reconnection delays, etc.
+
    pub durations: Durations,
+
    /// Configurations for managing outbound connections.
+
    pub outbound: Outbound,
+
    /// Configurations for managing outbound connections.
+
    pub inbound: Inbound,
+
}
+

+
impl Config {
+
    /// The duration for a connection to be considered "idle".
+
    pub fn idle(&self) -> LocalDuration {
+
        self.durations.idle
+
    }
+

+
    /// How much time should pass after a peer was last active for a *ping* to be sent.
+
    pub fn keep_alive(&self) -> LocalDuration {
+
        self.durations.keep_alive
+
    }
+

+
    /// Duration to wait on an unresponsive peer before dropping its connection.
+
    pub fn stale(&self) -> LocalDuration {
+
        self.durations.stale
+
    }
+

+
    /// Target number of peers to maintain connections to.
+
    pub fn outbound_target(&self) -> usize {
+
        self.outbound.target
+
    }
+

+
    /// Maximum number of allowed inbound connections.
+
    pub fn max_inbound(&self) -> usize {
+
        self.inbound.maximum
+
    }
+

+
    /// The rate limits for an inbound connection.
+
    pub fn inbound_rate_limit(&self) -> RateLimit {
+
        self.inbound.rate_limit
+
    }
+

+
    /// The rate limits for an outbound connection.
+
    pub fn outbound_rate_limit(&self) -> RateLimit {
+
        self.outbound.rate_limit
+
    }
+

+
    /// The minimum and maximum durations before attempting reconnecting to a
+
    /// node.
+
    pub fn reconnection_delay(&self) -> ReconnectionDelay {
+
        self.durations.reconnection_delay
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Durations {
+
    /// Duration for a connection to be considered idle.
+
    pub idle: LocalDuration,
+
    /// Duration to wait until a ping is sent to a connection.
+
    pub keep_alive: LocalDuration,
+
    /// Duration to wait until a connection is considered stale.
+
    pub stale: LocalDuration,
+
    /// Configure the minimum and maximum delay durations for attempting
+
    /// reconnections.
+
    pub reconnection_delay: ReconnectionDelay,
+
}
+

+
impl Default for Durations {
+
    fn default() -> Self {
+
        Self {
+
            idle: IDLE_INTERVAL,
+
            keep_alive: KEEP_ALIVE_DELTA,
+
            stale: STALE_CONNECTION_TIMEOUT,
+
            reconnection_delay: ReconnectionDelay::default(),
+
        }
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Outbound {
+
    /// Rate limiting of inbound connection actions.
+
    pub rate_limit: RateLimit,
+
    /// Target number of outbound connections that we want to reach.
+
    pub target: usize,
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Inbound {
+
    /// Rate limiting of inbound connection actions.
+
    pub rate_limit: RateLimit,
+
    /// The maximum number of inbound connections allowed.
+
    pub maximum: usize,
+
}
+

+
impl From<RateLimit> for Inbound {
+
    fn from(limit: RateLimit) -> Self {
+
        let maximum = limit.capacity;
+
        Self {
+
            rate_limit: limit,
+
            maximum,
+
        }
+
    }
+
}
+

+
pub struct Limits {
+
    /// The rate limits for each direction of connection.
+
    ///
+
    /// This applies to rate limiting incoming connections to accept, and the
+
    /// incoming protocol messages.
+
    pub rates: RateLimits,
+
    /// Allowed maximum number of inbound connections.
+
    pub max_inbound: usize,
+
}
+

+
pub struct Reconnection {
+
    pub delay: ReconnectionDelay,
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct ReconnectionDelay {
+
    /// The minimum amount of time to wait before attempting a reconnection.
+
    pub min_delta: LocalDuration,
+
    /// The maximum amount of time to wait before attempting a reconnection.
+
    pub max_delta: LocalDuration,
+
}
+

+
impl Default for ReconnectionDelay {
+
    fn default() -> Self {
+
        Self {
+
            min_delta: MIN_RECONNECTION_DELTA,
+
            max_delta: MAX_RECONNECTION_DELTA,
+
        }
+
    }
+
}
added crates/radicle-protocol/src/connections/session.rs
@@ -0,0 +1,1001 @@
+
//! State management for node connection states.
+
//!
+
//! # Session
+
//!
+
//! The main type to describe a node connection is [`Session`]. It has a single
+
//! generic parameter that describes the current state the session is in, which
+
//! is one of:
+
//!   - [`Initial`]
+
//!   - [`Attempted`]
+
//!   - [`Connected`]
+
//!   - [`Disconnected`]
+
//!
+
//! Or, if a collection of sessions in various states is required, then
+
//! [`State`] enumerates all of them.
+
//!
+
//! The are two main ways to construct a [`Session`]:
+
//! - [`Session::outbound`]
+
//! - [`Session::inbound`]
+
//!
+
//! # Sessions
+
//!
+
//! The [`Sessions`] type keeps track of what the current state a [`NodeId`] is
+
//! in, with its corresponding [`Session`].
+
//!
+
//! A given [`NodeId`] must only appear in one state at a given time, if a
+
//! session for it exists.
+

+
mod iter;
+
use iter::SessionsViewMut;
+
pub use iter::{SessionsIter, SessionsView};
+

+
use std::collections::{HashMap, VecDeque};
+
use std::fmt;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::{Address, Link, NodeId, PingState};
+
use radicle::prelude::RepoId;
+

+
use crate::service::{message, ZeroBytes, MAX_LATENCIES};
+

+
/// Enumeration of the various session states.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum State {
+
    Initial(Initial),
+
    Attempted(Attempted),
+
    Connected(Connected),
+
    Disconnected(Disconnected),
+
}
+

+
impl From<Initial> for State {
+
    fn from(value: Initial) -> Self {
+
        Self::Initial(value)
+
    }
+
}
+

+
impl From<Attempted> for State {
+
    fn from(value: Attempted) -> Self {
+
        Self::Attempted(value)
+
    }
+
}
+

+
impl From<Connected> for State {
+
    fn from(value: Connected) -> Self {
+
        Self::Connected(value)
+
    }
+
}
+

+
impl From<Disconnected> for State {
+
    fn from(value: Disconnected) -> Self {
+
        Self::Disconnected(value)
+
    }
+
}
+

+
impl From<State> for radicle::node::State {
+
    fn from(state: State) -> Self {
+
        match state {
+
            State::Initial(initial) => Self::from(initial),
+
            State::Attempted(attempted) => Self::from(attempted),
+
            State::Connected(connected) => Self::from(connected),
+
            State::Disconnected(disconnected) => Self::from(disconnected),
+
        }
+
    }
+
}
+

+
impl From<Initial> for radicle::node::State {
+
    fn from(_: Initial) -> Self {
+
        Self::Initial
+
    }
+
}
+

+
impl From<Attempted> for radicle::node::State {
+
    fn from(_: Attempted) -> Self {
+
        Self::Attempted
+
    }
+
}
+

+
impl From<Disconnected> for radicle::node::State {
+
    fn from(Disconnected { since, retry_at }: Disconnected) -> Self {
+
        Self::Disconnected { since, retry_at }
+
    }
+
}
+

+
impl From<Connected> for radicle::node::State {
+
    fn from(
+
        Connected {
+
            since,
+
            ping,
+
            latencies,
+
            stable,
+
        }: Connected,
+
    ) -> Self {
+
        Self::Connected {
+
            since,
+
            ping,
+
            latencies,
+
            stable,
+
        }
+
    }
+
}
+

+
/// Keeps track of multiple node sessions and their connection lifecycle.
+
///
+
/// Each node has one [`Session`], and can be in of the following states:
+
/// - [`Initial`]
+
/// - [`Attempted`]
+
/// - [`Connected`]
+
/// - [`Disconnected`]
+
///
+
/// # State Transitions
+
///
+
/// It is ensured that a given [`NodeId`] can only be in, at most, one state at
+
/// a given time.
+
///
+
/// A [`Session::outbound`] starts in the [`Initial`] state, and can then move
+
/// to [`Attempted`], [`Connected`], or [`Disconnected`].
+
///
+
/// A [`Session::inbound`] starts in the [`Connected`] state immediately – since
+
/// the connection was established by the incoming node. It can then move to
+
/// [`Disconnected`].
+
///
+
/// A [`Disconnected`] session can be reconnected to, which transitions it to
+
/// the [`Initial`] state, restarting the lifecycle.
+
#[derive(Debug, Default)]
+
pub struct Sessions {
+
    initial: HashMap<NodeId, Session<Initial>>,
+
    attempted: HashMap<NodeId, Session<Attempted>>,
+
    disconnected: HashMap<NodeId, Session<Disconnected>>,
+
    connected: HashMap<NodeId, Session<Connected>>,
+
}
+

+
impl<'a> IntoIterator for &'a Sessions {
+
    type Item = (&'a NodeId, Session<State>);
+
    type IntoIter = SessionsIter<'a>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.iter()
+
    }
+
}
+

+
impl Sessions {
+
    /// Construct a new [`Sessions`] state.
+
    pub fn new() -> Self {
+
        Self {
+
            initial: HashMap::new(),
+
            attempted: HashMap::new(),
+
            disconnected: HashMap::new(),
+
            connected: HashMap::new(),
+
        }
+
    }
+

+
    /// Get an iterator over all the sessions, see [`SessionsIter`] for more
+
    /// information.
+
    pub fn iter<'a>(&'a self) -> SessionsIter<'a> {
+
        SessionsIter {
+
            initial: self.initial.iter(),
+
            attempted: self.attempted.iter(),
+
            disconnected: self.disconnected.iter(),
+
            connected: self.connected.iter(),
+
        }
+
    }
+

+
    /// Get the number of sessions that are connected and have an [inbound]
+
    /// link.
+
    ///
+
    /// [inbound]: Link::Inbound
+
    pub fn connected_inbound(&self) -> usize {
+
        self.connected
+
            .values()
+
            .filter(|session| session.link().is_inbound())
+
            .count()
+
    }
+

+
    /// Get the number of sessions that are connected and have an [outbound]
+
    /// link.
+
    ///
+
    /// [outbound]: Link::Outbound
+
    pub fn connected_outbound(&self) -> usize {
+
        self.connected
+
            .values()
+
            .filter(|session| session.link().is_outbound())
+
            .count()
+
    }
+

+
    /// Checks that an existing [`Session`] exists for the given [`NodeId`].
+
    pub fn has_session_for(&self, node: &NodeId) -> bool {
+
        self.initial.contains_key(node)
+
            || self.attempted.contains_key(node)
+
            || self.disconnected.contains_key(node)
+
            || self.connected.contains_key(node)
+
    }
+

+
    /// Get all [`Session`]s that are in the [`Connected`] state, along with
+
    /// their [`NodeId`]s.
+
    pub fn connected(&self) -> SessionsView<'_, Connected> {
+
        SessionsView {
+
            inner: &self.connected,
+
        }
+
    }
+

+
    /// Get all [`Session`]s that are in the [`Initial`] state, along with
+
    /// their [`NodeId`]s.
+
    pub fn initial(&self) -> SessionsView<'_, Initial> {
+
        SessionsView {
+
            inner: &self.initial,
+
        }
+
    }
+

+
    /// Get all [`Session`]s that are in the [`Attempted`] state, along with
+
    /// their [`NodeId`]s.
+
    pub fn attempted(&self) -> SessionsView<'_, Attempted> {
+
        SessionsView {
+
            inner: &self.attempted,
+
        }
+
    }
+

+
    /// Get all [`Session`]s that are in the [`Disconnected`] state, along with
+
    /// their [`NodeId`]s.
+
    pub fn disconnected(&self) -> SessionsView<'_, Disconnected> {
+
        SessionsView {
+
            inner: &self.disconnected,
+
        }
+
    }
+

+
    /// Check if the given [`NodeId`] has a connected session.
+
    pub fn is_connected(&self, node: &NodeId) -> bool {
+
        self.connected.contains_key(node)
+
    }
+

+
    /// Check if the given [`NodeId`] has a disconnected session.
+
    pub fn is_diconnected(&self, node: &NodeId) -> bool {
+
        self.disconnected.contains_key(node)
+
    }
+

+
    /// Check if the given [`NodeId`] has an initial session.
+
    pub fn is_initial(&self, node: &NodeId) -> bool {
+
        self.initial.contains_key(node)
+
    }
+

+
    /// Check if the given [`NodeId`] has an attempted session.
+
    pub fn is_attempted(&self, node: &NodeId) -> bool {
+
        self.attempted.contains_key(node)
+
    }
+

+
    /// Get a [`Session`], for the given [`NodeId`], that can be in any [`State`].
+
    pub fn get_session(&self, node: &NodeId) -> Option<Session<State>> {
+
        self.initial
+
            .get(node)
+
            .cloned()
+
            .map(|s| s.into_any_state())
+
            .or_else(|| {
+
                self.attempted
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
            .or_else(|| {
+
                self.disconnected
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
            .or_else(|| {
+
                self.connected
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
    }
+

+
    /// Get the [`Session`], for the given [`NodeId`], that is expected to be in
+
    /// the [`Connected`] state.
+
    pub fn get_connected(&self, node: &NodeId) -> Option<&Session<Connected>> {
+
        self.connected.get(node)
+
    }
+

+
    /// Get a mutable iterator of the [`Sessions`]s that are in the
+
    /// [`Connected`] state, along with their [`NodeId`]s.
+
    pub(super) fn connected_mut(&mut self) -> SessionsViewMut<'_, Connected> {
+
        SessionsViewMut {
+
            inner: &mut self.connected,
+
        }
+
    }
+

+
    pub(super) fn unresponsive(
+
        &self,
+
        now: LocalTime,
+
        stale_connection: LocalDuration,
+
    ) -> impl Iterator<Item = (&NodeId, &Session<Connected>)> {
+
        self.connected()
+
            .into_iter()
+
            .filter(move |(_, session)| session.is_inactive(&now, stale_connection))
+
    }
+

+
    pub(super) fn inactive(
+
        &mut self,
+
        now: LocalTime,
+
        keep_alive: LocalDuration,
+
    ) -> impl Iterator<Item = (&NodeId, &mut Session<Connected>)> {
+
        self.connected_mut()
+
            .into_iter()
+
            .filter(move |(_, session)| session.is_inactive(&now, keep_alive))
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the [`Initial`] state.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    ///
+
    /// This is used when reconnecting a disconnected session, that needs to be
+
    /// kept as a persistent connection.
+
    pub(super) fn session_to_initial(&mut self, node: &NodeId) -> Option<Session<Initial>> {
+
        let s = self.disconnected.remove(node)?.into_initial();
+
        self.initial.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Attempted`] state.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub(super) fn session_to_attempted(&mut self, node: &NodeId) -> Option<Session<Attempted>> {
+
        let s = self.initial.remove(node)?.into_attempted();
+
        self.attempted.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Disconnected`] state.
+
    ///
+
    /// The time this [`Session`] was disconnected is marked by `since`, and if
+
    /// the connection should be retried then a `retry_at` value should be
+
    /// provided.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub(super) fn session_to_disconnected(
+
        &mut self,
+
        node: &NodeId,
+
        since: LocalTime,
+
        retry_at: LocalTime,
+
    ) -> Option<Session<Disconnected>> {
+
        match self.remove_session(node) {
+
            None => None,
+
            Some(session) => {
+
                let s = session.into_disconnected(since, retry_at);
+
                self.disconnected.insert(*node, s.clone());
+
                Some(s)
+
            }
+
        }
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Connected`] state.
+
    ///
+
    /// The [`Session`] is last active given by the time given for `now`, the
+
    /// type of [`Link`] is also marked by the provided value, and also keep
+
    /// track of whether the session should be persisted.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub(super) fn session_to_connected(
+
        &mut self,
+
        node: &NodeId,
+
        now: LocalTime,
+
        link: Option<Link>,
+
        connection_type: ConnectionType,
+
    ) -> Option<Session<Connected>> {
+
        let s = self.remove_session(node)?;
+
        let link = link.unwrap_or(s.link);
+
        let state = match s.state {
+
            State::Connected(connected) => connected,
+
            State::Initial(_) | State::Attempted(_) | State::Disconnected(_) => Connected::new(now),
+
        };
+
        let s = Session {
+
            state,
+
            id: s.id,
+
            addr: s.addr,
+
            link,
+
            connection_type,
+
            last_active: now,
+
            subscription: s.subscription,
+
            attempts: s.attempts,
+
        };
+
        self.connected.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    pub(super) fn subscribe_to(&mut self, node: &NodeId, rid: &RepoId) -> SubscribeTo {
+
        if let Some(session) = self.connected.get_mut(node) {
+
            return session.subscribe_to(rid);
+
        }
+

+
        if let Some(session) = self.disconnected.get_mut(node) {
+
            return session.subscribe_to(rid);
+
        }
+

+
        if let Some(session) = self.attempted.get_mut(node) {
+
            return session.subscribe_to(rid);
+
        }
+

+
        if let Some(session) = self.initial.get_mut(node) {
+
            return session.subscribe_to(rid);
+
        }
+

+
        SubscribeTo::Missing { node: *node }
+
    }
+

+
    pub(super) fn remove_session(&mut self, node: &NodeId) -> Option<Session<State>> {
+
        self.initial
+
            .remove(node)
+
            .map(|s| s.into_any_state())
+
            .or_else(|| self.attempted.remove(node).map(|s| s.into_any_state()))
+
            .or_else(|| self.disconnected.remove(node).map(|s| s.into_any_state()))
+
            .or_else(|| self.connected.remove(node).map(|s| s.into_any_state()))
+
    }
+

+
    pub(super) fn outbound(
+
        &mut self,
+
        node: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
        now: LocalTime,
+
    ) -> Session<Initial> {
+
        let session = Session::outbound(node, addr, connection_type, now);
+
        self.initial.insert(node, session.clone());
+
        session
+
    }
+

+
    pub(super) fn inbound(
+
        &mut self,
+
        node: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
        now: LocalTime,
+
    ) -> Session<Connected> {
+
        let session = Session::inbound(node, addr, connection_type, now);
+
        self.connected.insert(node, session.clone());
+
        session
+
    }
+

+
    pub(super) fn number_of_outbound_connections(&self) -> usize {
+
        let attempted = self
+
            .attempted
+
            .iter()
+
            .filter(|(_, s)| s.link.is_outbound())
+
            .count();
+
        let connected = self
+
            .connected
+
            .iter()
+
            .filter(|(_, s)| s.link.is_outbound())
+
            .count();
+
        attempted + connected
+
    }
+

+
    pub(super) fn number_of_inbound_connections(&self) -> usize {
+
        let attempted = self
+
            .attempted
+
            .iter()
+
            .filter(|(_, s)| s.link.is_inbound())
+
            .count();
+
        let connected = self
+
            .connected
+
            .iter()
+
            .filter(|(_, s)| s.link.is_inbound())
+
            .count();
+
        attempted + connected
+
    }
+

+
    pub(super) fn while_connecting<F, T>(
+
        &mut self,
+
        node: &NodeId,
+
        link: Option<Link>,
+
        connection_type: ConnectionType,
+
        now: LocalTime,
+
        f: F,
+
    ) -> Option<T>
+
    where
+
        F: FnOnce(&mut Session<Connected>) -> T,
+
    {
+
        let s = self.remove_session(node)?;
+
        let link = link.unwrap_or(s.link);
+
        let state = match s.state {
+
            State::Connected(connected) => connected,
+
            State::Initial(_) | State::Attempted(_) | State::Disconnected(_) => Connected::new(now),
+
        };
+
        let mut s = Session {
+
            id: s.id,
+
            addr: s.addr,
+
            link,
+
            connection_type,
+
            last_active: now,
+
            subscription: s.subscription,
+
            attempts: s.attempts,
+
            state,
+
        };
+
        let result = f(&mut s);
+
        self.connected.insert(*node, s.clone());
+
        Some(result)
+
    }
+
}
+

+
/// Number of attempts made for connecting to a node.
+
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct Attempts {
+
    attempts: usize,
+
}
+

+
impl fmt::Display for Attempts {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "{}", self.attempts)
+
    }
+
}
+

+
impl PartialEq<usize> for Attempts {
+
    fn eq(&self, other: &usize) -> bool {
+
        self.attempts == *other
+
    }
+
}
+

+
impl PartialOrd<usize> for Attempts {
+
    fn partial_cmp(&self, other: &usize) -> Option<std::cmp::Ordering> {
+
        self.attempts.partial_cmp(other)
+
    }
+
}
+

+
impl Attempts {
+
    pub fn attempted(self) -> Self {
+
        Self {
+
            attempts: self.attempts + 1,
+
        }
+
    }
+

+
    pub fn attempts(&self) -> usize {
+
        self.attempts
+
    }
+

+
    fn reset(&mut self) {
+
        self.attempts = 0;
+
    }
+
}
+

+
impl From<&Attempts> for usize {
+
    fn from(Attempts { attempts }: &Attempts) -> Self {
+
        *attempts
+
    }
+
}
+

+
impl From<Attempts> for usize {
+
    fn from(Attempts { attempts }: Attempts) -> Self {
+
        attempts
+
    }
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Session<S> {
+
    /// The [`NodeId`] of the session.
+
    id: NodeId,
+
    /// The public protocol [`Address`] for the session.
+
    addr: Address,
+
    /// The [`Link`] direction for the session.
+
    link: Link,
+
    /// Keep track of whether the session should be persisted. That is, if it is
+
    /// disconnected, reconnection attempts should be made.
+
    connection_type: ConnectionType,
+
    /// Last time a message was received from the peer.
+
    last_active: LocalTime,
+
    /// The peer's subscription containing the [`RepoId`]'s that this node is
+
    /// interested in.
+
    subscription: Option<message::Subscribe>,
+
    /// Number of attempts over the lifetime of the connection.
+
    ///
+
    /// The tracking of attempts is preserved through the state transitions of
+
    /// the session, and are reset to 0 when the connection is considered
+
    /// stable.
+
    attempts: Attempts,
+
    /// The state the session is in. Can be in the following states:
+
    ///   - [`Initial`]
+
    ///   - [`Attempted`]
+
    ///   - [`Disconnected`]
+
    ///   - [`Connected`]
+
    ///
+
    /// Or the enumeration of all of the above via [`State`].
+
    state: S,
+
}
+

+
/// A [`Session`] connection type describes how the session should be treated
+
/// when the session becomes disconnected.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub enum ConnectionType {
+
    /// The connection is ephemeral and the session should be removed on
+
    /// disconnection.
+
    Ephemeral,
+
    /// The connection is persistent and the session should be marked as
+
    /// disconnected, and a reconnection attempt should be made.
+
    Persistent,
+
}
+

+
impl ConnectionType {
+
    fn as_str(&self) -> &'static str {
+
        match self {
+
            ConnectionType::Ephemeral => "ephemeral",
+
            ConnectionType::Persistent => "persistent",
+
        }
+
    }
+
}
+

+
impl fmt::Display for ConnectionType {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        f.write_str(self.as_str())
+
    }
+
}
+

+
/// The result of modifying a node's subscription.
+
pub enum SubscribeTo {
+
    /// No subscription has been set for the node yet.
+
    NoSubscription,
+
    /// The subscription was successful.
+
    Subscribed,
+
    /// The node was not found when attempting to modify the subscription.
+
    Missing { node: NodeId },
+
}
+

+
impl From<Session<State>> for radicle::node::Session {
+
    fn from(session: Session<State>) -> Self {
+
        Self {
+
            nid: session.id,
+
            link: session.link,
+
            addr: session.addr,
+
            state: session.state.into(),
+
        }
+
    }
+
}
+

+
impl<S> fmt::Display for Session<S>
+
where
+
    S: ToString,
+
{
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        let mut attrs = Vec::new();
+
        let state = self.state.to_string();
+

+
        if self.link.is_inbound() {
+
            attrs.push("inbound");
+
        } else {
+
            attrs.push("outbound");
+
        }
+
        attrs.push(self.connection_type.as_str());
+
        attrs.push(state.as_str());
+

+
        write!(f, "{} [{}]", self.id, attrs.join(" "))
+
    }
+
}
+

+
impl<S> Session<S> {
+
    /// Return the node's identifier.
+
    pub fn node(&self) -> NodeId {
+
        self.id
+
    }
+

+
    /// Return the state metadata of the session.
+
    pub fn state(&self) -> &S {
+
        &self.state
+
    }
+

+
    /// Return the number of attempts that have been made for connection.
+
    pub fn attempts(&self) -> Attempts {
+
        self.attempts
+
    }
+

+
    /// Return the [`Address`] of the node.
+
    pub fn address(&self) -> &Address {
+
        &self.addr
+
    }
+

+
    /// Returns `true` if the session is subscribed to the given [`RepoId`].
+
    pub fn is_subscribed_to(&self, rid: &RepoId) -> bool {
+
        self.subscription
+
            .as_ref()
+
            .map(|s| s.filter.contains(rid))
+
            .unwrap_or(false)
+
    }
+

+
    /// Returns when the session was last active.
+
    ///
+
    /// The last active time is updated when the connection performs some
+
    /// activity, e.g. receiving a message.
+
    pub fn last_active(&self) -> &LocalTime {
+
        &self.last_active
+
    }
+

+
    /// Return the type of [`Link`] for the session connection.
+
    pub fn link(&self) -> &Link {
+
        &self.link
+
    }
+

+
    /// Returns `true` if the session is a persistent connection.
+
    pub fn persistent(&self) -> bool {
+
        matches!(self.connection_type, ConnectionType::Persistent)
+
    }
+

+
    /// Set the [`message::Subscribe`] of this [`Session`].
+
    pub(super) fn set_subscription(&mut self, subscription: message::Subscribe) {
+
        self.subscription = Some(subscription);
+
    }
+

+
    /// Subscribe to the given [`RepoId`], if the [`message::Subscribe`] has
+
    /// been set.
+
    fn subscribe_to(&mut self, rid: &RepoId) -> SubscribeTo {
+
        if let Some(ref mut sub) = self.subscription {
+
            sub.filter.insert(rid);
+
            return SubscribeTo::Subscribed;
+
        }
+
        SubscribeTo::NoSubscription
+
    }
+

+
    fn into_disconnected(self, since: LocalTime, retry_at: LocalTime) -> Session<Disconnected> {
+
        self.transition(Disconnected { since, retry_at })
+
    }
+

+
    #[allow(unused)]
+
    fn seen(&mut self, since: LocalTime) {
+
        self.last_active = since;
+
    }
+

+
    fn into_any_state<T>(self) -> Session<T>
+
    where
+
        T: From<S>,
+
    {
+
        self.map(|state| state.into())
+
    }
+

+
    fn transition<T>(self, next: T) -> Session<T> {
+
        self.map(|_| next)
+
    }
+

+
    fn map<T, F>(self, f: F) -> Session<T>
+
    where
+
        F: FnOnce(S) -> T,
+
    {
+
        Session {
+
            id: self.id,
+
            addr: self.addr,
+
            link: self.link,
+
            connection_type: self.connection_type,
+
            last_active: self.last_active,
+
            subscription: self.subscription,
+
            attempts: self.attempts,
+
            state: f(self.state),
+
        }
+
    }
+
}
+

+
/// The session is in an initial state, with no extra metadata.
+
///
+
/// An initial state indicates that it is going to attempt a connection, whether
+
/// through a fresh connection or a reconnection.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct Initial;
+

+
impl Session<Initial> {
+
    /// Construct a [`Session`] with the [`Initial`] state, and a [`Link`] that
+
    /// is [`Outbound`].
+
    ///
+
    /// The session begins with no subscription, and no attempts made.
+
    ///
+
    /// [`Outbound`]: Link::Outbound
+
    pub fn outbound(
+
        id: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
        last_active: LocalTime,
+
    ) -> Self {
+
        Self {
+
            id,
+
            addr,
+
            link: Link::Outbound,
+
            connection_type,
+
            state: Initial,
+
            last_active,
+
            subscription: None,
+
            attempts: Attempts::default(),
+
        }
+
    }
+

+
    /// Transition the [`Session`] to an [`Attempted`] state, incrementing the
+
    /// number of attempts made.
+
    fn into_attempted(mut self) -> Session<Attempted> {
+
        self.attempts = self.attempts.attempted();
+
        self.transition(Attempted)
+
    }
+
}
+

+
/// The session is in an attempted state, with no extra metadata.
+
///
+
/// An attempted state indicates that at least one attempt was made to connect.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct Attempted;
+

+
/// The session is in an disconnected state.
+
///
+
/// A disconnected state indicates that the session was connected at one point,
+
/// and a reconnection should be made.
+
///
+
/// # Metadata
+
///
+
/// [`Session::is_stable`] reports when a connection is considered stable.
+
///
+
/// [`Session::is_inactive`] reports when a connection is considered inactive.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Connected {
+
    /// Connected since this time.
+
    since: LocalTime,
+
    /// Ping state.
+
    ping: PingState,
+
    /// Measured latencies for this peer.
+
    latencies: VecDeque<LocalDuration>,
+
    /// Whether the connection is stable.
+
    stable: bool,
+
}
+

+
impl fmt::Display for Connected {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        f.write_str("connected")
+
    }
+
}
+

+
impl Connected {
+
    /// Create a new [`Connected`] state, where `since` is the time of
+
    /// connection.
+
    fn new(since: LocalTime) -> Self {
+
        Self {
+
            since,
+
            ping: PingState::default(),
+
            latencies: VecDeque::default(),
+
            stable: false,
+
        }
+
    }
+
}
+

+
/// A received pong message for a connected session.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pong {
+
    pub now: LocalTime,
+
    pub zeroes: ZeroBytes,
+
}
+

+
/// The result of a connected session receiving a pong message.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pinged {
+
    /// The recorded latency of the received pong.
+
    pub latency: LocalDuration,
+
}
+

+
impl Session<Connected> {
+
    /// Construct a [`Session`] with the [`Connected`] state, and a [`Link`] that
+
    /// is [`Inbound`].
+
    ///
+
    /// The session begins with no subscription, and no attempts made.
+
    ///
+
    /// [`Inbound`]: Link::Inbound
+
    pub fn inbound(
+
        id: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
        now: LocalTime,
+
    ) -> Self {
+
        Self {
+
            id,
+
            addr,
+
            link: Link::Inbound,
+
            connection_type,
+
            last_active: now,
+
            subscription: None,
+
            state: Connected::new(now),
+
            attempts: Attempts::default(),
+
        }
+
    }
+

+
    /// Returns true if the connection is considered stable.
+
    ///
+
    /// A stable connection is one which has a connected time that is before the
+
    /// current time, and the duration of the connection exceeds a configured
+
    /// threshold.
+
    pub fn is_stable(&self) -> bool {
+
        self.state.stable
+
    }
+

+
    /// Checks if the [`Session`] is inactive, i.e. the time passed is greater
+
    /// than the `delta`.
+
    pub fn is_inactive(&self, now: &LocalTime, delta: LocalDuration) -> bool {
+
        *now - self.last_active >= delta
+
    }
+

+
    /// A ping was sent to connected nodes, and this session is now awaiting a response.
+
    pub(super) fn ping(&mut self, ping: message::Ping, since: LocalTime) -> message::Ping {
+
        self.state.ping = PingState::AwaitingResponse {
+
            len: ping.ponglen,
+
            since,
+
        };
+
        ping
+
    }
+

+
    /// A pong was received from a connected node.
+
    ///
+
    /// The current session must be awaiting a response from a sent ping. If so,
+
    /// it checks that this is the corresponding pong, and records the latency
+
    /// between the sent ping and the received pong.
+
    pub(super) fn pinged(&mut self, Pong { zeroes, now }: Pong) -> Option<Pinged> {
+
        if let PingState::AwaitingResponse {
+
            len: ponglen,
+
            since,
+
        } = self.state.ping
+
        {
+
            if (ponglen as usize) == zeroes.len() {
+
                self.state.ping = PingState::Ok;
+
                let latency = now - since;
+
                self.state.latencies.push_back(latency);
+
                // TODO(finto): MAX_LATENCIES should likely be configured
+
                // somewhere else
+
                if self.state.latencies.len() > MAX_LATENCIES {
+
                    self.state.latencies.pop_front();
+
                }
+
                return Some(Pinged { latency });
+
            }
+
        }
+
        None
+
    }
+

+
    /// Checks the idleness of a connection, marking its connectivity as stable,
+
    /// and reset its attempt counter.
+
    ///
+
    /// A stable connection is one which has a connected time that is before the
+
    /// current time, and the duration of the connection exceeds a configured
+
    /// threshold.
+
    pub(super) fn stabilise(&mut self, now: LocalTime, stable_threshold: LocalDuration) -> bool {
+
        let Connected {
+
            since,
+
            ref mut stable,
+
            ..
+
        } = self.state;
+
        if now >= since && now.duration_since(since) >= stable_threshold {
+
            *stable = true;
+
            self.attempts.reset();
+
            true
+
        } else {
+
            false
+
        }
+
    }
+
}
+

+
/// The session is in an disconnected state.
+
///
+
/// A disconnected state indicates that the session was connected at one point,
+
/// and a reconnection should be made.
+
///
+
/// # Metadata
+
///
+
/// [`Session::should_retry_at`] reports when a reconnection should occur.
+
///
+
/// [`Session::disconnected_since`] reports how long the session has been disconnected.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct Disconnected {
+
    /// Since when has this peer been disconnected.
+
    since: LocalTime,
+
    /// When to retry the connection.
+
    retry_at: LocalTime,
+
}
+

+
impl Session<Disconnected> {
+
    /// Returns when the session should attempt a reconnection.
+
    pub fn should_retry_at(&self) -> &LocalTime {
+
        &self.state.retry_at
+
    }
+

+
    /// Returns when the session was recorded as disconnected.
+
    pub fn disconnected_since(&self) -> &LocalTime {
+
        &self.state.since
+
    }
+

+
    /// Transition the [`Session`] to an [`Initial`] state.
+
    fn into_initial(self) -> Session<Initial> {
+
        self.transition(Initial)
+
    }
+
}
added crates/radicle-protocol/src/connections/session/iter.rs
@@ -0,0 +1,106 @@
+
use std::collections::hash_map;
+
use std::collections::HashMap;
+

+
use radicle::node::NodeId;
+

+
use super::State;
+
use super::{Attempted, Connected, Disconnected, Initial, Session};
+

+
/// Provides an [`Iterator`] over all the sessions.
+
///
+
/// The order of the sessions are in:
+
/// - [`Connected`]
+
/// - [`Attempted`]
+
/// - [`Initial`]
+
/// - [`Disconnected`]
+
pub struct SessionsIter<'a> {
+
    pub(super) initial: hash_map::Iter<'a, NodeId, Session<Initial>>,
+
    pub(super) attempted: hash_map::Iter<'a, NodeId, Session<Attempted>>,
+
    pub(super) disconnected: hash_map::Iter<'a, NodeId, Session<Disconnected>>,
+
    pub(super) connected: hash_map::Iter<'a, NodeId, Session<Connected>>,
+
}
+

+
impl<'a> Iterator for SessionsIter<'a> {
+
    type Item = (&'a NodeId, Session<State>);
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.connected
+
            .next()
+
            .map(|(n, s)| (n, s.clone().into_any_state()))
+
            .or_else(|| {
+
                self.attempted
+
                    .next()
+
                    .map(|(n, s)| (n, s.clone().into_any_state()))
+
            })
+
            .or_else(|| {
+
                self.initial
+
                    .next()
+
                    .map(|(n, s)| (n, s.clone().into_any_state()))
+
            })
+
            .or_else(|| {
+
                self.disconnected
+
                    .next()
+
                    .map(|(n, s)| (n, s.clone().into_any_state()))
+
            })
+
    }
+
}
+

+
/// A view into sessions of a particular state.
+
///
+
/// - [`SessionsView::into_iter`]: to iterate over both the [`NodeId`] and [`Session`]s.
+
/// - [`SessionsView::node_ids`]: to iterate over just the [`NodeId`]s.
+
/// - [`SessionsView::sessions`]: to iterate over just the [`Session`]s.
+
pub struct SessionsView<'a, S> {
+
    pub(super) inner: &'a HashMap<NodeId, Session<S>>,
+
}
+

+
impl<'a, S> SessionsView<'a, S> {
+
    /// Return an iterator over the [`NodeId`]s of these sessions.
+
    pub fn node_ids(self) -> hash_map::Keys<'a, NodeId, Session<S>> {
+
        self.inner.keys()
+
    }
+

+
    /// Return an iterator over the [`Session`]s.
+
    pub fn sessions(self) -> hash_map::Values<'a, NodeId, Session<S>> {
+
        self.inner.values()
+
    }
+

+
    /// Returns the number of sessions.
+
    pub fn len(&self) -> usize {
+
        self.inner.len()
+
    }
+

+
    /// Returns `true` if there are no sessions.
+
    pub fn is_empty(&self) -> bool {
+
        self.inner.is_empty()
+
    }
+
}
+

+
impl<'a, S> IntoIterator for SessionsView<'a, S> {
+
    type Item = (&'a NodeId, &'a Session<S>);
+
    type IntoIter = hash_map::Iter<'a, NodeId, Session<S>>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.inner.iter()
+
    }
+
}
+

+
pub(in crate::connections) struct SessionsViewMut<'a, S> {
+
    pub inner: &'a mut HashMap<NodeId, Session<S>>,
+
}
+

+
impl<'a, S> SessionsViewMut<'a, S> {
+
    /// Return an iterator over the [`Session`]s.
+
    pub fn sessions(self) -> hash_map::ValuesMut<'a, NodeId, Session<S>> {
+
        self.inner.values_mut()
+
    }
+
}
+

+
impl<'a, S> IntoIterator for SessionsViewMut<'a, S> {
+
    type Item = (&'a NodeId, &'a mut Session<S>);
+
    type IntoIter = hash_map::IterMut<'a, NodeId, Session<S>>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.inner.iter_mut()
+
    }
+
}
added crates/radicle-protocol/src/connections/state.rs
@@ -0,0 +1,619 @@
+
pub mod command;
+
pub mod event;
+

+
#[cfg(test)]
+
mod test;
+

+
use std::net::IpAddr;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::config::RateLimit;
+
use radicle::node::{address, Severity};
+
use radicle::node::{HostName, Link, NodeId};
+
use radicle::prelude::RepoId;
+

+
use crate::connections::session;
+
use crate::connections::session::Sessions;
+
use crate::connections::Config;
+
use crate::service::limiter::RateLimiter;
+
use crate::service::{message, DisconnectReason};
+

+
use super::Attempts;
+

+
/// Manage the state of node connections for a running node.
+
///
+
/// Note the following terminology:
+
///
+
/// - Outbound connection is one that is originating from this node to another node.
+
/// - Inbound connection is one that is coming from another node to this node.
+
///
+
/// These [`Sessions`] are categorized into one of the four following states.
+
///
+
/// # Initial
+
///
+
/// - [`Connections::connect`]
+
/// - [`Connections::reconnect`]
+
///
+
/// A connection is in the initial state when the running node has attempted to
+
/// make an outbound connection to another node.
+
///
+
/// It can also be in an initial state when a disconnected node is being
+
/// reconnected, and thus goes back to the initial state.
+
///
+
/// # Attempted
+
///
+
/// - [`Connections::attempted`]
+
///
+
/// A connection is in the attempted state when it was previously in the initial
+
/// state, and an attempt to make a connection was made.
+
///
+
/// # Connected
+
///
+
/// - [`Connections::connected`]
+
///
+
/// A connection is considered connected in one of two cases.
+
///
+
/// The attempted outbound connection was established. In this case, there must
+
/// have been a session to transition to being connected.
+
///
+
/// If the connection is inbound then the connection is simply marked as
+
/// connected, regardless of the state of a previous connection.
+
///
+
/// # Disconnected
+
///
+
/// - [`Connections::disconnected`]
+
///
+
/// A connection is marked as disconnected only if it is considered a persisted
+
/// peer (see [`ConnectionType`]). If this is the case, then a reconnection
+
/// attempt should be made after an appropriate delay.
+
///
+
/// If the connection is not considered for persistence, then it will be removed
+
/// from the [`Sessions`], and may be penalized for the severity of its
+
/// disconnection reason.
+
///
+
/// [`ConnectionType`]: session::ConnectionType
+
#[derive(Debug)]
+
pub struct Connections {
+
    /// [`NodeId`] of the running node.
+
    local: NodeId,
+
    /// The state of the connection lifecycle for each node in the network.
+
    sessions: Sessions,
+
    /// Rate limiter of IP hosts.
+
    limiter: RateLimiter,
+
    /// Configuration for managing connections.
+
    config: Config,
+
}
+

+
impl Connections {
+
    /// Construct a new [`Connections`] with the provided [`Config`] and [`RateLimiter`].
+
    ///
+
    /// The state will start with no [`Sessions`], to begin.
+
    pub fn new(local: NodeId, config: Config, limiter: RateLimiter) -> Self {
+
        Self {
+
            local,
+
            sessions: Sessions::default(),
+
            limiter,
+
            config,
+
        }
+
    }
+

+
    /// Return the [`Config`] the [`Connections`] were initialized with.
+
    pub fn config(&self) -> &Config {
+
        &self.config
+
    }
+
}
+

+
impl Connections {
+
    // TODO(finto): we could enforce that only an accepted `IpAddr` is allowed
+
    // for calling to `connected` – which also helps reinforce that they are
+
    // interconnected.
+
    /// Perform checks on whether an incoming IP address should be accepted for
+
    /// connecting to.
+
    ///
+
    /// The caller can decide based on the resulting [`event::Accept`] whether
+
    /// to accept the connection. However, the following events are recommended
+
    /// to result in a rejected address:
+
    /// - [`event::Accept::LimitExceeded`]
+
    /// - [`event::Accept::HostLimited`]
+
    ///
+
    /// # State Transition
+
    ///
+
    /// This does not transition any session states, and simply inspects the
+
    /// rate limiter and [`IpAddr`] properties.
+
    pub fn accept(
+
        &mut self,
+
        command::Accept { ip }: command::Accept,
+
        now: LocalTime,
+
    ) -> event::Accept {
+
        // Always accept localhost connections, even if we already reached
+
        // our inbound connection limit.
+
        if ip.is_loopback() || ip.is_unspecified() {
+
            return event::Accept::LocalHost { ip };
+
        }
+

+
        if self.has_reached_inbound_limit() {
+
            return event::Accept::LimitExceeded {
+
                ip,
+
                current_inbound: self.sessions.connected_inbound(),
+
            };
+
        }
+

+
        if self.has_reached_ip_limit(&ip, now) {
+
            return event::Accept::HostLimited { ip };
+
        }
+

+
        event::Accept::Accepted { ip }
+
    }
+

+
    /// Mark a connection, with the given node, as attempted.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// Transitions the state of the existing session to `Attempted`.
+
    pub fn attempted(&mut self, command::Attempt { node }: command::Attempt) -> event::Attempted {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::Attempted::SelfConnection { node })
+
        {
+
            return event;
+
        }
+
        self.sessions
+
            .session_to_attempted(&node)
+
            .map(event::Attempted::attempt)
+
            .unwrap_or(event::Attempted::missing(node))
+
    }
+

+
    /// Make an outbound connection to another node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// A new session will only be created for the given node if a session does
+
    /// already exist.
+
    pub fn connect(
+
        &mut self,
+
        command::Connect {
+
            node,
+
            addr,
+
            connection_type,
+
        }: command::Connect,
+
        now: LocalTime,
+
    ) -> event::Connect {
+
        if self.is_disconnected(&node) {
+
            return event::Connect::disconnected(node);
+
        }
+
        if let Some(event) = self.guard_self_session(&node, event::Connect::SelfConnection { node })
+
        {
+
            return event;
+
        }
+
        if self.is_connecting(&node) {
+
            return event::Connect::already_connecting(node);
+
        }
+
        match self.sessions.get_connected(&node) {
+
            Some(session) => event::Connect::already_connected(session.clone()),
+
            None => {
+
                let record_ip = match addr.host {
+
                    HostName::Ip(ip) => (!address::is_local(&ip)).then_some(ip),
+
                    _ => None,
+
                };
+
                self.sessions.outbound(node, addr, connection_type, now);
+
                event::Connect::establish(node, connection_type, record_ip)
+
            }
+
        }
+
    }
+

+
    /// Mark a connection as connected to another node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// The transition of the connection depends on the kind of the incoming
+
    /// connection.
+
    ///
+
    /// ## Inbound
+
    ///
+
    /// The connection transitions to connected regardless of what state of the
+
    /// session was in before and if the session did not exist.
+
    ///
+
    /// If the session existed, before the transition, then it is marked as
+
    /// inbound.
+
    ///
+
    /// ## Outbound
+
    ///
+
    /// The connection transitions to connected regardless of what state of the
+
    /// session was in before, however, it must have had an existing session before.
+
    pub fn connected(&mut self, connected: command::Connected, now: LocalTime) -> event::Connected {
+
        match connected {
+
            command::Connected::Inbound {
+
                node,
+
                addr,
+
                connection_type,
+
            } => {
+
                if let Some(event) =
+
                    self.guard_self_session(&node, event::Connected::SelfConnection { node })
+
                {
+
                    return event;
+
                }
+
                // In this scenario, it's possible that our peer is persistent, and
+
                // disconnected. We get an inbound connection before we attempt a re-connection,
+
                // and therefore we treat it as a regular inbound connection.
+
                //
+
                // It's also possible that a disconnection hasn't gone through yet and our
+
                // peer is still in connected state here, while a new inbound connection from
+
                // that same peer is made. This results in a new connection from a peer that is
+
                // already connected from the perspective of the service. This appears to be
+
                // a bug in the underlying networking library.
+
                match self.sessions.session_to_connected(
+
                    &node,
+
                    now,
+
                    Some(Link::Inbound),
+
                    connection_type,
+
                ) {
+
                    None => {
+
                        let session = self.sessions.inbound(node, addr, connection_type, now);
+
                        event::Connected::established(session)
+
                    }
+
                    Some(session) => {
+
                        // TODO(finto): the address was never used in the
+
                        // previous code in the case of the link being outbound
+
                        // – it assumes that the session's address is the same
+
                        debug_assert_eq!(session.address(), &addr);
+
                        event::Connected::established(session)
+
                    }
+
                }
+
            }
+
            command::Connected::Outbound {
+
                node,
+
                addr,
+
                connection_type,
+
            } => {
+
                if let Some(event) =
+
                    self.guard_self_session(&node, event::Connected::SelfConnection { node })
+
                {
+
                    return event;
+
                }
+
                // Transitions the session to connected no matter what state it is in
+
                match self.sessions.session_to_connected(
+
                    &node,
+
                    now,
+
                    Some(Link::Outbound),
+
                    connection_type,
+
                ) {
+
                    None => event::Connected::missing(node),
+
                    Some(session) => {
+
                        // TODO(finto): the address was never used in the
+
                        // previous code in the case of the link being outbound
+
                        // – it assumes that the session's address is the same
+
                        debug_assert_eq!(session.address(), &addr);
+
                        event::Connected::established(session)
+
                    }
+
                }
+
            }
+
        }
+
    }
+

+
    /// Disconnect a node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// The [`ConnectionType`] decides how a disconnected node should be
+
    /// treated.
+
    ///
+
    /// ## `Ephemeral`
+
    ///
+
    /// If the connection is ephemeral, then the session for that connection is
+
    /// removed, and the severity of the reason is recorded.
+
    ///
+
    /// The severity can then be used for penalizing a node.
+
    ///
+
    /// ## `Persistent`
+
    ///
+
    /// If the connection is persistent, then the session will remain, and be
+
    /// marked as disconnected. The connection should then be retried after the
+
    /// returned delay.
+
    ///
+
    /// [`ConnectionType`]: session::ConnectionType
+
    pub fn disconnected(
+
        &mut self,
+
        command::Disconnect {
+
            node,
+
            link,
+
            since,
+
            connection_type,
+
        }: command::Disconnect,
+
        reason: &DisconnectReason,
+
    ) -> event::Disconnected {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::Disconnected::SelfConnection { node })
+
        {
+
            return event;
+
        }
+
        let Some(session) = self.sessions.get_session(&node) else {
+
            return event::Disconnected::missing(node);
+
        };
+
        if matches!(session.state(), session::State::Disconnected(_)) {
+
            return event::Disconnected::already_disconnected(node);
+
        }
+
        if *session.link() != link {
+
            return event::Disconnected::conflict(&session, link);
+
        }
+

+
        match connection_type {
+
            session::ConnectionType::Ephemeral => {
+
                let severity = self.reason_severity(reason, since);
+
                self.sessions
+
                    .remove_session(&node)
+
                    .map(|session| event::Disconnected::severed(session, severity))
+
                    .unwrap_or(event::Disconnected::missing(node))
+
            }
+
            session::ConnectionType::Persistent => {
+
                let delay = self.reconnection_delay(session.attempts());
+
                let retry_at = since + delay;
+
                self.sessions
+
                    .session_to_disconnected(&node, since, retry_at)
+
                    .map(|session| event::Disconnected::retry(session, delay, retry_at))
+
                    .unwrap_or(event::Disconnected::missing(node))
+
            }
+
        }
+
    }
+

+
    /// Reconnect the node.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// The session must be in the disconnected state, and transitions to the
+
    /// initial state.
+
    pub fn reconnect(
+
        &mut self,
+
        command::Reconnect { node }: command::Reconnect,
+
    ) -> event::Reconnect {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::Reconnect::SelfConnection { node })
+
        {
+
            return event;
+
        }
+
        self.sessions
+
            .session_to_initial(&node)
+
            .map(event::Reconnect::reconnecting)
+
            .unwrap_or(event::Reconnect::missing(node))
+
    }
+

+
    /// Mark connected nodes as stable.
+
    ///
+
    /// If the connected session has lasted longer than the configured stable
+
    /// threshold duration, then the session will be marked as stable, and the
+
    /// attempts counter is reset.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// This does not change the session's state.
+
    pub fn stabilise(&mut self, now: LocalTime) -> Vec<session::Session<session::Connected>> {
+
        self.sessions
+
            .connected_mut()
+
            .sessions()
+
            .fold(Vec::new(), |mut stabilised, session| {
+
                // Only stabilise sessions that are not already marked as stable
+
                if !session.is_stable() {
+
                    let stable = session
+
                        .stabilise(now, self.config.stale())
+
                        .then_some(session.clone());
+
                    stabilised.extend(stable);
+
                    stabilised
+
                } else {
+
                    stabilised
+
                }
+
            })
+
    }
+

+
    /// Ping any inactive connections to see if they are alive.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// This does not change the sessions' state.
+
    pub fn ping<'a>(
+
        &'a mut self,
+
        mut ping: impl FnMut() -> message::Ping + 'a,
+
        now: LocalTime,
+
    ) -> impl Iterator<Item = event::Ping> + 'a {
+
        let keep_alive = self.config.keep_alive();
+
        self.sessions
+
            .inactive(now, keep_alive)
+
            .map(move |(_, session)| event::Ping {
+
                session: session.clone(),
+
                ping: session.ping(ping(), now),
+
            })
+
    }
+

+
    /// Process a incoming message from a node.
+
    ///
+
    /// The [`Payload`] of the message may alter the state of the session.
+
    ///
+
    /// If the node is marked as disconnected, then the message is dropped from
+
    /// affecting the node's session.
+
    ///
+
    /// # State Transition
+
    ///
+
    /// Since a message must come from a connected node, the session will
+
    /// transition from its initial or attempted state to connected.
+
    ///
+
    /// [`Payload`]: command::Payload
+
    pub fn handle_message(
+
        &mut self,
+
        command::Message {
+
            node,
+
            payload,
+
            connection_type,
+
        }: command::Message,
+
        now: LocalTime,
+
    ) -> event::HandledMessage {
+
        if let Some(event) =
+
            self.guard_self_session(&node, event::HandledMessage::SelfConnection { node })
+
        {
+
            return event;
+
        }
+
        if self.sessions.is_diconnected(&node) {
+
            return event::HandledMessage::Disconnected { node };
+
        }
+
        let outbound_limit = self.config.outbound.rate_limit;
+
        let inbound_limit = self.config.inbound.rate_limit;
+
        let result =
+
            self.sessions
+
                .while_connecting(&node, None, connection_type, now, |connected| {
+
                    let limit: RateLimit = match connected.link() {
+
                        Link::Outbound => outbound_limit,
+
                        Link::Inbound => inbound_limit,
+
                    };
+
                    if self.limiter.limit(
+
                        connected.address().clone().into(),
+
                        Some(&connected.node()),
+
                        &limit,
+
                        now,
+
                    ) {
+
                        return event::HandledMessage::RateLimited { node };
+
                    }
+
                    match payload {
+
                        Some(command::Payload::Subscribe(subscription)) => {
+
                            connected.set_subscription(subscription);
+
                            event::HandledMessage::Subscribed {
+
                                session: connected.clone(),
+
                            }
+
                        }
+
                        Some(command::Payload::Pong(pong)) => {
+
                            let pinged = connected.pinged(pong);
+
                            event::HandledMessage::Pinged {
+
                                session: connected.clone(),
+
                                pinged,
+
                            }
+
                        }
+
                        None => event::HandledMessage::Connected {
+
                            session: connected.clone(),
+
                        },
+
                    }
+
                });
+
        result.unwrap_or(event::HandledMessage::MissingSession { node })
+
    }
+

+
    /// Add a repository to the given node's subscription.
+
    ///
+
    /// Returns `true` if the session existed and the repository was added to
+
    /// the subscription successfully.
+
    pub fn subscribe_to(&mut self, node: &NodeId, rid: &RepoId) -> session::SubscribeTo {
+
        self.sessions.subscribe_to(node, rid)
+
    }
+
}
+

+
impl Connections {
+
    /// The [`Sessions`] that are currently being managed.
+
    pub fn sessions(&self) -> &Sessions {
+
        &self.sessions
+
    }
+

+
    /// Returns `true` is the session exists for the given node.
+
    pub fn has_session(&self, node: &NodeId) -> bool {
+
        self.sessions.has_session_for(node)
+
    }
+

+
    /// Returns the number of outbound connections that are in a "connecting"
+
    /// state. That is, they are either attempting to connect or have already
+
    /// connected.
+
    pub fn number_of_outbound_connections(&self) -> usize {
+
        self.sessions.number_of_outbound_connections()
+
    }
+

+
    /// Returns the number of inbound connections that are in a "connecting"
+
    /// state. That is, they are either attempting to connect or have already
+
    /// connected.
+
    pub fn number_of_inbound_connections(&self) -> usize {
+
        self.sessions.number_of_inbound_connections()
+
    }
+

+
    /// Return the [`Session`] for the given [`NodeId`], if it exists.
+
    /// Note that the session can be in any [`State`].
+
    ///
+
    /// [`Session`]: session::Session
+
    /// [`State`]: session::State
+
    pub fn session_for(&self, node: &NodeId) -> Option<session::Session<session::State>> {
+
        self.sessions.get_session(node)
+
    }
+

+
    /// Return the connected [`Session`] for the given [`NodeId`], if it exists.
+
    ///
+
    /// [`Session`]: session::Session
+
    pub fn get_connected(&self, node: &NodeId) -> Option<&session::Session<session::Connected>> {
+
        self.sessions.get_connected(node)
+
    }
+

+
    /// Return an `Iterator` of all unresponsive, connected [`Session`]s.
+
    ///
+
    /// A session is considered unresponsive, if it has be inactive after the
+
    /// configured stale connection duration.
+
    ///
+
    /// [`Session`]: session::Session
+
    pub fn unresponsive(
+
        &self,
+
        now: &LocalTime,
+
    ) -> impl Iterator<Item = (&NodeId, &session::Session<session::Connected>)> {
+
        self.sessions.unresponsive(*now, self.config.stale())
+
    }
+

+
    fn guard_self_session<T>(&self, node: &NodeId, event: T) -> Option<T> {
+
        (&self.local == node).then_some(event)
+
    }
+

+
    fn has_reached_inbound_limit(&self) -> bool {
+
        self.sessions.connected_inbound() >= self.config.inbound.maximum
+
    }
+

+
    fn has_reached_ip_limit(&mut self, ip: &IpAddr, now: LocalTime) -> bool {
+
        let addr = HostName::from(*ip);
+
        self.limiter
+
            .limit(addr, None, &self.config.inbound.rate_limit, now)
+
    }
+

+
    fn reason_severity(&self, reason: &DisconnectReason, now: LocalTime) -> Severity {
+
        match reason {
+
            DisconnectReason::Dial(_)
+
            | DisconnectReason::Fetch(_)
+
            | DisconnectReason::Connection(_) => {
+
                if self.is_online(now) {
+
                    // If we're "online", there's something wrong with this
+
                    // peer connection specifically.
+
                    Severity::Medium
+
                } else {
+
                    Severity::Low
+
                }
+
            }
+
            DisconnectReason::Session(e) => e.severity(),
+
            DisconnectReason::Command
+
            | DisconnectReason::Conflict
+
            | DisconnectReason::SelfConnection => Severity::Low,
+
        }
+
    }
+

+
    /// Try to guess whether we're online or not.
+
    fn is_online(&self, now: LocalTime) -> bool {
+
        self.sessions
+
            .connected()
+
            .sessions()
+
            .filter(|s| s.address().is_routable() && *s.last_active() >= now - self.idle())
+
            .count()
+
            > 0
+
    }
+

+
    fn idle(&self) -> LocalDuration {
+
        self.config.idle()
+
    }
+

+
    fn reconnection_delay(&self, attempts: Attempts) -> LocalDuration {
+
        let attempts = u32::try_from(usize::from(attempts)).unwrap_or(u32::MAX);
+
        LocalDuration::from_secs(2u64.saturating_pow(attempts)).clamp(
+
            self.config.reconnection_delay().min_delta,
+
            self.config.reconnection_delay().max_delta,
+
        )
+
    }
+

+
    fn is_connecting(&self, node: &NodeId) -> bool {
+
        self.sessions.is_initial(node) || self.sessions.is_attempted(node)
+
    }
+

+
    fn is_disconnected(&self, node: &NodeId) -> bool {
+
        self.sessions.is_diconnected(node)
+
    }
+
}
added crates/radicle-protocol/src/connections/state/command.rs
@@ -0,0 +1,104 @@
+
use std::net::IpAddr;
+

+
use localtime::LocalTime;
+
use radicle::node::{Address, Link, NodeId};
+

+
use crate::connections::session;
+
use crate::connections::session::ConnectionType;
+
use crate::service::message;
+
use crate::service::ZeroBytes;
+

+
/// Check whether the incoming [`IpAddr`] should be accepted for connection.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Accept {
+
    pub ip: IpAddr,
+
}
+

+
/// Mark a connection as attempted.
+
pub struct Attempt {
+
    /// The node that is being attempted.
+
    pub node: NodeId,
+
}
+

+
/// Make an outbound connection to a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Connect {
+
    /// The node that is being connected to.
+
    pub node: NodeId,
+
    /// The found address of the node that is being contacted.
+
    pub addr: Address,
+
    /// Mark the session with the given [`ConnectionType`].
+
    pub connection_type: ConnectionType,
+
}
+

+
/// Mark the node as connected.
+
pub enum Connected {
+
    /// The connected node is made through an inbound connection.
+
    Inbound {
+
        /// The node that is now connected.
+
        node: NodeId,
+
        /// The address the node is connected via.
+
        addr: Address,
+
        /// Mark the session with the given [`ConnectionType`].
+
        connection_type: ConnectionType,
+
    },
+
    /// The connected node is made through an outbound connection.
+
    Outbound {
+
        /// The node that is now connected.
+
        node: NodeId,
+
        /// The address the node is connected via.
+
        addr: Address,
+
        /// Mark the session with the given [`ConnectionType`].
+
        connection_type: ConnectionType,
+
    },
+
}
+

+
/// Either mark the node as disconnected, or remove its session.
+
#[derive(Debug)]
+
pub struct Disconnect {
+
    /// The node being disconnected.
+
    pub node: NodeId,
+
    /// The link of the disconnection.
+
    pub link: Link,
+
    /// When did the disconnection occur.
+
    pub since: LocalTime,
+
    /// Decides whether the session is disconnected or removed.
+
    pub connection_type: ConnectionType,
+
}
+

+
/// Mark the node as initial, if it was disconnected.
+
#[derive(Debug)]
+
pub struct Reconnect {
+
    pub node: NodeId,
+
}
+

+
/// Handle an incoming message from the given node.
+
pub struct Message {
+
    /// The node sending the message.
+
    pub node: NodeId,
+
    /// The payload that is part of the incoming message.
+
    ///
+
    /// Not all messages are required for changing the state of the connection's
+
    /// state, so it is optional.
+
    pub payload: Option<Payload>,
+
    /// Mark the session with the given [`ConnectionType`].
+
    pub connection_type: ConnectionType,
+
}
+

+
/// The payload of an incoming message.
+
pub enum Payload {
+
    /// The message describes the node's subscription payload.
+
    Subscribe(message::Subscribe),
+
    /// The message was a "pong" in response to this node's "ping".
+
    Pong(session::Pong),
+
}
+

+
impl Payload {
+
    pub fn pong(zeroes: ZeroBytes, now: LocalTime) -> Self {
+
        Self::Pong(session::Pong { now, zeroes })
+
    }
+

+
    pub fn subscribe(subscription: message::Subscribe) -> Self {
+
        Self::Subscribe(subscription)
+
    }
+
}
added crates/radicle-protocol/src/connections/state/event.rs
@@ -0,0 +1,274 @@
+
use std::net::IpAddr;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::{Link, NodeId, Severity};
+

+
use crate::connections::session;
+
use crate::connections::session::{ConnectionType, Pinged, Session};
+
use crate::service::message;
+

+
/// The result of checking an address for accepting an inbound connection.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Accept {
+
    /// The inbound limit for the node has been reached.
+
    ///
+
    /// It is recommended that the incoming connection is rejected.
+
    LimitExceeded {
+
        /// The [`IpAddr`] that made the attempt.
+
        ip: IpAddr,
+
        /// The current inbound size.
+
        current_inbound: usize,
+
    },
+
    /// The address has been rate limited.
+
    ///
+
    /// It is recommended that the incoming connection is rejected.
+
    HostLimited {
+
        /// The [`IpAddr`] that made the attempt, and is being rate limited.
+
        ip: IpAddr,
+
    },
+
    /// The [`IpAddr`] is likely a localhost connection.
+
    ///
+
    /// It is recommended that this is accepted for local area networks.
+
    LocalHost { ip: IpAddr },
+
    /// The [`IpAddr`] should be accepted by the system, and the connection allowed.
+
    Accepted { ip: IpAddr },
+
}
+

+
/// The result of a connection attempt from a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Attempted {
+
    /// The connection was transitioned to attempted.
+
    ConnectionAttempt {
+
        session: Box<Session<session::Attempted>>,
+
    },
+
    /// The session did not exist for this node, and it was expected to.
+
    MissingSession { node: NodeId },
+
    /// Attempted to connect to the local node.
+
    SelfConnection { node: NodeId },
+
}
+

+
impl Attempted {
+
    pub(super) fn attempt(session: Session<session::Attempted>) -> Self {
+
        Self::ConnectionAttempt {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result when making an outbound connection to a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Connect {
+
    /// The node is already being connected to, but has not transitioned to
+
    /// fully-connected.
+
    AlreadyConnecting { node: NodeId },
+
    /// The node already has a connected session.
+
    AlreadyConnected {
+
        session: Box<session::Session<session::Connected>>,
+
    },
+
    /// The node is already in a disconnected state, and requires a call to
+
    /// reconnect to transition it to initial.
+
    Disconnected { node: NodeId },
+
    /// The caller should establish the outbound connection.
+
    Establish {
+
        /// The node to establish the connection with.
+
        node: NodeId,
+
        /// The session was given this [`ConnectionType`].
+
        connection_type: ConnectionType,
+
        /// If this is `Some`, then the [`IpAddr`] should be recorded by the
+
        /// local node.
+
        record_ip: Option<IpAddr>,
+
    },
+
    /// Attempted to connect to the local node.
+
    SelfConnection { node: NodeId },
+
}
+

+
impl Connect {
+
    pub(super) fn already_connecting(node: NodeId) -> Self {
+
        Self::AlreadyConnecting { node }
+
    }
+

+
    pub(super) fn already_connected(session: session::Session<session::Connected>) -> Self {
+
        Self::AlreadyConnected {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn disconnected(node: NodeId) -> Self {
+
        Self::Disconnected { node }
+
    }
+

+
    pub(super) fn establish(
+
        node: NodeId,
+
        connection_type: ConnectionType,
+
        record_ip: Option<IpAddr>,
+
    ) -> Self {
+
        Self::Establish {
+
            node,
+
            connection_type,
+
            record_ip,
+
        }
+
    }
+
}
+

+
/// The result when a node is marked a connected.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Connected {
+
    /// The connection was marked as connected.
+
    Established {
+
        session: Box<session::Session<session::Connected>>,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
    /// Connection came from the local node.
+
    SelfConnection { node: NodeId },
+
}
+

+
impl Connected {
+
    pub(super) fn established(session: session::Session<session::Connected>) -> Self {
+
        Self::Established {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result when a node is disconnected.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Disconnected {
+
    /// The session was marked as disconnected and a reconnection should be
+
    /// tried.
+
    Retry {
+
        /// The session that is now marked as disconnected.
+
        session: session::Session<session::Disconnected>,
+
        /// The delay to wait until the reconnection.
+
        delay: LocalDuration,
+
        /// The time for when the reconnection should happen.
+
        retry_at: LocalTime,
+
    },
+
    /// The session was removed, and the severity of the disconnection is
+
    /// recorded.
+
    Severed {
+
        /// The session that was removed.
+
        session: session::Session<session::State>,
+
        /// The severity of the reason for disconnection.
+
        ///
+
        /// Can be used for penalizing the node for bad behavior.
+
        severity: Severity,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
    /// The node was already marked as disconnected.
+
    AlreadyDisconnected { node: NodeId },
+
    /// The reported link of the disconnect did not match the existing link of
+
    /// the session.
+
    LinkConflict {
+
        node: NodeId,
+
        /// The link that was found in the existing session.
+
        found: Link,
+
        /// The link that was expected from the call to disconnect.
+
        expected: Link,
+
    },
+
    /// Attempted to disconnect from the local node.
+
    SelfConnection { node: NodeId },
+
}
+

+
impl Disconnected {
+
    pub(super) fn retry(
+
        session: session::Session<session::Disconnected>,
+
        delay: LocalDuration,
+
        retry_at: LocalTime,
+
    ) -> Self {
+
        Self::Retry {
+
            session,
+
            delay,
+
            retry_at,
+
        }
+
    }
+

+
    pub(super) fn severed(session: session::Session<session::State>, severity: Severity) -> Self {
+
        Self::Severed { session, severity }
+
    }
+

+
    pub(super) fn already_disconnected(node: NodeId) -> Self {
+
        Self::AlreadyDisconnected { node }
+
    }
+

+
    pub(super) fn conflict<S>(session: &session::Session<S>, expected: Link) -> Self {
+
        Self::LinkConflict {
+
            node: session.node(),
+
            found: *session.link(),
+
            expected,
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result when a node is being reconnected to.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Reconnect {
+
    /// The connection was marked as initial, transitioning from a disconnected
+
    /// state.
+
    Reconnecting {
+
        session: Box<session::Session<session::Initial>>,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
    /// Attempted to reconnect to the local node.
+
    SelfConnection { node: NodeId },
+
}
+

+
impl Reconnect {
+
    pub(super) fn reconnecting(session: session::Session<session::Initial>) -> Self {
+
        Self::Reconnecting {
+
            session: Box::new(session),
+
        }
+
    }
+

+
    pub(super) fn missing(node: NodeId) -> Self {
+
        Self::MissingSession { node }
+
    }
+
}
+

+
/// The result of handling an incoming message from a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum HandledMessage {
+
    /// The node was in a disconnected state, so the message was dropped.
+
    Disconnected { node: NodeId },
+
    /// The node was rate limited, so the message was dropped.
+
    RateLimited { node: NodeId },
+
    /// The node's subscription was updated, and is in a connected state.
+
    Subscribed {
+
        session: session::Session<session::Connected>,
+
    },
+
    /// The node's pong was received, and is in a connected state.
+
    Pinged {
+
        session: session::Session<session::Connected>,
+
        pinged: Option<Pinged>,
+
    },
+
    /// There was no message to process, and the node is in a connected state.
+
    Connected {
+
        session: session::Session<session::Connected>,
+
    },
+
    /// An existing session was expected for the node, but there was none.
+
    MissingSession { node: NodeId },
+
    /// Message originated from the local node.
+
    SelfConnection { node: NodeId },
+
}
+

+
/// The result of pinging a connected session.
+
pub struct Ping {
+
    /// The session that was being pinged.
+
    pub session: session::Session<session::Connected>,
+
    /// The ping message.
+
    pub ping: message::Ping,
+
}
added crates/radicle-protocol/src/connections/state/test.rs
@@ -0,0 +1,3139 @@
+
//! Property-Based Tests for Connection State Management
+

+
mod arbitrary;
+
use arbitrary::{ArbitraryTime, NonLocalNode, RoutableAddress, TestCommand};
+

+
mod invariants;
+
use invariants::{check_invariants, get_session_state, is_invalid_transition, SessionState};
+

+
use std::collections::{HashMap, HashSet};
+
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
+

+
use localtime::{LocalDuration, LocalTime};
+
use qcheck::{Arbitrary, Gen, TestResult};
+
use qcheck_macros::quickcheck;
+
use radicle::crypto;
+
use radicle::node::config::{RateLimit, RateLimits};
+
use radicle::node::{Address, HostName, Link, NodeId, Timestamp};
+
use radicle::prelude::RepoId;
+

+
use crate::connections::config;
+
use crate::connections::config::{
+
    ReconnectionDelay, MAX_RECONNECTION_DELTA, MIN_RECONNECTION_DELTA,
+
};
+
use crate::connections::session::{ConnectionType, Pong};
+
use crate::connections::state::{command, event, Connections};
+
use crate::connections::{Attempts, Config};
+
use crate::service::filter::Filter;
+
use crate::service::limiter::RateLimiter;
+
use crate::service::DisconnectReason;
+
use crate::service::{message, MAX_LATENCIES};
+

+
// =============================================================================
+
// Test Helpers
+
// =============================================================================
+

+
fn test_config() -> Config {
+
    let durations = config::Durations {
+
        idle: LocalDuration::from_secs(60),
+
        keep_alive: LocalDuration::from_secs(30),
+
        stale: LocalDuration::from_secs(120),
+
        reconnection_delay: ReconnectionDelay::default(),
+
    };
+
    let limits = RateLimits::default();
+
    let inbound = config::Inbound {
+
        rate_limit: limits.inbound.into(),
+
        maximum: 10,
+
    };
+
    let outbound = config::Outbound {
+
        rate_limit: limits.outbound.into(),
+
        target: 8,
+
    };
+
    Config {
+
        durations,
+
        inbound,
+
        outbound,
+
    }
+
}
+

+
fn new_connections(local: NodeId) -> Connections {
+
    Connections::new(local, test_config(), RateLimiter::default())
+
}
+

+
fn test_config_low_limits() -> Config {
+
    let durations = config::Durations {
+
        idle: LocalDuration::from_secs(60),
+
        keep_alive: LocalDuration::from_secs(30),
+
        stale: LocalDuration::from_secs(120),
+
        reconnection_delay: ReconnectionDelay::default(),
+
    };
+
    let inbound = config::Inbound {
+
        rate_limit: RateLimit {
+
            capacity: 1,
+
            fill_rate: 0.0,
+
        }, // 1 token, no refill
+
        maximum: 10,
+
    };
+
    let outbound = config::Outbound {
+
        rate_limit: RateLimit {
+
            capacity: 1,
+
            fill_rate: 0.0,
+
        },
+
        target: 8,
+
    };
+
    Config {
+
        durations,
+
        inbound,
+
        outbound,
+
    }
+
}
+

+
fn new_connections_with_low_limits(local: NodeId) -> Connections {
+
    Connections::new(local, test_config_low_limits(), RateLimiter::default())
+
}
+

+
fn apply_command(connections: &mut Connections, cmd: TestCommand, time: &mut LocalTime) {
+
    *time = *time + LocalDuration::from_secs(1);
+
    let now = *time;
+

+
    match cmd {
+
        TestCommand::Accept { ip } => {
+
            connections.accept(command::Accept { ip }, now);
+
        }
+
        TestCommand::Connect {
+
            node,
+
            addr,
+
            connection_type,
+
        } => {
+
            connections.connect(
+
                command::Connect {
+
                    node,
+
                    addr,
+
                    connection_type,
+
                },
+
                now,
+
            );
+
        }
+
        TestCommand::Attempt { node } => {
+
            connections.attempted(command::Attempt { node });
+
        }
+
        TestCommand::ConnectedInbound {
+
            node,
+
            addr,
+
            connection_type,
+
        } => {
+
            connections.connected(
+
                command::Connected::Inbound {
+
                    node,
+
                    addr,
+
                    connection_type,
+
                },
+
                now,
+
            );
+
        }
+
        TestCommand::ConnectedOutbound {
+
            node,
+
            addr,
+
            connection_type,
+
        } => {
+
            connections.connected(
+
                command::Connected::Outbound {
+
                    node,
+
                    addr,
+
                    connection_type,
+
                },
+
                now,
+
            );
+
        }
+
        TestCommand::Disconnect {
+
            node,
+
            link,
+
            connection_type,
+
        } => {
+
            connections.disconnected(
+
                command::Disconnect {
+
                    node,
+
                    link,
+
                    since: now,
+
                    connection_type,
+
                },
+
                &DisconnectReason::Command,
+
            );
+
        }
+
        TestCommand::Reconnect { node } => {
+
            connections.reconnect(command::Reconnect { node });
+
        }
+
        TestCommand::Message {
+
            node,
+
            connection_type,
+
        } => {
+
            connections.handle_message(
+
                command::Message {
+
                    node,
+
                    payload: None,
+
                    connection_type,
+
                },
+
                now,
+
            );
+
        }
+
    }
+
}
+

+
// =============================================================================
+
// Session Uniqueness Properties
+
// =============================================================================
+

+
/// Single Session Per Node
+
///
+
/// After any sequence of commands, no node appears in more than one state collection.
+
///
+
/// ∀ node ∈ NodeId:
+
///   |{s ∈ initial | s.node = node}| +
+
///   |{s ∈ attempted | s.node = node}| +
+
///   |{s ∈ connected | s.node = node}| +
+
///   |{s ∈ disconnected | s.node = node}| ≤ 1
+
#[quickcheck]
+
fn prop_single_session_per_node(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections, cmd, &mut time);
+
    }
+

+
    match invariants::check_single_session_per_node(connections.sessions()) {
+
        Ok(()) => TestResult::passed(),
+
        Err(e) => TestResult::error(e.to_string()),
+
    }
+
}
+

+
/// Local Node Exclusion
+
///
+
/// The local node should never exist in any session collection.
+
///
+
/// ∀ state ∈ {Initial, Attempted, Connected, Disconnected}:
+
///  local_node ∉ sessions[state].keys()
+
#[quickcheck]
+
fn prop_local_node_exclusion(commands: Vec<TestCommand>) {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections, cmd, &mut time);
+
    }
+

+
    assert!(!connections.has_session(&local));
+
}
+

+
/// Session Existence Consistency
+
///
+
/// has_session_for(node) is true iff exactly one state check returns true.
+
///
+
/// has_session_for(node) ⟺
+
///  (is_initial(node) ⊕ is_attempted(node) ⊕ is_connected(node) ⊕ is_disconnected(node))
+
#[quickcheck]
+
fn prop_session_existence_consistency(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections, cmd, &mut time);
+
    }
+

+
    match invariants::check_session_existence_consistency(connections.sessions()) {
+
        Ok(()) => TestResult::passed(),
+
        Err(e) => TestResult::error(e.to_string()),
+
    }
+
}
+

+
// =============================================================================
+
// State Transition Properties
+
// =============================================================================
+

+
/// All State Transitions Are Valid
+
///
+
/// No command sequence produces an invalid state transition.
+
#[quickcheck]
+
fn prop_valid_transitions(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    // Track previous state for each node
+
    let mut previous_states: HashMap<NodeId, SessionState> = HashMap::new();
+

+
    for (i, cmd) in commands.iter().enumerate() {
+
        apply_command(&mut connections, cmd.clone(), &mut time);
+

+
        // Check all nodes we're tracking
+
        let mut to_remove = Vec::new();
+
        for (node, prev_state) in previous_states.iter() {
+
            match get_session_state(connections.sessions(), node) {
+
                Some(current) => {
+
                    if *prev_state != current && is_invalid_transition(*prev_state, current) {
+
                        return TestResult::error(format!(
+
                            "Invalid transition at command {}: {:?} -> {:?} for node {:?}",
+
                            i, prev_state, current, node
+
                        ));
+
                    }
+
                }
+
                None => {
+
                    // Session was removed (valid for ephemeral)
+
                    to_remove.push(*node);
+
                }
+
            }
+
        }
+

+
        // Remove tracked nodes that no longer exist
+
        for node in to_remove {
+
            previous_states.remove(&node);
+
        }
+

+
        // Update states for all current sessions
+
        for (node, _) in connections.sessions().iter() {
+
            if let Some(state) = get_session_state(connections.sessions(), node) {
+
                previous_states.insert(*node, state);
+
            }
+
        }
+
    }
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// Command Safety Properties
+
// =============================================================================
+

+
/// Connect Idempotency for Connected Sessions
+
///
+
/// Calling connect on an already-connected node returns AlreadyConnected.
+
///
+
/// ∀ node ∈ connected.keys():
+
///  let state_before = sessions.clone()
+
///  connect(node) = AlreadyConnected
+
///  sessions = state_before
+
#[quickcheck]
+
fn prop_connect_idempotency(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let event::Connected::Established { session } = connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) else {
+
        return TestResult::error("Expected Established");
+
    };
+

+
    assert_eq!(
+
        connections.connect(
+
            command::Connect {
+
                node,
+
                addr,
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            now,
+
        ),
+
        event::Connect::AlreadyConnected { session }
+
    );
+
    TestResult::passed()
+
}
+

+
/// Connect Blocked for Disconnected Sessions
+
///
+
/// Calling connect on a disconnected node returns Disconnected.
+
///
+
/// ∀ node ∈ disconnected.keys():
+
///  connect(node) = Disconnected { node }
+
#[quickcheck]
+
fn prop_connect_blocked_for_disconnected(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let event::Disconnected::Retry { .. } = connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) else {
+
        return TestResult::error("Expected Retry");
+
    };
+

+
    assert!(connections.sessions().is_diconnected(&node));
+
    assert_eq!(
+
        connections.connect(
+
            command::Connect {
+
                node,
+
                addr,
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            now,
+
        ),
+
        event::Connect::Disconnected { node }
+
    );
+
    TestResult::passed()
+
}
+

+
/// Connect Blocked for Connecting Sessions
+
///
+
/// Calling connect on Initial/Attempted returns AlreadyConnecting.
+
///
+
/// ∀ node ∈ (initial.keys() ∪ attempted.keys()):
+
///  connect(node) = AlreadyConnecting { node }
+
#[quickcheck]
+
fn prop_connect_blocked_for_connecting(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    assert_eq!(
+
        connections.connect(
+
            command::Connect {
+
                node,
+
                addr,
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            now,
+
        ),
+
        event::Connect::AlreadyConnecting { node }
+
    );
+
    TestResult::passed()
+
}
+

+
/// Missing Session Handling
+
///
+
/// Commands requiring existing session return MissingSession when none exists.
+
///
+
/// ∀ node ∉ sessions.keys():
+
///  attempt(node) = MissingSession { node }
+
///  ∧ disconnect(node) = MissingSession { node }
+
///  ∧ reconnect(node) = MissingSession { node }
+
///  ∧ connected_outbound(node) = MissingSession { node }
+
#[quickcheck]
+
fn prop_missing_session_handling(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Attempt on missing session
+
    assert_eq!(
+
        connections.attempted(command::Attempt { node }),
+
        event::Attempted::MissingSession { node }
+
    );
+

+
    // Disconnect on missing session
+
    assert_eq!(
+
        connections.disconnected(
+
            command::Disconnect {
+
                node,
+
                link: Link::Inbound,
+
                since: now,
+
                connection_type: ConnectionType::Persistent
+
            },
+
            &DisconnectReason::Command
+
        ),
+
        event::Disconnected::MissingSession { node }
+
    );
+

+
    // Reconnect on missing session
+
    assert_eq!(
+
        connections.reconnect(command::Reconnect { node }),
+
        event::Reconnect::MissingSession { node }
+
    );
+

+
    // Connected Outbound on missing session
+
    assert_eq!(
+
        connections.connected(
+
            command::Connected::Outbound {
+
                node,
+
                addr,
+
                connection_type: ConnectionType::Persistent
+
            },
+
            now
+
        ),
+
        event::Connected::MissingSession { node }
+
    );
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// Link Direction Properties
+
// =============================================================================
+

+
/// Outbound Link for Outbound Connections
+
///
+
/// Sessions created via connect have Link::Outbound.
+
///
+
/// ∀ session created via connect():
+
///  session.link = Link::Outbound
+
#[quickcheck]
+
fn prop_outbound_link(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    match connections.connected(
+
        command::Connected::Outbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { session } => {
+
            if *session.link() == Link::Outbound {
+
                TestResult::passed()
+
            } else {
+
                TestResult::error(format!("Expected Outbound, got {:?}", session.link()))
+
            }
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Inbound Link for Inbound Connections
+
///
+
/// Sessions created via Connected::Inbound have Link::Inbound.
+
///
+
/// ∀ session created via Connected::Inbound:
+
///  session.link = Link::Inbound
+
#[quickcheck]
+
fn prop_inbound_link(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { session } => {
+
            assert_eq!(*session.link(), Link::Inbound);
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Link Conflict Detection (Inbound session, Outbound disconnect)
+
///
+
/// Disconnect with mismatched link returns LinkConflict.
+
///
+
/// ∀ session, link where session.link ≠ link:
+
///   disconnect(session.node, link) = LinkConflict {
+
///     node: session.node,
+
///     found: session.link,
+
///     expected: link
+
///   }
+
#[quickcheck]
+
fn prop_link_conflict_inbound_session(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Create Inbound session
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Disconnect with wrong link (Outbound)
+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Outbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::LinkConflict {
+
            found, expected, ..
+
        } => {
+
            if found == Link::Inbound && expected == Link::Outbound {
+
                TestResult::passed()
+
            } else {
+
                TestResult::error(format!(
+
                    "Unexpected conflict: found={:?}, expected={:?}",
+
                    found, expected
+
                ))
+
            }
+
        }
+
        other => TestResult::error(format!("Expected LinkConflict, got {:?}", other)),
+
    }
+
}
+

+
/// Link Conflict Detection (Outbound session, Inbound disconnect)
+
///
+
/// Disconnect with mismatched link returns LinkConflict.
+
///
+
/// ∀ session, link where session.link ≠ link:
+
///   disconnect(session.node, link) = LinkConflict {
+
///     node: session.node,
+
///     found: session.link,
+
///     expected: link
+
///   }
+
#[quickcheck]
+
fn prop_link_conflict_outbound_session(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Create Outbound session
+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    connections.connected(
+
        command::Connected::Outbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Disconnect with wrong link (Inbound)
+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::LinkConflict {
+
            found, expected, ..
+
        } => {
+
            if found == Link::Outbound && expected == Link::Inbound {
+
                TestResult::passed()
+
            } else {
+
                TestResult::error(format!(
+
                    "Unexpected conflict: found={:?}, expected={:?}",
+
                    found, expected
+
                ))
+
            }
+
        }
+
        other => TestResult::error(format!("Expected LinkConflict, got {:?}", other)),
+
    }
+
}
+

+
/// Link Count Consistency
+
///
+
/// connected_inbound() and connected_outbound() match actual counts.
+
///
+
/// connected_inbound() = |{s ∈ connected | s.link = Link::Inbound}|
+
/// connected_outbound() = |{s ∈ connected | s.link = Link::Outbound}|
+
#[quickcheck]
+
fn prop_link_counts(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections, cmd, &mut time);
+
    }
+

+
    match invariants::check_link_count_consistency(connections.sessions()) {
+
        Ok(()) => TestResult::passed(),
+
        Err(e) => TestResult::error(e.to_string()),
+
    }
+
}
+

+
// =============================================================================
+
// Connection Type Properties
+
// =============================================================================
+

+
/// Ephemeral Disconnection Removes Session
+
///
+
/// Disconnecting an ephemeral session removes it entirely.
+
///
+
/// ∀ session where session.connection_type = Ephemeral:
+
///  disconnect(session) → session ∉ sessions
+
#[quickcheck]
+
fn prop_ephemeral_removes(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Ephemeral,
+
        },
+
        now,
+
    );
+

+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Ephemeral,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::Severed { .. } => {
+
            if connections.has_session(&node) {
+
                TestResult::error("Session should be removed after ephemeral disconnect")
+
            } else {
+
                TestResult::passed()
+
            }
+
        }
+
        other => TestResult::error(format!("Expected Severed, got {:?}", other)),
+
    }
+
}
+

+
/// Persistent Disconnection Preserves Session
+
///
+
/// Disconnecting a persistent session transitions to Disconnected state.
+
///
+
/// ∀ session where session.connection_type = Persistent:
+
///  disconnect(session) → session ∈ disconnected
+
#[quickcheck]
+
fn prop_persistent_preserves(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::Retry { .. } => {
+
            if connections.sessions().is_diconnected(&node) {
+
                TestResult::passed()
+
            } else {
+
                TestResult::error("Session should be in Disconnected state")
+
            }
+
        }
+
        other => TestResult::error(format!("Expected Retry, got {:?}", other)),
+
    }
+
}
+

+
/// Persistent Sessions Have Retry Time
+
///
+
/// Disconnected persistent sessions have retry_at > disconnect time.
+
///
+
/// ∀ session ∈ disconnected:
+
///  session.retry_at.is_some()
+
#[quickcheck]
+
fn prop_has_retry_time(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::Retry { retry_at, .. } => {
+
            if retry_at > now {
+
                TestResult::passed()
+
            } else {
+
                TestResult::error(format!(
+
                    "retry_at ({:?}) should be > now ({:?})",
+
                    retry_at, now
+
                ))
+
            }
+
        }
+
        other => TestResult::error(format!("Expected Retry, got {:?}", other)),
+
    }
+
}
+

+
// =============================================================================
+
// Attempt Counter Properties
+
// =============================================================================
+

+
/// Attempt Monotonicity During Connection Phase
+
///
+
/// The attempt counter never decreases during Initial → Attempted → Connected.
+
///
+
/// ∀ transitions Initial → Attempted → Connected:
+
///  attempts_before ≤ attempts_after
+
#[quickcheck]
+
fn prop_attempt_monotonic(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut attempts: Vec<Attempts> = Vec::new();
+

+
    // Initial state
+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    match connections.session_for(&node) {
+
        Some(s) => attempts.push(s.attempts()),
+
        None => return TestResult::error("Session should exist after connect"),
+
    }
+

+
    // Attempted state
+
    match connections.attempted(command::Attempt { node }) {
+
        event::Attempted::ConnectionAttempt { session } => {
+
            attempts.push(session.attempts());
+
        }
+
        other => return TestResult::error(format!("Expected ConnectionAttempt, got {:?}", other)),
+
    }
+

+
    // Connected state
+
    match connections.connected(
+
        command::Connected::Outbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { session } => {
+
            attempts.push(session.attempts());
+
        }
+
        other => return TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+

+
    // Verify we have all 3 data points
+
    assert_eq!(attempts.len(), 3);
+

+
    // Verify monotonicity
+
    for window in attempts.windows(2) {
+
        if window[1] < window[0] {
+
            return TestResult::error(format!(
+
                "Attempt count decreased: {} -> {}",
+
                window[0], window[1]
+
            ));
+
        }
+
    }
+

+
    TestResult::passed()
+
}
+

+
/// Attempt Increment on Attempt Command
+
///
+
/// The Attempt command increments the attempt counter by exactly 1.
+
///
+
/// ∀ session in Initial:
+
///   let attempts_before = session.attempts
+
///   attempt(session.node)
+
///   let attempts_after = session.attempts
+
///   attempts_after = attempts_before + 1
+
#[quickcheck]
+
fn prop_attempt_increments(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let before = match connections.session_for(&node) {
+
        Some(s) => s.attempts(),
+
        None => return TestResult::error("Session should exist after connect"),
+
    };
+

+
    match connections.attempted(command::Attempt { node }) {
+
        event::Attempted::ConnectionAttempt { session } => {
+
            let after = session.attempts();
+
            if after == before.attempted() {
+
                TestResult::passed()
+
            } else {
+
                TestResult::error(format!(
+
                    "Expected attempts={}, got {}",
+
                    before.attempted(),
+
                    after
+
                ))
+
            }
+
        }
+
        other => TestResult::error(format!("Expected ConnectionAttempt, got {:?}", other)),
+
    }
+
}
+

+
/// Attempt Preservation Through Disconnection
+
///
+
/// The attempt count is preserved when transitioning to Disconnected.
+
///
+
/// ∀ session transitioning to Disconnected:
+
///  disconnected.attempts = original.attempts
+
#[quickcheck]
+
fn prop_attempt_preserved_on_disconnect(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    connections.attempted(command::Attempt { node });
+
    connections.connected(
+
        command::Connected::Outbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let before = match connections.sessions().get_connected(&node) {
+
        Some(session) => session.attempts(),
+
        None => return TestResult::error("Session should be connected"),
+
    };
+

+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Outbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+

+
    let after = match connections.session_for(&node) {
+
        Some(session) => session.attempts(),
+
        None => return TestResult::error("Session should exist after disconnect"),
+
    };
+

+
    if before == after {
+
        TestResult::passed()
+
    } else {
+
        TestResult::error(format!(
+
            "Attempts changed through disconnect: {} -> {}",
+
            before, after
+
        ))
+
    }
+
}
+

+
/// Attempt Reset on Stabilization
+
///
+
/// When a session is stabilised, its attempt counter is reset to zero.
+
///
+
/// ∀ session where stabilise(session) = true:
+
///  session.attempts = 0
+
#[quickcheck]
+
fn prop_attempt_reset_on_stabilise(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Build up some attempts
+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    connections.attempted(command::Attempt { node });
+
    connections.connected(
+
        command::Connected::Outbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Verify we have attempts > 0
+
    let before = match connections.sessions().get_connected(&node) {
+
        Some(session) => session.attempts(),
+
        None => return TestResult::error("Session should be connected"),
+
    };
+

+
    if before == 0 {
+
        return TestResult::error("Expected attempts > 0 before stabilise");
+
    }
+

+
    let later = now + connections.config().stale() + LocalDuration::from_secs(1);
+

+
    let stabilised = connections.stabilise(later);
+

+
    // Verify this session was stabilised
+
    assert!(stabilised.iter().any(|s| s.node() == node));
+

+
    // Verify attempts reset
+
    let after = match connections.sessions().get_connected(&node) {
+
        Some(session) => session.attempts(),
+
        None => return TestResult::error("Session should still be connected"),
+
    };
+

+
    if after == 0 {
+
        TestResult::passed()
+
    } else {
+
        TestResult::error(format!(
+
            "Attempts should be 0 after stabilise, got {}",
+
            after
+
        ))
+
    }
+
}
+

+
// =============================================================================
+
// Rate Limiting Properties
+
// =============================================================================
+

+
/// Inbound Limit Enforcement
+
///
+
/// When inbound connections reach the limit, accept returns LimitExceeded.
+
///
+
/// connected_inbound() ≥ inbound_limit ∧ ¬ip.is_loopback() ∧ ¬ip.is_unspecified()
+
///  → accept(ip) = LimitExceeded
+
#[test]
+
fn prop_inbound_limit() {
+
    const INBOUND_LIMIT: u8 = 2;
+

+
    let local = NonLocalNode::local_node();
+
    let config = {
+
        let mut config = test_config();
+
        config.inbound.maximum = INBOUND_LIMIT as usize;
+
        config
+
    };
+
    let mut connections = Connections::new(local, config, RateLimiter::default());
+
    let now = LocalTime::from_secs(1577836800);
+
    let mut g = Gen::new(100);
+

+
    // Fill up to the inbound limit
+
    for i in 0..INBOUND_LIMIT {
+
        let node = NodeId::from(crypto::PublicKey::from([i + 10; 32]));
+
        let addr = Address::arbitrary(&mut g);
+
        connections.connected(
+
            command::Connected::Inbound {
+
                node,
+
                addr,
+
                connection_type: ConnectionType::Ephemeral,
+
            },
+
            now,
+
        );
+
    }
+

+
    // Next accept should be limited
+
    let ip = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8));
+
    assert!(
+
        matches!(
+
            connections.accept(command::Accept { ip }, now),
+
            event::Accept::LimitExceeded { .. }
+
        ),
+
        "Accept should return LimitExceeded when at inbound limit"
+
    );
+
}
+

+
/// Localhost Always Accepted
+
///
+
/// Localhost and unspecified IPs are always accepted regardless of limits.
+
///
+
/// ip.is_loopback() ∨ ip.is_unspecified() → accept(ip) = LocalHost
+
#[test]
+
fn prop_localhost_accepted() {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let now = LocalTime::from_secs(1577836800);
+

+
    let localhost_ips = [
+
        IpAddr::V4(Ipv4Addr::LOCALHOST),
+
        IpAddr::V6(Ipv6Addr::LOCALHOST),
+
        IpAddr::V4(Ipv4Addr::UNSPECIFIED),
+
        IpAddr::V6(Ipv6Addr::UNSPECIFIED),
+
    ];
+

+
    for ip in localhost_ips {
+
        assert!(
+
            matches!(
+
                connections.accept(command::Accept { ip }, now),
+
                event::Accept::LocalHost { .. }
+
            ),
+
            "Expected LocalHost for {:?}",
+
            ip
+
        );
+
    }
+
}
+

+
/// Host Rate Limiting
+
///
+
/// IPs that exceed the rate limit return HostLimited.
+
///
+
/// rate_limited(ip) → accept(ip) = HostLimited { ip }
+
#[test]
+
fn prop_host_rate_limited() {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections_with_low_limits(local);
+
    let now = LocalTime::from_secs(1577836800);
+
    let ip = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8));
+

+
    // First accept consumes the single token
+
    assert!(
+
        matches!(
+
            connections.accept(command::Accept { ip }, now),
+
            event::Accept::Accepted { .. }
+
        ),
+
        "First accept should succeed"
+
    );
+

+
    // Second accept should be rate limited (no tokens, no refill)
+
    assert_eq!(
+
        connections.accept(command::Accept { ip }, now),
+
        event::Accept::HostLimited { ip }
+
    );
+
}
+

+
/// Message Rate Limiting
+
///
+
/// Messages from rate-limited nodes return RateLimited.
+
///
+
/// ∀ message from rate_limited node:
+
///  handle_message(message) = RateLimited { node }
+
#[quickcheck]
+
fn prop_message_rate_limited(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections_with_low_limits(local);
+

+
    // Establish a connected session
+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { .. } => {}
+
        other => return TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+

+
    // First message consumes the single token
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: None,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::Connected { .. } => {}
+
        other => {
+
            return TestResult::error(format!("First message should succeed, got {:?}", other))
+
        }
+
    }
+

+
    // Second message should be rate limited
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: None,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::RateLimited { node: n } if n == node => TestResult::passed(),
+
        other => TestResult::error(format!("Expected RateLimited for {node}, got {:?}", other)),
+
    }
+
}
+

+
// =============================================================================
+
// Timing Properties
+
// =============================================================================
+

+
/// Reconnection Delay Bounds
+
///
+
/// Reconnection delay is always within configured min/max bounds.
+
///
+
/// ∀ delay returned by disconnect:
+
///  min_delta ≤ delay ≤ max_delta
+
#[quickcheck]
+
fn prop_delay_bounds(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::Retry { delay, .. } => {
+
            if delay < MIN_RECONNECTION_DELTA {
+
                TestResult::error(format!(
+
                    "Delay {:?} is below minimum {:?}",
+
                    delay, MIN_RECONNECTION_DELTA
+
                ))
+
            } else if delay > MAX_RECONNECTION_DELTA {
+
                TestResult::error(format!(
+
                    "Delay {:?} is above maximum {:?}",
+
                    delay, MAX_RECONNECTION_DELTA
+
                ))
+
            } else {
+
                TestResult::passed()
+
            }
+
        }
+
        other => TestResult::error(format!("Expected Retry, got {:?}", other)),
+
    }
+
}
+

+
/// Exponential Backoff
+
///
+
/// Reconnection delays are increasing across reconnection cycles.
+
#[quickcheck]
+
fn prop_exponential_backoff(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut delays: Vec<LocalDuration> = Vec::new();
+

+
    for _ in 0..5 {
+
        connections.connect(
+
            command::Connect {
+
                node,
+
                addr: addr.clone(),
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            now,
+
        );
+
        connections.attempted(command::Attempt { node });
+
        connections.connected(
+
            command::Connected::Outbound {
+
                node,
+
                addr: addr.clone(),
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            now,
+
        );
+

+
        match connections.disconnected(
+
            command::Disconnect {
+
                node,
+
                link: Link::Outbound,
+
                since: now,
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            &DisconnectReason::Command,
+
        ) {
+
            event::Disconnected::Retry { delay, .. } => delays.push(delay),
+
            other => return TestResult::error(format!("Expected Retry, got {:?}", other)),
+
        }
+

+
        connections.reconnect(command::Reconnect { node });
+
    }
+

+
    // Verify we collected all delays
+
    if delays.len() != 5 {
+
        return TestResult::error(format!("Expected 5 delays, got {}", delays.len()));
+
    }
+

+
    // Verify increasing
+
    for window in delays.windows(2) {
+
        if window[1] < window[0] {
+
            return TestResult::error(format!(
+
                "Delay decreased: {:?} -> {:?}",
+
                window[0], window[1]
+
            ));
+
        }
+
    }
+

+
    TestResult::passed()
+
}
+

+
/// Last Active Update on Connection
+
///
+
/// last_active is set when a session transitions to Connected.
+
///
+
/// ∀ connection at time t:
+
///  session.last_active = t
+
#[quickcheck]
+
fn prop_last_active_on_connect(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { session } => {
+
            assert_eq!(*session.last_active(), now);
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Last Active Update on Message
+
///
+
/// last_active is updated when a session receives a message.
+
///
+
/// ∀ connection at time t:
+
///  session.last_active = t
+
#[quickcheck]
+
fn prop_last_active_on_message(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(connect_time): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        connect_time,
+
    );
+

+
    let message_time = connect_time + LocalDuration::from_secs(10);
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: None,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        message_time,
+
    ) {
+
        event::HandledMessage::Connected { session } => {
+
            assert_eq!(*session.last_active(), message_time);
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Connected, got {:?}", other)),
+
    }
+
}
+

+
/// Inactivity Detection
+
///
+
/// is_inactive returns true iff time since last activity exceeds threshold.
+
#[quickcheck]
+
fn prop_inactivity_detection(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let session = match connections.sessions().get_connected(&node) {
+
        Some(s) => s,
+
        None => return TestResult::error("Session should be connected"),
+
    };
+

+
    let delta = LocalDuration::from_secs(60);
+

+
    // Before threshold: not inactive
+
    let before_threshold = now + connections.config().idle() - LocalDuration::from_secs(1);
+
    assert!(!session.is_inactive(&before_threshold, delta));
+

+
    // At threshold: inactive
+
    let at_threshold = now + delta;
+
    assert!(session.is_inactive(&at_threshold, delta));
+

+
    // After threshold: inactive
+
    let after_threshold = now + connections.config().idle();
+
    assert!(session.is_inactive(&after_threshold, delta));
+
    TestResult::passed()
+
}
+

+
/// Stability Threshold
+
///
+
/// A session becomes stable only after connected for longer than the stability threshold.
+
///
+
/// session.stable = true ⟺ (now - session.since ≥ stable_threshold)
+
#[quickcheck]
+
fn prop_stability_threshold(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let before_threshold = now + connections.config().stale() - LocalDuration::from_secs(1);
+
    connections.stabilise(before_threshold);
+

+
    let session = match connections.sessions().get_connected(&node) {
+
        Some(s) => s,
+
        None => return TestResult::error("Session should be connected"),
+
    };
+
    assert!(!session.is_stable());
+

+
    let after_threshold = now + connections.config().stale();
+
    connections.stabilise(after_threshold);
+

+
    let session = match connections.sessions().get_connected(&node) {
+
        Some(s) => s,
+
        None => return TestResult::error("Session should be connected"),
+
    };
+
    assert!(session.is_stable());
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// Subscription Properties
+
// =============================================================================
+

+
/// Subscription Persistence Across States
+
///
+
/// Subscription data is preserved through state transitions.
+
///
+
/// ∀ state transition:
+
///  session_before.subscribe = session_after.subscribe
+
#[quickcheck]
+
fn prop_subscription_persistence_through_disconnect(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
    rid: RepoId,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Connect with Persistent type
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Set subscription with the repo ID
+
    let mut filter = Filter::empty();
+
    filter.insert(&rid);
+
    let subscription = message::Subscribe {
+
        filter,
+
        since: Timestamp::from(now),
+
        until: Timestamp::MAX,
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Subscribe(subscription)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::Subscribed { session } => {
+
            assert!(session.is_subscribed_to(&rid));
+
        }
+
        other => return TestResult::error(format!("Expected Subscribed, got {:?}", other)),
+
    }
+

+
    // Disconnect
+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+

+
    // Verify subscription is preserved in disconnected state
+
    match connections.session_for(&node) {
+
        Some(session) => {
+
            assert!(session.is_subscribed_to(&rid));
+
            TestResult::passed()
+
        }
+
        None => TestResult::error("Session should exist after persistent disconnect"),
+
    }
+
}
+

+
/// Subscription Persistence Across States
+
///
+
/// Subscription data is preserved through state transitions.
+
///
+
/// ∀ state transition:
+
///  session_before.subscribe = session_after.subscribe
+
#[quickcheck]
+
fn prop_subscription_persistence(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
    rid: RepoId,
+
    commands: Vec<TestCommand>,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Connect with Persistent type
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Set subscription with the repo ID
+
    let mut filter = Filter::empty();
+
    filter.insert(&rid);
+
    let subscription = message::Subscribe {
+
        filter,
+
        since: Timestamp::from(now),
+
        until: Timestamp::MAX,
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Subscribe(subscription)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::Subscribed { session } => {
+
            if !session.is_subscribed_to(&rid) {
+
                return TestResult::error("Subscription should be set");
+
            }
+
        }
+
        other => return TestResult::error(format!("Expected Subscribed, got {:?}", other)),
+
    }
+

+
    let mut time = now;
+

+
    for cmd in commands {
+
        // Track if this command might replace our session
+
        let is_inbound_for_node = matches!(
+
            &cmd,
+
            TestCommand::ConnectedInbound { node: n, .. } if *n == node
+
        );
+
        let is_ephemeral_disconnect_for_node = matches!(
+
            &cmd,
+
            TestCommand::Disconnect {
+
                node: n,
+
                connection_type: ConnectionType::Ephemeral,
+
                ..
+
            } if *n == node
+
        );
+

+
        apply_command(&mut connections, cmd, &mut time);
+

+
        // If session was replaced by inbound or removed by ephemeral disconnect, stop checking
+
        if is_inbound_for_node || is_ephemeral_disconnect_for_node {
+
            continue;
+
        }
+

+
        // If session still exists, verify subscription is preserved
+
        if let Some(session) = connections.session_for(&node) {
+
            assert!(session.is_subscribed_to(&rid));
+
        }
+
    }
+

+
    // Final check if session exists
+
    if let Some(session) = connections.session_for(&node) {
+
        assert!(session.is_subscribed_to(&rid));
+
    }
+

+
    TestResult::passed()
+
}
+

+
/// Subscribe Returns Success Only for Existing Connected Sessions
+
///
+
/// subscribe returns Subscribed only if the session exists and is connected.
+
///
+
/// subscribe(node, subscription) = true ⟺ has_session_for(node)
+
#[quickcheck]
+
fn prop_subscribe_requires_connected_session(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let subscription = message::Subscribe {
+
        filter: Filter::default(),
+
        since: Timestamp::from(now),
+
        until: Timestamp::MAX,
+
    };
+

+
    // Subscribe on missing session should fail
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Subscribe(subscription.clone())),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::MissingSession { .. } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected MissingSession for missing session, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // Connect the session
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Subscribe on connected session should succeed
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Subscribe(subscription.clone())),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::Subscribed { .. } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected Subscribed for connected session, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // Disconnect the session
+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+

+
    // Subscribe on disconnected session should fail
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Subscribe(subscription)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::Disconnected { .. } => TestResult::passed(),
+
        other => TestResult::error(format!(
+
            "Expected Disconnected for disconnected session, got {:?}",
+
            other
+
        )),
+
    }
+
}
+

+
// =============================================================================
+
// Ping/Pong Properties
+
// =============================================================================
+

+
/// Pong Only in Connected State
+
///
+
/// Pong processing only succeeds for connected sessions.
+
///
+
/// pinged(node, pong) = Ok(_) ⟺ node ∈ connected.keys()
+
#[quickcheck]
+
fn prop_pong_only_connected(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let pong = Pong {
+
        now,
+
        zeroes: message::ZeroBytes::new(10),
+
    };
+

+
    // Pong on missing session
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(pong.clone())),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::MissingSession { .. } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected MissingSession for missing session, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // Connect and set up ping state
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Ping the session to set up AwaitingResponse state
+
    let later = now + LocalDuration::from_secs(60);
+
    let ponglen = 10u16;
+
    let mut ping_called = false;
+
    for event in connections.ping(
+
        || {
+
            ping_called = true;
+
            message::Ping {
+
                ponglen,
+
                zeroes: message::ZeroBytes::new(0),
+
            }
+
        },
+
        later,
+
    ) {
+
        // Consume the iterator to trigger pings
+
        let _ = event;
+
    }
+
    assert!(ping_called);
+

+
    // Valid pong on connected session should succeed
+
    let valid_pong = Pong {
+
        now: later,
+
        zeroes: message::ZeroBytes::new(ponglen),
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(valid_pong)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        later,
+
    ) {
+
        event::HandledMessage::Pinged {
+
            pinged: Some(_), ..
+
        } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected Pinged with Some for connected session, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // Disconnect the session
+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: later,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+

+
    // Pong on disconnected session should fail
+
    let pong = Pong {
+
        now: later,
+
        zeroes: message::ZeroBytes::new(10),
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(pong)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        later,
+
    ) {
+
        event::HandledMessage::Disconnected { .. } => TestResult::passed(),
+
        other => TestResult::error(format!(
+
            "Expected Disconnected for disconnected session, got {:?}",
+
            other
+
        )),
+
    }
+
}
+

+
/// Latency Recording
+
///
+
/// Successful pong responses record latency
+
///
+
/// ∀ successful pong:
+
///  session.latencies.push_back(latency)
+
///  ∧ |session.latencies| ≤ MAX_LATENCIES
+
#[quickcheck]
+
fn prop_latency_bounded(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let ponglen = 10u16;
+
    let mut successful_pongs = 0;
+

+
    // Send more pongs than MAX_LATENCIES to verify bounded storage
+
    for i in 0..(MAX_LATENCIES + 5) {
+
        let ping_time = now + LocalDuration::from_secs(60 * (i as u64 + 1));
+

+
        // Ping to set up AwaitingResponse
+
        for _ in connections.ping(
+
            || message::Ping {
+
                ponglen,
+
                zeroes: message::ZeroBytes::new(0),
+
            },
+
            ping_time,
+
        ) {}
+

+
        // Pong with valid response
+
        let pong_time = ping_time + LocalDuration::from_secs(1);
+
        let pong = Pong {
+
            now: pong_time,
+
            zeroes: message::ZeroBytes::new(ponglen),
+
        };
+

+
        match connections.handle_message(
+
            command::Message {
+
                node,
+
                payload: Some(command::Payload::Pong(pong)),
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            pong_time,
+
        ) {
+
            event::HandledMessage::Pinged {
+
                pinged: Some(pinged),
+
                ..
+
            } => {
+
                successful_pongs += 1;
+
                // Verify latency is recorded correctly
+
                assert_eq!(pinged.latency, LocalDuration::from_secs(1));
+
            }
+
            other => {
+
                return TestResult::error(format!("Expected Pinged with latency, got {:?}", other))
+
            }
+
        }
+
    }
+

+
    assert_eq!(successful_pongs, MAX_LATENCIES + 5);
+
    TestResult::passed()
+
}
+

+
/// Ping State Transition
+
///
+
/// After ping, session enters AwaitingResponse state until valid pong.
+
///
+
/// after ping(): session.ping = AwaitingResponse { len, since }
+
/// after valid pong(): session.ping = Ok
+
#[quickcheck]
+
fn prop_ping_state_transition(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let ponglen = 10u16;
+

+
    // Before ping: pong should return None (no AwaitingResponse)
+
    let pong = Pong {
+
        now,
+
        zeroes: message::ZeroBytes::new(ponglen),
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(pong)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::HandledMessage::Pinged { pinged: None, .. } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected Pinged with None before ping, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // Ping to enter AwaitingResponse
+
    let ping_time = now + LocalDuration::from_secs(60);
+
    for _ in connections.ping(
+
        || message::Ping {
+
            ponglen,
+
            zeroes: message::ZeroBytes::new(0),
+
        },
+
        ping_time,
+
    ) {}
+

+
    // Invalid pong (wrong length) should return None
+
    let invalid_pong = Pong {
+
        now: ping_time,
+
        zeroes: message::ZeroBytes::new(ponglen + 1), // Wrong length
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(invalid_pong)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        ping_time,
+
    ) {
+
        event::HandledMessage::Pinged { pinged: None, .. } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected Pinged with None for invalid pong, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // Need to ping again since state may have changed
+
    let ping_time2 = ping_time + LocalDuration::from_secs(60);
+
    for _ in connections.ping(
+
        || message::Ping {
+
            ponglen,
+
            zeroes: message::ZeroBytes::new(0),
+
        },
+
        ping_time2,
+
    ) {}
+

+
    // Valid pong should return Some and reset state
+
    let valid_pong = Pong {
+
        now: ping_time2,
+
        zeroes: message::ZeroBytes::new(ponglen),
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(valid_pong)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        ping_time2,
+
    ) {
+
        event::HandledMessage::Pinged {
+
            pinged: Some(_), ..
+
        } => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected Pinged with Some for valid pong, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // After valid pong: back to Ok state, pong should return None
+
    let final_pong = Pong {
+
        now: ping_time2,
+
        zeroes: message::ZeroBytes::new(ponglen),
+
    };
+

+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: Some(command::Payload::Pong(final_pong)),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        ping_time2,
+
    ) {
+
        event::HandledMessage::Pinged { pinged: None, .. } => TestResult::passed(),
+
        other => TestResult::error(format!(
+
            "Expected Pinged with None after valid pong (back to Ok), got {:?}",
+
            other
+
        )),
+
    }
+
}
+

+
// =============================================================================
+
// Iterator Properties
+
// =============================================================================
+

+
/// Iterator Completeness
+
///
+
/// Iterating over sessions yields exactly all sessions across all states.
+
///
+
/// |sessions.iter()| = |initial| + |attempted| + |connected| + |disconnected|
+
#[quickcheck]
+
fn prop_iterator_complete(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections, cmd, &mut time);
+
    }
+

+
    let sessions = connections.sessions();
+
    let iter_count = sessions.iter().count();
+

+
    let mut state_count = 0;
+
    for (node, _) in sessions.iter() {
+
        let in_state = sessions.is_initial(node) as usize
+
            + sessions.is_attempted(node) as usize
+
            + sessions.get_connected(node).is_some() as usize
+
            + sessions.is_diconnected(node) as usize;
+

+
        assert_eq!(in_state, 1);
+
        state_count += 1;
+
    }
+

+
    assert_eq!(iter_count, state_count);
+
    TestResult::passed()
+
}
+

+
/// Connected Iterator Correctness
+
///
+
/// connected() iterator yields exactly all connected sessions.
+
///
+
/// sessions.connected().count() = |connected|
+
/// ∧ ∀ session in sessions.connected(): session ∈ connected
+
#[quickcheck]
+
fn prop_connected_iterator(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections, cmd, &mut time);
+
    }
+

+
    let sessions = connections.sessions();
+
    let iter_count = sessions.connected().sessions().count();
+

+
    let manual_count = sessions
+
        .iter()
+
        .filter(|(node, _)| sessions.get_connected(node).is_some())
+
        .count();
+

+
    assert_eq!(iter_count, manual_count);
+
    TestResult::passed()
+
}
+

+
/// Unresponsive Filter Correctness
+
///
+
/// unresponsive returns only connected sessions that are inactive.
+
///
+
/// ∀ session in unresponsive(now, threshold):
+
///  session ∈ connected ∧ session.is_inactive(now, threshold)
+
#[quickcheck]
+
fn prop_unresponsive_filter(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Connect the session
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let stale_connection = connections.config().stale();
+

+
    // Before stale_connection threshold: not unresponsive
+
    let before_threshold = now + stale_connection - LocalDuration::from_secs(1);
+
    let unresponsive_before: Vec<_> = connections.unresponsive(&before_threshold).collect();
+
    assert!(!unresponsive_before.iter().any(|(n, _)| **n == node));
+

+
    // At/after stale_connection threshold: unresponsive
+
    let after_threshold = now + stale_connection + LocalDuration::from_secs(1);
+
    let unresponsive_after: Vec<_> = connections.unresponsive(&after_threshold).collect();
+
    assert!(unresponsive_after.iter().any(|(n, _)| **n == node));
+

+
    // Verify all returned sessions are actually connected and inactive
+
    for (nid, session) in unresponsive_after {
+
        if connections.sessions().get_connected(nid).is_none() {
+
            return TestResult::error(format!("Unresponsive session {:?} is not connected", nid));
+
        }
+
        if !session.is_inactive(&after_threshold, stale_connection) {
+
            return TestResult::error(format!("Unresponsive session {:?} is not inactive", nid));
+
        }
+
    }
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// State Machine Model Properties
+
// =============================================================================
+

+
/// Deterministic Transitions
+
///
+
/// Given the same state and command, the resulting state is always the same.
+
///
+
/// ∀ state S, command C:
+
///  apply(S, C) always produces the same result
+
#[quickcheck]
+
fn prop_deterministic_transitions(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+

+
    let mut connections1 = new_connections(local);
+
    let mut connections2 = new_connections(local);
+
    let mut time1 = LocalTime::from_secs(1577836800);
+
    let mut time2 = LocalTime::from_secs(1577836800);
+

+
    for cmd in commands {
+
        apply_command(&mut connections1, cmd.clone(), &mut time1);
+
        apply_command(&mut connections2, cmd, &mut time2);
+

+
        // Verify session sets match
+
        let nodes1: HashSet<_> = connections1.sessions().iter().map(|(n, _)| *n).collect();
+
        let nodes2: HashSet<_> = connections2.sessions().iter().map(|(n, _)| *n).collect();
+

+
        if nodes1 != nodes2 {
+
            return TestResult::error("Session sets differ after identical commands");
+
        }
+

+
        // Verify states match for each node
+
        for node in nodes1 {
+
            let s1 = connections1.sessions();
+
            let s2 = connections2.sessions();
+

+
            let state1 = (
+
                s1.is_initial(&node),
+
                s1.is_attempted(&node),
+
                s1.get_connected(&node).is_some(),
+
                s1.is_diconnected(&node),
+
            );
+
            let state2 = (
+
                s2.is_initial(&node),
+
                s2.is_attempted(&node),
+
                s2.get_connected(&node).is_some(),
+
                s2.is_diconnected(&node),
+
            );
+

+
            if state1 != state2 {
+
                return TestResult::error(format!(
+
                    "State differs for node {:?}: {:?} vs {:?}",
+
                    node, state1, state2
+
                ));
+
            }
+
        }
+
    }
+

+
    TestResult::passed()
+
}
+

+
/// No State Loss
+
///
+
/// A session cannot disappear except through Disconnect(Ephemeral).
+
///
+
/// session ∈ sessions at time t ∧ session ∉ sessions at time t+1
+
///  → ∃ Disconnect(Ephemeral) for session.node between t and t+1
+
///     ∨ ∃ Connected(Inbound) that replaced session
+
#[quickcheck]
+
fn prop_no_state_loss(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    // Track which nodes have sessions
+
    let mut had_session: HashSet<NodeId> = HashSet::new();
+

+
    for cmd in commands {
+
        // Record nodes that have sessions before command
+
        had_session.extend(connections.sessions().iter().map(|(n, _)| n));
+

+
        // Track if this command is an ephemeral disconnect or inbound connect
+
        let is_ephemeral_disconnect = matches!(
+
            &cmd,
+
            TestCommand::Disconnect {
+
                connection_type: ConnectionType::Ephemeral,
+
                ..
+
            }
+
        );
+
        let inbound_node = match &cmd {
+
            TestCommand::ConnectedInbound { node, .. } => Some(*node),
+
            _ => None,
+
        };
+

+
        apply_command(&mut connections, cmd, &mut time);
+

+
        // Check for disappeared sessions
+
        for node in had_session.iter() {
+
            if !connections.has_session(node) {
+
                // Session disappeared - must be due to ephemeral disconnect
+
                // or it was overwritten by inbound (which keeps the session)
+
                if !is_ephemeral_disconnect && inbound_node != Some(*node) {
+
                    return TestResult::error(format!(
+
                        "Session {:?} disappeared without ephemeral disconnect or inbound overwrite",
+
                        node
+
                    ));
+
                }
+
            }
+
        }
+

+
        // Update tracked sessions
+
        had_session.clear();
+
        had_session.extend(connections.sessions().iter().map(|(n, _)| n));
+
    }
+

+
    TestResult::passed()
+
}
+

+
/// Command Reversibility (Partial)
+
///
+
/// Reconnect reverses disconnect in terms of session state (Disconnected → Initial).
+
///
+
/// reconnect(node) reverses disconnect(node)
+
///  only in terms of session existence, not exact state
+
#[quickcheck]
+
fn prop_reconnect_reverses_disconnect(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Connect and establish session
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    assert!(connections.sessions().get_connected(&node).is_some());
+

+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+
    assert!(connections.sessions().is_diconnected(&node));
+

+
    // Reconnect should bring session back to Initial
+
    match connections.reconnect(command::Reconnect { node }) {
+
        event::Reconnect::Reconnecting { .. } => {}
+
        other => {
+
            return TestResult::error(format!("Expected Reconnecting, got {:?}", other));
+
        }
+
    }
+
    assert!(connections.sessions().is_initial(&node));
+
    assert!(connections.has_session(&node));
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// Inbound Special Cases
+
// =============================================================================
+

+
/// Inbound Creates Session if Missing
+
///
+
/// Connected::Inbound creates a new connected session if none exists.
+
///
+
/// node ∉ sessions.keys() ∧ Connected::Inbound(node)
+
///  → node ∈ connected.keys()
+
#[quickcheck]
+
fn prop_inbound_creates(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { session } => {
+
            assert_eq!(session.node(), node);
+
            assert!(connections.sessions().get_connected(&node).is_some());
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Inbound Overwrites Disconnected State
+
///
+
/// Connected::Inbound transitions disconnected session to Connected.
+
///
+
/// ∀ existing session state:
+
///  Connected::Inbound(node) → node ∈ connected.keys()
+
#[quickcheck]
+
fn prop_inbound_overwrites_disconnected(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Create a disconnected session
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+
    assert!(connections.sessions().is_diconnected(&node));
+

+
    // Inbound should overwrite
+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { .. } => {
+
            assert!(connections.sessions().get_connected(&node).is_some());
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Inbound Overwrites Initial State
+
///
+
/// Connected::Inbound transitions initial session to Connected.
+
///
+
/// ∀ existing session state:
+
///  Connected::Inbound(node) → node ∈ connected.keys()
+
#[quickcheck]
+
fn prop_inbound_overwrites_initial(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Create an initial session via connect
+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    assert!(connections.sessions().is_initial(&node));
+

+
    // Inbound should overwrite
+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { .. } => {
+
            assert!(connections.sessions().get_connected(&node).is_some());
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Inbound Overwrites Attempted State
+
///
+
/// Connected::Inbound transitions attempted session to Connected.
+
///
+
/// ∀ existing session state:
+
///  Connected::Inbound(node) → node ∈ connected.keys()
+
#[quickcheck]
+
fn prop_inbound_overwrites_attempted(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Create an attempted session
+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    connections.attempted(command::Attempt { node });
+
    assert!(connections.sessions().is_attempted(&node));
+

+
    // Inbound should overwrite
+
    match connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::Established { .. } => {
+
            assert!(connections.sessions().get_connected(&node).is_some());
+
            TestResult::passed()
+
        }
+
        other => TestResult::error(format!("Expected Established, got {:?}", other)),
+
    }
+
}
+

+
/// Outbound Requires Existing Session
+
///
+
/// Connected::Outbound fails if no session exists.
+
///
+
/// node ∉ sessions.keys() ∧ Connected::Outbound(node)
+
///  → result = MissingSession { node }
+
#[quickcheck]
+
fn prop_outbound_requires_session(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    match connections.connected(
+
        command::Connected::Outbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connected::MissingSession { node: n } if n == node => TestResult::passed(),
+
        other => TestResult::error(format!(
+
            "Expected MissingSession for {node}, got {:?}",
+
            other
+
        )),
+
    }
+
}
+

+
// =============================================================================
+
// Address Properties
+
// =============================================================================
+

+
/// Address Preservation
+
///
+
/// Session address is preserved through state transitions.
+
///
+
/// ∀ state transition:
+
///  session_before.addr = session_after.addr
+
#[quickcheck]
+
fn prop_address_preservation(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
    commands: Vec<TestCommand>,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let expected_addr = addr.clone();
+

+
    // Create session via connect
+
    connections.connect(
+
        command::Connect {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    let mut time = now;
+

+
    for cmd in commands {
+
        // Track if this command might replace our session
+
        let is_inbound_for_node = matches!(
+
            &cmd,
+
            TestCommand::ConnectedInbound { node: n, .. } if *n == node
+
        );
+
        let is_ephemeral_disconnect_for_node = matches!(
+
            &cmd,
+
            TestCommand::Disconnect {
+
                node: n,
+
                connection_type: ConnectionType::Ephemeral,
+
                ..
+
            } if *n == node
+
        );
+

+
        apply_command(&mut connections, cmd, &mut time);
+

+
        // If session was replaced by inbound or removed by ephemeral disconnect, stop checking
+
        if is_inbound_for_node || is_ephemeral_disconnect_for_node {
+
            continue;
+
        }
+

+
        // If session still exists, verify address is preserved
+
        if let Some(session) = connections.session_for(&node) {
+
            assert_eq!(*session.address(), expected_addr);
+
        }
+
    }
+

+
    // Final check if session exists
+
    if let Some(session) = connections.session_for(&node) {
+
        assert_eq!(*session.address(), expected_addr);
+
    }
+

+
    TestResult::passed()
+
}
+

+
/// Record IP for Routable Addresses
+
///
+
/// connect signals to record IP only for non-local IP addresses.
+
///
+
/// connect(node, addr) = Establish { record_ip: Some(ip) }
+
///  ⟺ addr.host = Ip(ip) ∧ ¬is_local(ip)
+
#[quickcheck]
+
fn prop_record_ip_for_routable(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    match connections.connect(
+
        command::Connect {
+
            node,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connect::Establish { record_ip, .. } => match record_ip {
+
            Some(_) => TestResult::passed(),
+
            None => TestResult::error("Expected record_ip for routable address"),
+
        },
+
        other => TestResult::error(format!("Expected Establish, got {:?}", other)),
+
    }
+
}
+

+
/// Record IP is None for non-IP addresses
+
///
+
/// connect signals record_ip=None for DNS hostnames.
+
#[quickcheck]
+
fn prop_no_record_ip_for_dns(
+
    NonLocalNode(node): NonLocalNode,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let addr = Address::from(cyphernet::addr::NetAddr {
+
        host: HostName::Dns(String::from("seed.radicle.example.com")),
+
        port: 8080,
+
    });
+

+
    match connections.connect(
+
        command::Connect {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    ) {
+
        event::Connect::Establish {
+
            record_ip: None, ..
+
        } => TestResult::passed(),
+
        event::Connect::Establish {
+
            record_ip: Some(ip),
+
            ..
+
        } => TestResult::error(format!(
+
            "Expected record_ip=None for DNS address, got {:?}",
+
            ip
+
        )),
+
        other => TestResult::error(format!("Expected Establish, got {:?}", other)),
+
    }
+
}
+

+
/// Record IP is None for localhost addresses.
+
#[quickcheck]
+
fn prop_no_record_ip_for_localhost(
+
    NonLocalNode(node): NonLocalNode,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let localhost_ips = [
+
        IpAddr::V4(Ipv4Addr::LOCALHOST),
+
        IpAddr::V6(Ipv6Addr::LOCALHOST),
+
    ];
+

+
    for ip in localhost_ips {
+
        let addr = Address::from(cyphernet::addr::NetAddr {
+
            host: HostName::Ip(ip),
+
            port: 8080,
+
        });
+

+
        match connections.connect(
+
            command::Connect {
+
                node,
+
                addr,
+
                connection_type: ConnectionType::Persistent,
+
            },
+
            now,
+
        ) {
+
            event::Connect::Establish {
+
                record_ip: None, ..
+
            } => {}
+
            event::Connect::Establish {
+
                record_ip: Some(recorded),
+
                ..
+
            } => {
+
                return TestResult::error(format!(
+
                    "Expected record_ip=None for localhost {:?}, got {:?}",
+
                    ip, recorded
+
                ));
+
            }
+
            other => {
+
                return TestResult::error(format!(
+
                    "Expected Establish for {:?}, got {:?}",
+
                    ip, other
+
                ))
+
            }
+
        }
+

+
        connections.disconnected(
+
            command::Disconnect {
+
                node,
+
                link: Link::Outbound,
+
                since: now,
+
                connection_type: ConnectionType::Ephemeral,
+
            },
+
            &DisconnectReason::Command,
+
        );
+
    }
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// Additional Properties
+
// =============================================================================
+

+
/// Empty State Initial Condition
+
///
+
/// New Connections instance has empty sessions.
+
#[test]
+
fn prop_empty_initial() {
+
    let local = NonLocalNode::local_node();
+
    let connections = new_connections(local);
+

+
    assert_eq!(
+
        connections.sessions().iter().count(),
+
        0,
+
        "Sessions should be empty"
+
    );
+
    assert_eq!(
+
        connections.sessions().connected().sessions().count(),
+
        0,
+
        "Connected sessions should be empty"
+
    );
+
    assert_eq!(
+
        connections.sessions().connected_inbound(),
+
        0,
+
        "Inbound count should be 0"
+
    );
+
    assert_eq!(
+
        connections.sessions().connected_outbound(),
+
        0,
+
        "Outbound count should be 0"
+
    );
+
}
+

+
/// Double Disconnect Prevention
+
///
+
/// Disconnecting an already disconnected session returns AlreadyDisconnected.
+
#[quickcheck]
+
fn prop_double_disconnect(
+
    NonLocalNode(node): NonLocalNode,
+
    addr: Address,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // First disconnect
+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+

+
    // Second disconnect should return AlreadyDisconnected
+
    match connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    ) {
+
        event::Disconnected::AlreadyDisconnected { node: n } if n == node => TestResult::passed(),
+
        other => TestResult::error(format!(
+
            "Expected AlreadyDisconnected for {node}, got {:?}",
+
            other
+
        )),
+
    }
+
}
+

+
/// Number of Connections Calculation
+
///
+
/// number_of_outbound_connections counts only Attempted and Connected with outbound links.
+
#[quickcheck]
+
fn prop_number_of_outbound_connections(
+
    NonLocalNode(node1): NonLocalNode,
+
    NonLocalNode(node2): NonLocalNode,
+
    NonLocalNode(node3): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    // Ensure distinct nodes
+
    if node1 == node2 || node2 == node3 || node1 == node3 {
+
        return TestResult::discard();
+
    }
+

+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Initial state: 0 outbound
+
    assert_eq!(connections.number_of_outbound_connections(), 0);
+

+
    // Initial connections are not counted
+
    connections.connect(
+
        command::Connect {
+
            node: node1,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    assert_eq!(connections.number_of_outbound_connections(), 0);
+

+
    connections.attempted(command::Attempt { node: node1 });
+
    assert_eq!(connections.number_of_outbound_connections(), 1);
+

+
    // Add Connected (outbound) - should count
+
    connections.connected(
+
        command::Connected::Outbound {
+
            node: node1,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    assert_eq!(connections.number_of_outbound_connections(), 1);
+

+
    // Add Connected (inbound) - should NOT count
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node: node2,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+
    assert_eq!(connections.number_of_outbound_connections(), 1);
+

+
    // Disconnect outbound to Disconnected - should NOT count
+
    connections.disconnected(
+
        command::Disconnect {
+
            node: node1,
+
            link: Link::Outbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+
    assert_eq!(connections.number_of_outbound_connections(), 0);
+

+
    TestResult::passed()
+
}
+

+
/// Message Handling for Disconnected Nodes
+
///
+
/// Messages from disconnected nodes return Disconnected and don't modify state.
+
#[quickcheck]
+
fn prop_message_from_disconnected(
+
    NonLocalNode(node): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    // Connect then disconnect
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    connections.disconnected(
+
        command::Disconnect {
+
            node,
+
            link: Link::Inbound,
+
            since: now,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        &DisconnectReason::Command,
+
    );
+
    assert!(connections.sessions().is_diconnected(&node));
+

+
    // Message to disconnected node
+
    let later = now + LocalDuration::from_secs(10);
+
    match connections.handle_message(
+
        command::Message {
+
            node,
+
            payload: None,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        later,
+
    ) {
+
        event::HandledMessage::Disconnected { node: n } if n == node => {}
+
        other => {
+
            return TestResult::error(format!(
+
                "Expected Disconnected for message to disconnected node, got {:?}",
+
                other
+
            ))
+
        }
+
    }
+

+
    // State should not have changed
+
    assert!(connections.sessions().is_diconnected(&node));
+
    TestResult::passed()
+
}
+

+
/// Stabilization Batch Correctness
+
///
+
/// stabilise returns exactly the sessions that transition to stable, not all stable sessions.
+
#[quickcheck]
+
fn prop_stabilise_returns_newly_stable(
+
    NonLocalNode(node1): NonLocalNode,
+
    NonLocalNode(node2): NonLocalNode,
+
    RoutableAddress(addr): RoutableAddress,
+
    ArbitraryTime(now): ArbitraryTime,
+
) -> TestResult {
+
    if node1 == node2 {
+
        return TestResult::discard();
+
    }
+

+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+

+
    let stale_connection = connections.config().stale();
+

+
    // Connect first session
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node: node1,
+
            addr: addr.clone(),
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        now,
+
    );
+

+
    // Stabilise first session
+
    let after_threshold = now + stale_connection + LocalDuration::from_secs(1);
+
    let stabilised1 = connections.stabilise(after_threshold);
+
    assert_eq!(stabilised1.len(), 1);
+
    assert_eq!(stabilised1[0].node(), node1);
+

+
    // Connect second session at later time
+
    let later = after_threshold + LocalDuration::from_secs(1);
+
    connections.connected(
+
        command::Connected::Inbound {
+
            node: node2,
+
            addr,
+
            connection_type: ConnectionType::Persistent,
+
        },
+
        later,
+
    );
+

+
    // Stabilise again - first session is already stable, should not be returned
+
    let much_later = later + stale_connection + LocalDuration::from_secs(1);
+
    let stabilised2 = connections.stabilise(much_later);
+
    assert_eq!(stabilised2.len(), 1);
+
    assert_eq!(stabilised2[0].node(), node2);
+

+
    // Stabilise again - both already stable, should return empty
+
    let even_later = much_later + LocalDuration::from_secs(1);
+
    let stabilised3 = connections.stabilise(even_later);
+
    assert!(stabilised3.is_empty());
+

+
    TestResult::passed()
+
}
+

+
// =============================================================================
+
// Comprehensive Invariant Test
+
// =============================================================================
+

+
/// All invariants hold after any command sequence.
+
#[quickcheck]
+
fn prop_all_invariants(commands: Vec<TestCommand>) -> TestResult {
+
    let local = NonLocalNode::local_node();
+
    let mut connections = new_connections(local);
+
    let mut time = LocalTime::from_secs(1577836800);
+

+
    for (i, cmd) in commands.iter().enumerate() {
+
        apply_command(&mut connections, cmd.clone(), &mut time);
+

+
        if let Err(e) = check_invariants(&connections, &local) {
+
            return TestResult::error(format!("Invariant violated after command {}: {}", i, e));
+
        }
+
    }
+

+
    TestResult::passed()
+
}
added crates/radicle-protocol/src/connections/state/test/arbitrary.rs
@@ -0,0 +1,351 @@
+
//! Arbitrary implementations for property-based testing of connections.
+

+
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
+

+
use localtime::LocalTime;
+
use qcheck::{Arbitrary, Gen};
+
use radicle::crypto;
+
use radicle::node::{address, Address, HostName, Link, NodeId};
+

+
use crate::connections::session::ConnectionType;
+

+
// =============================================================================
+
// Generation Functions (for types we don't own)
+
// =============================================================================
+

+
pub fn link(g: &mut Gen) -> Link {
+
    if bool::arbitrary(g) {
+
        Link::Inbound
+
    } else {
+
        Link::Outbound
+
    }
+
}
+

+
pub fn local_time(g: &mut Gen) -> LocalTime {
+
    // Generate time between year 2020 and 2030
+
    let secs = u64::arbitrary(g) % (10 * 365 * 24 * 60 * 60);
+
    LocalTime::from_secs(1577836800 + secs)
+
}
+

+
pub fn routable_ip(g: &mut Gen) -> IpAddr {
+
    loop {
+
        let ip: IpAddr = if bool::arbitrary(g) {
+
            IpAddr::V4(Ipv4Addr::from(u32::arbitrary(g)))
+
        } else {
+
            let octets: [u8; 16] = Arbitrary::arbitrary(g);
+
            IpAddr::V6(Ipv6Addr::from(octets))
+
        };
+
        if !ip.is_loopback() && !ip.is_unspecified() {
+
            return ip;
+
        }
+
    }
+
}
+

+
// =============================================================================
+
// Newtype Wrappers for Quickcheck Integration
+
// =============================================================================
+

+
/// Newtype for LocalTime that implements Arbitrary.
+
#[derive(Clone, Debug)]
+
pub struct ArbitraryTime(pub LocalTime);
+

+
impl Arbitrary for ArbitraryTime {
+
    fn arbitrary(g: &mut Gen) -> Self {
+
        ArbitraryTime(local_time(g))
+
    }
+

+
    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
+
        // Shrink toward epoch (1577836800 = 2020-01-01)
+
        let secs = self.0.as_secs();
+
        let base = 1577836800u64;
+
        if secs > base {
+
            Box::new(std::iter::once(ArbitraryTime(LocalTime::from_secs(base))))
+
        } else {
+
            Box::new(std::iter::empty())
+
        }
+
    }
+
}
+

+
/// Newtype for Link that implements Arbitrary.
+
#[derive(Clone, Debug)]
+
pub struct ArbitraryLink(pub Link);
+

+
impl Arbitrary for ArbitraryLink {
+
    fn arbitrary(g: &mut Gen) -> Self {
+
        ArbitraryLink(link(g))
+
    }
+

+
    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
+
        // Shrink Outbound to Inbound
+
        match self.0 {
+
            Link::Outbound => Box::new(std::iter::once(ArbitraryLink(Link::Inbound))),
+
            Link::Inbound => Box::new(std::iter::empty()),
+
        }
+
    }
+
}
+

+
/// Newtype for NodeId that is never equal to the test local node.
+
#[derive(Clone, Debug)]
+
pub struct NonLocalNode(pub NodeId);
+

+
impl NonLocalNode {
+
    pub(super) fn local_node() -> NodeId {
+
        NodeId::from(crypto::PublicKey::from([1u8; 32]))
+
    }
+
}
+

+
impl Arbitrary for NonLocalNode {
+
    fn arbitrary(g: &mut Gen) -> Self {
+
        let local = Self::local_node();
+
        loop {
+
            let node = NodeId::arbitrary(g);
+
            if node != local {
+
                return NonLocalNode(node);
+
            }
+
        }
+
    }
+

+
    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
+
        let local = Self::local_node();
+
        Box::new(
+
            self.0
+
                .shrink()
+
                .filter(move |n| *n != local)
+
                .map(NonLocalNode),
+
        )
+
    }
+
}
+

+
/// Newtype for Address with a routable IP.
+
#[derive(Clone, Debug)]
+
pub struct RoutableAddress(pub Address);
+

+
impl Arbitrary for RoutableAddress {
+
    fn arbitrary(g: &mut Gen) -> Self {
+
        loop {
+
            let ip: IpAddr = if bool::arbitrary(g) {
+
                IpAddr::V4(Ipv4Addr::from(u32::arbitrary(g)))
+
            } else {
+
                let octets: [u8; 16] = Arbitrary::arbitrary(g);
+
                IpAddr::V6(Ipv6Addr::from(octets))
+
            };
+
            if address::is_routable(&ip) {
+
                let port = u16::arbitrary(g);
+
                let addr = Address::from(cyphernet::addr::NetAddr {
+
                    host: HostName::Ip(ip),
+
                    port,
+
                });
+
                return RoutableAddress(addr);
+
            }
+
        }
+
    }
+

+
    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
+
        // Shrinking while maintaining routability is complex; skip it
+
        Box::new(std::iter::empty())
+
    }
+
}
+

+
// =============================================================================
+
// ConnectionType Arbitrary
+
// =============================================================================
+

+
impl Arbitrary for ConnectionType {
+
    fn arbitrary(g: &mut Gen) -> Self {
+
        if bool::arbitrary(g) {
+
            ConnectionType::Ephemeral
+
        } else {
+
            ConnectionType::Persistent
+
        }
+
    }
+

+
    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
+
        // Shrink Persistent to Ephemeral
+
        match self {
+
            ConnectionType::Persistent => Box::new(std::iter::once(ConnectionType::Ephemeral)),
+
            ConnectionType::Ephemeral => Box::new(std::iter::empty()),
+
        }
+
    }
+
}
+

+
// =============================================================================
+
// Test Command
+
// =============================================================================
+

+
/// A command that can be applied to the Connections state machine.
+
#[derive(Clone, Debug)]
+
pub enum TestCommand {
+
    Accept {
+
        ip: IpAddr,
+
    },
+
    Connect {
+
        node: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
    },
+
    Attempt {
+
        node: NodeId,
+
    },
+
    ConnectedInbound {
+
        node: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
    },
+
    ConnectedOutbound {
+
        node: NodeId,
+
        addr: Address,
+
        connection_type: ConnectionType,
+
    },
+
    Disconnect {
+
        node: NodeId,
+
        link: Link,
+
        connection_type: ConnectionType,
+
    },
+
    Reconnect {
+
        node: NodeId,
+
    },
+
    Message {
+
        node: NodeId,
+
        connection_type: ConnectionType,
+
    },
+
}
+

+
impl Arbitrary for TestCommand {
+
    fn arbitrary(g: &mut Gen) -> Self {
+
        let choice = u8::arbitrary(g) % 8;
+

+
        match choice {
+
            0 => TestCommand::Accept { ip: routable_ip(g) },
+
            1 => TestCommand::Connect {
+
                node: NodeId::arbitrary(g),
+
                addr: Address::arbitrary(g),
+
                connection_type: ConnectionType::arbitrary(g),
+
            },
+
            2 => TestCommand::Attempt {
+
                node: NodeId::arbitrary(g),
+
            },
+
            3 => TestCommand::ConnectedInbound {
+
                node: NodeId::arbitrary(g),
+
                addr: Address::arbitrary(g),
+
                connection_type: ConnectionType::arbitrary(g),
+
            },
+
            4 => TestCommand::ConnectedOutbound {
+
                node: NodeId::arbitrary(g),
+
                addr: Address::arbitrary(g),
+
                connection_type: ConnectionType::arbitrary(g),
+
            },
+
            5 => TestCommand::Disconnect {
+
                node: NodeId::arbitrary(g),
+
                link: ArbitraryLink::arbitrary(g).0,
+
                connection_type: ConnectionType::arbitrary(g),
+
            },
+
            6 => TestCommand::Reconnect {
+
                node: NodeId::arbitrary(g),
+
            },
+
            _ => TestCommand::Message {
+
                node: NodeId::arbitrary(g),
+
                connection_type: ConnectionType::arbitrary(g),
+
            },
+
        }
+
    }
+

+
    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
+
        match self {
+
            TestCommand::Connect {
+
                node,
+
                addr,
+
                connection_type,
+
            } => {
+
                let node = *node;
+
                let addr = addr.clone();
+
                let ct = *connection_type;
+

+
                // Shrink node, then try simpler command
+
                let node_shrinks = node.shrink().map(move |n| TestCommand::Connect {
+
                    node: n,
+
                    addr: addr.clone(),
+
                    connection_type: ct,
+
                });
+
                let simpler = std::iter::once(TestCommand::Attempt { node });
+

+
                Box::new(node_shrinks.chain(simpler))
+
            }
+
            TestCommand::ConnectedInbound {
+
                node,
+
                addr,
+
                connection_type,
+
            } => {
+
                let node = *node;
+
                let addr = addr.clone();
+
                let ct = *connection_type;
+

+
                let node_shrinks = node.shrink().map(move |n| TestCommand::ConnectedInbound {
+
                    node: n,
+
                    addr: addr.clone(),
+
                    connection_type: ct,
+
                });
+
                let simpler = std::iter::once(TestCommand::Attempt { node });
+

+
                Box::new(node_shrinks.chain(simpler))
+
            }
+
            TestCommand::ConnectedOutbound {
+
                node,
+
                addr,
+
                connection_type,
+
            } => {
+
                let node = *node;
+
                let addr = addr.clone();
+
                let ct = *connection_type;
+

+
                let node_shrinks = node.shrink().map(move |n| TestCommand::ConnectedOutbound {
+
                    node: n,
+
                    addr: addr.clone(),
+
                    connection_type: ct,
+
                });
+
                let simpler = std::iter::once(TestCommand::Attempt { node });
+

+
                Box::new(node_shrinks.chain(simpler))
+
            }
+
            TestCommand::Disconnect {
+
                node,
+
                link,
+
                connection_type,
+
            } => {
+
                let node = *node;
+
                let link = *link;
+
                let ct = *connection_type;
+

+
                let node_shrinks = node.shrink().map(move |n| TestCommand::Disconnect {
+
                    node: n,
+
                    link,
+
                    connection_type: ct,
+
                });
+

+
                Box::new(node_shrinks)
+
            }
+
            TestCommand::Attempt { node } => {
+
                let node_shrinks = node.shrink().map(|n| TestCommand::Attempt { node: n });
+
                Box::new(node_shrinks)
+
            }
+
            TestCommand::Reconnect { node } => {
+
                let node_shrinks = node.shrink().map(|n| TestCommand::Reconnect { node: n });
+
                Box::new(node_shrinks)
+
            }
+
            TestCommand::Message {
+
                node,
+
                connection_type,
+
            } => {
+
                let node = *node;
+
                let ct = *connection_type;
+

+
                let node_shrinks = node.shrink().map(move |n| TestCommand::Message {
+
                    node: n,
+
                    connection_type: ct,
+
                });
+

+
                Box::new(node_shrinks)
+
            }
+
            TestCommand::Accept { .. } => Box::new(std::iter::empty()),
+
        }
+
    }
+
}
added crates/radicle-protocol/src/connections/state/test/invariants.rs
@@ -0,0 +1,182 @@
+
//! Invariant checking functions for connection state management.
+

+
use std::collections::HashSet;
+

+
use radicle::node::{Link, NodeId};
+

+
use crate::connections::session::Sessions;
+
use crate::connections::state::Connections;
+

+
// =============================================================================
+
// Error Types
+
// =============================================================================
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub enum InvariantViolation {
+
    /// A node appears in multiple state collections
+
    DuplicateSession { node: NodeId },
+
    /// The local node appears in a session collection
+
    LocalNodeInSession { node: NodeId },
+
    /// Session existence check is inconsistent with state checks
+
    SessionExistenceInconsistent { node: NodeId },
+
    /// Link count mismatch
+
    LinkCountMismatch {
+
        link: Link,
+
        counted: usize,
+
        reported: usize,
+
    },
+
}
+

+
impl std::fmt::Display for InvariantViolation {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            Self::DuplicateSession { node } => {
+
                write!(f, "Node {:?} appears in multiple states", node)
+
            }
+
            Self::LocalNodeInSession { node } => {
+
                write!(f, "Local node {:?} found in sessions", node)
+
            }
+
            Self::SessionExistenceInconsistent { node } => {
+
                write!(f, "Session existence inconsistent for node {:?}", node)
+
            }
+
            Self::LinkCountMismatch {
+
                link,
+
                counted,
+
                reported,
+
            } => {
+
                write!(
+
                    f,
+
                    "{:?} count mismatch: counted={}, reported={}",
+
                    link, counted, reported
+
                )
+
            }
+
        }
+
    }
+
}
+

+
impl std::error::Error for InvariantViolation {}
+

+
// =============================================================================
+
// Invariant Checking Functions
+
// =============================================================================
+

+
/// Check all core invariants on a Connections instance.
+
pub fn check_invariants(
+
    connections: &Connections,
+
    local: &NodeId,
+
) -> Result<(), InvariantViolation> {
+
    let sessions = connections.sessions();
+
    check_single_session_per_node(sessions)?;
+
    check_local_node_exclusion(sessions, local)?;
+
    check_session_existence_consistency(sessions)?;
+
    check_link_count_consistency(sessions)?;
+
    Ok(())
+
}
+

+
/// A node should only appear in the sessions exactly once, or not at all.
+
pub fn check_single_session_per_node(sessions: &Sessions) -> Result<(), InvariantViolation> {
+
    let mut seen_nodes: HashSet<NodeId> = HashSet::new();
+
    for (node, _) in sessions.iter() {
+
        if !seen_nodes.insert(*node) {
+
            return Err(InvariantViolation::DuplicateSession { node: *node });
+
        }
+
    }
+
    Ok(())
+
}
+

+
/// The local node should never appear in any session collection.
+
pub fn check_local_node_exclusion(
+
    sessions: &Sessions,
+
    local: &NodeId,
+
) -> Result<(), InvariantViolation> {
+
    if sessions.has_session_for(local) {
+
        return Err(InvariantViolation::LocalNodeInSession { node: *local });
+
    }
+
    Ok(())
+
}
+

+
/// For every session, the corresponding node should appear exactly once.
+
pub fn check_session_existence_consistency(sessions: &Sessions) -> Result<(), InvariantViolation> {
+
    for (node, _) in sessions.iter() {
+
        let has_session = sessions.has_session_for(node);
+
        let state_count = sessions.is_initial(node) as u8
+
            + sessions.is_attempted(node) as u8
+
            + sessions.get_connected(node).is_some() as u8
+
            + sessions.is_diconnected(node) as u8;
+

+
        if has_session && state_count != 1 {
+
            return Err(InvariantViolation::SessionExistenceInconsistent { node: *node });
+
        }
+
        if !has_session && state_count != 0 {
+
            return Err(InvariantViolation::SessionExistenceInconsistent { node: *node });
+
        }
+
    }
+
    Ok(())
+
}
+

+
/// For every connected session, the computed link counts should match.
+
pub fn check_link_count_consistency(sessions: &Sessions) -> Result<(), InvariantViolation> {
+
    let mut inbound_count = 0;
+
    let mut outbound_count = 0;
+

+
    for session in sessions.connected().sessions() {
+
        match session.link() {
+
            Link::Inbound => inbound_count += 1,
+
            Link::Outbound => outbound_count += 1,
+
        }
+
    }
+

+
    if inbound_count != sessions.connected_inbound() {
+
        return Err(InvariantViolation::LinkCountMismatch {
+
            link: Link::Inbound,
+
            counted: inbound_count,
+
            reported: sessions.connected_inbound(),
+
        });
+
    }
+
    if outbound_count != sessions.connected_outbound() {
+
        return Err(InvariantViolation::LinkCountMismatch {
+
            link: Link::Outbound,
+
            counted: outbound_count,
+
            reported: sessions.connected_outbound(),
+
        });
+
    }
+
    Ok(())
+
}
+

+
// =============================================================================
+
// State Transition Oracle
+
// =============================================================================
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum SessionState {
+
    Initial,
+
    Attempted,
+
    Connected,
+
    Disconnected,
+
}
+

+
/// Check if a state transition is explicitly invalid.
+
pub fn is_invalid_transition(from: SessionState, to: SessionState) -> bool {
+
    matches!(
+
        (from, to),
+
        (SessionState::Attempted, SessionState::Initial)
+
            | (SessionState::Connected, SessionState::Initial)
+
            | (SessionState::Connected, SessionState::Attempted)
+
            | (SessionState::Disconnected, SessionState::Attempted)
+
    )
+
}
+

+
/// Determine the current state of a session.
+
pub fn get_session_state(sessions: &Sessions, node: &NodeId) -> Option<SessionState> {
+
    if sessions.is_initial(node) {
+
        Some(SessionState::Initial)
+
    } else if sessions.is_attempted(node) {
+
        Some(SessionState::Attempted)
+
    } else if sessions.get_connected(node).is_some() {
+
        Some(SessionState::Connected)
+
    } else if sessions.is_diconnected(node) {
+
        Some(SessionState::Disconnected)
+
    } else {
+
        None // Session doesn't exist
+
    }
+
}
modified crates/radicle-protocol/src/lib.rs
@@ -1,4 +1,5 @@
pub mod bounded;
+
pub mod connections;
pub mod deserializer;
pub mod fetcher;
pub mod service;
modified crates/radicle-protocol/src/service.rs
@@ -9,10 +9,8 @@ pub mod limiter;
pub mod message;
pub mod session;

-
use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::net::IpAddr;
-
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, net, time};

@@ -26,18 +24,19 @@ use radicle::identity::Doc;
use radicle::node;
use radicle::node::address;
use radicle::node::address::Store as _;
-
use radicle::node::address::{AddressBook, AddressType, KnownAddress};
+
use radicle::node::address::{AddressType, KnownAddress};
use radicle::node::config::{PeerConfig, RateLimit};
use radicle::node::device::Device;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
-
use radicle::node::{ConnectOptions, Penalty, Severity};
+
use radicle::node::{ConnectOptions, Penalty};
use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::RepositoryError;
use radicle_fetch::policy::SeedingPolicy;

+
use crate::connections;
use crate::fetcher;
use crate::fetcher::service::FetcherService;
use crate::fetcher::FetcherState;
@@ -50,9 +49,7 @@ use radicle::identity::RepoId;
use radicle::node::events::Emitter;
use radicle::node::routing;
use radicle::node::routing::InsertResult;
-
use radicle::node::{
-
    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
-
};
+
use radicle::node::{Address, Alias, Features, FetchResult, Seed, Seeds, SyncStatus, SyncedAt};
use radicle::prelude::*;
use radicle::storage;
use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
@@ -65,7 +62,6 @@ use radicle::node::PROTOCOL_VERSION;
use crate::bounded::BoundedVec;
use crate::service::filter::Filter;
pub use crate::service::message::{Message, ZeroBytes};
-
pub use crate::service::session::{QueuedFetch, Session};
use crate::worker::FetchError;
use radicle::node::events::{Event, Events};
use radicle::node::{Config, NodeId};
@@ -77,8 +73,6 @@ use self::limiter::RateLimiter;
use self::message::InventoryAnnouncement;
use self::policy::NamespacesError;

-
/// How often to run the "idle" task.
-
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
/// How often to run the "gossip" task.
pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
/// How often to run the "announce" task.
@@ -87,10 +81,6 @@ pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
/// How often to run the "prune" task.
pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
-
/// Duration to wait on an unresponsive peer before dropping its connection.
-
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
-
/// How much time should pass after a peer was last active for a *ping* to be sent.
-
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
/// Maximum number of latency values to keep for a session.
pub const MAX_LATENCIES: usize = 16;
/// Maximum time difference between the local time, and an announcement timestamp.
@@ -111,8 +101,6 @@ pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
/// How long to wait for a fetch to stall before aborting, default is 3s.
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
-
/// Target number of peers to maintain connections to.
-
pub const TARGET_OUTBOUND_PEERS: usize = 8;

/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
@@ -398,7 +386,7 @@ pub struct Service<D, S, G> {
    /// Policy configuration.
    policies: policy::Config<Write>,
    /// Peer sessions, currently or recently connected.
-
    sessions: Sessions,
+
    connections: connections::state::Connections,
    /// Clock. Tells the time.
    clock: LocalTime,
    /// Who relayed what announcement to us. We keep track of this to ensure that
@@ -475,11 +463,13 @@ where
        node: NodeAnnouncement,
        emitter: Emitter<Event>,
    ) -> Self {
-
        let sessions = Sessions::new(rng.clone());
        let limiter = RateLimiter::new(config.peers());
        let last_timestamp = node.timestamp;
        let clock = LocalTime::default(); // Updated on initialize.
        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
+

+
        // TODO(finto): `Connections` and `Fetcher` should be configured outside
+
        // of `Service::new` so that they can be setup in a fallible environment
        let fetcher = {
            let config = fetcher::Config::new()
                .with_max_concurrency(
@@ -489,6 +479,25 @@ where
                .with_max_capacity(fetcher::MaxQueueSize::default());
            FetcherService::new(config)
        };
+
        let connections = {
+
            use connections::config::{Durations, Inbound, Outbound};
+
            let durations = Durations::default();
+
            let inbound = Inbound::from(RateLimit::from(config.limits.rate.inbound));
+
            let outbound = Outbound {
+
                rate_limit: config.limits.rate.outbound.into(),
+
                target: connections::config::TARGET_OUTBOUND_PEERS,
+
            };
+
            let connections_config = connections::Config {
+
                durations,
+
                inbound,
+
                outbound,
+
            };
+
            connections::state::Connections::new(
+
                *signer.node_id(),
+
                connections_config,
+
                RateLimiter::new(config.peers()),
+
            )
+
        };
        Self {
            config,
            storage,
@@ -501,7 +510,7 @@ where
            db,
            outbox: Outbox::default(),
            limiter,
-
            sessions,
+
            connections,
            fetcher,
            filter: Filter::empty(),
            relayed_by: HashMap::default(),
@@ -742,12 +751,16 @@ where
        // Try to establish some connections.
        self.maintain_connections();
        // Start periodic tasks.
-
        self.outbox.wakeup(IDLE_INTERVAL);
+
        self.outbox.wakeup(self.idle_interval());
        self.outbox.wakeup(GOSSIP_INTERVAL);

        Ok(())
    }

+
    pub fn idle_interval(&self) -> LocalDuration {
+
        self.connections.config().idle()
+
    }
+

    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
        trace!(
            target: "service",
@@ -777,7 +790,7 @@ where
            now - self.started_at.expect("Service::wake: service must be initialized")
        );

-
        if now - self.last_idle >= IDLE_INTERVAL {
+
        if now - self.last_idle >= self.idle_interval() {
            trace!(target: "service", "Running 'idle' task...");

            self.keep_alive(&now);
@@ -785,7 +798,7 @@ where
            self.idle_connections();
            self.maintain_connections();
            self.dequeue_fetches();
-
            self.outbox.wakeup(IDLE_INTERVAL);
+
            self.outbox.wakeup(self.idle_interval());
            self.last_idle = now;
        }
        if now - self.last_gossip >= GOSSIP_INTERVAL {
@@ -841,6 +854,8 @@ where
        match cmd {
            Command::Connect(nid, addr, opts) => {
                if opts.persistent {
+
                    // TODO(finto): I think these should live in the `Connections`
+
                    // as the persisted peers.
                    self.config.connect.insert((nid, addr.clone()).into());
                }
                if let Err(e) = self.connect(nid, addr) {
@@ -894,7 +909,7 @@ where
                // Let all our peers know that we're interested in this repo from now on.
                self.outbox.broadcast(
                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
-
                    self.sessions.connected().map(|(_, s)| s),
+
                    self.connections.sessions().connected().sessions(),
                );
            }
            Command::Unseed(id, resp) => {
@@ -996,18 +1011,12 @@ where
    ) {
        let session = {
            let reason = format!("peer {from} is not connected; cannot initiate fetch");
-
            let Some(session) = self.sessions.get_mut(&from) else {
+
            let Some(session) = self.connections.get_connected(&from) else {
                if let Some(c) = channel {
                    c.send(FetchResult::Failed { reason }).ok();
                }
                return;
            };
-
            if !session.is_connected() {
-
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed { reason }).ok();
-
                }
-
                return;
-
            }
            session
        };

@@ -1168,25 +1177,21 @@ where
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
    pub fn dequeue_fetches(&mut self) {
-
        let sessions = self
-
            .sessions
-
            .shuffled()
-
            .map(|(k, _)| *k)
+
        let mut connected = self
+
            .sessions()
+
            .connected()
+
            .node_ids()
+
            .copied()
            .collect::<Vec<_>>();
+
        self.rng.shuffle(&mut connected);

-
        for nid in sessions {
-
            #[allow(clippy::unwrap_used)]
-
            let sess = self.sessions.get_mut(&nid).unwrap();
-
            if !sess.is_connected() {
-
                continue;
-
            }
-

+
        for node in connected {
            let Some(fetcher::QueuedFetch {
                rid,
                from,
                refs_at,
                timeout,
-
            }) = self.fetcher.dequeue(&nid)
+
            }) = self.fetcher.dequeue(&node)
            else {
                continue;
            };
@@ -1216,15 +1221,9 @@ where

    /// Inbound connection attempt.
    pub fn accepted(&mut self, ip: IpAddr) -> bool {
-
        // Always accept localhost connections, even if we already reached
-
        // our inbound connection limit.
-
        if ip.is_loopback() || ip.is_unspecified() {
-
            return true;
-
        }
-
        // Check for inbound connection limit.
-
        if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
-
            return false;
-
        }
+
        use connections::state::command;
+
        use connections::state::event;
+

        match self.db.addresses().is_ip_banned(ip) {
            Ok(banned) => {
                if banned {
@@ -1232,26 +1231,41 @@ where
                    return false;
                }
            }
-
            Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
+
            Err(e) => {
+
                warn!(target: "service", "Failed to query ban status for {ip}: {e}");
+
                return false;
+
            }
        }
-
        let host: HostName = ip.into();
-
        let tokens = self.config.limits.rate.inbound;
-

-
        if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
-
            trace!(target: "service", "Rate limiting inbound connection from {host}..");
-
            return false;
+
        match self.connections.accept(command::Accept { ip }, self.clock) {
+
            event::Accept::LimitExceeded {
+
                ip: _,
+
                current_inbound: _,
+
            } => false,
+
            event::Accept::HostLimited { ip } => {
+
                trace!(target: "service", "Rate limiting inbound connection from {ip}..");
+
                false
+
            }
+
            // Always accept localhost connections, even if we already reached
+
            // our inbound connection limit.
+
            event::Accept::LocalHost { ip: _ } => true,
+
            event::Accept::Accepted { ip: _ } => true,
        }
-
        true
    }

    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
-
        debug!(target: "service", "Attempted connection to {nid} ({addr})");
-

-
        if let Some(sess) = self.sessions.get_mut(&nid) {
-
            sess.to_attempted();
-
        } else {
-
            #[cfg(debug_assertions)]
-
            panic!("Service::attempted: unknown session {nid}@{addr}");
+
        use connections::state::command;
+
        use connections::state::event;
+
        match self.connections.attempted(command::Attempt { node: nid }) {
+
            event::Attempted::ConnectionAttempt { session } => {
+
                debug!(target: "service", "Attempted connection nid={nid} addr={addr} link={} persistent={}", session.link(), session.persistent());
+
            }
+
            event::Attempted::MissingSession { node: _ } => {
+
                #[cfg(debug_assertions)]
+
                panic!("Service::attempted: unknown session {nid}@{addr}");
+
            }
+
            event::Attempted::SelfConnection { node } => {
+
                debug!(target: "service", "Attempted connection to this running node {node}");
+
            }
        }
    }

@@ -1262,88 +1276,105 @@ where
    }

    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
-
        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
-
        self.emitter.emit(Event::PeerConnected { nid: remote });
+
        use connections::state::command;
+
        use connections::state::event;

        let msgs = self.initial(link);
-

-
        if link.is_outbound() {
-
            if let Some(peer) = self.sessions.get_mut(&remote) {
-
                peer.to_connected(self.clock);
-
                self.outbox.write_all(peer, msgs);
+
        let connection_type = self.connection_type(&remote);
+
        let command = match link {
+
            Link::Outbound => command::Connected::Outbound {
+
                node: remote,
+
                addr: addr.clone(),
+
                connection_type,
+
            },
+
            Link::Inbound => command::Connected::Inbound {
+
                node: remote,
+
                addr: addr.clone(),
+
                connection_type,
+
            },
+
        };
+
        match self.connections.connected(command, self.clock) {
+
            event::Connected::Established { session: _ } => {
+
                info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
+
                self.emitter.emit(Event::PeerConnected { nid: remote });
+
                self.outbox.write_all(remote, msgs);
            }
-
        } else {
-
            match self.sessions.entry(remote) {
-
                Entry::Occupied(mut e) => {
-
                    // In this scenario, it's possible that our peer is persistent, and
-
                    // disconnected. We get an inbound connection before we attempt a re-connection,
-
                    // and therefore we treat it as a regular inbound connection.
-
                    //
-
                    // It's also possible that a disconnection hasn't gone through yet and our
-
                    // peer is still in connected state here, while a new inbound connection from
-
                    // that same peer is made. This results in a new connection from a peer that is
-
                    // already connected from the perspective of the service. This appears to be
-
                    // a bug in the underlying networking library.
-
                    let peer = e.get_mut();
-
                    debug!(
-
                        target: "service",
-
                        "Connecting peer {remote} already has a session open ({peer})"
-
                    );
-
                    peer.link = link;
-
                    peer.to_connected(self.clock);
-
                    self.outbox.write_all(peer, msgs);
-
                }
-
                Entry::Vacant(e) => {
-
                    if let HostName::Ip(ip) = addr.host {
-
                        if !address::is_local(&ip) {
-
                            if let Err(e) =
-
                                self.db
-
                                    .addresses_mut()
-
                                    .record_ip(&remote, ip, self.clock.into())
-
                            {
-
                                log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
-
                            }
-
                        }
-
                    }
-
                    let peer = e.insert(Session::inbound(
-
                        remote,
-
                        addr,
-
                        self.config.is_persistent(&remote),
-
                        self.rng.clone(),
-
                        self.clock,
-
                    ));
-
                    self.outbox.write_all(peer, msgs);
-
                }
+
            event::Connected::MissingSession { node } => {
+
                debug!(target: "service", "Could not transition {node} to connect since its session is missing");
+
            }
+
            event::Connected::SelfConnection { node } => {
+
                warn!(target: "service", "Connected to local node {node}");
            }
        }
    }

    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
-
        let since = self.local_time();
-
        let Some(session) = self.sessions.get_mut(&remote) else {
-
            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
-
            // disconnection event once the transport is dropped.
-
            trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
-
            return;
+
        use connections::state::command;
+
        use connections::state::event;
+

+
        let command = command::Disconnect {
+
            node: remote,
+
            link,
+
            connection_type: self.connection_type(&remote),
+
            since: self.clock,
        };
-
        // In cases of connection conflicts, there may be disconnections of one of the two
-
        // connections. In that case we don't want the service to remove the session.
-
        if session.link != link {
-
            return;
+
        match self.connections.disconnected(command, reason) {
+
            event::Disconnected::Retry {
+
                session: _,
+
                retry_at: _,
+
                delay,
+
            } => {
+
                info!(target: "service", "Disconnected from {remote} ({reason})");
+
                self.emitter.emit(Event::PeerDisconnected {
+
                    nid: remote,
+
                    reason: reason.to_string(),
+
                });
+
                debug!(target: "service", "Reconnecting to {remote} in {delay}..");
+
                self.outbox.wakeup(delay);
+
            }
+
            event::Disconnected::Severed { session, severity } => {
+
                info!(target: "service", "Disconnected from {remote} ({reason})");
+
                self.emitter.emit(Event::PeerDisconnected {
+
                    nid: remote,
+
                    reason: reason.to_string(),
+
                });
+
                if let Err(e) =
+
                    self.db
+
                        .addresses_mut()
+
                        .disconnected(&remote, session.address(), severity)
+
                {
+
                    error!(target: "service", "Error updating address store: {e}");
+
                }
+
                // Only re-attempt outbound connections, since we don't care if an inbound connection
+
                // is dropped.
+
                if link.is_outbound() {
+
                    self.maintain_connections();
+
                }
+
            }
+
            event::Disconnected::MissingSession { node } => {
+
                debug!(target: "service", "Attempted to disconnect missing session {node}");
+
            }
+
            event::Disconnected::AlreadyDisconnected { node: _ } => {
+
                // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
+
                // disconnection event once the transport is dropped.
+
                trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
+
            }
+
            event::Disconnected::LinkConflict {
+
                node,
+
                found,
+
                expected,
+
            } => {
+
                // In cases of connection conflicts, there may be disconnections of one of the two
+
                // connections. In that case we don't want the service to remove the session.
+
                trace!(target: "service", "Conflicting sessions {node} found={found} expected={expected}");
+
            }
+
            event::Disconnected::SelfConnection { node } => {
+
                warn!(target: "service", "Disconnection came for local node {node}");
+
            }
        }

-
        info!(target: "service", "Disconnected from {remote} ({reason})");
-
        self.emitter.emit(Event::PeerDisconnected {
-
            nid: remote,
-
            reason: reason.to_string(),
-
        });
-

-
        let link = session.link;
-
        let addr = session.addr.clone();
-

        let cmd = fetcher::state::command::Cancel { from: remote };
        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
-

        match event {
            fetcher::state::event::Cancel::Unexpected { from } => {
                debug!(target: "service", "No fetches to cancel for {from}");
@@ -1366,53 +1397,6 @@ where
                .ok();
        }

-
        // Attempt to re-connect to persistent peers.
-
        if self.config.peer(&remote).is_some() {
-
            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
-
                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
-

-
            // Nb. We always try to reconnect to persistent peers, even when the error appears
-
            // to not be transient.
-
            session.to_disconnected(since, since + delay);
-

-
            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
-

-
            self.outbox.wakeup(delay);
-
        } else {
-
            debug!(target: "service", "Dropping peer {remote}..");
-
            self.sessions.remove(&remote);
-

-
            let severity = match reason {
-
                DisconnectReason::Dial(_)
-
                | DisconnectReason::Fetch(_)
-
                | DisconnectReason::Connection(_) => {
-
                    if self.is_online() {
-
                        // If we're "online", there's something wrong with this
-
                        // peer connection specifically.
-
                        Severity::Medium
-
                    } else {
-
                        Severity::Low
-
                    }
-
                }
-
                DisconnectReason::Session(e) => e.severity(),
-
                DisconnectReason::Command
-
                | DisconnectReason::Conflict
-
                | DisconnectReason::SelfConnection => Severity::Low,
-
            };
-

-
            if let Err(e) = self
-
                .db
-
                .addresses_mut()
-
                .disconnected(&remote, &addr, severity)
-
            {
-
                debug!(target: "service", "Failed to update address store: {e}");
-
            }
-
            // Only re-attempt outbound connections, since we don't care if an inbound connection
-
            // is dropped.
-
            if link.is_outbound() {
-
                self.maintain_connections();
-
            }
-
        }
        self.dequeue_fetches();
    }

@@ -1543,15 +1527,18 @@ where

                // Here we handle the special case where the inventory we received is that of
                // a connected peer, as opposed to being relayed to us.
-
                if let Some(sess) = self.sessions.get_mut(announcer) {
-
                    for id in message.inventory.as_slice() {
-
                        // If we are connected to the announcer of this inventory, update the peer's
-
                        // subscription filter to include all inventory items. This way, we'll
-
                        // relay messages relating to the peer's inventory.
-
                        if let Some(sub) = &mut sess.subscribe {
-
                            sub.filter.insert(id);
-
                        }
-

+
                for id in message.inventory.as_slice() {
+
                    // If we are connected to the announcer of this inventory, update the peer's
+
                    // subscription filter to include all inventory items. This way, we'll
+
                    // relay messages relating to the peer's inventory.
+
                    let should_route = matches!(
+
                        // The logic previously would consider a missing
+
                        // subscription as ok
+
                        self.connections.subscribe_to(announcer, id),
+
                        connections::session::SubscribeTo::NoSubscription
+
                            | connections::session::SubscribeTo::Subscribed
+
                    );
+
                    if should_route {
                        // If we're seeding and connected to the announcer, and we don't have
                        // the inventory, fetch it from the announcer.
                        if self.policies.is_seeding(id).expect(
@@ -1653,7 +1640,7 @@ where
                // Refs can be relayed by peers who don't have the data in storage,
                // therefore we only check whether we are connected to the *announcer*,
                // which is required by the protocol to only announce refs it has.
-
                let Some(remote) = self.sessions.get(announcer).cloned() else {
+
                let Some(remote) = self.connections.get_connected(announcer) else {
                    trace!(
                        target: "service",
                        "Skipping fetch of {}, no sessions connected to {announcer}",
@@ -1662,7 +1649,7 @@ where
                    return Ok(relay);
                };
                // Finally, start the fetch.
-
                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
+
                self.fetch_refs_at(message.rid, remote.node(), refs, scope, FETCH_TIMEOUT, None);

                return Ok(relay);
            }
@@ -1742,58 +1729,58 @@ where
        remote: &NodeId,
        message: Message,
    ) -> Result<(), session::Error> {
+
        use connections::state::command;
+
        use connections::state::command::Payload;
+
        use connections::state::event::HandledMessage;
+

        let local = self.node_id();
        let relay = self.config.is_relay();
-
        let Some(peer) = self.sessions.get_mut(remote) else {
-
            debug!(target: "service", "Session not found for {remote}");
-
            return Ok(());
-
        };
-
        peer.last_active = self.clock;

-
        let limit: RateLimit = match peer.link {
-
            Link::Outbound => self.config.limits.rate.outbound.into(),
-
            Link::Inbound => self.config.limits.rate.inbound.into(),
+
        let payload = match &message {
+
            // TODO(finto): I need to convince myself why this is always from the
+
            // sending node and not a relaying node – the previous code assumed so too.
+
            Message::Subscribe(subscribe) => Some(Payload::Subscribe(subscribe.clone())),
+
            Message::Announcement(_) => None,
+
            Message::Info(_) => None,
+
            Message::Ping(_) => None,
+
            Message::Pong { zeroes } => Some(Payload::pong(zeroes.clone(), self.clock)),
        };
-
        if self
-
            .limiter
-
            .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
-
        {
-
            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
-
            return Ok(());
-
        }
-
        message.log(log::Level::Debug, remote, Link::Inbound);
-

-
        let connected = match &mut peer.state {
-
            session::State::Disconnected { .. } => {
-
                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
+
        let command = command::Message {
+
            node: *remote,
+
            payload,
+
            connection_type: self.connection_type(remote),
+
        };
+
        let peer = match self.connections.handle_message(command, self.clock) {
+
            HandledMessage::MissingSession { node } => {
+
                debug!(target: "service", "Dropping message from unknown peer {}", node);
                return Ok(());
            }
-
            // In case of a discrepancy between the service state and the state of the underlying
-
            // wire protocol, we may receive a message from a peer that we consider not fully connected
-
            // at the service level. To remedy this, we simply transition the peer to a connected state.
-
            //
-
            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
-
            // solution to converge towards the same state.
-
            session::State::Attempted | session::State::Initial => {
-
                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
-
                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
-

-
                peer.to_connected(self.clock);
-

-
                None
+
            HandledMessage::Disconnected { node } => {
+
                debug!(target: "service", "Ignoring message from disconnected peer {}", node);
+
                return Ok(());
+
            }
+
            HandledMessage::RateLimited { node } => {
+
                info!(target: "service", "Peer {node} reached rate limit, ignoring message");
+
                return Ok(());
+
            }
+
            HandledMessage::Connected { session } => session,
+
            HandledMessage::Subscribed { session } => session,
+
            HandledMessage::Pinged { session, pinged: _ } => session,
+
            HandledMessage::SelfConnection { node } => {
+
                warn!(target: "service", "Message sender is the local node {node}");
+
                return Ok(());
            }
-
            session::State::Connected {
-
                ping, latencies, ..
-
            } => Some((ping, latencies)),
        };

+
        message.log(log::Level::Debug, remote, Link::Inbound);
+

        trace!(target: "service", "Received message {message:?} from {remote}");

        match message {
            // Process a peer announcement.
            Message::Announcement(ann) => {
                let relayer = remote;
-
                let relayer_addr = peer.addr.clone();
+
                let relayer_addr = peer.address().clone();

                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
                    if self.config.is_relay() {
@@ -1834,7 +1821,7 @@ where
                            }
                            // Only send messages if we're a relay, or it's our own messages.
                            if relay || ann.node == local {
-
                                self.outbox.write(peer, ann.into());
+
                                self.outbox.write(&peer, ann.into());
                            }
                        }
                    }
@@ -1842,7 +1829,6 @@ where
                        warn!(target: "service", "Failed to query gossip messages from store: {e}");
                    }
                }
-
                peer.subscribe = Some(subscribe);
            }
            Message::Info(info) => {
                self.handle_info(*remote, &info)?;
@@ -1853,30 +1839,13 @@ where
                    return Ok(());
                }
                self.outbox.write(
-
                    peer,
+
                    &peer,
                    Message::Pong {
                        zeroes: ZeroBytes::new(ponglen),
                    },
                );
            }
-
            Message::Pong { zeroes } => {
-
                if let Some((ping, latencies)) = connected {
-
                    if let session::PingState::AwaitingResponse {
-
                        len: ponglen,
-
                        since,
-
                    } = *ping
-
                    {
-
                        if (ponglen as usize) == zeroes.len() {
-
                            *ping = session::PingState::Ok;
-
                            // Keep track of peer latency.
-
                            latencies.push_back(self.clock - since);
-
                            if latencies.len() > MAX_LATENCIES {
-
                                latencies.pop_front();
-
                            }
-
                        }
-
                    }
-
                }
-
            }
+
            Message::Pong { zeroes: _ } => {}
        }
        Ok(())
    }
@@ -1949,15 +1918,6 @@ where
        ]
    }

-
    /// Try to guess whether we're online or not.
-
    fn is_online(&self) -> bool {
-
        self.sessions
-
            .connected()
-
            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
-
            .count()
-
            > 0
-
    }
-

    /// Remove a local repository from our inventory.
    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
        let node = self.node_id();
@@ -2140,7 +2100,7 @@ where
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
        let timestamp = ann.timestamp();
-
        let peers = self.sessions.connected().map(|(_, p)| p);
+
        let peers = self.connections.sessions().connected().sessions();

        // Update our sync status for our own refs. This is useful for determining if refs were
        // updated while the node was stopped.
@@ -2157,52 +2117,98 @@ where
            }
        }

+
        // Store our announcement so that it can be retrieved from us later, just like
+
        // announcements we receive from peers.
+
        if let Err(e) = self.db.gossip_mut().announced(&ann.node, &ann) {
+
            error!(target: "service", "Error updating our gossip store with announced message: {e}");
+
        }
+

        self.outbox.announce(
            ann,
            peers.filter(|p| {
                // Only announce to peers who are allowed to view this repo.
-
                doc.is_visible_to(&p.id.into())
+
                doc.is_visible_to(&p.node().into())
            }),
-
            self.db.gossip_mut(),
        );
        Ok((refs, timestamp))
    }

    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
-
        if let Some(sess) = self.sessions.get_mut(&nid) {
-
            sess.to_initial();
-
            self.outbox.connect(nid, addr);
+
        use connections::state::command;
+
        use connections::state::event;

-
            return true;
+
        match self.connections.reconnect(command::Reconnect { node: nid }) {
+
            event::Reconnect::Reconnecting { session } => {
+
                debug_assert_eq!(nid, session.node());
+
                debug_assert_eq!(addr, *session.address());
+
                self.outbox
+
                    .connect(session.node(), session.address().clone());
+
                true
+
            }
+
            event::Reconnect::MissingSession { node } => {
+
                debug!("Reconnecting to missing session for {node}");
+
                false
+
            }
+
            event::Reconnect::SelfConnection { node } => {
+
                warn!(target: "service", "Attempted reconnect with local node {node}");
+
                false
+
            }
        }
-
        false
    }

    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
-
        debug!(target: "service", "Connecting to {nid} ({addr})..");
-

-
        if nid == self.node_id() {
-
            return Err(ConnectError::SelfConnection);
-
        }
-
        if self.sessions.contains_key(&nid) {
-
            return Err(ConnectError::SessionExists { nid });
-
        }
-
        if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
-
            return Err(ConnectError::LimitReached { nid, addr });
-
        }
-
        let persistent = self.config.is_persistent(&nid);
-
        let timestamp: Timestamp = self.clock.into();
+
        use connections::state::command;
+
        use connections::state::event;

-
        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
-
            warn!(target: "service", "Failed to update address book with connection attempt: {e}");
+
        let command = command::Connect {
+
            node: nid,
+
            addr: addr.clone(),
+
            connection_type: self.connection_type(&nid),
+
        };
+
        match self.connections.connect(command, self.clock) {
+
            event::Connect::SelfConnection { node } => {
+
                debug!(target: "service", "Attempted connect to local node {node}");
+
                Err(ConnectError::SelfConnection)
+
            }
+
            event::Connect::AlreadyConnected { session } => {
+
                let node = session.node();
+
                trace!(target: "service", "Connected to {node} already");
+
                Err(ConnectError::SessionExists { nid: node })
+
            }
+
            event::Connect::AlreadyConnecting { node } => {
+
                trace!(target: "service", "Connecting to {node} already");
+
                Err(ConnectError::SessionExists { nid: node })
+
            }
+
            event::Connect::Disconnected { node } => {
+
                trace!(target: "service", "Attempted connect to {node} which is disconnected");
+
                Err(ConnectError::SessionExists { nid: node })
+
            }
+
            event::Connect::Establish {
+
                node,
+
                connection_type: _,
+
                record_ip,
+
            } => {
+
                if let Some(ip) = record_ip {
+
                    if let Err(e) = self
+
                        .db
+
                        .addresses_mut()
+
                        .record_ip(&node, ip, self.clock.into())
+
                    {
+
                        warn!(target: "service", "Failed to record IP address {ip} for {node}: {e}");
+
                    }
+
                }
+
                if let Err(e) = self
+
                    .db
+
                    .addresses_mut()
+
                    .attempted(&nid, &addr, self.clock.into())
+
                {
+
                    warn!(target: "service", "Failed to update the address book with connection attempt: {e}");
+
                }
+
                debug!(target: "service", "Connecting to outbound {nid} ({addr})..");
+
                self.outbox.connect(node, addr);
+
                Ok(())
+
            }
        }
-
        self.sessions.insert(
-
            nid,
-
            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
-
        );
-
        self.outbox.connect(nid, addr);
-

-
        Ok(())
    }

    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
@@ -2220,7 +2226,10 @@ where

                for seed in self.db.seeds().seeds_for(rid)? {
                    let seed = seed?;
-
                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
+
                    let state = self
+
                        .connections
+
                        .session_for(&seed.nid)
+
                        .map(|s| node::State::from(s.state().clone()));
                    let synced = if local.at == seed.synced_at.oid {
                        SyncStatus::Synced { at: seed.synced_at }
                    } else {
@@ -2248,7 +2257,10 @@ where
                continue;
            }
            let addrs = self.db.addresses().addresses_of(&nid)?;
-
            let state = self.sessions.get(&nid).map(|s| s.state.clone());
+
            let state = self
+
                .connections
+
                .session_for(&nid)
+
                .map(|s| node::State::from(s.state().clone()));

            seeds.insert(Seed::new(nid, addrs, state, None));
        }
@@ -2289,8 +2301,10 @@ where
        // 1. Don't relay to a peer who sent us this message.
        // 2. Don't relay to the peer who signed this announcement.
        let relay_to = self
-
            .sessions
+
            .connections
+
            .sessions()
            .connected()
+
            .into_iter()
            .filter(|(id, _)| {
                relayed_by
                    .map(|relayers| !relayers.contains(id))
@@ -2347,12 +2361,22 @@ where
            return;
        }
        let msg = AnnouncementMessage::from(self.inventory.clone());
+
        let ann = msg.signed(&self.signer);
+
        // Store our announcement so that it can be retrieved from us later, just like
+
        // announcements we receive from peers.
+
        if let Err(e) = self.db.gossip_mut().announced(&ann.node, &ann) {
+
            error!(target: "service", "Error updating our gossip store with announced message: {e}");
+
        }

-
        self.outbox.announce(
-
            msg.signed(&self.signer),
-
            self.sessions.connected().map(|(_, p)| p),
-
            self.db.gossip_mut(),
-
        );
+
        // Borrow-checker prevents us from passing the borrowed sessions, while
+
        // we have a mutable borrow of outbox
+
        let peers = self
+
            .sessions()
+
            .connected()
+
            .sessions()
+
            .cloned()
+
            .collect::<Vec<_>>();
+
        self.outbox.announce(ann, peers.iter());
        self.last_inventory = timestamp;
    }

@@ -2373,19 +2397,13 @@ where
    }

    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
-
        let stale = self
-
            .sessions
-
            .connected()
-
            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
-

-
        for (_, session) in stale {
-
            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
+
        for (_, session) in self.connections.unresponsive(now) {
+
            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.node());

            // TODO: Should we switch the session state to "disconnected" even before receiving
            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
-

            self.outbox.disconnect(
-
                session.id,
+
                session.node(),
                DisconnectReason::Session(session::Error::Timeout),
            );
        }
@@ -2393,13 +2411,13 @@ where

    /// Ensure connection health by pinging connected peers.
    fn keep_alive(&mut self, now: &LocalTime) {
-
        let inactive_sessions = self
-
            .sessions
-
            .connected_mut()
-
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
-
            .map(|(_, session)| session);
-
        for session in inactive_sessions {
-
            session.ping(self.clock, &mut self.outbox).ok();
+
        use connections::state::event::Ping;
+
        for Ping { session, ping } in self
+
            .connections
+
            .ping(|| message::Ping::new(&mut self.rng), *now)
+
        {
+
            debug!(target: "service", "Pinging {}@{}", session.node(), session.address());
+
            self.outbox.write(&session, Message::Ping(ping));
        }
    }

@@ -2413,7 +2431,7 @@ where
                    .filter(|entry| entry.version == PROTOCOL_VERSION)
                    .filter(|entry| !entry.address.banned)
                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
-
                    .filter(|entry| !self.sessions.contains_key(&entry.node))
+
                    .filter(|entry| !self.connections.has_session(&entry.node))
                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
                    .filter(|entry| &entry.node != self.nid())
                    .filter(|entry| !entry.address.addr.is_onion() || self.config.onion.is_some())
@@ -2496,18 +2514,14 @@ where

    /// Run idle task for all connections.
    fn idle_connections(&mut self) {
-
        for (_, sess) in self.sessions.iter_mut() {
-
            sess.idle(self.clock);
-

-
            if sess.is_stable() {
-
                // Mark as connected once connection is stable.
-
                if let Err(e) =
-
                    self.db
-
                        .addresses_mut()
-
                        .connected(&sess.id, &sess.addr, self.clock.into())
-
                {
-
                    warn!(target: "service", "Failed to update address book with connection: {e}");
-
                }
+
        for session in self.connections.stabilise(self.clock) {
+
            // Mark as connected once connection is stable.
+
            if let Err(e) = self.db.addresses_mut().connected(
+
                &session.node(),
+
                session.address(),
+
                self.clock.into(),
+
            ) {
+
                warn!(target: "service", "Error updating address book with connection: {e}");
            }
        }
    }
@@ -2519,14 +2533,9 @@ where
        };
        trace!(target: "service", "Maintaining connections..");

-
        let target = TARGET_OUTBOUND_PEERS;
+
        let target = self.connections.config().outbound_target();
        let now = self.clock;
-
        let outbound = self
-
            .sessions
-
            .values()
-
            .filter(|s| s.link.is_outbound())
-
            .filter(|s| s.is_connected() || s.is_connecting())
-
            .count();
+
        let outbound = self.connections.number_of_outbound_connections();
        let wanted = target.saturating_sub(outbound);

        // Don't connect to more peers than needed.
@@ -2581,27 +2590,33 @@ where
        trace!(target: "service", "Maintaining persistent peers..");

        let now = self.local_time();
-
        let mut reconnect = Vec::new();
-

-
        for (nid, session) in self.sessions.iter_mut() {
-
            if let Some(addr) = self.config.peer(nid) {
-
                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
-
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
-
                    // even a successful attempt means that we're unlikely to be able to reconnect.
-

-
                    if now >= *retry_at {
-
                        reconnect.push((*nid, addr.clone(), session.attempts()));
-
                    }
+
        let reconnect = self.connections.sessions().disconnected().into_iter().fold(
+
            Vec::new(),
+
            |mut reconnect, (node, session)| {
+
                // TODO: Try to reconnect only if the peer was attempted. A disconnect without
+
                // even a successful attempt means that we're unlikely to be able to reconnect.
+
                if now >= *session.should_retry_at() {
+
                    reconnect.push((*node, session.address().clone(), session.attempts()))
                }
-
            }
-
        }
+
                reconnect
+
            },
+
        );

+
        // TODO(finto): we don't need to iterate over this twice
        for (nid, addr, attempts) in reconnect {
            if self.reconnect(nid, addr) {
                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
            }
        }
    }
+

+
    fn connection_type(&self, node: &NodeId) -> connections::session::ConnectionType {
+
        if self.config.is_persistent(node) {
+
            connections::session::ConnectionType::Persistent
+
        } else {
+
            connections::session::ConnectionType::Ephemeral
+
        }
+
    }
}

/// Gives read access to the service state.
@@ -2609,7 +2624,7 @@ pub trait ServiceState {
    /// Get the Node ID.
    fn nid(&self) -> &NodeId;
    /// Get the existing sessions.
-
    fn sessions(&self) -> &Sessions;
+
    fn sessions(&self) -> &connections::Sessions;
    /// Get fetch state.
    fn fetching(&self) -> &FetcherState;
    /// Get outbox.
@@ -2640,8 +2655,8 @@ where
        self.signer.public_key()
    }

-
    fn sessions(&self) -> &Sessions {
-
        &self.sessions
+
    fn sessions(&self) -> &connections::Sessions {
+
        self.connections.sessions()
    }

    fn fetching(&self) -> &FetcherState {
@@ -2748,67 +2763,3 @@ pub enum LookupError {
    #[error(transparent)]
    Repository(#[from] RepositoryError),
}
-

-
#[derive(Debug, Clone)]
-
/// Holds currently (or recently) connected peers.
-
pub struct Sessions(AddressBook<NodeId, Session>);
-

-
impl Sessions {
-
    pub fn new(rng: Rng) -> Self {
-
        Self(AddressBook::new(rng))
-
    }
-

-
    /// Iterator over fully connected peers.
-
    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
-
        self.0
-
            .iter()
-
            .filter_map(move |(id, sess)| match &sess.state {
-
                session::State::Connected { .. } => Some((id, sess)),
-
                _ => None,
-
            })
-
    }
-

-
    /// Iterator over connected inbound peers.
-
    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
-
        self.connected().filter(|(_, s)| s.link.is_inbound())
-
    }
-

-
    /// Iterator over outbound peers.
-
    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
-
        self.connected().filter(|(_, s)| s.link.is_outbound())
-
    }
-

-
    /// Iterator over mutable fully connected peers.
-
    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
-
        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
-
    }
-

-
    /// Iterator over disconnected peers.
-
    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
-
        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
-
    }
-

-
    /// Return whether this node has a fully established session.
-
    pub fn is_connected(&self, id: &NodeId) -> bool {
-
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
-
    }
-

-
    /// Return whether this node can be connected to.
-
    pub fn is_disconnected(&self, id: &NodeId) -> bool {
-
        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
-
    }
-
}
-

-
impl Deref for Sessions {
-
    type Target = AddressBook<NodeId, Session>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl DerefMut for Sessions {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.0
-
    }
-
}
modified crates/radicle-protocol/src/service/io.rs
@@ -9,12 +9,12 @@ use radicle::node::Address;
use radicle::node::NodeId;
use radicle::storage::refs::RefsAt;

+
use crate::connections::session;
+
use crate::connections::session::Session;
use crate::service::message::Message;
-
use crate::service::session::Session;
use crate::service::DisconnectReason;
use crate::service::Link;

-
use super::gossip;
use super::message::{Announcement, AnnouncementMessage};

/// I/O operation to execute at the network/wire level.
@@ -61,46 +61,35 @@ impl Outbox {
        self.io.push_back(Io::Disconnect(id, reason));
    }

-
    pub fn write(&mut self, remote: &Session, msg: Message) {
+
    // TODO(finto): use a `ConnectedNode` token that is a smart constructed
+
    // `NodeId`. We can take that instead of `Session<session::Connected>`,
+
    // which can relax the borrow-checker.
+
    pub fn write(&mut self, remote: &Session<session::Connected>, msg: Message) {
        let level = match &msg {
            Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
            _ => log::Level::Debug,
        };
-
        msg.log(level, &remote.id, Link::Outbound);
+
        msg.log(level, &remote.node(), Link::Outbound);
        trace!(target: "service", "Write {:?} to {}", &msg, remote);

-
        self.io.push_back(Io::Write(remote.id, vec![msg]));
+
        self.io.push_back(Io::Write(remote.node(), vec![msg]));
    }

    /// Announce something to a peer. This is meant for our own announcement messages.
    pub fn announce<'a>(
        &mut self,
        ann: Announcement,
-
        peers: impl Iterator<Item = &'a Session>,
-
        gossip: &mut impl gossip::Store,
+
        peers: impl Iterator<Item = &'a Session<session::Connected>>,
    ) {
-
        // Store our announcement so that it can be retrieved from us later, just like
-
        // announcements we receive from peers.
-
        if let Err(e) = gossip.announced(&ann.node, &ann) {
-
            warn!(target: "service", "Failed to update gossip store with announced message: {e}");
-
        }
-

        for peer in peers {
            if let AnnouncementMessage::Refs(refs) = &ann.message {
-
                if let Some(subscribe) = &peer.subscribe {
-
                    if subscribe.filter.contains(&refs.rid) {
-
                        self.write(peer, ann.clone().into());
-
                    } else {
-
                        debug!(
-
                            target: "service",
-
                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
-
                            refs.rid
-
                        );
-
                    }
+
                if peer.is_subscribed_to(&refs.rid) {
+
                    self.write(peer, ann.clone().into());
                } else {
                    debug!(
                        target: "service",
-
                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
+
                        "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
+
                        refs.rid
                    );
                }
            } else {
@@ -109,7 +98,7 @@ impl Outbox {
        }
    }

-
    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
+
    pub fn write_all(&mut self, remote: NodeId, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();

        for (ix, msg) in msgs.iter().enumerate() {
@@ -121,18 +110,21 @@ impl Outbox {
                ix + 1,
                msgs.len()
            );
-
            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
+
            msg.log(log::Level::Trace, &remote, Link::Outbound);
        }
-
        self.io.push_back(Io::Write(remote.id, msgs));
+
        self.io.push_back(Io::Write(remote, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
        self.io.push_back(Io::Wakeup(after));
    }

+
    // TODO(finto): use a `ConnectedNode` token that is a smart constructed
+
    // `NodeId`. We can take that instead of `Session<session::Connected>`,
+
    // which can relax the borrow-checker.
    pub fn fetch(
        &mut self,
-
        peer: &mut Session,
+
        peer: &Session<session::Connected>,
        rid: RepoId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
@@ -152,7 +144,7 @@ impl Outbox {
        self.io.push_back(Io::Fetch {
            rid,
            refs_at,
-
            remote: peer.id,
+
            remote: peer.node(),
            timeout,
            reader_limit,
        });
@@ -162,7 +154,7 @@ impl Outbox {
    pub fn broadcast<'a>(
        &mut self,
        msg: impl Into<Message>,
-
        peers: impl IntoIterator<Item = &'a Session>,
+
        peers: impl IntoIterator<Item = &'a Session<session::Connected>>,
    ) {
        let msg = msg.into();
        for peer in peers {
@@ -171,18 +163,14 @@ impl Outbox {
    }

    /// Relay a message to interested peers.
-
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
+
    pub fn relay<'a>(
+
        &mut self,
+
        ann: Announcement,
+
        peers: impl IntoIterator<Item = &'a Session<session::Connected>>,
+
    ) {
        if let AnnouncementMessage::Refs(msg) = &ann.message {
            let id = msg.rid;
-
            let peers = peers.into_iter().filter(|p| {
-
                if let Some(subscribe) = &p.subscribe {
-
                    subscribe.filter.contains(&id)
-
                } else {
-
                    // If the peer did not send us a `subscribe` message, we don't
-
                    // relay any messages to them.
-
                    false
-
                }
-
            });
+
            let peers = peers.into_iter().filter(|p| p.is_subscribed_to(&id));
            self.broadcast(ann, peers);
        } else {
            self.broadcast(ann, peers);
modified crates/radicle-protocol/src/service/session.rs
@@ -1,15 +1,8 @@
-
use std::collections::VecDeque;
-
use std::{fmt, time};
-

-
use crossbeam_channel as chan;
-
use radicle::node::{FetchResult, Severity};
-
use radicle::node::{Link, Timestamp};
+
use radicle::node::Severity;
+
use radicle::node::Timestamp;
pub use radicle::node::{PingState, State};
-
use radicle::storage::refs::RefsAt;

-
use crate::service::message;
-
use crate::service::message::Message;
-
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
+
use crate::service::LocalDuration;

/// Time after which a connection is considered stable.
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
@@ -45,236 +38,3 @@ impl Error {
        }
    }
}
-

-
/// Error when trying to queue a fetch.
-
#[derive(thiserror::Error, Debug, Clone)]
-
pub enum QueueError {
-
    /// The item already exists in the queue.
-
    #[error("item is already queued")]
-
    Duplicate(QueuedFetch),
-
    /// The queue is at capacity.
-
    #[error("queue capacity reached")]
-
    CapacityReached(QueuedFetch),
-
}
-

-
impl QueueError {
-
    /// Get the inner [`QueuedFetch`].
-
    pub fn inner(&self) -> &QueuedFetch {
-
        match self {
-
            Self::Duplicate(f) => f,
-
            Self::CapacityReached(f) => f,
-
        }
-
    }
-
}
-

-
/// Fetch waiting to be processed, in the fetch queue.
-
#[derive(Debug, Clone)]
-
pub struct QueuedFetch {
-
    /// Repo being fetched.
-
    pub rid: RepoId,
-
    /// Peer being fetched from.
-
    pub from: NodeId,
-
    /// Refs being fetched.
-
    pub refs_at: Vec<RefsAt>,
-
    /// The timeout given for the fetch request.
-
    pub timeout: time::Duration,
-
    /// Result channel.
-
    pub channel: Option<chan::Sender<FetchResult>>,
-
}
-

-
impl PartialEq for QueuedFetch {
-
    fn eq(&self, other: &Self) -> bool {
-
        self.rid == other.rid
-
            && self.from == other.from
-
            && self.refs_at == other.refs_at
-
            && self.channel.is_none()
-
            && other.channel.is_none()
-
    }
-
}
-

-
/// A peer session. Each connected peer will have one session.
-
#[derive(Debug, Clone)]
-
pub struct Session {
-
    /// Peer id.
-
    pub id: NodeId,
-
    /// Peer address.
-
    pub addr: Address,
-
    /// Connection direction.
-
    pub link: Link,
-
    /// Whether we should attempt to re-connect
-
    /// to this peer upon disconnection.
-
    pub persistent: bool,
-
    /// Peer connection state.
-
    pub state: State,
-
    /// Peer subscription.
-
    pub subscribe: Option<message::Subscribe>,
-
    /// Last time a message was received from the peer.
-
    pub last_active: LocalTime,
-

-
    /// Connection attempts. For persistent peers, Tracks
-
    /// how many times we've attempted to connect. We reset this to zero
-
    /// upon successful connection, once the connection is stable.
-
    attempts: usize,
-
    /// Source of entropy.
-
    rng: Rng,
-
}
-

-
impl fmt::Display for Session {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        let mut attrs = Vec::new();
-
        let state = self.state.to_string();
-

-
        if self.link.is_inbound() {
-
            attrs.push("inbound");
-
        } else {
-
            attrs.push("outbound");
-
        }
-
        if self.persistent {
-
            attrs.push("persistent");
-
        }
-
        attrs.push(state.as_str());
-

-
        write!(f, "{} [{}]", self.id, attrs.join(" "))
-
    }
-
}
-

-
impl From<&Session> for radicle::node::Session {
-
    fn from(s: &Session) -> Self {
-
        Self {
-
            nid: s.id,
-
            link: if s.link.is_inbound() {
-
                radicle::node::Link::Inbound
-
            } else {
-
                radicle::node::Link::Outbound
-
            },
-
            addr: s.addr.clone(),
-
            state: s.state.clone(),
-
        }
-
    }
-
}
-

-
impl Session {
-
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
-
        Self {
-
            id,
-
            addr,
-
            state: State::Initial,
-
            link: Link::Outbound,
-
            subscribe: None,
-
            persistent,
-
            last_active: LocalTime::default(),
-
            attempts: 1,
-
            rng,
-
        }
-
    }
-

-
    pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
-
        Self {
-
            id,
-
            addr,
-
            state: State::Connected {
-
                since: time,
-
                ping: PingState::default(),
-
                latencies: VecDeque::default(),
-
                stable: false,
-
            },
-
            link: Link::Inbound,
-
            subscribe: None,
-
            persistent,
-
            last_active: time,
-
            attempts: 0,
-
            rng,
-
        }
-
    }
-

-
    pub fn is_connecting(&self) -> bool {
-
        matches!(self.state, State::Attempted)
-
    }
-

-
    pub fn is_stable(&self) -> bool {
-
        matches!(self.state, State::Connected { stable: true, .. })
-
    }
-

-
    pub fn is_connected(&self) -> bool {
-
        self.state.is_connected()
-
    }
-

-
    pub fn is_disconnected(&self) -> bool {
-
        matches!(self.state, State::Disconnected { .. })
-
    }
-

-
    pub fn is_initial(&self) -> bool {
-
        matches!(self.state, State::Initial)
-
    }
-

-
    pub fn attempts(&self) -> usize {
-
        self.attempts
-
    }
-

-
    /// Run 'idle' task for session.
-
    pub fn idle(&mut self, now: LocalTime) {
-
        if let State::Connected {
-
            since,
-
            ref mut stable,
-
            ..
-
        } = self.state
-
        {
-
            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
-
                *stable = true;
-
                // Reset number of attempts for stable connections.
-
                self.attempts = 0;
-
            }
-
        }
-
    }
-

-
    pub fn to_attempted(&mut self) {
-
        assert!(
-
            self.is_initial(),
-
            "Can only transition to 'attempted' state from 'initial' state"
-
        );
-
        self.state = State::Attempted;
-
        self.attempts += 1;
-
    }
-

-
    pub fn to_connected(&mut self, since: LocalTime) {
-
        self.last_active = since;
-

-
        if let State::Connected { .. } = &self.state {
-
            log::debug!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
-
        };
-
        self.state = State::Connected {
-
            since,
-
            ping: PingState::default(),
-
            latencies: VecDeque::default(),
-
            stable: false,
-
        };
-
    }
-

-
    /// Move the session state to "disconnected". Returns any pending RID
-
    /// that was requested.
-
    pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
-
        self.state = State::Disconnected { since, retry_at };
-
    }
-

-
    /// Return to initial state from disconnected state. This state transition
-
    /// happens when we attempt to re-connect to a disconnected peer.
-
    pub fn to_initial(&mut self) {
-
        assert!(
-
            self.is_disconnected(),
-
            "Can only transition to 'initial' state from 'disconnected' state"
-
        );
-
        self.state = State::Initial;
-
    }
-

-
    pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
-
        if let State::Connected { ping, .. } = &mut self.state {
-
            let msg = message::Ping::new(&mut self.rng);
-
            *ping = PingState::AwaitingResponse {
-
                len: msg.ponglen,
-
                since,
-
            };
-
            reactor.write(self, Message::Ping(msg));
-
        }
-
        Ok(())
-
    }
-
}