Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Service: drop unresponsive peers
Slack Coder committed 3 years ago
commit 14690d19513d144f54db64bf1232e8ca3e22cada
parent 240825387d260b8415838af8987ec5870298403e
7 files changed +278 -10
modified radicle-node/src/service.rs
@@ -33,12 +33,12 @@ use crate::node;
use crate::service::config::ProjectTracking;
use crate::service::message::{Address, Announcement, AnnouncementMessage};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
-
use crate::service::peer::{SessionError, SessionState};
+
use crate::service::peer::{PingState, SessionError, SessionState};
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};

pub use crate::service::config::{Config, Network};
-
pub use crate::service::message::{Envelope, Message};
+
pub use crate::service::message::{Envelope, Message, ZeroBytes};
pub use crate::service::peer::Session;

use self::gossip::Gossip;
@@ -53,6 +53,11 @@ pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
+

+
/// Duration to wait on an unresponsive peer before dropping its connection.
+
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_secs(60);
+

+
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_secs(30);
pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);

/// Network node identifier.
@@ -362,6 +367,8 @@ where
        if now - self.last_idle >= IDLE_INTERVAL {
            debug!("Running 'idle' task...");

+
            self.keep_alive(&now);
+
            self.disconnect_unresponsive_peers(&now);
            self.maintain_connections();
            self.reactor.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
@@ -485,7 +492,7 @@ where
        let peer = self
            .sessions
            .entry(ip)
-
            .or_insert_with(|| Session::new(*addr, Link::Outbound, persistent));
+
            .or_insert_with(|| Session::new(*addr, Link::Outbound, persistent, &self.rng));

        peer.attempted();
    }
@@ -522,7 +529,12 @@ where
        } else {
            self.sessions.insert(
                ip,
-
                Session::new(addr, Link::Inbound, self.config.is_persistent(&address)),
+
                Session::new(
+
                    addr,
+
                    Link::Inbound,
+
                    self.config.is_persistent(&address),
+
                    &self.rng,
+
                ),
            );
        }
    }
@@ -724,13 +736,14 @@ where
        } else {
            return Err(SessionError::NotFound(remote.ip()));
        };
+
        peer.last_active = self.clock.local_time();

        if envelope.magic != self.config.network.magic() {
            return Err(SessionError::WrongMagic(envelope.magic));
        }
        debug!("Received {:?} from {}", &envelope.msg, peer.ip());

-
        match (&peer.state, envelope.msg) {
+
        match (&mut peer.state, envelope.msg) {
            (
                SessionState::Initial,
                Message::Initialize {
@@ -764,6 +777,7 @@ where
                    since: self.clock.local_time(),
                    addrs,
                    git,
+
                    ping: Default::default(),
                };
            }
            (SessionState::Initial, _) => {
@@ -812,6 +826,19 @@ where
                );
                return Err(SessionError::Misbehavior);
            }
+
            (SessionState::Negotiated { .. }, Message::Ping { ponglen, .. }) => {
+
                let resp = Message::Pong {
+
                    zeroes: ZeroBytes::new(ponglen),
+
                };
+
                self.reactor.write(peer.addr, resp);
+
            }
+
            (SessionState::Negotiated { ping, .. }, Message::Pong { zeroes }) => {
+
                if let PingState::AwaitingResponse(ponglen) = *ping {
+
                    if (ponglen as usize) == zeroes.len() {
+
                        *ping = PingState::Ok;
+
                    }
+
                }
+
            }
            (SessionState::Disconnected { .. }, msg) => {
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.ip());
            }
@@ -883,6 +910,29 @@ where
        Ok(())
    }

+
    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
+
        let stale = self
+
            .sessions
+
            .negotiated()
+
            .filter(|(_, _, session)| session.last_active < *now - STALE_CONNECTION_TIMEOUT);
+
        for (_, _, session) in stale {
+
            self.reactor
+
                .disconnect(session.addr, DisconnectReason::Error(SessionError::Timeout));
+
        }
+
    }
+

+
    /// Ensure connection health by pinging connected peers.
+
    fn keep_alive(&mut self, now: &LocalTime) {
+
        let inactive_sessions = self
+
            .sessions
+
            .negotiated_mut()
+
            .filter(|(_, session)| session.last_active < *now - KEEP_ALIVE_DELTA)
+
            .map(|(_, session)| session);
+
        for session in inactive_sessions {
+
            session.ping(&mut self.reactor, &self.rng).ok();
+
        }
+
    }
+

    fn maintain_connections(&mut self) {
        // TODO: Connect to all potential seeds.
        if self.sessions.len() < TARGET_OUTBOUND_PEERS {
@@ -1080,6 +1130,11 @@ impl Sessions {
                _ => None,
            })
    }
