Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Prefix gossip messages with varint length
cloudhead committed 2 years ago
commit 0358d7bcf084518261957d7c53774804e92f759e
parent 87870b521aa8c1af94e632963c8ad2810fb3d4df
4 files changed +94 -23
modified radicle-node/src/wire.rs
@@ -59,6 +59,8 @@ pub enum Error {
    UnknownAddressType(u8),
    #[error("unknown message type `{0}`")]
    UnknownMessageType(u16),
+
    #[error("unexpected bytes")]
+
    UnexpectedBytes,
}

impl Error {
@@ -94,8 +96,12 @@ pub fn serialize<T: Encode + ?Sized>(data: &T) -> Vec<u8> {
/// Decode an object from a vector.
pub fn deserialize<T: Decode>(data: &[u8]) -> Result<T, Error> {
    let mut cursor = io::Cursor::new(data);
+
    let obj = T::decode(&mut cursor)?;

-
    T::decode(&mut cursor)
+
    if cursor.position() as usize != cursor.get_ref().len() {
+
        return Err(Error::UnexpectedBytes);
+
    }
+
    Ok(obj)
}

impl Encode for u8 {
modified radicle-node/src/wire/frame.rs
@@ -16,6 +16,7 @@ const CONTROL_CLOSE: u8 = 1;
const CONTROL_EOF: u8 = 2;

/// Protocol version.
+
#[derive(Debug, PartialEq, Eq)]
pub struct Version([u8; 4]);

impl wire::Encode for Version {
@@ -182,6 +183,7 @@ impl TryFrom<u8> for StreamKind {
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// |                     Data                                   ...| Data (variable size)
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
#[derive(Debug, PartialEq, Eq)]
pub struct Frame {
    /// The protocol version.
    pub version: Version,
@@ -226,6 +228,7 @@ impl Frame {
}

/// Frame payload.
+
#[derive(Debug, PartialEq, Eq)]
pub enum FrameData {
    /// Control frame payload.
    Control(Control),
@@ -236,6 +239,7 @@ pub enum FrameData {
}

/// A control message sent over a control stream.
+
#[derive(Debug, PartialEq, Eq)]
pub enum Control {
    /// Open a new stream.
    Open {
@@ -315,19 +319,22 @@ impl wire::Decode for Frame {
                Ok(frame)
            }
            Ok(StreamKind::Gossip) => {
-
                let msg = Message::decode(reader)?;
+
                let data = varint::payload::decode(reader)?;
+
                let mut cursor = io::Cursor::new(data);
+
                let msg = Message::decode(&mut cursor)?;
                let frame = Frame {
                    version,
                    stream,
                    data: FrameData::Gossip(msg),
                };
+

+
                // Nb. If there is data after the `Message` that is not decoded,
+
                // it is simply dropped here.
+

                Ok(frame)
            }
            Ok(StreamKind::Git { .. }) => {
-
                let size = VarInt::decode(reader)?;
-
                let mut data = vec![0; *size as usize];
-
                reader.read_exact(&mut data[..])?;
-

+
                let data = varint::payload::decode(reader)?;
                Ok(Frame::git(stream, data))
            }
            Err(n) => Err(wire::Error::InvalidStreamKind(n)),
@@ -341,24 +348,12 @@ impl wire::Encode for Frame {

        n += self.version.encode(writer)?;
        n += self.stream.encode(writer)?;
+
        n += match &self.data {
+
            FrameData::Control(ctrl) => ctrl.encode(writer)?,
+
            FrameData::Git(data) => varint::payload::encode(data, writer)?,
+
            FrameData::Gossip(msg) => varint::payload::encode(&wire::serialize(msg), writer)?,
+
        };

-
        match &self.data {
-
            FrameData::Control(ctrl) => {
-
                n += ctrl.encode(writer)?;
-
            }
-
            FrameData::Gossip(msg) => {
-
                n += msg.encode(writer)?;
-
            }
-
            FrameData::Git(data) => {
-
                let len = data.len();
-
                let size = VarInt::new(len as u64)
-
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
-
                n += size.encode(writer)?;
-

-
                writer.write_all(data.as_slice())?;
-
                n += len;
-
            }
-
        }
        Ok(n)
    }
}
modified radicle-node/src/wire/protocol.rs
@@ -937,3 +937,44 @@ fn session<G: Signer + Ecdh<Pk = NodeId>>(
    );
    WireSession::with(proxy, noise)
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use crate::service::{Message, ZeroBytes};
+
    use crate::wire;
+
    use crate::wire::varint;
+

+
    #[test]
+
    fn test_message_with_extension() {
+
        use crate::deserializer;
+

+
        let mut stream = Vec::new();
+
        let pong = Message::Pong {
+
            zeroes: ZeroBytes::new(42),
+
        };
+
        frame::PROTOCOL_VERSION.encode(&mut stream).unwrap();
+
        frame::StreamId::gossip(Link::Outbound)
+
            .encode(&mut stream)
+
            .unwrap();
+

+
        // Serialize gossip message with some extension fields.
+
        let mut gossip = wire::serialize(&pong);
+
        String::from("extra").encode(&mut gossip).unwrap();
+
        48u8.encode(&mut gossip).unwrap();
+

+
        // Encode gossip message using the varint-prefix format into the stream.
+
        varint::payload::encode(&gossip, &mut stream).unwrap();
+

+
        let mut de = deserializer::Deserializer::<Frame>::new(1024);
+
        de.input(&stream);
+

+
        // The "pong" message decodes successfully, even though there is trailing data.
+
        assert_eq!(
+
            de.deserialize_next().unwrap().unwrap(),
+
            Frame::gossip(Link::Outbound, pong)
+
        );
+
        assert!(de.deserialize_next().unwrap().is_none());
+
        assert!(de.is_empty());
+
    }
+
}
modified radicle-node/src/wire/varint.rs
@@ -146,6 +146,35 @@ impl Encode for VarInt {
    }
}

+
/// Encoding and decoding varint-prefixed payloads.
+
pub mod payload {
+
    use super::*;
+

+
    /// Encode varint-prefixed data payload.
+
    pub fn encode<W: io::Write + ?Sized>(payload: &[u8], writer: &mut W) -> io::Result<usize> {
+
        let mut n = 0;
+
        let len = payload.len();
+
        let varint =
+
            VarInt::new(len as u64).map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
+

+
        n += varint.encode(writer)?; // The length of the payload length.
+
        n += len; // The length of the data payload itself.
+

+
        writer.write_all(payload)?;
+

+
        Ok(n)
+
    }
+

+
    /// Decode varint-prefixed data payload.
+
    pub fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Vec<u8>, wire::Error> {
+
        let size = VarInt::decode(reader)?;
+
        let mut data = vec![0; *size as usize];
+
        reader.read_exact(&mut data[..])?;
+

+
        Ok(data)
+
    }
+
}
+

#[cfg(test)]
mod test {
    use super::*;