Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Decode messages in wire layer
Alexis Sellier committed 3 years ago
commit 2b6e20b50c36db5cc15a185ad89a64d697d1e8b8
parent 8e160f170c7dbfa9e603c509d1f401e487a5806d
3 files changed +62 -62
modified node/src/protocol.rs
@@ -34,7 +34,7 @@ use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, Writ
pub use crate::protocol::config::{Config, Network};

use self::filter::Filter;
-
use self::message::{InventoryAnnouncement, NodeFeatures};
+
use self::message::{Envelope, InventoryAnnouncement, NodeFeatures};

pub const DEFAULT_PORT: u16 = 8776;
pub const PROTOCOL_VERSION: u32 = 1;
@@ -511,54 +511,35 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
        }
    }

-
    pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
+
    pub fn received_message(&mut self, addr: &std::net::SocketAddr, msg: Envelope) {
        let peer_ip = addr.ip();
-
        let (peer, msgs) = if let Some(peer) = self.peers.get_mut(&peer_ip) {
-
            let decoder = peer.inbox();
-
            decoder.input(bytes);
-

-
            let mut msgs = Vec::with_capacity(1);
-
            loop {
-
                match decoder.decode_next() {
-
                    Ok(Some(msg)) => msgs.push(msg),
-
                    Ok(None) => break,
-

-
                    Err(err) => {
-
                        // TODO: Disconnect peer.
-
                        error!("Invalid message received from {}: {}", peer.addr, err);
-

-
                        return;
-
                    }
-
                }
-
            }
-
            (peer, msgs)
+
        let peer = if let Some(peer) = self.peers.get_mut(&peer_ip) {
+
            peer
        } else {
            return;
        };

-
        let mut relay = Vec::new();
-
        for msg in msgs {
-
            match peer.received(msg, &mut self.context) {
-
                Ok(None) => {}
-
                Ok(Some(msg)) => {
-
                    relay.push(msg);
-
                }
-
                Err(err) => {
-
                    self.context
-
                        .disconnect(peer.addr, DisconnectReason::Error(err));
-
                    // If there's an error, stop processing messages from this peer.
-
                    // However, we still relay messages returned up to this point.
-
                    break;
-
                }
+
        let relay = match peer.received(msg, &mut self.context) {
+
            Ok(msg) => msg,
+
            Err(err) => {
+
                self.context
+
                    .disconnect(peer.addr, DisconnectReason::Error(err));
+
                // If there's an error, stop processing messages from this peer.
+
                // However, we still relay messages returned up to this point.
+
                //
+
                // FIXME: The peer should be set in a state such that we don'that
+
                // process further messages.
+
                return;
            }
-
        }
+
        };
+

+
        if let Some(msg) = relay {
+
            let negotiated = self
+
                .peers
+
                .negotiated()
+
                .filter(|(ip, _)| **ip != peer_ip)
+
                .map(|(_, p)| p);

-
        let negotiated = self
-
            .peers
-
            .negotiated()
-
            .filter(|(ip, _)| **ip != peer_ip)
-
            .map(|(_, p)| p);
-
        for msg in relay {
            self.context.relay(msg, negotiated.clone());
        }
    }
modified node/src/protocol/peer.rs
@@ -1,4 +1,3 @@
-
use crate::decoder::Decoder;
use crate::protocol::message::*;
use crate::protocol::*;

@@ -52,8 +51,6 @@ pub struct Peer {
    /// Peer subscription.
    pub subscribe: Option<Subscribe>,

-
    /// Inbox for incoming messages from peer.
-
    inbox: Decoder,
    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
    /// upon successful connection.
@@ -64,7 +61,6 @@ impl Peer {
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool) -> Self {
        Self {
            addr,
-
            inbox: Decoder::new(256),
            state: PeerState::default(),
            link,
            timestamp: Timestamp::default(),
@@ -82,10 +78,6 @@ impl Peer {
        matches!(self.state, PeerState::Negotiated { .. })
    }

-
    pub fn inbox(&mut self) -> &mut Decoder {
-
        &mut self.inbox
-
    }
-

    pub fn attempts(&self) -> usize {
        self.attempts
    }
modified node/src/protocol/wire.rs
@@ -1,6 +1,6 @@
-
use std::collections::BTreeMap;
+
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
-
use std::net;
+
use std::net::{self, IpAddr};
use std::ops::{Deref, DerefMut};
use std::string::FromUtf8Error;
use std::{io, mem};
@@ -11,6 +11,7 @@ use nakamoto_net::{Link, LocalTime};

use crate::address_book;
use crate::crypto::{PublicKey, Signature, Signer};
+
use crate::decoder::Decoder;
use crate::git;
use crate::git::fmt;
use crate::hash::Digest;
@@ -387,12 +388,16 @@ impl Decode for Digest {

#[derive(Debug)]
pub struct Wire<S, T, G> {
+
    inboxes: HashMap<IpAddr, Decoder>,
    inner: protocol::Protocol<S, T, G>,
}

impl<S, T, G> Wire<S, T, G> {
    pub fn new(inner: protocol::Protocol<S, T, G>) -> Self {
-
        Self { inner }
+
        Self {
+
            inboxes: HashMap::new(),
+
            inner,
+
        }
    }
}

@@ -402,45 +407,67 @@ where
    T: WriteStorage<'r> + 'static,
    G: Signer,
{
-
    fn initialize(&mut self, time: LocalTime) {
+
    pub fn initialize(&mut self, time: LocalTime) {
        self.inner.initialize(time)
    }

-
    fn tick(&mut self, now: LocalTime) {
+
    pub fn tick(&mut self, now: LocalTime) {
        self.inner.tick(now)
    }

-
    fn wake(&mut self) {
+
    pub fn wake(&mut self) {
        self.inner.wake()
    }

-
    fn command(&mut self, cmd: protocol::Command) {
+
    pub fn command(&mut self, cmd: protocol::Command) {
        self.inner.command(cmd)
    }

-
    fn attempted(&mut self, addr: &net::SocketAddr) {
+
    pub fn attempted(&mut self, addr: &net::SocketAddr) {
        self.inner.attempted(addr)
    }

-
    fn connected(
+
    pub fn connected(
        &mut self,
        addr: std::net::SocketAddr,
        local_addr: &std::net::SocketAddr,
        link: Link,
    ) {
+
        self.inboxes.insert(addr.ip(), Decoder::new(256));
        self.inner.connected(addr, local_addr, link)
    }

-
    fn disconnected(
+
    pub fn disconnected(
        &mut self,
        addr: &std::net::SocketAddr,
        reason: nakamoto::DisconnectReason<protocol::DisconnectReason>,
    ) {
+
        self.inboxes.remove(&addr.ip());
        self.inner.disconnected(addr, reason)
    }

-
    fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
-
        self.inner.received_bytes(addr, bytes)
+
    pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
+
        let peer_ip = addr.ip();
+

+
        if let Some(inbox) = self.inboxes.get_mut(&peer_ip) {
+
            inbox.input(bytes);
+

+
            loop {
+
                match inbox.decode_next() {
+
                    Ok(Some(msg)) => self.inner.received_message(addr, msg),
+
                    Ok(None) => break,
+

+
                    Err(err) => {
+
                        // TODO: Disconnect peer.
+
                        log::error!("Invalid message received from {}: {}", peer_ip, err);
+

+
                        return;
+
                    }
+
                }
+
            }
+
        } else {
+
            log::debug!("Received message from unknown peer {}", peer_ip);
+
        }
    }
}