+

+
    /// Iterator over mutable fully negotiated peers.
+
    pub fn negotiated_mut(&mut self) -> impl Iterator<Item = (&IpAddr, &mut Session)> {
+
        self.0.iter_mut().filter(move |(_, p)| p.is_negotiated())
+
    }
}

impl Deref for Sessions {
modified radicle-node/src/service/message.rs
@@ -315,6 +315,23 @@ pub enum Message {
    /// Gossip announcement. These messages are relayed to peers, and filtered
    /// using [`Message::Subscribe`].
    Announcement(Announcement),
+

+
    /// Ask a connected peer for a Pong.
+
    ///
+
    /// Use to check if the remote peer is responsive or a side-effect free way to keep a
+
    /// connection alive.
+
    Ping {
+
        /// The desired response length
+
        ponglen: u16,
+
        /// The ping payload.
+
        zeroes: ZeroBytes,
+
    },
+

+
    /// Response to `Ping` message.
+
    Pong {
+
        /// The pong payload.
+
        zeroes: ZeroBytes,
+
    },
}

impl Message {
@@ -373,10 +390,29 @@ impl fmt::Debug for Message {
            Self::Announcement(Announcement { node, message, .. }) => {
                write!(f, "Announcement({}, {:?})", node, message)
            }
+
            Self::Ping { ponglen, zeroes } => write!(f, "Ping({ponglen}, {:?})", zeroes),
+
            Self::Pong { zeroes } => write!(f, "Pong({:?})", zeroes),
        }
    }
}

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct ZeroBytes(u16);
+

+
impl ZeroBytes {
+
    pub fn new(arg: u16) -> Self {
+
        ZeroBytes(arg)
+
    }
+

+
    pub fn is_empty(&self) -> bool {
+
        self.0 == 0
+
    }
+

+
    pub fn len(&self) -> usize {
+
        self.0.into()
+
    }
+
}
+

#[cfg(test)]
mod tests {
    use super::*;
modified radicle-node/src/service/peer.rs
@@ -1,5 +1,19 @@
use crate::service::message::*;
use crate::service::*;
+
use crate::wire;
+

+
use std::mem::size_of;
+

+
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
+
pub enum PingState {
+
    #[default]
+
    /// The peer has not been sent a ping.
+
    None,
+
    /// A ping has been sent and is waiting on the peer's response.
+
    AwaitingResponse(u16),
+
    /// The peer was successfully pinged.
+
    Ok,
+
}

#[derive(Debug, Default, Clone)]
#[allow(clippy::large_enum_variant)]
@@ -18,6 +32,7 @@ pub enum SessionState {
        /// Addresses this peer is reachable on.
        addrs: Vec<Address>,
        git: Url,
+
        ping: PingState,
    },
    /// When a peer is disconnected.
    Disconnected { since: LocalTime },
@@ -37,6 +52,8 @@ pub enum SessionError {
    VerificationFailed(#[from] storage::VerifyError),
    #[error("peer misbehaved")]
    Misbehavior,
+
    #[error("peer timed out")]
+
    Timeout,
}

/// A peer session. Each connected peer will have one session.
@@ -53,22 +70,29 @@ pub struct Session {
    pub state: SessionState,
    /// Peer subscription.
    pub subscribe: Option<Subscribe>,
+
    /// Last time a message was received from the peer.
+
    pub last_active: LocalTime,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
    /// upon successful connection.
    attempts: usize,
+

+
    /// Source of entropy.
+
    rng: Rng,
}

impl Session {
-
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool) -> Self {
+
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool, rng: &Rng) -> Self {
        Self {
            addr,
            state: SessionState::default(),
            link,
            subscribe: None,
            persistent,
+
            last_active: LocalTime::default(),
            attempts: 0,
+
            rng: rng.clone(),
        }
    }

@@ -91,4 +115,20 @@ impl Session {
    pub fn connected(&mut self, _link: Link) {
        self.attempts = 0;
    }
+

+
    pub fn ping(&mut self, reactor: &mut Reactor, rng: &Rng) -> Result<(), SessionError> {
+
        if let SessionState::Negotiated { ping, .. } = &mut self.state {
+
            let ponglen = rng.u16(0..wire::message::MAX_PAYLOAD_SIZE_BYTES);
+
            let msg = Message::Ping {
+
                ponglen,
+
                zeroes: message::ZeroBytes::new(
+
                    rng.u16(0..(wire::message::MAX_PAYLOAD_SIZE_BYTES - (size_of::<u16>() as u16))),
+
                ),
+
            };
+
            reactor.write(self.addr, msg);
+

+
            *ping = PingState::AwaitingResponse(ponglen);
+
        }
+
        Ok(())
+
    }
}
modified radicle-node/src/test/arbitrary.rs
@@ -8,7 +8,7 @@ use crate::prelude::{Id, NodeId, Refs, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
use crate::service::message::{
    Address, Announcement, Envelope, InventoryAnnouncement, Message, NodeAnnouncement,
-
    RefsAnnouncement, Subscribe,
+
    RefsAnnouncement, Subscribe, ZeroBytes,
};
use crate::wire::message::MessageType;

@@ -45,6 +45,8 @@ impl Arbitrary for Message {
                MessageType::NodeAnnouncement,
                MessageType::RefsAnnouncement,
                MessageType::Subscribe,
+
                MessageType::Ping,
+
                MessageType::Pong,
            ])
            .unwrap();

@@ -93,6 +95,13 @@ impl Arbitrary for Message {
                since: Timestamp::arbitrary(g),
                until: Timestamp::arbitrary(g),
            }),
+
            MessageType::Ping => Self::Ping {
+
                ponglen: u16::arbitrary(g),
+
                zeroes: ZeroBytes::arbitrary(g),
+
            },
+
            MessageType::Pong => Self::Pong {
+
                zeroes: ZeroBytes::arbitrary(g),
+
            },
            _ => unreachable!(),
        }
    }
