Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol/connections: refactor config
Fintan Halpenny committed 3 months ago
commit e632277d522c99fd5331414e7584b095c48b02f3
parent 1dc8d26821ee1ee39fa84c926a67a9bd687d08dd
3 files changed +173 -52
modified crates/radicle-protocol/src/connections.rs
@@ -1,46 +1,8 @@
+
pub mod config;
+
pub use config::Config;
+

pub mod state;

pub mod session;
pub use session::State;
pub use session::{Attempts, Pinged, Session, Sessions};
-

-
use localtime::LocalDuration;
-
use radicle::node::config::RateLimits;
-

-
/// Minimum amount of time to wait before reconnecting to a peer.
-
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
-
/// Maximum amount of time to wait before reconnecting to a peer.
-
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
-

-
#[derive(Debug)]
-
pub struct Config {
-
    /// Duration for a connection to be considered idle.
-
    pub idle: LocalDuration,
-
    /// Duration to wait until a ping is sent to a connection.
-
    pub keep_alive: LocalDuration,
-
    /// Duration to wait until a connection is considered stale.
-
    pub stale_connection: LocalDuration,
-
    /// Allowed number of inbound connections
-
    pub inbound_limit: usize,
-
    /// The number of outbound peers that we want to reach.
-
    pub target_outbound_peers: usize,
-
    pub limits: RateLimits,
-
    pub reconnection_delay: ReconnectionDelay,
-
}
-

-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-
pub struct ReconnectionDelay {
-
    /// The minimum amount of time to wait before attempting a re-connection.
-
    pub min_delta: LocalDuration,
-
    /// The maximum amount of time to wait before attempting a re-connection.
-
    pub max_delta: LocalDuration,
-
}
-

-
impl Default for ReconnectionDelay {
-
    fn default() -> Self {
-
        Self {
-
            min_delta: MIN_RECONNECTION_DELTA,
-
            max_delta: MAX_RECONNECTION_DELTA,
-
        }
-
    }
-
}
added crates/radicle-protocol/src/connections/config.rs
@@ -0,0 +1,160 @@
+
//! Configuration parameter for the [`Connections`] state management.
+
//!
+
//! [`Connections`]: crate::connections::state::Connections
+

+
use localtime::LocalDuration;
+

+
// TODO(finto): these are realistically only used here. I think that components
+
// should define their own configuration values, that eventually compose into
+
// the final larger configuration. I think this would result in a more useful
+
// layout of the config, e.g. connections.inbound.rateLimit,
+
// connections.outbound.rateLimit, connections.duration.idle, etc.
+
use radicle::node::config::{RateLimit, RateLimits};
+

+
/// How often to run the "idle" task.
+
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
+
/// How much time should pass after a peer was last active for a *ping* to be sent.
+
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
+
/// Duration to wait on an unresponsive peer before dropping its connection.
+
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
+
/// Minimum amount of time to wait before reconnecting to a peer.
+
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
+
/// Maximum amount of time to wait before reconnecting to a peer.
+
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
+
/// Target number of peers to maintain connections to.
+
pub const TARGET_OUTBOUND_PEERS: usize = 8;
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Config {
+
    /// Configurations for connection durations, such as idleness, keep alive,
+
    /// reconnection delays, etc.
+
    pub durations: Durations,
+
    /// Configurations for managing outbound connections.
+
    pub outbound: Outbound,
+
    /// Configurations for managing outbound connections.
+
    pub inbound: Inbound,
+
}
+

