Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol(connections): pong command
Fintan Halpenny committed 10 months ago
commit 8eaaae769407c100476a8d01310b028bcd2be373
parent db6337e73043b1ef35e28548ee556d566cf5fda0
3 files changed +72 -4
modified crates/radicle-protocol/src/connections.rs
@@ -8,7 +8,7 @@ pub mod events;

use radicle::node::config::RateLimits;
use radicle::prelude::RepoId;
-
use session::{HasAttempts as _, Session, Sessions};
+
use session::{HasAttempts as _, Pinged, Session, Sessions};
mod session;

use std::collections::HashSet;
@@ -80,6 +80,10 @@ pub enum CommandEvent {
        node: NodeId,
        rid: RepoId,
    },
+
    Pinged {
+
        node: NodeId,
+
        pinged: Option<Pinged>,
+
    },
}

impl Connections {
@@ -90,6 +94,7 @@ impl Connections {
            commands::Command::Disconnect(disconnect) => self.disconnected(disconnect),
            commands::Command::Subscribe(subscribe) => self.subscribed(subscribe),
            commands::Command::SubscribeTo(subscribe) => self.subscribed_to(subscribe),
+
            commands::Command::Pong(pong) => self.pinged(pong),
        }
    }

@@ -150,6 +155,13 @@ impl Connections {
            CommandEvent::MissingSession { node }
        }
    }
+

+
    fn pinged(&mut self, commands::Pong { node, pong }: commands::Pong) -> CommandEvent {
+
        self.sessions.pinged(&node, pong).map_or_else(
+
            |session::Missing| CommandEvent::MissingSession { node },
+
            |pinged| CommandEvent::Pinged { node, pinged },
+
        )
+
    }
}

impl Connections {
modified crates/radicle-protocol/src/connections/commands.rs
@@ -6,12 +6,15 @@ use radicle::{

use crate::service::message;

+
use super::session;
+

pub enum Command {
    Attempt(Attempt),
    Connect(Connect),
    Disconnect(Disconnect),
    Subscribe(Subscribe),
    SubscribeTo(SubscribeTo),
+
    Pong(Pong),
}

impl From<Attempt> for Command {
@@ -44,6 +47,12 @@ impl From<SubscribeTo> for Command {
    }
}

+
impl From<Pong> for Command {
+
    fn from(v: Pong) -> Self {
+
        Self::Pong(v)
+
    }
+
}
+

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Attempt {
    pub node: NodeId,
@@ -75,3 +84,9 @@ pub struct SubscribeTo {
    pub node: NodeId,
    pub rid: RepoId,
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pong {
+
    pub node: NodeId,
+
    pub pong: session::Pong,
+
}
modified crates/radicle-protocol/src/connections/session.rs
@@ -6,7 +6,7 @@ use radicle::{
    prelude::RepoId,
};

-
use crate::service::message;
+
use crate::service::{message, ZeroBytes, MAX_LATENCIES};

/// Time after which a connection is considered stable.
#[allow(unused)]
@@ -55,6 +55,9 @@ impl HasAttempts for State {
    }
}

+
/// Marker type for when a [`NodeId`] is missing from [`Sessions`].
+
pub struct Missing;
+

pub struct Sessions {
    initial: HashMap<NodeId, Session<Initial>>,
    attempted: HashMap<NodeId, Session<Attempted>>,
@@ -251,6 +254,11 @@ impl Sessions {
        false
    }

+
    pub fn pinged(&mut self, node: &NodeId, pong: Pong) -> Result<Option<Pinged>, Missing> {
+
        let session = self.connected.get_mut(node).ok_or(Missing)?;
+
        Ok(session.pinged(pong))
+
    }
+

    fn remove_session(&mut self, node: &NodeId) -> Option<Session<State>> {
        self.initial
            .remove(node)
@@ -537,9 +545,21 @@ impl Connected {
    }
}

+
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Ping {
-
    since: LocalTime,
-
    rng: fastrand::Rng,
+
    pub since: LocalTime,
+
    pub rng: fastrand::Rng,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pong {
+
    pub now: LocalTime,
+
    pub zeroes: ZeroBytes,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pinged {
+
    pub latency: LocalDuration,
}

impl Session<Connected> {
@@ -570,6 +590,27 @@ impl Session<Connected> {
        msg
    }

+
    pub fn pinged(&mut self, Pong { zeroes, now }: Pong) -> Option<Pinged> {
+
        if let PingState::AwaitingResponse {
+
            len: ponglen,
+
            since,
+
        } = self.state.ping
+
        {
+
            if (ponglen as usize) == zeroes.len() {
+
                self.state.ping = PingState::Ok;
+
                let latency = now - since;
+
                self.state.latencies.push_back(latency);
+
                // TODO(finto): MAX_LATENCIES should likely be configured
+
                // somewhere else
+
                if self.state.latencies.len() > MAX_LATENCIES {
+
                    self.state.latencies.pop_front();
+
                }
+
                return Some(Pinged { latency });
+
            }
+
        }
+
        None
+
    }
+

    pub fn idle(&mut self, now: LocalTime, stable_threshold: LocalDuration) {
        let Connected {
            since,