Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Rename `Peer` to `Session`
Alexis Sellier committed 3 years ago
commit 1fa85f0da3d3b545f830b5f48b2e79ec7b34fec4
parent 51f2d4e3fc17736740db6b7ea1f7d5fc31e90ed7
3 files changed +61 -59
modified radicle-node/src/service.rs
@@ -27,7 +27,7 @@ use crate::git::Url;
use crate::identity::{Doc, Id};
use crate::service::config::ProjectTracking;
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
-
use crate::service::peer::{Peer, PeerError, PeerState};
+
use crate::service::peer::{Session, SessionError, SessionState};
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};

@@ -138,8 +138,8 @@ pub enum CommandError {}

#[derive(Debug)]
pub struct Service<S, T, G> {
-
    /// Peers currently or recently connected.
-
    peers: Peers,
+
    /// Peer sessions, currently or recently connected.
+
    sessions: Sessions,
    /// Service state that isn't peer-specific.
    context: Context<S, T, G>,
    /// Whether our local inventory no long represents what we have announced to the network.
@@ -169,7 +169,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

        Self {
            context: Context::new(config, clock, storage, addrmgr, signer, rng.clone()),
-
            peers: Peers::new(rng),
+
            sessions: Sessions::new(rng),
            out_of_sync: false,
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
@@ -180,17 +180,17 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
    }