+
impl Config {
+
    /// The duration for a connection to be considered "idle".
+
    pub fn idle(&self) -> LocalDuration {
+
        self.durations.idle
+
    }
+

+
    /// How much time should pass after a peer was last active for a *ping* to be sent.
+
    pub fn keep_alive(&self) -> LocalDuration {
+
        self.durations.keep_alive
+
    }
+

+
    /// Duration to wait on an unresponsive peer before dropping its connection.
+
    pub fn stale(&self) -> LocalDuration {
+
        self.durations.stale
+
    }
+

+
    /// Target number of peers to maintain connections to.
+
    pub fn outbound_target(&self) -> usize {
+
        self.outbound.target
+
    }
+

+
    /// Maximum number of allowed inbound connections.
+
    pub fn max_inbound(&self) -> usize {
+
        self.inbound.maximum
+
    }
+

+
    /// The rate limits for an inbound connection.
+
    pub fn inbound_rate_limit(&self) -> RateLimit {
+
        self.inbound.rate_limit
+
    }
+

+
    /// The rate limits for an outbound connection.
+
    pub fn outbound_rate_limit(&self) -> RateLimit {
+
        self.outbound.rate_limit
+
    }
+

+
    /// The minimum and maximum durations before attempting reconnecting to a
+
    /// node.
+
    pub fn reconnection_delay(&self) -> ReconnectionDelay {
+
        self.durations.reconnection_delay
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Durations {
+
    /// Duration for a connection to be considered idle.
+
    pub idle: LocalDuration,
+
    /// Duration to wait until a ping is sent to a connection.
+
    pub keep_alive: LocalDuration,
+
    /// Duration to wait until a connection is considered stale.
+
    pub stale: LocalDuration,
+
    /// Configure the minimum and maximum delay durations for attempting
+
    /// reconnections.
+
    pub reconnection_delay: ReconnectionDelay,
+
}
+

+
impl Default for Durations {
+
    fn default() -> Self {
+
        Self {
+
            idle: IDLE_INTERVAL,
+
            keep_alive: KEEP_ALIVE_DELTA,
+
            stale: STALE_CONNECTION_TIMEOUT,
+
            reconnection_delay: ReconnectionDelay::default(),
+
        }
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Outbound {
+
    /// Rate limiting of inbound connection actions.
+
    pub rate_limit: RateLimit,
+
    /// Target number of outbound connections that we want to reach.
+
    pub target: usize,
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Inbound {
+
    /// Rate limiting of inbound connection actions.
+
    pub rate_limit: RateLimit,
+
    /// The maximum number of inbound connections allowed.
+
    pub maximum: usize,
+
}
+

+
impl From<RateLimit> for Inbound {
+
    fn from(limit: RateLimit) -> Self {
+
        let maximum = limit.capacity;
+
        Self {
+
            rate_limit: limit,
+
            maximum,
+
        }
+
    }
+
}
+

+
pub struct Limits {
+
    /// The rate limits for each direction of connection.
+
    ///
+
    /// This applies to rate limiting incoming connections to accept, and the
+
    /// incoming protocol messages.
+
    pub rates: RateLimits,
+
    /// Allowed maximum number of inbound connections.
+
    pub max_inbound: usize,
+
}
+

+
pub struct Reconnection {
+
    pub delay: ReconnectionDelay,
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct ReconnectionDelay {
+
    /// The minimum amount of time to wait before attempting a reconnection.
+
    pub min_delta: LocalDuration,
+
    /// The maximum amount of time to wait before attempting a reconnection.
+
    pub max_delta: LocalDuration,
+
}
+

+
impl Default for ReconnectionDelay {
+
    fn default() -> Self {
+
        Self {
+
            min_delta: MIN_RECONNECTION_DELTA,
+
            max_delta: MAX_RECONNECTION_DELTA,
+
        }
+
    }
+
}
modified crates/radicle-protocol/src/connections/state.rs
@@ -346,7 +346,7 @@ impl Connections {
                // Only stabilise sessions that are not already marked as stable
                if !session.is_stable() {
                    let stable = session
-
                        .stabilise(now, self.config.stale_connection)
+
                        .stabilise(now, self.config.stale())
                        .then_some(session.clone());
                    stabilised.extend(stable);
                    stabilised
@@ -366,7 +366,7 @@ impl Connections {
        mut ping: impl FnMut() -> message::Ping + 'a,
        now: LocalTime,
    ) -> impl Iterator<Item = event::Ping> + 'a {
-
        let keep_alive = self.config.keep_alive;
+
        let keep_alive = self.config.keep_alive();
        self.sessions
            .inactive(now, keep_alive)
            .map(move |(_, session)| event::Ping {
@@ -400,8 +400,8 @@ impl Connections {
        if self.sessions.is_diconnected(&node) {
            return event::HandledMessage::Disconnected { node };
        }
-
        let outbound_limit = RateLimit::from(self.config.limits.outbound);
-
        let inbound_limit = RateLimit::from(self.config.limits.inbound);
+
        let outbound_limit = self.config.outbound.rate_limit;
+
        let inbound_limit = self.config.inbound.rate_limit;
        let result =
            self.sessions
                .while_connecting(&node, None, connection_type, now, |connected| {
@@ -499,18 +499,17 @@ impl Connections {
        &self,
        now: &LocalTime,
    ) -> impl Iterator<Item = (&NodeId, &session::Session<session::Connected>)> {
-
        self.sessions
-
            .unresponsive(*now, self.config.stale_connection)
+
        self.sessions.unresponsive(*now, self.config.stale())
    }

    fn has_reached_inbound_limit(&self) -> bool {
-
        self.sessions.connected_inbound() >= self.config.inbound_limit
+
        self.sessions.connected_inbound() >= self.config.inbound.maximum
    }

    fn has_reached_ip_limit(&mut self, ip: &IpAddr, now: LocalTime) -> bool {
        let addr = HostName::from(*ip);
        self.limiter
-
            .limit(addr, None, &self.config.limits.inbound, now)
+
            .limit(addr, None, &self.config.inbound.rate_limit, now)
    }

    fn reason_severity(&self, reason: &DisconnectReason, now: LocalTime) -> Severity {
@@ -544,14 +543,14 @@ impl Connections {
    }

    fn idle(&self) -> LocalDuration {
-
        self.config.idle
+
        self.config.idle()
    }

    fn reconnection_delay(&self, attempts: Attempts) -> LocalDuration {
        let attempts = u32::try_from(usize::from(attempts)).unwrap_or(u32::MAX);
        LocalDuration::from_secs(2u64.saturating_pow(attempts)).clamp(
-
            self.config.reconnection_delay.min_delta,
-
            self.config.reconnection_delay.max_delta,
+
            self.config.reconnection_delay().min_delta,
+
            self.config.reconnection_delay().max_delta,
        )
    }