Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Track peer latency
cloudhead committed 2 years ago
commit d6d90c6fdcd052dcbe44a856acab59f24d482fd1
parent 664cf20c92548d7f2fff6191c783c0b2938e6d5b
3 files changed +39 -10
modified radicle-node/src/service.rs
@@ -78,6 +78,8 @@ pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
/// How much time should pass after a peer was last active for a *ping* to be sent.
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
+
/// Maximum number of latency values to keep for a session.
+
pub const MAX_LATENCIES: usize = 16;
/// Maximum time difference between the local time, and an announcement timestamp.
pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Maximum attempts to connect to a peer before we give up.
@@ -1653,10 +1655,24 @@ where
                    },
                );
            }
-
            (session::State::Connected { ping, .. }, Message::Pong { zeroes }) => {
-
                if let session::PingState::AwaitingResponse(ponglen) = *ping {
+
            (
+
                session::State::Connected {
+
                    ping, latencies, ..
+
                },
+
                Message::Pong { zeroes },
+
            ) => {
+
                if let session::PingState::AwaitingResponse {
+
                    len: ponglen,
+
                    since,
+
                } = *ping
+
                {
                    if (ponglen as usize) == zeroes.len() {
                        *ping = session::PingState::Ok;
+
                        // Keep track of peer latency.
+
                        latencies.push_back(self.clock - since);
+
                        if latencies.len() > MAX_LATENCIES {
+
                            latencies.pop_front();
+
                        }
                    }
                }
            }
@@ -2036,7 +2052,7 @@ where
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
            .map(|(_, session)| session);
        for session in inactive_sessions {
-
            session.ping(&mut self.outbox).ok();
+
            session.ping(self.clock, &mut self.outbox).ok();
        }
    }

modified radicle-node/src/service/session.rs
@@ -1,4 +1,4 @@
-
use std::collections::HashSet;
+
use std::collections::{HashSet, VecDeque};
use std::fmt;

use crate::node::config::Limits;
@@ -119,6 +119,7 @@ impl Session {
                since: time,
                ping: PingState::default(),
                fetching: HashSet::default(),
+
                latencies: VecDeque::default(),
            },
            link: Link::Inbound,
            subscribe: None,
@@ -212,6 +213,7 @@ impl Session {
            since,
            ping: PingState::default(),
            fetching: HashSet::default(),
+
            latencies: VecDeque::default(),
        };
    }

@@ -231,11 +233,13 @@ impl Session {
        self.state = State::Initial;
    }

-
    pub fn ping(&mut self, reactor: &mut Outbox) -> Result<(), Error> {
+
    pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
        if let State::Connected { ping, .. } = &mut self.state {
            let msg = message::Ping::new(&mut self.rng);
-
            *ping = PingState::AwaitingResponse(msg.ponglen);
-

+
            *ping = PingState::AwaitingResponse {
+
                len: msg.ponglen,
+
                since,
+
            };
            reactor.write(self, Message::Ping(msg));
        }
        Ok(())
modified radicle/src/node.rs
@@ -11,7 +11,7 @@ pub mod policy;
pub mod routing;
pub mod seed;

-
use std::collections::{BTreeSet, HashMap, HashSet};
+
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader};
use std::ops::{ControlFlow, Deref};
use std::os::unix::net::UnixStream;
@@ -21,7 +21,7 @@ use std::{fmt, io, net, thread, time};

use amplify::WrapperMut;
use cyphernet::addr::NetAddr;
-
use localtime::LocalTime;
+
use localtime::{LocalDuration, LocalTime};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json as json;
@@ -73,7 +73,12 @@ pub enum PingState {
    /// The peer has not been sent a ping.
    None,
    /// A ping has been sent and is waiting on the peer's response.
-
    AwaitingResponse(u16),
+
    AwaitingResponse {
+
        /// Length of pong payload expected.
+
        len: u16,
+
        /// Since when are we waiting.
+
        since: LocalTime,
+
    },
    /// The peer was successfully pinged.
    Ok,
}
@@ -97,6 +102,9 @@ pub enum State {
        ping: PingState,
        /// Ongoing fetches.
        fetching: HashSet<RepoId>,
+
        /// Measured latencies for this peer.
+
        #[serde(skip)]
+
        latencies: VecDeque<LocalDuration>,
    },
    /// When a peer is disconnected.
    #[serde(rename_all = "camelCase")]
@@ -1177,6 +1185,7 @@ mod test {
                since: LocalTime::now(),
                ping: Default::default(),
                fetching: Default::default(),
+
                latencies: VecDeque::default(),
            }))
            .unwrap(),
        )