Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: peer connections
◌ CI pending Fintan Halpenny committed 10 months ago
commit ba174752c6a62186f0e79e8759d65ac9edc166e5
parent ca9e9a27961c8d55024877b54c24abfd8c105f43
1 pending (1 total) View logs
7 files changed +1215 -0
added crates/radicle-protocol/src/connections.rs
@@ -0,0 +1,399 @@
+
// TODO(finto): command should be something else, perhaps `input`?
+
pub mod command;
+
pub use command::{Connect, Disconnect};
+

+
pub mod commands;
+
pub mod effects;
+
pub mod events;
+

+
use radicle::node::config::RateLimits;
+
use session::{HasAttempts as _, Session, Sessions};
+
mod session;
+

+
use std::collections::HashSet;
+
use std::net::IpAddr;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::address;
+
use radicle::node::{Address, HostName, Link, NodeId, Severity};
+

+
use crate::service::limiter::RateLimiter;
+
use crate::service::DisconnectReason;
+

+
/// Minimum amount of time to wait before reconnecting to a peer.
+
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
+
/// Maximum amount of time to wait before reconnecting to a peer.
+
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
+

+
pub struct Connections {
+
    /// The state of the connection lifecycle for each node in the network.
+
    sessions: Sessions,
+
    /// Keep track of which node connections are meant to be persistent.
+
    persistent: HashSet<NodeId>,
+
    /// Keep track of banned IP addresses.
+
    banned: HashSet<IpAddr>,
+
    /// Rate limiter of IP hosts.
+
    limiter: RateLimiter,
+
    /// Configuration for managing connections.
+
    config: Config,
+
}
+

