Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: follow up on session renaming
Slack Coder committed 3 years ago
commit a8895aee80324a24f46f7448e88f75ecd207f2b6
parent c11f5b13bfd73bec3f0f2749aa0ca1b89018eae3
5 files changed +158 -155
modified radicle-node/src/service.rs
@@ -1,9 +1,9 @@
pub mod config;
pub mod filter;
pub mod message;
-
pub mod peer;
pub mod reactor;
pub mod routing;
+
pub mod session;

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
@@ -32,13 +32,12 @@ use crate::node;
use crate::service::config::ProjectTracking;
use crate::service::message::{Address, Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
-
use crate::service::peer::{PingState, SessionError, SessionState};
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};

pub use crate::service::config::{Config, Network};
pub use crate::service::message::{Message, ZeroBytes};
-
pub use crate::service::peer::Session;
+
pub use crate::service::session::Session;

use self::gossip::Gossip;
use self::message::InventoryAnnouncement;
@@ -550,7 +549,7 @@ where
        debug!("Disconnected from {} ({})", ip, reason);

        if let Some(peer) = self.sessions.get_mut(&ip) {
-
            peer.state = SessionState::Disconnected { since };
+
            peer.state = session::State::Disconnected { since };

            // Attempt to re-connect to persistent peers.
            if self.config.is_persistent(&address) && peer.attempts() < MAX_CONNECTION_ATTEMPTS {
@@ -579,7 +578,7 @@ where

    pub fn received_message(&mut self, addr: &net::SocketAddr, message: Message) {
        match self.handle_message(addr, message) {
-
            Err(SessionError::NotFound(ip)) => {
+
            Err(session::Error::NotFound(ip)) => {
                error!("Session not found for {ip}");
            }
            Err(err) => {
@@ -602,9 +601,9 @@ where
        &mut self,
        session: &NodeId,
        announcement: &Announcement,
-
    ) -> Result<bool, peer::SessionError> {
+
    ) -> Result<bool, session::Error> {
        if !announcement.verify() {
-
            return Err(SessionError::Misbehavior);
+
            return Err(session::Error::Misbehavior);
        }
        let Announcement { node, message, .. } = announcement;
        let now = self.clock.local_time();
@@ -614,7 +613,7 @@ where

        // Don't allow messages from too far in the future.
        if timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
            return Err(SessionError::InvalidTimestamp(timestamp));
+
            return Err(session::Error::InvalidTimestamp(timestamp));
        }

        match message {
@@ -632,7 +631,7 @@ where
                    if let Error::Fetch(storage::FetchError::Verify(err)) = err {
                        // Disconnect the peer if it is the signer of this message.
                        if node == session {
-
                            return Err(peer::SessionError::VerificationFailed(err));
+
                            return Err(session::Error::VerificationFailed(err));
                        }
                    }
                    // There's not much we can do if the peer sending us this message isn't the
@@ -741,21 +740,21 @@ where
        &mut self,
        remote: &net::SocketAddr,
        message: Message,
-
    ) -> Result<(), peer::SessionError> {
+
    ) -> Result<(), session::Error> {
        let peer_ip = remote.ip();
        let peer = if let Some(peer) = self.sessions.get_mut(&peer_ip) {
            peer
        } else {
-
            return Err(SessionError::NotFound(remote.ip()));
+
            return Err(session::Error::NotFound(remote.ip()));
        };
        peer.last_active = self.clock.local_time();

        debug!("Received {:?} from {}", &message, peer.ip());

        match (&mut peer.state, message) {
-
            (SessionState::Initial, Message::Initialize { id, version, addrs }) => {
+
            (session::State::Initial, Message::Initialize { id, version, addrs }) => {
                if version != PROTOCOL_VERSION {
-
                    return Err(SessionError::WrongVersion(version));
+
                    return Err(session::Error::WrongVersion(version));
                }
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
                // extra "acknowledgment" message sent when the `Initialize` is well received.
@@ -773,22 +772,22 @@ where
                // Nb. we don't set the peer timestamp here, since it is going to be
                // set after the first message is received only. Setting it here would
                // mean that messages received right after the handshake could be ignored.
-
                peer.state = SessionState::Negotiated {
+
                peer.state = session::State::Negotiated {
                    id,
                    since: self.clock.local_time(),
                    addrs,
                    ping: Default::default(),
                };
            }
-
            (SessionState::Initial, _) => {
+
            (session::State::Initial, _) => {
                debug!(
                    "Disconnecting peer {} for sending us a message before handshake",
                    peer.ip()
                );
-
                return Err(SessionError::Misbehavior);
+
                return Err(session::Error::Misbehavior);
            }
            // Process a peer announcement.
-
            (SessionState::Negotiated { id, .. }, Message::Announcement(ann)) => {
+
            (session::State::Negotiated { id, .. }, Message::Announcement(ann)) => {
                let id = *id;

                // Returning true here means that the message should be relayed.
@@ -809,7 +808,7 @@ where
                    return Ok(());
                }
            }
-
            (SessionState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
+
            (session::State::Negotiated { .. }, Message::Subscribe(subscribe)) => {
                for msg in self
                    .gossip
                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
@@ -818,14 +817,14 @@ where
                }
                peer.subscribe = Some(subscribe);
            }
-
            (SessionState::Negotiated { .. }, Message::Initialize { .. }) => {
+
            (session::State::Negotiated { .. }, Message::Initialize { .. }) => {
                debug!(
                    "Disconnecting peer {} for sending us a redundant handshake message",
                    peer.ip()
                );
-
                return Err(SessionError::Misbehavior);
+
                return Err(session::Error::Misbehavior);
            }
-
            (SessionState::Negotiated { .. }, Message::Ping(Ping { ponglen, .. })) => {
+
            (session::State::Negotiated { .. }, Message::Ping(Ping { ponglen, .. })) => {
                // Ignore pings which ask for too much data.
                if ponglen > Ping::MAX_PONG_ZEROES {
                    return Ok(());
@@ -837,14 +836,14 @@ where
                    },
                );
            }
-
            (SessionState::Negotiated { ping, .. }, Message::Pong { zeroes }) => {
-
                if let PingState::AwaitingResponse(ponglen) = *ping {
+
            (session::State::Negotiated { ping, .. }, Message::Pong { zeroes }) => {
+
                if let session::PingState::AwaitingResponse(ponglen) = *ping {
                    if (ponglen as usize) == zeroes.len() {
-
                        *ping = PingState::Ok;
+
                        *ping = session::PingState::Ok;
                    }
                }
            }
-
            (SessionState::Disconnected { .. }, msg) => {
+
            (session::State::Disconnected { .. }, msg) => {
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.ip());
            }
        }
@@ -916,8 +915,10 @@ where
            .negotiated()
            .filter(|(_, _, session)| session.last_active < *now - STALE_CONNECTION_TIMEOUT);
        for (_, _, session) in stale {
-
            self.reactor
-
                .disconnect(session.addr, DisconnectReason::Error(SessionError::Timeout));
+
            self.reactor.disconnect(
+
                session.addr,
+
                DisconnectReason::Error(session::Error::Timeout),
+
            );
        }
    }

@@ -995,7 +996,7 @@ where
#[derive(Debug)]
pub enum DisconnectReason {
    User,
-
    Error(SessionError),
+
    Error(session::Error),
}

impl DisconnectReason {
@@ -1113,7 +1114,7 @@ impl Sessions {

    pub fn by_id(&self, id: &NodeId) -> Option<&Session> {
        self.0.values().find(|p| {
-
            if let SessionState::Negotiated { id: _id, .. } = &p.state {
+
            if let session::State::Negotiated { id: _id, .. } = &p.state {
                _id == id
            } else {
                false
@@ -1126,7 +1127,7 @@ impl Sessions {
        self.0
            .iter()
            .filter_map(move |(ip, sess)| match &sess.state {
-
                SessionState::Negotiated { id, .. } => Some((ip, id, sess)),
+
                session::State::Negotiated { id, .. } => Some((ip, id, sess)),
                _ => None,
            })
    }
deleted radicle-node/src/service/peer.rs
@@ -1,122 +0,0 @@
-
use crate::service::message::*;
-
use crate::service::*;
-

-
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
-
pub enum PingState {
-
    #[default]
-
    /// The peer has not been sent a ping.
-
    None,
-
    /// A ping has been sent and is waiting on the peer's response.
-
    AwaitingResponse(u16),
-
    /// The peer was successfully pinged.
-
    Ok,
-
}
-

-
#[derive(Debug, Default, Clone)]
-
#[allow(clippy::large_enum_variant)]
-
pub enum SessionState {
-
    /// Initial peer state. For outgoing peers this
-
    /// means we've attempted a connection. For incoming
-
    /// peers, this means they've successfully connected
-
    /// to us.
-
    #[default]
-
    Initial,
-
    /// State after successful handshake.
-
    Negotiated {
-
        /// The peer's unique identifier.
-
        id: NodeId,
-
        since: LocalTime,
-
        /// Addresses this peer is reachable on.
-
        addrs: Vec<Address>,
-
        ping: PingState,
-
    },
-
    /// When a peer is disconnected.
-
    Disconnected { since: LocalTime },
-
}
-

-
#[derive(thiserror::Error, Debug)]
-
pub enum SessionError {
-
    #[error("wrong protocol version in message: {0}")]
-
    WrongVersion(u32),
-
    #[error("invalid announcement timestamp: {0}")]
-
    InvalidTimestamp(u64),
-
    #[error("session not found for address `{0}`")]
-
    NotFound(net::IpAddr),
-
    #[error("verification failed on fetch: {0}")]
-
    VerificationFailed(#[from] storage::VerifyError),
-
    #[error("peer misbehaved")]
-
    Misbehavior,
-
    #[error("peer timed out")]
-
    Timeout,
-
}
-

-
/// A peer session. Each connected peer will have one session.
-
#[derive(Debug, Clone)]
-
pub struct Session {
-
    /// Peer address.
-
    pub addr: net::SocketAddr,
-
    /// Connection direction.
-
    pub link: Link,
-
    /// Whether we should attempt to re-connect
-
    /// to this peer upon disconnection.
-
    pub persistent: bool,
-
    /// Peer connection state.
-
    pub state: SessionState,
-
    /// Peer subscription.
-
    pub subscribe: Option<Subscribe>,
-
    /// Last time a message was received from the peer.
-
    pub last_active: LocalTime,
-

-
    /// Connection attempts. For persistent peers, Tracks
-
    /// how many times we've attempted to connect. We reset this to zero
-
    /// upon successful connection.
-
    attempts: usize,
-

-
    /// Source of entropy.
-
    rng: Rng,
-
}
-

-
impl Session {
-
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool, rng: Rng) -> Self {
-
        Self {
-
            addr,
-
            state: SessionState::default(),
-
            link,
-
            subscribe: None,
-
            persistent,
-
            last_active: LocalTime::default(),
-
            attempts: 0,
-
            rng,
-
        }
-
    }
-

-
    pub fn ip(&self) -> IpAddr {
-
        self.addr.ip()
-
    }
-

-
    pub fn is_negotiated(&self) -> bool {
-
        matches!(self.state, SessionState::Negotiated { .. })
-
    }
-

-
    pub fn attempts(&self) -> usize {
-
        self.attempts
-
    }
-

-
    pub fn attempted(&mut self) {
-
        self.attempts += 1;
-
    }
-

-
    pub fn connected(&mut self, _link: Link) {
-
        self.attempts = 0;
-
    }
-

-
    pub fn ping(&mut self, reactor: &mut Reactor) -> Result<(), SessionError> {
-
        if let SessionState::Negotiated { ping, .. } = &mut self.state {
-
            let msg = message::Ping::new(&mut self.rng);
-
            *ping = PingState::AwaitingResponse(msg.ponglen);
-

-
            reactor.write(self.addr, Message::Ping(msg));
-
        }
-
        Ok(())
-
    }
-
}
modified radicle-node/src/service/reactor.rs
@@ -4,7 +4,7 @@ use std::net;
use log::*;

use crate::prelude::*;
-
use crate::service::peer::Session;
+
use crate::service::session::Session;

use super::message::{Announcement, AnnouncementMessage};

added radicle-node/src/service/session.rs
@@ -0,0 +1,125 @@
+
use crate::service::message;
+
use crate::service::message::Message;
+
use crate::service::net;
+
use crate::service::storage;
+
use crate::service::{Link, LocalTime, NodeId, Reactor, Rng};
+

+
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
+
pub enum PingState {
+
    #[default]
+
    /// The peer has not been sent a ping.
+
    None,
+
    /// A ping has been sent and is waiting on the peer's response.
+
    AwaitingResponse(u16),
+
    /// The peer was successfully pinged.
+
    Ok,
+
}
+

+
#[derive(Debug, Default, Clone)]
+
#[allow(clippy::large_enum_variant)]
+
pub enum State {
+
    /// Initial peer state. For outgoing peers this
+
    /// means we've attempted a connection. For incoming
+
    /// peers, this means they've successfully connected
+
    /// to us.
+
    #[default]
+
    Initial,
+
    /// State after successful handshake.
+
    Negotiated {
+
        /// The peer's unique identifier.
+
        id: NodeId,
+
        since: LocalTime,
+
        /// Addresses this peer is reachable on.
+
        addrs: Vec<message::Address>,
+
        ping: PingState,
+
    },
+
    /// When a peer is disconnected.
+
    Disconnected { since: LocalTime },
+
}
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error("wrong protocol version in message: {0}")]
+
    WrongVersion(u32),
+
    #[error("invalid announcement timestamp: {0}")]
+
    InvalidTimestamp(u64),
+
    #[error("session not found for address `{0}`")]
+
    NotFound(net::IpAddr),
+
    #[error("verification failed on fetch: {0}")]
+
    VerificationFailed(#[from] storage::VerifyError),
+
    #[error("peer misbehaved")]
+
    Misbehavior,
+
    #[error("peer timed out")]
+
    Timeout,
+
}
+

+
/// A peer session. Each connected peer will have one session.
+
#[derive(Debug, Clone)]
+
pub struct Session {
+
    /// Peer address.
+
    pub addr: net::SocketAddr,
+
    /// Connection direction.
+
    pub link: Link,
+
    /// Whether we should attempt to re-connect
+
    /// to this peer upon disconnection.
+
    pub persistent: bool,
+
    /// Peer connection state.
+
    pub state: State,
+
    /// Peer subscription.
+
    pub subscribe: Option<message::Subscribe>,
+
    /// Last time a message was received from the peer.
+
    pub last_active: LocalTime,
+

+
    /// Connection attempts. For persistent peers, Tracks
+
    /// how many times we've attempted to connect. We reset this to zero
+
    /// upon successful connection.
+
    attempts: usize,
+

+
    /// Source of entropy.
+
    rng: Rng,
+
}
+

+
impl Session {
+
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool, rng: Rng) -> Self {
+
        Self {
+
            addr,
+
            state: State::default(),
+
            link,
+
            subscribe: None,
+
            persistent,
+
            last_active: LocalTime::default(),
+
            attempts: 0,
+
            rng,
+
        }
+
    }
+

+
    pub fn ip(&self) -> net::IpAddr {
+
        self.addr.ip()
+
    }
+

+
    pub fn is_negotiated(&self) -> bool {
+
        matches!(self.state, State::Negotiated { .. })
+
    }
+

+
    pub fn attempts(&self) -> usize {
+
        self.attempts
+
    }
+

+
    pub fn attempted(&mut self) {
+
        self.attempts += 1;
+
    }
+

+
    pub fn connected(&mut self, _link: Link) {
+
        self.attempts = 0;
+
    }
+

+
    pub fn ping(&mut self, reactor: &mut Reactor) -> Result<(), Error> {
+
        if let State::Negotiated { ping, .. } = &mut self.state {
+
            let msg = message::Ping::new(&mut self.rng);
+
            *ping = PingState::AwaitingResponse(msg.ponglen);
+

+
            reactor.write(self.addr, Message::Ping(msg));
+
        }
+
        Ok(())
+
    }
+
}
modified radicle-node/src/test/tests.rs
@@ -10,7 +10,6 @@ use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
use crate::service::message::*;
-
use crate::service::peer::*;
use crate::service::reactor::Io;
use crate::service::ServiceState as _;
use crate::service::*;
@@ -280,7 +279,7 @@ fn test_inventory_relay_bad_timestamp() {
    );
    assert_matches!(
        alice.outbox().next(),
-
        Some(Io::Disconnect(addr, DisconnectReason::Error(SessionError::InvalidTimestamp(t))))
+
        Some(Io::Disconnect(addr, DisconnectReason::Error(session::Error::InvalidTimestamp(t))))
        if addr == bob.addr() && t == timestamp
    );
}