@@ -115,3 +124,9 @@ impl Arbitrary for Address {
        }
    }
}
+

+
impl Arbitrary for ZeroBytes {
+
    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
+
        ZeroBytes::new(u16::arbitrary(g))
+
    }
+
}
modified radicle-node/src/test/peer.rs
@@ -22,7 +22,7 @@ use crate::storage::WriteStorage;
use crate::test::arbitrary;
use crate::test::signer::MockSigner;
use crate::test::simulator;
-
use crate::{Link, LocalTime};
+
use crate::{Link, LocalDuration, LocalTime};

/// Service instantiation used for testing.
pub type Service<S, G> = service::Service<routing::Table, address::Book, S, G>;
@@ -251,6 +251,11 @@ where
        );
    }

+
    pub fn elapse(&mut self, duration: LocalDuration) {
+
        self.clock().elapse(duration);
+
        self.service.wake();
+
    }
+

    /// Drain outgoing messages sent from this peer to the remote address.
    pub fn messages(&mut self, remote: &net::SocketAddr) -> impl Iterator<Item = Message> {
        let mut msgs = Vec::new();
modified radicle-node/src/test/tests.rs
@@ -37,6 +37,70 @@ use crate::{client, git, identity, rad, service, test};
// You may then run the test with eg. `cargo test -- --nocapture` to always show output.

#[test]
+
fn test_ping_response() {
+
    let mut alice = Peer::new("alice", [8, 8, 8, 8], MockStorage::empty());
+
    let bob = Peer::new("bob", [9, 9, 9, 9], MockStorage::empty());
+

+
    alice.connect_to(&bob);
+
    alice.receive(
+
        &bob.addr(),
+
        Message::Ping {
+
            ponglen: 21,
+
            zeroes: ZeroBytes::new(42),
+
        },
+
    );
+
    assert_matches!(
+
        alice.messages(&bob.addr()).next(),
+
        Some(Message::Pong { zeroes }) if zeroes.len() == 21,
+
        "respond with correctly formatted pong",
+
    );
+
}
+

+
#[test]
+
fn test_disconnecting_unresponsive_peer() {
+
    let mut alice = Peer::new("alice", [8, 8, 8, 8], MockStorage::empty());
+
    let bob = Peer::new("bob", [9, 9, 9, 9], MockStorage::empty());
+

+
    alice.connect_to(&bob);
+
    assert_eq!(1, alice.sessions().negotiated().count(), "bob connects");
+
    alice.elapse(STALE_CONNECTION_TIMEOUT + LocalDuration::from_secs(1));
+
    alice
+
        .outbox()
+
        .find(|m| matches!(m, &Io::Disconnect(addr, _) if addr == bob.addr()))
+
        .expect("disconnect an unresponsive bob");
+
}
+

+
#[test]
+
fn test_connection_kept_alive() {
+
    let mut alice = Peer::new("alice", [8, 8, 8, 8], MockStorage::empty());
+
    let mut bob = Peer::new("bob", [9, 9, 9, 9], MockStorage::empty());
+

+
    let mut sim = Simulation::new(
+
        LocalTime::now(),
+
        alice.rng.clone(),
+
        simulator::Options::default(),
+
    )
+
    .initialize([&mut alice, &mut bob]);
+

+
    alice.command(service::Command::Connect(bob.addr()));
+
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
+
    assert_eq!(1, alice.sessions().negotiated().count(), "bob connects");
+

+
    let mut elapsed: LocalDuration = LocalDuration::from_secs(0);
+
    let step: LocalDuration = STALE_CONNECTION_TIMEOUT / 10;
+
    while elapsed < STALE_CONNECTION_TIMEOUT + step {
+
        alice.elapse(step);
+
        bob.elapse(step);
+
        sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
+

+
        elapsed = elapsed + step;
+
    }
+

+
    assert_eq!(1, alice.sessions().len(), "alice remains connected to Bob");
+
    assert_eq!(1, bob.sessions().len(), "bob remains connected to Alice");
+
}
+

+
#[test]
fn test_outbound_connection() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8], MockStorage::empty());
    let bob = Peer::new("bob", [9, 9, 9, 9], MockStorage::empty());
