Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Use deserializer when decoding messages
Alexis Sellier committed 3 years ago
commit fdfe1d508b44f6fa5687b4bf82ac1bce25f553c8
parent 17e3c975fb16fcf980953645b465c1550bf20fdf
3 files changed +60 -14
modified radicle-node/src/deserializer.rs
@@ -13,6 +13,12 @@ pub struct Deserializer<D = Message> {
    item: PhantomData<D>,
}

+
impl Default for Deserializer<Message> {
+
    fn default() -> Self {
+
        Self::new(wire::Size::MAX as usize + 1)
+
    }
+
}
+

impl<D> From<Vec<u8>> for Deserializer<D> {
    fn from(unparsed: Vec<u8>) -> Self {
        Self {
@@ -51,6 +57,16 @@ impl<D: wire::Decode> Deserializer<D> {
            Err(err) => Err(err),
        }
    }
+

+
    /// Drain the unparsed buffer.
+
    pub fn unparsed(&mut self) -> impl ExactSizeIterator<Item = u8> + '_ {
+
        self.unparsed.drain(..)
+
    }
+

+
    /// Return whether there are unparsed bytes.
+
    pub fn is_empty(&self) -> bool {
+
        self.unparsed.is_empty()
+
    }
}

impl<D: wire::Decode> io::Write for Deserializer<D> {
@@ -78,9 +94,38 @@ mod test {
    use super::*;
    use qcheck_macros::quickcheck;

+
    use crate::test::assert_matches;
+

    const MSG_HELLO: &[u8] = &[5, b'h', b'e', b'l', b'l', b'o'];
    const MSG_BYE: &[u8] = &[3, b'b', b'y', b'e'];

+
    #[test]
+
    fn test_decode_next() {
+
        let mut decoder = Deserializer::<String>::new(8);
+

+
        decoder.input(&[3, b'b']);
+
        assert_matches!(decoder.deserialize_next(), Ok(None));
+
        assert_eq!(decoder.unparsed.len(), 2);
+

+
        decoder.input(&[b'y']);
+
        assert_matches!(decoder.deserialize_next(), Ok(None));
+
        assert_eq!(decoder.unparsed.len(), 3);
+

+
        decoder.input(&[b'e']);
+
        assert_matches!(decoder.deserialize_next(), Ok(Some(s)) if s.as_str() == "bye");
+
        assert_eq!(decoder.unparsed.len(), 0);
+
        assert!(decoder.is_empty());
+
    }
+

+
    #[test]
+
    fn test_unparsed() {
+
        let mut decoder = Deserializer::<String>::new(8);
+

+
        decoder.input(&[3, b'b', b'y']);
+
        assert_eq!(decoder.unparsed().collect::<Vec<_>>(), vec![3, b'b', b'y']);
+
        assert!(decoder.is_empty());
+
    }
+

    #[quickcheck]
    fn prop_decode_next(chunk_size: usize) {
        let mut bytes = vec![];
modified radicle-node/src/wire/message.rs
@@ -365,7 +365,7 @@ impl wire::Encode for ZeroBytes {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        let mut n = (self.len() as u16).encode(writer)?;
        for _ in 0..self.len() {
-
            n += 0u8.encode(writer)?
+
            n += 0u8.encode(writer)?;
        }
        Ok(n)
    }
modified radicle-node/src/wire/protocol.rs
@@ -26,9 +26,10 @@ use radicle::node::{routing, NodeId};
use radicle::storage::WriteStorage;

use crate::crypto::Signer;
+
use crate::prelude::Deserializer;
use crate::service::reactor::{Fetch, Io};
use crate::service::{session, DisconnectReason, Message, Service};
-
use crate::wire::{Decode, Encode, Error};
+
use crate::wire::{Encode, Error};
use crate::worker;
use crate::worker::{Task, TaskResult};
use crate::Link;
@@ -79,7 +80,7 @@ enum Peer {
    Connected {
        link: Link,
        id: NodeId,
-
        inbox: VecDeque<u8>,
+
        inbox: Deserializer<Message>,
    },
    /// The peer was scheduled for disconnection. Once the transport is handed over
    /// by the reactor, we can consider it disconnected.
@@ -93,7 +94,7 @@ enum Peer {
        fetch: Fetch,
        link: Link,
        id: NodeId,
-
        inbox: VecDeque<u8>,
+
        inbox: Vec<u8>,
    },
    /// The peer is now upgraded and we are in control of the socket.
    Upgraded { link: Link, id: NodeId },
@@ -151,7 +152,7 @@ impl Peer {
            *self = Self::Connected {
                link,
                id,
-
                inbox: VecDeque::new(),
+
                inbox: Deserializer::default(),
            };
            link
        } else if let Self::Outbound { id: expected } = self {
@@ -161,7 +162,7 @@ impl Peer {
            *self = Self::Connected {
                link,
                id,
-
                inbox: VecDeque::new(),
+
                inbox: Deserializer::default(),
            };
            link
        } else {
@@ -195,7 +196,7 @@ impl Peer {
                fetch,
                id: *id,
                link: *link,
-
                inbox: inbox.clone(),
+
                inbox: inbox.unparsed().collect(),
            };
        } else {
            panic!("Peer::upgrading: session is not fully connected");
@@ -232,7 +233,7 @@ impl Peer {
            *self = Self::Connected {
                id: *id,
                link: *link,
-
                inbox: VecDeque::new(),
+
                inbox: Deserializer::default(),
            };
        } else {
            panic!("Peer::downgrade: can't downgrade if not in upgraded state");
@@ -508,12 +509,12 @@ where
            }
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected { id, inbox, .. }) = self.peers.get_mut(&fd) {
-
                    inbox.extend(data);
+
                    inbox.input(&data);

                    loop {
-
                        match Message::decode(inbox) {
-
                            Ok(msg) => self.service.received_message(*id, msg),
-
                            Err(err) if err.is_eof() => {
+
                        match inbox.deserialize_next() {
+
                            Ok(Some(msg)) => self.service.received_message(*id, msg),
+
                            Ok(None) => {
                                // Buffer is empty, or message isn't complete.
                                break;
                            }
@@ -522,7 +523,7 @@ where

                                if let Error::UnknownMessageType(t) = e {
                                    let mut leftover = t.to_be_bytes().to_vec();
-
                                    leftover.extend(inbox.drain(..));
+
                                    leftover.extend(inbox.unparsed());

                                    if let Ok(header) =
                                        str::from_utf8(&leftover[..worker::pktline::HEADER_LEN])
@@ -538,7 +539,7 @@ where
                                }

                                if !inbox.is_empty() {
-
                                    log::debug!(target: "wire", "Dropping read buffer with {} bytes", inbox.len());
+
                                    log::debug!(target: "wire", "Dropping read buffer for {id} with {} bytes", inbox.unparsed().count());
                                }
                                self.disconnect(
                                    fd,