+
pub struct Config {
+
    /// Duration for a connection to be considered idle.
+
    idle: LocalDuration,
+
    /// Allowed number of inbound connections
+
    inbound_limit: usize,
+
    limits: RateLimits,
+
    reconnection_delay: ReconnectionDelay,
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct ReconnectionDelay {
+
    /// The minimum amount of time to wait before attempting a re-connection.
+
    pub min_delta: LocalDuration,
+
    /// The maximum amount of time to wait before attempting a re-connection.
+
    pub max_delta: LocalDuration,
+
}
+

+
impl Default for ReconnectionDelay {
+
    fn default() -> Self {
+
        Self {
+
            min_delta: MIN_RECONNECTION_DELTA,
+
            max_delta: MAX_RECONNECTION_DELTA,
+
        }
+
    }
+
}
+

+
pub enum CommandEvent {
+
    MissingSession { node: NodeId },
+
    Attempted(Session<session::Attempted>),
+
    Connected(Session<session::Connected>),
+
    Disconnected(Session<session::Disconnected>),
+
}
+

+
impl Connections {
+
    pub fn handle_command(&mut self, command: commands::Command) -> CommandEvent {
+
        match command {
+
            commands::Command::Attempt(attempt) => self.attempted(attempt),
+
            commands::Command::Connect(connect) => self.connected(connect),
+
            commands::Command::Disconnect(disconnect) => self.disconnected(disconnect),
+
        }
+
    }
+

+
    fn attempted(&mut self, commands::Attempt { node }: commands::Attempt) -> CommandEvent {
+
        self.sessions
+
            .session_to_attempted(&node)
+
            .map(CommandEvent::Attempted)
+
            .unwrap_or(CommandEvent::MissingSession { node })
+
    }
+

+
    fn connected(
+
        &mut self,
+
        commands::Connect {
+
            node,
+
            now,
+
            link,
+
            persistent,
+
        }: commands::Connect,
+
    ) -> CommandEvent {
+
        self.sessions
+
            .session_to_connected(&node, now, link, persistent)
+
            .map(CommandEvent::Connected)
+
            .unwrap_or(CommandEvent::MissingSession { node })
+
    }
+

+
    fn disconnected(
+
        &mut self,
+
        commands::Disconnect {
+
            node,
+
            since,
+
            retry_at,
+
        }: commands::Disconnect,
+
    ) -> CommandEvent {
+
        self.sessions
+
            .session_to_disconnected(&node, since, retry_at)
+
            .map(CommandEvent::Disconnected)
+
            .unwrap_or(CommandEvent::MissingSession { node })
+
    }
+
}
+

+
impl Connections {
+
    pub fn sessions(&self) -> &Sessions {
+
        &self.sessions
+
    }
+

+
    pub fn accept(&mut self, ip: IpAddr, now: LocalTime) -> AcceptResult {
+
        let mut result = AcceptResult::default();
+
        // Always accept localhost connections, even if we already reached
+
        // our inbound connection limit.
+
        if ip.is_loopback() || ip.is_unspecified() {
+
            result.local_host(ip);
+
            return result;
+
        }
+

+
        if self.has_reached_inbound_limit() {
+
            result.inbound_limit_exceeded(ip, self.sessions.connected_inbound());
+
            return result;
+
        }
+

+
        if self.is_ip_banned(&ip) {
+
            result.ip_banned(ip);
+
            return result;
+
        }
+

+
        if self.has_reached_ip_limit(&ip, now) {
+
            result.host_limited(ip);
+
            return result;
+
        }
+

+
        result.accepted(ip);
+
        result
+
    }
+

+
    pub fn connect(&self, connect: Connect) -> ConnectResult {
+
        let mut result = ConnectResult::default();
+
        match connect {
+
            Connect::Inbound(command::Inbound {
+
                node,
+
                clock,
+
                persistent,
+
            }) => match self.sessions.get_connected(&node) {
+
                Some(session) => result.already_connected(session.clone(), Link::Inbound),
+
                None => {
+
                    result.connect(node, clock, Link::Inbound, persistent);
+
                    result.send_initial_messages(node, Link::Inbound);
+
                }
+
            },
+
            Connect::Outbound(command::Outbound {
+
                node,
+
                addr,
+
                persistent,
+
                clock,
+
            }) => match self.sessions.get_connected(&node) {
+
                Some(session) => {
+
                    result.already_connected(session.clone(), Link::Outbound);
+
                    result.send_initial_messages(node, *session.link());
+
                }
+
                None => {
+
                    if let HostName::Ip(ip) = addr.host {
+
                        if !address::is_local(&ip) {
+
                            result.record_ip(node, ip, clock);
+
                        }
+
                    }
+
                    result.connect(node, clock, Link::Outbound, persistent);
+
                    result.send_initial_messages(node, Link::Outbound);
+
                }
+
            },
+
        }
+
        result
+
    }
+

+
    pub fn disconnect(&self, disconnect: Disconnect) -> DisconnectResult {
+
        let mut result = DisconnectResult::default();
+
        let Disconnect {
+
            node,
+
            link,
+
            reason,
+
            since,
+
        } = disconnect;
+
        let is_persistent = self.is_persistent(&node);
+
        let Some(session) = self.sessions.get_session(&node) else {
+
            result.already_disconnected(node);
+
            return result;
+
        };
+
        if *session.link() != link {
+
            result.link_conflict(node, *session.link(), link);
+
            return result;
+
        }
+

+
        if is_persistent {
+
            let delay = self.reconnection_delay(session.attempts().as_u32());
+
            let retry_at = since + delay;
+
            result.retry_connection(node, since, retry_at);
+
            result.disconnect(node, since, Some(retry_at));
+
        } else {
+
            let severity = self.reason_severity(reason, since);
+
            result.record_severity(node, session.address().clone(), severity);
+
            result.disconnect(node, since, None);
+
            if link.is_outbound() {
+
                result.maintain_connnections();
+
            }
+
        }
+
        result
+
    }
+

+
    fn reason_severity(&self, reason: DisconnectReason, now: LocalTime) -> Severity {
+
        match reason {
+
            DisconnectReason::Dial(_)
+
            | DisconnectReason::Fetch(_)
+
            | DisconnectReason::Connection(_) => {
+
                if self.is_online(now) {
+
                    // If we're "online", there's something wrong with this
+
                    // peer connection specifically.
+
                    Severity::Medium
+
                } else {
+
                    Severity::Low
+
                }
+
            }
+
            DisconnectReason::Session(e) => e.severity(),
+
            DisconnectReason::Command
+
            | DisconnectReason::Conflict
+
            | DisconnectReason::SelfConnection => Severity::Low,
+
        }
+
    }
+

+
    /// Try to guess whether we're online or not.
+
    fn is_online(&self, now: LocalTime) -> bool {
+
        self.sessions
+
            .connected()
+
            .filter(|(_, s)| s.address().is_routable() && *s.last_active() >= now - self.idle())
+
            .count()
+
            > 0
+
    }
+

+
    fn idle(&self) -> LocalDuration {
+
        self.config.idle
+
    }
+

+
    fn is_persistent(&self, node: &NodeId) -> bool {
+
        self.persistent.contains(node)
+
    }
+

+
    fn is_ip_banned(&self, ip: &IpAddr) -> bool {
+
        self.banned.contains(ip)
+
    }
+

+
    // TODO: limit is harshing my buzz by taking &mut self here
+
    fn has_reached_ip_limit(&mut self, ip: &IpAddr, now: LocalTime) -> bool {
+
        let addr = HostName::from(*ip);
+
        self.limiter
+
            .limit(addr, None, &self.config.limits.inbound, now)
+
    }
+

+
    fn has_reached_inbound_limit(&self) -> bool {
+
        self.sessions.connected_inbound() >= self.config.inbound_limit
+
    }
+

+
    fn reconnection_delay(&self, attempts: u32) -> LocalDuration {
+
        LocalDuration::from_secs(2u64.pow(attempts)).clamp(
+
            self.config.reconnection_delay.min_delta,
+
            self.config.reconnection_delay.max_delta,
+
        )
+
    }
+
}
+

+
#[derive(Debug, Default)]
+
pub struct AcceptResult {
+
    pub effects: Vec<effects::Accept>,
+
    pub events: Vec<events::Accept>,
+
}
+

+
impl AcceptResult {
+
    fn local_host(&mut self, ip: IpAddr) {
+
        self.effects.push(effects::Accept::LocalHost { ip });
+
    }
+

+
    fn inbound_limit_exceeded(&mut self, ip: IpAddr, connected_inbound: usize) {
+
        self.events.push(events::Accept::LimitExceeded {
+
            ip,
+
            current_inbound: connected_inbound,
+
        });
+
    }
+

+
    fn ip_banned(&mut self, ip: IpAddr) {
+
        self.events.push(events::Accept::IpBanned { ip })
+
    }
+

+
    fn host_limited(&mut self, ip: IpAddr) {
+
        self.events.push(events::Accept::HostLimited { ip })
+
    }
+

+
    fn accepted(&mut self, ip: IpAddr) {
+
        self.effects.push(effects::Accept::Accepted { ip })
+
    }
+
}
+

+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct ConnectResult {
+
    pub events: Vec<events::Connect>,
+
    pub effects: Vec<effects::Connect>,
+
    pub commands: Vec<commands::Connect>,
+
}
+

+
impl ConnectResult {
+
    fn already_connected(&mut self, session: Session<session::Connected>, attempted_link: Link) {
+
        self.events.push(events::Connect::AlreadyConnected {
+
            session,
+
            attempted_link,
+
        });
+
    }
+

+
    fn send_initial_messages(&mut self, node: NodeId, link: Link) {
+
        self.effects
+
            .push(effects::Connect::SendInitialMessages { node, link });
+
    }
+

+
    fn record_ip(&mut self, node: NodeId, ip: IpAddr, clock: LocalTime) {
+
        self.effects
+
            .push(effects::Connect::RecordIp { node, ip, clock });
+
    }
+

+
    fn connect(&mut self, node: NodeId, clock: LocalTime, link: Link, persistent: bool) {
+
        self.commands.push(commands::Connect {
+
            node,
+
            now: clock,
+
            link,
+
            persistent,
+
        });
+
    }
+
}
+

+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct DisconnectResult {
+
    pub events: Vec<events::Disconnect>,
+
    pub effects: Vec<effects::Disconnect>,
+
    pub commands: Vec<commands::Disconnect>,
+
}
+

+
impl DisconnectResult {
+
    fn already_disconnected(&mut self, node: NodeId) {
+
        self.events
+
            .push(events::Disconnect::AlreadyDisconnected { node });
+
    }
+

+
    fn link_conflict(&mut self, node: NodeId, found: Link, expected: Link) {
+
        self.events.push(events::Disconnect::LinkConflict {
+
            node,
+
            found,
+
            expected,
+
        });
+
    }
+

+
    fn retry_connection(&mut self, node: NodeId, since: LocalTime, retry_at: LocalTime) {
+
        self.effects.push(effects::Disconnect::RetryConnection {
+
            node,
+
            since,
+
            retry_at,
+
        });
+
    }
+

+
    fn maintain_connnections(&mut self) {
+
        self.effects.push(effects::Disconnect::MaintainConnections);
+
    }
+

+
    fn record_severity(&mut self, node: NodeId, address: Address, severity: Severity) {
+
        self.effects.push(effects::Disconnect::RecordServerity {
+
            node,
+
            address,
+
            severity,
+
        })
+
    }
+

+
    fn disconnect(&mut self, node: NodeId, since: LocalTime, retry_at: Option<LocalTime>) {
+
        self.commands.push(commands::Disconnect {
+
            node,
+
            since,
+
            retry_at,
+
        });
+
    }
+
}
added crates/radicle-protocol/src/connections/command.rs
@@ -0,0 +1,49 @@
+
use localtime::LocalTime;
+

+
use radicle::node::{Address, Link, NodeId};
+

+
use crate::service::DisconnectReason;
+

+
pub enum Connect {
+
    Inbound(Inbound),
+
    Outbound(Outbound),
+
}
+

+
impl Connect {
+
    pub fn inbound(node: NodeId, clock: LocalTime, persistent: bool) -> Self {
+
        Self::Inbound(Inbound {
+
            node,
+
            clock,
+
            persistent,
+
        })
+
    }
+

+
    pub fn outbound(node: NodeId, addr: Address, persistent: bool, clock: LocalTime) -> Self {
+
        Self::Outbound(Outbound {
+
            node,
+
            addr,
+
            persistent,
+
            clock,
+
        })
+
    }
+
}
+

+
pub struct Inbound {
+
    pub(super) node: NodeId,
+
    pub(super) clock: LocalTime,
+
    pub(super) persistent: bool,
+
}
+

+
pub struct Outbound {
+
    pub(super) node: NodeId,
+
    pub(super) addr: Address,
+
    pub(super) persistent: bool,
+
    pub(super) clock: LocalTime,
+
}
+

+
pub struct Disconnect {
+
    pub(super) node: NodeId,
+
    pub(super) link: Link,
+
    pub(super) reason: DisconnectReason,
+
    pub(super) since: LocalTime,
+
}
added crates/radicle-protocol/src/connections/commands.rs
@@ -0,0 +1,46 @@
+
use localtime::LocalTime;
+
use radicle::node::{Link, NodeId};
+

+
pub enum Command {
+
    Attempt(Attempt),
+
    Connect(Connect),
+
    Disconnect(Disconnect),
+
}
+

+
impl From<Attempt> for Command {
+
    fn from(v: Attempt) -> Self {
+
        Self::Attempt(v)
+
    }
+
}
+

+
impl From<Connect> for Command {
+
    fn from(v: Connect) -> Self {
+
        Self::Connect(v)
+
    }
+
}
+

+
impl From<Disconnect> for Command {
+
    fn from(v: Disconnect) -> Self {
+
        Self::Disconnect(v)
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct Attempt {
+
    pub node: NodeId,
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct Connect {
+
    pub node: NodeId,
+
    pub now: LocalTime,
+
    pub link: Link,
+
    pub persistent: bool,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Disconnect {
+
    pub node: NodeId,
+
    pub since: LocalTime,
+
    pub retry_at: Option<LocalTime>,
+
}
added crates/radicle-protocol/src/connections/effects.rs
@@ -0,0 +1,100 @@
+
//! External effects that are emitted by interacting with [`Connections`]. These
+
//! effects should be used by the rest of the system to perform side-effects.
+
//!
+
//! [`Connections`]: super::Connections.
+

+
use std::net::IpAddr;
+

+
use localtime::LocalTime;
+
use radicle::node::{Address, Link, NodeId, Severity};
+

+
/// All effects that can occur from interacting with [`Connections`].
+
///
+
/// [`Connections`]: super::Connections.
+
pub enum Effect {
+
    Accept(Accept),
+
    Connect(Connect),
+
    Disconnect(Disconnect),
+
}
+

+
impl From<Accept> for Effect {
+
    fn from(v: Accept) -> Self {
+
        Self::Accept(v)
+
    }
+
}
+

+
impl From<Connect> for Effect {
+
    fn from(v: Connect) -> Self {
+
        Self::Connect(v)
+
    }
+
}
+

+
impl From<Disconnect> for Effect {
+
    fn from(v: Disconnect) -> Self {
+
        Self::Disconnect(v)
+
    }
+
}
+

+
/// Effects that occur when checking for accepting a connection from an
+
/// [`IpAddr`].
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Accept {
+
    /// The [`IpAddr`] is likely a localhost connection.
+
    LocalHost { ip: IpAddr },
+
    /// The [`IpAddr`] should be accepted by the system, and later connected to.
+
    Accepted { ip: IpAddr },
+
}
+

+
impl Accept {
+
    /// The [`Accept::LocalHost`] should be created only if the [`IpAddr`] is
+
    /// either a loopback address or has an 'unspecified' address (see
+
    /// [`IpAddr::is_unspecified`]).
+
    pub fn local_host(ip: IpAddr) -> Option<Self> {
+
        (ip.is_loopback() || ip.is_unspecified()).then_some(Self::LocalHost { ip })
+
    }
+

+
    /// See [`Accept::Accepted`].
+
    pub fn accepted(ip: IpAddr) -> Self {
+
        Self::Accepted { ip }
+
    }
+
}
+

+
/// Effects that occur when connecting a node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub enum Connect {
+
    /// The set of initial messages should be sent to the [`NodeId`].
+
    SendInitialMessages { node: NodeId, link: Link },
+
    /// The [`IpAddr`] of the [`NodeId`] should be recorded in an external
+
    /// database.
+
    RecordIp {
+
        node: NodeId,
+
        ip: IpAddr,
+
        clock: LocalTime,
+
    },
+
}
+

+
/// Effects that occur when disconnecting a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Disconnect {
+
    /// The connection to [`NodeId`] was disconnected, but re-connection should
+
    /// be attempted at the given point in time.
+
    RetryConnection {
+
        /// The node that should be re-connected to.
+
        node: NodeId,
+
        /// When the node was disconnected.
+
        since: LocalTime,
+
        /// When the re-connection attempt should be made.
+
        retry_at: LocalTime,
+
    },
+
    /// Record the severity of the disconnect reason.
+
    RecordServerity {
+
        /// The node that was disconnected.
+
        node: NodeId,
+
        /// The address of the node that was disconnected.
+
        address: Address,
+
        /// The severity of the disconnect reason.
+
        severity: Severity,
+
    },
+
    /// Try to maintain all connections that are meant to be persistent.
+
    MaintainConnections,
+
}
added crates/radicle-protocol/src/connections/events.rs
@@ -0,0 +1,58 @@
+
use std::net::IpAddr;
+

+
use radicle::node::{Link, NodeId};
+

+
use super::session;
+
use super::session::Session;
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Event {
+
    Accept(Accept),
+
    Connect(Box<Connect>),
+
    Disconnect(Disconnect),
+
}
+

+
impl From<Disconnect> for Event {
+
    fn from(v: Disconnect) -> Self {
+
        Self::Disconnect(v)
+
    }
+
}
+

+
impl From<Connect> for Event {
+
    fn from(v: Connect) -> Self {
+
        Self::Connect(Box::new(v))
+
    }
+
}
+

+
impl From<Accept> for Event {
+
    fn from(v: Accept) -> Self {
+
        Self::Accept(v)
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Accept {
+
    LimitExceeded { ip: IpAddr, current_inbound: usize },
+
    IpBanned { ip: IpAddr },
+
    HostLimited { ip: IpAddr },
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Connect {
+
    AlreadyConnected {
+
        session: Session<session::Connected>,
+
        attempted_link: Link,
+
    },
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub enum Disconnect {
+
    AlreadyDisconnected {
+
        node: NodeId,
+
    },
+
    LinkConflict {
+
        node: NodeId,
+
        found: Link,
+
        expected: Link,
+
    },
+
}
added crates/radicle-protocol/src/connections/session.rs
@@ -0,0 +1,562 @@
+
use std::collections::{HashMap, HashSet, VecDeque};
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::{
+
    node::{Address, Link, NodeId, PingState},
+
    prelude::RepoId,
+
};
+

+
use crate::service::message;
+

+
/// Time after which a connection is considered stable.
+
#[allow(unused)]
+
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
+

+
#[derive(Clone, Debug)]
+
pub enum State {
+
    Initial(Initial),
+
    Attempted(Attempted),
+
    Connected(Connected),
+
    Disconnected(Disconnected),
+
}
+

+
impl From<Initial> for State {
+
    fn from(value: Initial) -> Self {
+
        Self::Initial(value)
+
    }
+
}
+

+
impl From<Attempted> for State {
+
    fn from(value: Attempted) -> Self {
+
        Self::Attempted(value)
+
    }
+
}
+

+
impl From<Connected> for State {
+
    fn from(value: Connected) -> Self {
+
        Self::Connected(value)
+
    }
+
}
+

+
impl From<Disconnected> for State {
+
    fn from(value: Disconnected) -> Self {
+
        Self::Disconnected(value)
+
    }
+
}
+

+
impl HasAttempts for State {
+
    fn attempts(&self) -> Attempts {
+
        match self {
+
            State::Initial(initial) => initial.attempts,
+
            State::Attempted(attempted) => attempted.attempts,
+
            State::Connected(connected) => connected.attempts,
+
            State::Disconnected(disconnected) => disconnected.attempts,
+
        }
+
    }
+
}
+

+
pub struct Sessions {
+
    initial: HashMap<NodeId, Session<Initial>>,
+
    attempted: HashMap<NodeId, Session<Attempted>>,
+
    disconnected: HashMap<NodeId, Session<Disconnected>>,
+
    connected: HashMap<NodeId, Session<Connected>>,
+
}
+

+
impl Sessions {
+
    /// Get the number of sessions that are connected and have an [inbound]
+
    /// link.
+
    ///
+
    /// [inbound]: Link::Inbound
+
    pub fn connected_inbound(&self) -> usize {
+
        self.connected
+
            .values()
+
            .filter(|session| session.link().is_inbound())
+
            .count()
+
    }
+

+
    /// Get the number of sessions that are connected and have an [outbound]
+
    /// link.
+
    ///
+
    /// [outbound]: Link::Outbound
+
    pub fn connected_outbound(&self) -> usize {
+
        self.connected
+
            .values()
+
            .filter(|session| session.link().is_outbound())
+
            .count()
+
    }
+

+
    /// Checks that an existing [`Session`] exists for the given [`NodeId`].
+
    pub fn has_session_for(&self, node: &NodeId) -> bool {
+
        self.initial.contains_key(node)
+
            || self.attempted.contains_key(node)
+
            || self.disconnected.contains_key(node)
+
            || self.connected.contains_key(node)
+
    }
+

+
    /// Get all [`Session`]s that are in the [`Connected`] state, along with
+
    /// their [`NodeId`]s.
+
    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session<Connected>)> {
+
        self.connected.iter()
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Attempted`] state.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub fn session_to_attempted(&mut self, node: &NodeId) -> Option<Session<Attempted>> {
+
        let s = self.initial.remove(node)?.into_attempted();
+
        self.attempted.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Disconnected`] state.
+
    ///
+
    /// The time this [`Session`] was disconnected is marked by `since`, and if
+
    /// the connection should be retried then a `retry_at` value should be
+
    /// provided.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub fn session_to_disconnected(
+
        &mut self,
+
        node: &NodeId,
+
        since: LocalTime,
+
        retry_at: Option<LocalTime>,
+
    ) -> Option<Session<Disconnected>> {
+
        match self.remove_session(node) {
+
            None => None,
+
            Some(session) => {
+
                let s = session.into_disconnected(since, retry_at);
+
                self.disconnected.insert(*node, s.clone());
+
                Some(s)
+
            }
+
        }
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Connected`] state.
+
    ///
+
    /// The [`Session`] is last active given by the time given for `now`, the
+
    /// type of [`Link`] is also marked by the provided value, and also keep
+
    /// track of whether the session should be persisted.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub fn session_to_connected(
+
        &mut self,
+
        node: &NodeId,
+
        now: LocalTime,
+
        link: Link,
+
        persistent: bool,
+
    ) -> Option<Session<Connected>> {
+
        let s = self.remove_session(node)?;
+
        let state = match s.state {
+
            State::Initial(initial) => Connected::from_initial(initial, now),
+
            State::Attempted(attempted) => Connected::from_attempted(attempted, now),
+
            State::Connected(connected) => connected,
+
            State::Disconnected(disconnected) => Connected::from_disconnected(disconnected, now),
+
        };
+
        Some(Session {
+
            state,
+
            id: s.id,
+
            addr: s.addr,
+
            link,
+
            persistent,
+
            last_active: now,
+
            subscribe: s.subscribe,
+
        })
+
    }
+

+
    /// Transition a [`Disconnected`] [`Session`] into an [`Initial`] state,
+
    /// meaning that it should be re-connected to.
+
    ///
+
    /// If the [`NodeId`] was not in a [`Disconnected`] state then `None` is
+
    /// returned.
+
    pub fn reconnect(&mut self, node: &NodeId) -> Option<Session<Initial>> {
+
        let s = self.disconnected.remove(node)?.into_initial();
+
        self.initial.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    /// Get a [`Session`] that can be in any [`State`].
+
    pub fn get_session(&self, node: &NodeId) -> Option<Session<State>> {
+
        self.initial
+
            .get(node)
+
            .cloned()
+
            .map(|s| s.into_any_state())
+
            .or_else(|| {
+
                self.attempted
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
            .or_else(|| {
+
                self.disconnected
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
            .or_else(|| {
+
                self.connected
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
    }
+

+
    fn remove_session(&mut self, node: &NodeId) -> Option<Session<State>> {
+
        self.initial
+
            .remove(node)
+
            .map(|s| s.into_any_state())
+
            .or_else(|| self.attempted.remove(node).map(|s| s.into_any_state()))
+
            .or_else(|| self.disconnected.remove(node).map(|s| s.into_any_state()))
+
            .or_else(|| self.connected.remove(node).map(|s| s.into_any_state()))
+
    }
+

+
    /// Get the [`Session`], for the given [`NodeId`], that is expected to be in
+
    /// the [`Connected`] state.
+
    pub fn get_connected(&self, node: &NodeId) -> Option<&Session<Connected>> {
+
        self.connected.get(node)
+
    }
+

+
    #[allow(unused)]
+
    fn inbound(&mut self, node: NodeId, addr: Address, persistent: bool, now: LocalTime) {
+
        self.connected
+
            .insert(node, Session::inbound(node, addr, persistent, now));
+
    }
+
}
+

+
pub trait HasAttempts {
+
    fn attempts(&self) -> Attempts;
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct Attempts {
+
    /// Connection attempts. For persistent peers, Tracks
+
    /// how many times we've attempted to connect. We reset this to zero
+
    /// upon successful connection, once the connection is stable.
+
    attempts: usize,
+
}
+

+
impl Attempts {
+
    fn new(attempts: usize) -> Self {
+
        Attempts { attempts }
+
    }
+

+
    pub fn attempted(self) -> Self {
+
        Self {
+
            attempts: self.attempts + 1,
+
        }
+
    }
+

+
    pub fn reset(&mut self) {
+
        self.attempts = 0;
+
    }
+

+
    pub fn attempts(&self) -> usize {
+
        self.attempts
+
    }
+

+
    pub(super) fn as_u32(&self) -> u32 {
+
        self.attempts as u32
+
    }
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Session<S> {
+
    /// The [`NodeId`] of the session.
+
    id: NodeId,
+
    /// The public protocol [`Address`] for the session.
+
    addr: Address,
+
    /// The [`Link`] direction for the session.
+
    link: Link,
+
    /// Keep track of whether the session should be persisted. That is, if it is
+
    /// disconnected, re-connection attempts should be made.
+
    persistent: bool,
+
    /// Last time a message was received from the peer.
+
    last_active: LocalTime,
+
    /// Peer subscription.
+
    subscribe: Option<message::Subscribe>,
+
    /// The state the session is in. Can be in the following states:
+
    ///   - [`Initial`]
+
    ///   - [`Attempted`]
+
    ///   - [`Disconnected`]
+
    ///   - [`Connected`]
+
    state: S,
+
}
+

+
impl<S: HasAttempts> HasAttempts for Session<S> {
+
    fn attempts(&self) -> Attempts {
+
        self.state.attempts()
+
    }
+
}
+

+
impl<S> Session<S> {
+
    pub fn node(&self) -> NodeId {
+
        self.id
+
    }
+

+
    pub fn address(&self) -> &Address {
+
        &self.addr
+
    }
+

+
    /// Set the [`message::Subscribe`] of this [`Session`].
+
    pub fn set_subscription(&mut self, subscription: message::Subscribe) {
+
        self.subscribe = Some(subscription);
+
    }
+

+
    /// Subscribe to the given [`RepoId`], if the [`message::Subscribe`] has
+
    /// been set.
+
    pub fn subscribe_to(&mut self, rid: &RepoId) {
+
        if let Some(ref mut sub) = self.subscribe {
+
            sub.filter.insert(rid);
+
        }
+
    }
+

+
    pub fn last_active(&self) -> &LocalTime {
+
        &self.last_active
+
    }
+

+
    pub fn link(&self) -> &Link {
+
        &self.link
+
    }
+

+
    pub fn as_outbound(&mut self) {
+
        self.link = Link::Outbound;
+
    }
+

+
    pub fn into_disconnected(
+
        self,
+
        since: LocalTime,
+
        retry_at: Option<LocalTime>,
+
    ) -> Session<Disconnected>
+
    where
+
        S: HasAttempts,
+
    {
+
        self.map(|s| Disconnected {
+
            since,
+
            retry_at,
+
            attempts: s.attempts(),
+
        })
+
    }
+

+
    #[allow(unused)]
+
    fn seen(&mut self, since: LocalTime) {
+
        self.last_active = since;
+
    }
+

+
    fn into_any_state<T>(self) -> Session<T>
+
    where
+
        T: From<S>,
+
    {
+
        self.map(|state| state.into())
+
    }
+

+
    fn map<T, F>(self, f: F) -> Session<T>
+
    where
+
        F: FnOnce(S) -> T,
+
    {
+
        Session {
+
            id: self.id,
+
            addr: self.addr,
+
            link: self.link,
+
            persistent: self.persistent,
+
            last_active: self.last_active,
+
            subscribe: self.subscribe,
+
            state: f(self.state),
+
        }
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct Initial {
+
    attempts: Attempts,
+
}
+

+
impl Initial {
+
    pub fn new() -> Self {
+
        Self::with_attempts(Attempts::new(1))
+
    }
+

+
    pub fn with_attempts(attempts: Attempts) -> Self {
+
        Self { attempts }
+
    }
+
}
+

+
impl Default for Initial {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

+
impl Session<Initial> {
+
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, last_active: LocalTime) -> Self {
+
        Self {
+
            id,
+
            addr,
+
            link: Link::Outbound,
+
            persistent,
+
            state: Initial::new(),
+
            last_active,
+
            subscribe: None,
+
        }
+
    }
+

+
    /// Transition the [`Session`] to an [`Attempted`] state, incrementing the
+
    /// number of attempts made.
+
    pub fn into_attempted(self) -> Session<Attempted> {
+
        self.map(|s| Attempted::new(s.attempts.attempted()))
+
    }
+

+
    /// Transition the [`Session`] into the [`Connected`] state.
+
    pub fn into_connected(self, since: LocalTime) -> Session<Connected> {
+
        self.map(|s| Connected::new(since, s.attempts))
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Attempted {
+
    attempts: Attempts,
+
}
+

+
impl Attempted {
+
    pub fn new(attempts: Attempts) -> Self {
+
        Attempted { attempts }
+
    }
+
}
+

+
impl Session<Attempted> {
+
    /// Transition the [`Session`] into the [`Connected`] state.
+
    pub fn into_connected(self, since: LocalTime) -> Session<Connected> {
+
        self.map(|s| Connected::new(since, s.attempts))
+
    }
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Connected {
+
    /// Connected since this time.
+
    since: LocalTime,
+
    /// Ping state.
+
    ping: PingState,
+
    /// Ongoing fetches.
+
    fetching: HashSet<RepoId>,
+
    /// Measured latencies for this peer.
+
    latencies: VecDeque<LocalDuration>,
+
    /// Whether the connection is stable.
+
    stable: bool,
+
    /// Number of attempts over the lifetime of the connection. This includes if
+
    /// the connection is degraded back to an [`Initial`] state through a
+
    /// [`Session::reconnect`].
+
    attempts: Attempts,
+
}
+

+
impl HasAttempts for Connected {
+
    fn attempts(&self) -> Attempts {
+
        self.attempts
+
    }
+
}
+

+
impl Connected {
+
    /// Create a new [`Connected`] state, where `since` is the time of
+
    /// connection, and `attempts` is the number of attempted connections in the
+
    /// lifetime of the [`Session`].
+
    pub fn new(since: LocalTime, attempts: Attempts) -> Self {
+
        Self {
+
            since,
+
            ping: PingState::default(),
+
            fetching: HashSet::default(),
+
            latencies: VecDeque::default(),
+
            stable: false,
+
            attempts,
+
        }
+
    }
+

+
    /// Create a fresh [`Connected`] state, using `since` as the [`LocalTime`] for
+
    /// when this connection was made.
+
    pub fn fresh(since: LocalTime) -> Self {
+
        Self::new(since, Attempts::new(0))
+
    }
+

+
    fn from_initial(initial: Initial, since: LocalTime) -> Self {
+
        Self::new(since, initial.attempts)
+
    }
+

+
    fn from_attempted(attempted: Attempted, since: LocalTime) -> Self {
+
        Self::new(since, attempted.attempts)
+
    }
+

+
    fn from_disconnected(disconnected: Disconnected, since: LocalTime) -> Self {
+
        Self::new(since, disconnected.attempts)
+
    }
+
}
+

+
pub struct Ping {
+
    since: LocalTime,
+
    rng: fastrand::Rng,
+
}
+

+
impl Session<Connected> {
+
    pub fn inbound(id: NodeId, addr: Address, persistent: bool, now: LocalTime) -> Self {
+
        Self {
+
            id,
+
            addr,
+
            link: Link::Inbound,
+
            persistent,
+
            last_active: now,
+
            subscribe: None,
+
            state: Connected::fresh(now),
+
        }
+
    }
+

+
    /// Checks if the [`Session`] is inactive, i.e. the time passed is greater
+
    /// than the `delta`.
+
    pub fn is_inactive(&self, now: &LocalTime, delta: LocalDuration) -> bool {
+
        *now - self.last_active >= delta
+
    }
+

+
    pub fn ping(&mut self, mut ping: Ping) -> message::Ping {
+
        let msg = message::Ping::new(&mut ping.rng);
+
        self.state.ping = PingState::AwaitingResponse {
+
            len: msg.ponglen,
+
            since: ping.since,
+
        };
+
        msg
+
    }
+

+
    pub fn idle(&mut self, now: LocalTime, stable_threshold: LocalDuration) {
+
        let Connected {
+
            since,
+
            ref mut stable,
+
            ref mut attempts,
+
            ..
+
        } = self.state;
+
        if now >= since && now.duration_since(since) >= stable_threshold {
+
            *stable = true;
+
            attempts.reset();
+
        }
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Disconnected {
+
    /// Since when has this peer been disconnected.
+
    since: LocalTime,
+
    /// When to retry the connection.
+
    retry_at: Option<LocalTime>,
+
    /// Number of attempts while disconnected.
+
    attempts: Attempts,
+
}
+

+
impl Session<Disconnected> {
+
    pub fn disconnected_since(&self) -> &LocalTime {
+
        &self.state.since
+
    }
+

+
    pub fn should_retry_at(&self) -> Option<&LocalTime> {
+
        self.state.retry_at.as_ref()
+
    }
+

+
    /// Transition the [`Session`] to an [`Initial`] state.
+
    fn into_initial(self) -> Session<Initial> {
+
        self.map(|s| Initial::with_attempts(s.attempts))
+
    }
+
}
modified crates/radicle-protocol/src/lib.rs
@@ -4,6 +4,7 @@ pub mod service;
pub mod wire;
pub mod worker;

+
pub mod connections;
pub mod tasks;

/// Peer-to-peer protocol version.