    pub fn disconnect(&mut self, remote: &IpAddr, reason: DisconnectReason) {
-
        if let Some(addr) = self.peers.get(remote).map(|p| p.addr) {
+
        if let Some(addr) = self.sessions.get(remote).map(|p| p.addr) {
            self.context.disconnect(addr, reason);
        }
    }

-
    pub fn seeds(&self, id: &Id) -> Box<dyn Iterator<Item = (&NodeId, &Peer)> + '_> {
+
    pub fn seeds(&self, id: &Id) -> Box<dyn Iterator<Item = (&NodeId, &Session)> + '_> {
        if let Some(peers) = self.routing.get(id) {
            Box::new(
                peers
                    .iter()
-
                    .filter_map(|id| self.peers.by_id(id).map(|p| (id, p))),
+
                    .filter_map(|id| self.sessions.by_id(id).map(|p| (id, p))),
            )
        } else {
            Box::new(std::iter::empty())
@@ -236,8 +236,8 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
    }

    /// Get the connected peers.
-
    pub fn peers(&self) -> &Peers {
-
        &self.peers
+
    pub fn peers(&self) -> &Sessions {
+
        &self.sessions
    }

    /// Get the current inventory.
@@ -427,7 +427,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
                let node = self.node_id();
                let repo = self.storage.repository(&id).unwrap();
                let remote = repo.remote(&node).unwrap();
-
                let peers = self.peers.negotiated().map(|(_, p)| p);
+
                let peers = self.sessions.negotiated().map(|(_, p)| p);
                let refs = remote.refs.into();
                let message = RefsAnnouncement { id, refs };
                let signature = message.sign(&self.signer);
@@ -448,9 +448,9 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
        let ip = addr.ip();
        let persistent = self.context.config.is_persistent(addr);
        let peer = self
-
            .peers
+
            .sessions
            .entry(ip)
-
            .or_insert_with(|| Peer::new(*addr, Link::Outbound, persistent));
+
            .or_insert_with(|| Session::new(*addr, Link::Outbound, persistent));

        peer.attempted();
    }
@@ -472,14 +472,14 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
            // TODO: Refactor this so that we don't create messages if the peer isn't found.
            let messages = self.handshake_messages();

-
            if let Some(peer) = self.peers.get_mut(&ip) {
+
            if let Some(peer) = self.sessions.get_mut(&ip) {
                self.context.write_all(peer.addr, messages);
                peer.connected();
            }
        } else {
-
            self.peers.insert(
+
            self.sessions.insert(
                ip,
-
                Peer::new(
+
                Session::new(
                    addr,
                    Link::Inbound,
                    self.context.config.is_persistent(&addr),
@@ -498,8 +498,8 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

        debug!("Disconnected from {} ({})", ip, reason);

-
        if let Some(peer) = self.peers.get_mut(&ip) {
-
            peer.state = PeerState::Disconnected { since };
+
        if let Some(peer) = self.sessions.get_mut(&ip) {
+
            peer.state = SessionState::Disconnected { since };

            // Attempt to re-connect to persistent peers.
            if self.context.config.is_persistent(addr) && peer.attempts() < MAX_CONNECTION_ATTEMPTS
@@ -529,7 +529,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

    pub fn received_message(&mut self, addr: &std::net::SocketAddr, msg: Envelope) {
        let peer_ip = addr.ip();
-
        let peer = if let Some(peer) = self.peers.get_mut(&peer_ip) {
+
        let peer = if let Some(peer) = self.sessions.get_mut(&peer_ip) {
            peer
        } else {
            return;
@@ -551,7 +551,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

        if let Some(msg) = relay {
            let negotiated = self
-
                .peers
+
                .sessions
                .negotiated()
                .filter(|(ip, _)| **ip != peer_ip)
                .map(|(_, p)| p);
@@ -568,7 +568,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
        let inv = Message::inventory(self.context.inventory_announcement()?, &self.context.signer);

-
        for addr in self.peers.negotiated().map(|(_, p)| p.addr) {
+
        for addr in self.sessions.negotiated().map(|(_, p)| p.addr) {
            self.context.write(addr, inv.clone());
        }
        Ok(())
@@ -580,8 +580,8 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

    fn maintain_connections(&mut self) {
        // TODO: Connect to all potential seeds.
-
        if self.peers.len() < TARGET_OUTBOUND_PEERS {
-
            let delta = TARGET_OUTBOUND_PEERS - self.peers.len();
+
        if self.sessions.len() < TARGET_OUTBOUND_PEERS {
+
            let delta = TARGET_OUTBOUND_PEERS - self.sessions.len();

            for _ in 0..delta {
                // TODO: Connect to random peer.
@@ -607,7 +607,7 @@ impl<S, T, G> DerefMut for Service<S, T, G> {
#[derive(Debug, Clone)]
pub enum DisconnectReason {
    User,
-
    Error(PeerError),
+
    Error(SessionError),
}

impl DisconnectReason {
@@ -823,14 +823,14 @@ impl<S, T, G> Context<S, T, G> {
    }

    /// Broadcast a message to a list of peers.
-
    fn broadcast<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Peer>) {
+
    fn broadcast<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
        for peer in peers {
            self.write(peer.addr, msg.clone());
        }
    }

    /// Relay a message to interested peers.
-
    fn relay<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Peer>) {
+
    fn relay<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
        if let Message::RefsAnnouncement { message, .. } = &msg {
            let id = message.id.clone();
            let peers = peers.into_iter().filter(|p| {
@@ -851,16 +851,16 @@ impl<S, T, G> Context<S, T, G> {

#[derive(Debug)]
/// Holds currently (or recently) connected peers.
-
pub struct Peers(AddressBook<IpAddr, Peer>);
+
pub struct Sessions(AddressBook<IpAddr, Session>);

-
impl Peers {
+
impl Sessions {
    pub fn new(rng: Rng) -> Self {
        Self(AddressBook::new(rng))
    }

-
    pub fn by_id(&self, id: &NodeId) -> Option<&Peer> {
+
    pub fn by_id(&self, id: &NodeId) -> Option<&Session> {
        self.0.values().find(|p| {
-
            if let PeerState::Negotiated { id: _id, .. } = &p.state {
+
            if let SessionState::Negotiated { id: _id, .. } = &p.state {
                _id == id
            } else {
                false
@@ -869,20 +869,20 @@ impl Peers {
    }

    /// Iterator over fully negotiated peers.
-
    pub fn negotiated(&self) -> impl Iterator<Item = (&IpAddr, &Peer)> + Clone {
+
    pub fn negotiated(&self) -> impl Iterator<Item = (&IpAddr, &Session)> + Clone {
        self.0.iter().filter(move |(_, p)| p.is_negotiated())
    }
}

-
impl Deref for Peers {
-
    type Target = AddressBook<IpAddr, Peer>;
+
impl Deref for Sessions {
+
    type Target = AddressBook<IpAddr, Session>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

-
impl DerefMut for Peers {
+
impl DerefMut for Sessions {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
modified radicle-node/src/service/peer.rs
@@ -3,7 +3,7 @@ use crate::service::*;

#[derive(Debug, Default)]
#[allow(clippy::large_enum_variant)]
-
pub enum PeerState {
+
pub enum SessionState {
    /// Initial peer state. For outgoing peers this
    /// means we've attempted a connection. For incoming
    /// peers, this means they've successfully connected
@@ -24,7 +24,7 @@ pub enum PeerState {
}

#[derive(thiserror::Error, Debug, Clone)]
-
pub enum PeerError {
+
pub enum SessionError {
    #[error("wrong network constant in message: {0}")]
    WrongMagic(u32),
    #[error("wrong protocol version in message: {0}")]
@@ -35,8 +35,9 @@ pub enum PeerError {
    Misbehavior,
}

+
/// A peer session. Each connected peer will have one session.
#[derive(Debug)]
-
pub struct Peer {
+
pub struct Session {
    /// Peer address.
    pub addr: net::SocketAddr,
    /// Connection direction.
@@ -45,7 +46,7 @@ pub struct Peer {
    /// to this peer upon disconnection.
    pub persistent: bool,
    /// Peer connection state.
-
    pub state: PeerState,
+
    pub state: SessionState,
    /// Last known peer time.
    pub timestamp: Timestamp,
    /// Peer subscription.
@@ -57,11 +58,11 @@ pub struct Peer {
    attempts: usize,
}

-
impl Peer {
+
impl Session {
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool) -> Self {
        Self {
            addr,
-
            state: PeerState::default(),
+
            state: SessionState::default(),
            link,
            timestamp: Timestamp::default(),
            subscribe: None,
@@ -75,7 +76,7 @@ impl Peer {
    }

    pub fn is_negotiated(&self) -> bool {
-
        matches!(self.state, PeerState::Negotiated { .. })
+
        matches!(self.state, SessionState::Negotiated { .. })
    }

    pub fn attempts(&self) -> usize {
@@ -94,19 +95,19 @@ impl Peer {
        &mut self,
        envelope: Envelope,
        ctx: &mut Context<S, T, G>,
-
    ) -> Result<Option<Message>, PeerError>
+
    ) -> Result<Option<Message>, SessionError>
    where
        T: storage::WriteStorage<'r>,
        G: crypto::Signer,
    {
        if envelope.magic != ctx.config.network.magic() {
-
            return Err(PeerError::WrongMagic(envelope.magic));
+
            return Err(SessionError::WrongMagic(envelope.magic));
        }
        debug!("Received {:?} from {}", &envelope.msg, self.ip());

        match (&self.state, envelope.msg) {
            (
-
                PeerState::Initial,
+
                SessionState::Initial,
                Message::Initialize {
                    id,
                    timestamp,
@@ -118,10 +119,10 @@ impl Peer {
                let now = ctx.timestamp();

                if timestamp.abs_diff(now) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(PeerError::InvalidTimestamp(timestamp));
+
                    return Err(SessionError::InvalidTimestamp(timestamp));
                }
                if version != PROTOCOL_VERSION {
-
                    return Err(PeerError::WrongVersion(version));
+
                    return Err(SessionError::WrongVersion(version));
                }
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
                // extra "acknowledgment" message sent when the `Initialize` is well received.
@@ -131,34 +132,35 @@ impl Peer {
                // Nb. we don't set the peer timestamp here, since it is going to be
                // set after the first message is received only. Setting it here would
                // mean that messages received right after the handshake could be ignored.
-
                self.state = PeerState::Negotiated {
+
                self.state = SessionState::Negotiated {
                    id,
                    since: ctx.clock.local_time(),
                    addrs,
                    git,
                };
            }
-
            (PeerState::Initial, _) => {
+
            (SessionState::Initial, _) => {
                debug!(
                    "Disconnecting peer {} for sending us a message before handshake",
                    self.ip()
                );
-
                return Err(PeerError::Misbehavior);
+
                return Err(SessionError::Misbehavior);
            }
            (
-
                PeerState::Negotiated { git, .. },
+
                SessionState::Negotiated { git, .. },
                Message::InventoryAnnouncement {
                    node,
                    message,
                    signature,
                },
            ) => {
+
                // FIXME: This is wrong, we are comparing timestamps of different peers.
                let now = ctx.clock.local_time();
                let last = self.timestamp;

                // Don't allow messages from too far in the past or future.
                if message.timestamp.abs_diff(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(PeerError::InvalidTimestamp(message.timestamp));
+
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
                }
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
@@ -179,7 +181,7 @@ impl Peer {
            }
            // Process a peer inventory update announcement by (maybe) fetching.
            (
-
                PeerState::Negotiated { git, .. },
+
                SessionState::Negotiated { git, .. },
                Message::RefsAnnouncement {
                    node,
                    message,
@@ -209,11 +211,11 @@ impl Peer {
                        }
                    }
                } else {
-
                    return Err(PeerError::Misbehavior);
+
                    return Err(SessionError::Misbehavior);
                }
            }
            (
-
                PeerState::Negotiated { .. },
+
                SessionState::Negotiated { .. },
                Message::NodeAnnouncement {
                    node,
                    message,
@@ -221,21 +223,21 @@ impl Peer {
                },
            ) => {
                if !message.verify(&node, &signature) {
-
                    return Err(PeerError::Misbehavior);
+
                    return Err(SessionError::Misbehavior);
                }
                log::warn!("Node announcement handling is not implemented");
            }
-
            (PeerState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
+
            (SessionState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
                self.subscribe = Some(subscribe);
            }
-
            (PeerState::Negotiated { .. }, Message::Initialize { .. }) => {
+
            (SessionState::Negotiated { .. }, Message::Initialize { .. }) => {
                debug!(
                    "Disconnecting peer {} for sending us a redundant handshake message",
                    self.ip()
                );
-
                return Err(PeerError::Misbehavior);
+
                return Err(SessionError::Misbehavior);
            }
-
            (PeerState::Disconnected { .. }, msg) => {
+
            (SessionState::Disconnected { .. }, msg) => {
                debug!("Ignoring {:?} from disconnected peer {}", msg, self.ip());
            }
        }
modified radicle-node/src/test/tests.rs
@@ -226,7 +226,7 @@ fn test_inventory_relay_bad_timestamp() {
    );
    assert_matches!(
        alice.outbox().next(),
-
        Some(Io::Disconnect(addr, DisconnectReason::Error(PeerError::InvalidTimestamp(t))))
+
        Some(Io::Disconnect(addr, DisconnectReason::Error(SessionError::InvalidTimestamp(t))))
        if addr == bob.addr() && t == timestamp
    );
}