@@ -596,8 +660,6 @@ fn test_persistent_peer_reconnect() {

#[test]
fn test_push_and_pull() {
-
    logger::init(log::Level::Debug);
-

    let tempdir = tempfile::tempdir().unwrap();

    let storage_alice = Storage::open(tempdir.path().join("alice").join("storage")).unwrap();
modified radicle-node/src/wire/message.rs
@@ -7,6 +7,11 @@ use crate::prelude::*;
use crate::service::message::*;
use crate::wire;

+
use std::mem::size_of;
+

+
/// The maximum supported message size in bytes.
+
pub const MAX_PAYLOAD_SIZE_BYTES: u16 = u16::MAX - (size_of::<MessageType>() as u16);
+

/// Message type.
#[repr(u16)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -16,6 +21,8 @@ pub enum MessageType {
    InventoryAnnouncement = 4,
    RefsAnnouncement = 6,
    Subscribe = 8,
+
    Ping = 10,
+
    Pong = 12,
}

impl From<MessageType> for u16 {
@@ -34,6 +41,8 @@ impl TryFrom<u16> for MessageType {
            4 => Ok(MessageType::InventoryAnnouncement),
            6 => Ok(MessageType::RefsAnnouncement),
            8 => Ok(MessageType::Subscribe),
+
            10 => Ok(MessageType::Ping),
+
            12 => Ok(MessageType::Pong),
            _ => Err(other),
        }
    }
@@ -49,6 +58,8 @@ impl Message {
                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
            },
+
            Self::Ping { .. } => MessageType::Ping,
+
            Self::Pong { .. } => MessageType::Pong,
        }
        .into()
    }
@@ -188,6 +199,13 @@ impl wire::Encode for Message {
                n += message.encode(writer)?;
                n += signature.encode(writer)?;
            }
+
            Self::Ping { ponglen, zeroes } => {
+
                n += ponglen.encode(writer)?;
+
                n += zeroes.encode(writer)?;
+
            }
+
            Self::Pong { zeroes } => {
+
                n += zeroes.encode(writer)?;
+
            }
        }
        Ok(n)
    }
@@ -258,6 +276,15 @@ impl wire::Decode for Message {
                }
                .into())
            }
+
            Ok(MessageType::Ping) => {
+
                let ponglen = u16::decode(reader)?;
+
                let zeroes = ZeroBytes::decode(reader)?;
+
                Ok(Self::Ping { ponglen, zeroes })
+
            }
+
            Ok(MessageType::Pong) => {
+
                let zeroes = ZeroBytes::decode(reader)?;
+
                Ok(Self::Pong { zeroes })
+
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }
    }
@@ -335,6 +362,26 @@ impl wire::Decode for Address {
    }
}

+
impl wire::Encode for ZeroBytes {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = (self.len() as u16).encode(writer)?;
+
        for _ in 0..self.len() {
+
            n += 0u8.encode(writer)?
+
        }
+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for ZeroBytes {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let zeroes = u16::decode(reader)?;
+
        for _ in 0..zeroes {
+
            _ = u8::decode(reader)?;
+
        }
+
        Ok(ZeroBytes::new(zeroes))
+
    }
+
}
+

#[cfg(test)]
mod tests {
    use super::*;
@@ -378,6 +425,14 @@ mod tests {
    }

    #[quickcheck]
+
    fn prop_zero_bytes_encode_decode(zeroes: ZeroBytes) {
+
        assert_eq!(
+
            wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
+
            zeroes
+
        );
+
    }
+

+
    #[quickcheck]
    fn prop_addr(addr: Address) {
        assert_eq!(
            wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),