Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
protocol: Revisit encoding and decoding
Merged lorenz opened 8 months ago

Encoding

In 3c5668edd22ae4b9a085220d6be552f944ccb038 I mistakenly moved the check the length of an encoded wire::Message from its impl wire::Encode to the fn serialize, so that it would not apply only to messages (as originally intended), but to any argument of serialize.

This is not a problem for gossip messages, but catastrophic for Git streams, which tend to send a lot of data.

The fix is simple: Move the check back into impl Encode for Message.

What made this harder to spot for me was the multitude of concepts and terms involved in encoding: What’s serializing vs. encoding? Why is there this odd Frame::to_bytes?

So I decided to clean up a bit. I removed fn serialize so that the logic stays close to the data, and instead of it provide fn encode_vec in trait Encode so that we don’t add up with one-offs like Frame::to_bytes again. Also we make use of encode_vec in the tests.

There is test_encode_git_large will panic if we ever regress.

Decoding

Remove fn deserialize and instead provide a trait fn decode_exact for testing.

Refactor errors, so that we can now cleanly distinguish between Error::UnexpectedEnd which means that decoding might work given more data, and Error::Invalid, which means that no amount of additional data will decode.

While at it, cleanup the variants of Error::Invalid, and provide ControlType.

8 files changed +273 -222 c5b99db1 a568e7f4
modified crates/radicle-node/src/wire.rs
@@ -427,7 +427,7 @@ where
                        stream: task.stream,
                    },
                );
-
                self.actions.push_back(Action::Send(fd, frame.to_bytes()));
+
                self.actions.push_back(Action::Send(fd, frame.encode_vec()));
            }
        } else {
            // If the peer disconnected, we'll get here, but we still want to let the service know
@@ -479,7 +479,7 @@ where
                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
            };
            self.actions
-
                .push_back(reactor::Action::Send(fd, frame.to_bytes()));
+
                .push_back(reactor::Action::Send(fd, frame.encode_vec()));
        }
    }

@@ -1126,7 +1126,7 @@ where
                    self.actions.push_back(Action::Send(
                        fd,
                        Frame::<service::Message>::control(link, frame::Control::Open { stream })
-
                            .to_bytes(),
+
                            .encode_vec(),
                    ));
                }
            }
@@ -1264,7 +1264,7 @@ mod test {
        frame::StreamId::gossip(Link::Outbound).encode(&mut stream);

        // Serialize gossip message with some extension fields.
-
        let mut gossip = wire::serialize(&pong);
+
        let mut gossip = pong.encode_vec();
        String::from("extra").encode(&mut gossip);
        48u8.encode(&mut gossip);

modified crates/radicle-protocol/src/deserializer.rs
@@ -48,7 +48,7 @@ impl<const B: usize, D: wire::Decode> Deserializer<B, D> {
    }

    /// Decode and return the next message. Returns [`None`] if nothing was decoded.
-
    pub fn deserialize_next(&mut self) -> Result<Option<D>, wire::Error> {
+
    pub fn deserialize_next(&mut self) -> Result<Option<D>, wire::Invalid> {
        let mut reader = io::Cursor::new(self.unparsed.as_slice());

        match D::decode(&mut reader) {
@@ -59,7 +59,7 @@ impl<const B: usize, D: wire::Decode> Deserializer<B, D> {
                Ok(Some(msg))
            }
            Err(wire::Error::UnexpectedEnd { .. }) => Ok(None),
-
            Err(err) => Err(err),
+
            Err(wire::Error::Invalid(err)) => Err(err),
        }
    }

@@ -106,7 +106,7 @@ impl<const B: usize, D: wire::Decode> io::Write for Deserializer<B, D> {
}

impl<const B: usize, D: wire::Decode> Iterator for Deserializer<B, D> {
-
    type Item = Result<D, wire::Error>;
+
    type Item = Result<D, wire::Invalid>;

    fn next(&mut self) -> Option<Self::Item> {
        self.deserialize_next().transpose()
modified crates/radicle-protocol/src/service/gossip/store.rs
@@ -10,7 +10,7 @@ use crate::service::message::{
    Announcement, AnnouncementMessage, InventoryAnnouncement, NodeAnnouncement, RefsAnnouncement,
};
use crate::wire;
-
use crate::wire::Decode;
+
use crate::wire::{Decode as _, Encode as _};
use radicle::node::Database;
use radicle::node::NodeId;
use radicle::prelude::Timestamp;
@@ -117,17 +117,17 @@ impl Store for Database {
            AnnouncementMessage::Node(msg) => {
                stmt.bind((2, sql::Value::String(String::new())))?;
                stmt.bind((3, &GossipType::Node))?;
-
                stmt.bind((4, msg))?;
+
                stmt.bind((4, &msg.encode_vec()[..]))?;
            }
            AnnouncementMessage::Refs(msg) => {
                stmt.bind((2, &msg.rid))?;
                stmt.bind((3, &GossipType::Refs))?;
-
                stmt.bind((4, msg))?;
+
                stmt.bind((4, &msg.encode_vec()[..]))?;
            }
            AnnouncementMessage::Inventory(msg) => {
                stmt.bind((2, sql::Value::String(String::new())))?;
                stmt.bind((3, &GossipType::Inventory))?;
-
                stmt.bind((4, msg))?;
+
                stmt.bind((4, &msg.encode_vec()[..]))?;
            }
        }
        stmt.bind((5, &ann.signature))?;
@@ -231,12 +231,6 @@ impl TryFrom<&sql::Value> for NodeAnnouncement {
    }
}

-
impl sql::BindableWithIndex for &NodeAnnouncement {
-
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
-
        wire::serialize(self).bind(stmt, i)
-
    }
-
}
-

impl TryFrom<&sql::Value> for RefsAnnouncement {
    type Error = sql::Error;

@@ -254,12 +248,6 @@ impl TryFrom<&sql::Value> for RefsAnnouncement {
    }
}

-
impl sql::BindableWithIndex for &RefsAnnouncement {
-
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
-
        wire::serialize(self).bind(stmt, i)
-
    }
-
}
-

impl TryFrom<&sql::Value> for InventoryAnnouncement {
    type Error = sql::Error;

@@ -277,12 +265,6 @@ impl TryFrom<&sql::Value> for InventoryAnnouncement {
    }
}

-
impl sql::BindableWithIndex for &InventoryAnnouncement {
-
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
-
        wire::serialize(self).bind(stmt, i)
-
    }
-
}
-

impl From<wire::Error> for sql::Error {
    fn from(other: wire::Error) -> Self {
        sql::Error {
modified crates/radicle-protocol/src/service/message.rs
@@ -16,6 +16,7 @@ use crate::bounded::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{Link, NodeId, Timestamp};
use crate::wire;
+
use crate::wire::Encode as _;

/// Maximum number of addresses which can be announced to other nodes.
pub const ADDRESS_LIMIT: usize = 16;
@@ -82,7 +83,7 @@ impl NodeAnnouncement {
        let mut output = [0u8; 32];

        scrypt::scrypt(
-
            wire::serialize(self).as_ref(),
+
            &self.encode_vec(),
            Announcement::POW_SALT,
            &params,
            &mut output,
@@ -265,7 +266,8 @@ impl AnnouncementMessage {
    {
        use crypto::signature::Signer as _;

-
        let msg = wire::serialize(&self);
+
        let msg = self.encode_vec();
+

        let signature = signer.sign(&msg);

        Announcement {
@@ -365,7 +367,7 @@ impl Announcement {

    /// Verify this announcement's signature.
    pub fn verify(&self) -> bool {
-
        let msg = wire::serialize(&self.message);
+
        let msg = self.message.encode_vec();
        self.node.verify(msg, &self.signature).is_ok()
    }

@@ -425,8 +427,8 @@ impl PartialOrd for Message {

impl Ord for Message {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
-
        let this = wire::serialize(self);
-
        let other = wire::serialize(other);
+
        let this = self.encode_vec();
+
        let other = other.encode_vec();

        this.cmp(&other)
    }
@@ -674,13 +676,14 @@ mod tests {
    use std::str::FromStr;

    use fastrand;
+
    use localtime::LocalTime;
    use qcheck_macros::quickcheck;
    use radicle::git::raw;
+
    use radicle::test::arbitrary;
+

+
    use crate::wire::Decode as _;

    use super::*;
-
    use crate::wire::Encode;
-
    use localtime::LocalTime;
-
    use radicle::test::arbitrary;

    #[test]
    fn test_ref_remote_limit() {
@@ -709,7 +712,7 @@ mod tests {
        let mut buf = Vec::new();
        msg.encode(&mut buf);

-
        let decoded = wire::deserialize(buf.as_slice());
+
        let decoded = Message::decode_exact(buf.as_slice());
        assert!(decoded.is_ok());
        assert_eq!(msg, decoded.unwrap());
    }
@@ -728,7 +731,7 @@ mod tests {
        let mut buf: Vec<u8> = Vec::new();
        msg.encode(&mut buf);

-
        let decoded = wire::deserialize(buf.as_slice());
+
        let decoded = Message::decode_exact(buf.as_slice());
        assert!(
            decoded.is_ok(),
            "INVENTORY_LIMIT is a valid limit for decoding"
modified crates/radicle-protocol/src/wire.rs
@@ -7,6 +7,7 @@ pub use message::{AddressType, MessageType};

use std::collections::BTreeMap;
use std::convert::TryFrom;
+
use std::fmt::Debug;
use std::mem;
use std::ops::Deref;
use std::str::FromStr;
@@ -41,39 +42,50 @@ use crate::service::filter;
pub type Size = u16;

#[derive(thiserror::Error, Debug)]
-
pub enum Error {
+
pub enum Invalid {
+
    #[error("invalid Git object identifier size: expected {expected}, got {actual}")]
+
    Oid { expected: usize, actual: usize },
+
    #[error(transparent)]
+
    Bounded(#[from] crate::bounded::Error),
+
    #[error("invalid filter size: {actual}")]
+
    FilterSize { actual: usize },
    #[error("UTF-8 error: {0}")]
    FromUtf8(#[from] FromUtf8Error),
-
    #[error("invalid size: expected {expected}, got {actual}")]
-
    InvalidSize { expected: usize, actual: usize },
-
    #[error("invalid filter size: {0}")]
-
    InvalidFilterSize(usize),
-
    #[error("invalid channel type {0:x}")]
-
    InvalidStreamKind(u8),
    #[error(transparent)]
-
    InvalidRefName(#[from] fmt::Error),
+
    RefName(#[from] fmt::Error),
    #[error(transparent)]
-
    InvalidAlias(#[from] node::AliasError),
-
    #[error("invalid user agent string: {0:?}")]
-
    InvalidUserAgent(String),
-
    #[error("invalid control message with type `{0}`")]
-
    InvalidControlMessage(u8),
-
    #[error("invalid protocol version header `{0:x?}`")]
-
    InvalidProtocolVersion([u8; 4]),
+
    Alias(#[from] node::AliasError),
+
    #[error("invalid user agent string: {err}")]
+
    InvalidUserAgent { err: String },
    #[error("invalid onion address: {0}")]
-
    InvalidOnionAddr(#[from] tor::OnionAddrDecodeError),
-
    #[error("invalid timestamp: {0}")]
-
    InvalidTimestamp(u64),
-
    #[error("wrong protocol version `{0}`")]
-
    WrongProtocolVersion(u8),
-
    #[error("unknown address type `{0}`")]
-
    UnknownAddressType(u8),
-
    #[error("unknown message type `{0}`")]
-
    UnknownMessageType(u16),
-
    #[error("unknown info type `{0}`")]
-
    UnknownInfoType(u16),
-
    #[error("unexpected bytes")]
-
    UnexpectedBytes,
+
    OnionAddr(#[from] tor::OnionAddrDecodeError),
+
    #[error("invalid timestamp: {actual_millis} millis")]
+
    Timestamp { actual_millis: u64 },
+

+
    // Message types
+
    #[error("invalid control message type: {actual:x}")]
+
    ControlType { actual: u8 },
+
    #[error("invalid stream type: {actual:x}")]
+
    StreamType { actual: u8 },
+
    #[error("invalid address type: {actual:x}")]
+
    AddressType { actual: u8 },
+
    #[error("invalid message type: {actual:x}")]
+
    MessageType { actual: u16 },
+
    #[error("invalid info message type: {actual:x}")]
+
    InfoMessageType { actual: u16 },
+

+
    // Protocol version handling
+
    #[error("invalid protocol version string: {actual:x?}")]
+
    ProtocolVersion { actual: [u8; 4] },
+
    #[error("unsupported protocol version: {actual}")]
+
    ProtocolVersionUnsupported { actual: u8 },
+
}
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error(transparent)]
+
    Invalid(#[from] Invalid),
+

    #[error("unexpected end of buffer, requested {requested} more bytes but only {available} are available")]
    UnexpectedEnd { available: usize, requested: usize },
}
@@ -94,33 +106,43 @@ impl From<bytes::TryGetError> for Error {

/// Things that can be encoded as binary.
pub trait Encode {
+
    /// Encode self by writing it to the given buffer.
    fn encode(&self, buffer: &mut impl BufMut);
+

+
    /// A convenience wrapper around [`Encode::encode`]
+
    /// that allocates a [`Vec`].
+
    fn encode_vec(&self) -> Vec<u8> {
+
        let mut buf = Vec::new();
+
        self.encode(&mut buf);
+
        buf
+
    }
}

/// Things that can be decoded from binary.
pub trait Decode: Sized {
    fn decode(buffer: &mut impl Buf) -> Result<Self, Error>;
-
}
-

-
/// Encode an object into a byte vector.
-
///
-
/// # Panics
-
///
-
/// If the encoded object exceeds [`Size::MAX`].
-
pub fn serialize<E: Encode + ?Sized>(data: &E) -> Vec<u8> {
-
    let mut buffer = Vec::new().limit(Size::MAX as usize);
-
    data.encode(&mut buffer);
-
    buffer.into_inner()
-
}

-
/// Decode an object from a slice.
-
pub fn deserialize<T: Decode>(mut data: &[u8]) -> Result<T, Error> {
-
    let result = T::decode(&mut data)?;
-

-
    if data.is_empty() {
-
        Ok(result)
-
    } else {
-
        Err(Error::UnexpectedBytes)
+
    /// A convenience wrapper around [`Decode::decode`] to decode
+
    /// from a slice exactly.
+
    ///
+
    /// # Panics
+
    ///
+
    ///  - If decoding failed because there were not enough bytes.
+
    ///  - If there are any bytes left after decoding.
+
    #[cfg(test)]
+
    fn decode_exact(mut data: &[u8]) -> Result<Self, Invalid> {
+
        match Self::decode(&mut data) {
+
            Ok(value) => {
+
                if !data.is_empty() {
+
                    panic!("{} bytes left in buffer", data.len());
+
                }
+
                Ok(value)
+
            }
+
            Err(err @ Error::UnexpectedEnd { .. }) => {
+
                panic!("{}", err);
+
            }
+
            Err(Error::Invalid(e)) => Err(e),
+
        }
    }
}

@@ -299,7 +321,7 @@ impl Decode for Refs {

        for _ in 0..len {
            let name = String::decode(buf)?;
-
            let name = git::RefString::try_from(name).map_err(Error::from)?;
+
            let name = git::RefString::try_from(name).map_err(Invalid::from)?;
            let oid = git::Oid::decode(buf)?;

            refs.insert(name, oid);
@@ -311,19 +333,21 @@ impl Decode for Refs {
impl Decode for git::RefString {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let ref_str = String::decode(buf)?;
-
        git::RefString::try_from(ref_str).map_err(Error::from)
+
        Ok(git::RefString::try_from(ref_str).map_err(Invalid::from)?)
    }
}

impl Decode for UserAgent {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
-
        String::decode(buf).and_then(|s| UserAgent::from_str(&s).map_err(Error::InvalidUserAgent))
+
        let user_agent = String::decode(buf)?;
+
        Ok(UserAgent::from_str(&user_agent).map_err(|err| Invalid::InvalidUserAgent { err })?)
    }
}

impl Decode for Alias {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
-
        String::decode(buf).and_then(|s| Alias::from_str(&s).map_err(Error::from))
+
        let alias = String::decode(buf)?;
+
        Ok(Alias::from_str(&alias).map_err(Invalid::from)?)
    }
}

@@ -341,18 +365,19 @@ where

impl Decode for git::Oid {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        const LEN_EXPECTED: usize = mem::size_of::<git::raw::Oid>();
+

        let len = Size::decode(buf)? as usize;
-
        #[allow(non_upper_case_globals)]
-
        const expected: usize = mem::size_of::<git::raw::Oid>();

-
        if len != expected {
-
            return Err(Error::InvalidSize {
-
                expected,
+
        if len != LEN_EXPECTED {
+
            return Err(Invalid::Oid {
+
                expected: LEN_EXPECTED,
                actual: len,
-
            });
+
            }
+
            .into());
        }

-
        let buf: [u8; expected] = Decode::decode(buf)?;
+
        let buf: [u8; LEN_EXPECTED] = Decode::decode(buf)?;
        let oid = git::raw::Oid::from_bytes(&buf).expect("the buffer is exactly the right size");
        let oid = git::Oid::from(oid);

@@ -407,10 +432,7 @@ where
{
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let len: usize = Size::decode(buf)? as usize;
-
        let mut items = Self::with_capacity(len).map_err(|_| Error::InvalidSize {
-
            expected: Self::max(),
-
            actual: len,
-
        })?;
+
        let mut items = Self::with_capacity(len).map_err(Invalid::from)?;

        for _ in 0..items.capacity() {
            let item = T::decode(buf)?;
@@ -427,7 +449,7 @@ impl Decode for String {

        buf.try_copy_to_slice(&mut bytes)?;

-
        let string = String::from_utf8(bytes)?;
+
        let string = String::from_utf8(bytes).map_err(Invalid::from)?;

        Ok(string)
    }
@@ -451,7 +473,7 @@ impl Decode for filter::Filter {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let size: usize = Size::decode(buf)? as usize;
        if !filter::FILTER_SIZES.contains(&size) {
-
            return Err(Error::InvalidFilterSize(size));
+
            return Err(Invalid::FilterSize { actual: size }.into());
        }

        let mut bytes = vec![0; size];
@@ -515,7 +537,7 @@ impl Decode for node::Features {
impl Decode for tor::OnionAddrV3 {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let bytes: [u8; tor::ONION_V3_RAW_LEN] = Decode::decode(buf)?;
-
        let addr = tor::OnionAddrV3::from_raw_bytes(bytes)?;
+
        let addr = tor::OnionAddrV3::from_raw_bytes(bytes).map_err(Invalid::from)?;

        Ok(addr)
    }
@@ -530,7 +552,9 @@ impl Encode for Timestamp {
impl Decode for Timestamp {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let millis = u64::decode(buf)?;
-
        let ts = Timestamp::try_from(millis).map_err(Error::InvalidTimestamp)?;
+
        let ts = Timestamp::try_from(millis).map_err(|value| Invalid::Timestamp {
+
            actual_millis: value,
+
        })?;

        Ok(ts)
    }
@@ -548,22 +572,22 @@ mod tests {

    #[quickcheck]
    fn prop_u8(input: u8) {
-
        assert_eq!(deserialize::<u8>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(u8::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_u16(input: u16) {
-
        assert_eq!(deserialize::<u16>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(u16::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_u32(input: u32) {
-
        assert_eq!(deserialize::<u32>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(u32::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_u64(input: u64) {
-
        assert_eq!(deserialize::<u64>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(u64::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
@@ -571,7 +595,7 @@ mod tests {
        if input.len() > u8::MAX as usize {
            return qcheck::TestResult::discard();
        }
-
        assert_eq!(deserialize::<String>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(String::decode_exact(&input.encode_vec()).unwrap(), input);

        qcheck::TestResult::passed()
    }
@@ -579,38 +603,38 @@ mod tests {
    #[quickcheck]
    fn prop_vec(input: BoundedVec<String, 16>) {
        assert_eq!(
-
            deserialize::<BoundedVec<String, 16>>(&serialize(&input.as_slice())).unwrap(),
+
            BoundedVec::<String, 16>::decode_exact(&input.encode_vec()).unwrap(),
            input
        );
    }

    #[quickcheck]
    fn prop_pubkey(input: PublicKey) {
-
        assert_eq!(deserialize::<PublicKey>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(PublicKey::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_filter(input: filter::Filter) {
        assert_eq!(
-
            deserialize::<filter::Filter>(&serialize(&input)).unwrap(),
+
            filter::Filter::decode_exact(&input.encode_vec()).unwrap(),
            input
        );
    }

    #[quickcheck]
    fn prop_id(input: RepoId) {
-
        assert_eq!(deserialize::<RepoId>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(RepoId::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_refs(input: Refs) {
-
        assert_eq!(deserialize::<Refs>(&serialize(&input)).unwrap(), input);
+
        assert_eq!(Refs::decode_exact(&input.encode_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_tuple(input: (String, String)) {
        assert_eq!(
-
            deserialize::<(String, String)>(&serialize(&input)).unwrap(),
+
            <(String, String)>::decode_exact(&input.encode_vec()).unwrap(),
            input
        );
    }
@@ -620,7 +644,7 @@ mod tests {
        let signature = Signature::from(input);

        assert_eq!(
-
            deserialize::<Signature>(&serialize(&signature)).unwrap(),
+
            Signature::decode_exact(&signature.encode_vec()).unwrap(),
            signature
        );
    }
@@ -629,13 +653,13 @@ mod tests {
    fn prop_oid(input: [u8; 20]) {
        let oid = git::Oid::try_from(input.as_slice()).unwrap();

-
        assert_eq!(deserialize::<git::Oid>(&serialize(&oid)).unwrap(), oid);
+
        assert_eq!(git::Oid::decode_exact(&oid.encode_vec()).unwrap(), oid);
    }

    #[quickcheck]
    fn prop_signed_refs(input: SignedRefs<Unverified>) {
        assert_eq!(
-
            deserialize::<SignedRefs<Unverified>>(&serialize(&input)).unwrap(),
+
            SignedRefs::<Unverified>::decode_exact(&input.encode_vec()).unwrap(),
            input
        );
    }
@@ -643,7 +667,7 @@ mod tests {
    #[test]
    fn test_string() {
        assert_eq!(
-
            serialize(&String::from("hello")),
+
            String::from("hello").encode_vec(),
            vec![5, b'h', b'e', b'l', b'l', b'o']
        );
    }
@@ -651,7 +675,7 @@ mod tests {
    #[test]
    fn test_alias() {
        assert_eq!(
-
            serialize(&Alias::from_str("hello").unwrap()),
+
            Alias::from_str("hello").unwrap().encode_vec(),
            vec![5, b'h', b'e', b'l', b'l', b'o']
        );
    }
@@ -660,30 +684,30 @@ mod tests {
    fn test_filter_invalid() {
        let b = bloomy::BloomFilter::with_size(filter::FILTER_SIZE_M / 3);
        let f = filter::Filter::from(b);
-
        let bytes = serialize(&f);
+
        let bytes = f.encode_vec();

        assert_matches!(
-
            deserialize::<filter::Filter>(&bytes).unwrap_err(),
-
            Error::InvalidFilterSize(_)
+
            filter::Filter::decode_exact(&bytes).unwrap_err(),
+
            Invalid::FilterSize { .. }
        );
    }

    #[test]
    fn test_bounded_vec_limit() {
        let v: BoundedVec<u8, 2> = vec![1, 2].try_into().unwrap();
-
        let buf = serialize(&v);
+
        let buf = &v.encode_vec();

        assert_matches!(
-
            deserialize::<BoundedVec<u8, 1>>(&buf),
-
            Err(Error::InvalidSize {
+
            BoundedVec::<u8, 1>::decode_exact(buf),
+
            Err(Invalid::Bounded(crate::bounded::Error::InvalidSize {
                expected: 1,
                actual: 2
-
            }),
+
            })),
            "fail when vector is too small for buffer",
        );

        assert!(
-
            deserialize::<BoundedVec<u8, 2>>(&buf).is_ok(),
+
            BoundedVec::<u8, 2>::decode_exact(buf).is_ok(),
            "successfully decode vector of same size",
        );
    }
modified crates/radicle-protocol/src/wire/frame.rs
@@ -12,13 +12,6 @@ use crate::{wire, wire::varint, wire::varint::VarInt, PROTOCOL_VERSION};
/// by a version number.
pub const PROTOCOL_VERSION_STRING: Version = Version([b'r', b'a', b'd', PROTOCOL_VERSION]);

-
/// Control open byte.
-
const CONTROL_OPEN: u8 = 0;
-
/// Control close byte.
-
const CONTROL_CLOSE: u8 = 1;
-
/// Control EOF byte.
-
const CONTROL_EOF: u8 = 2;
-

/// Protocol version.
#[derive(Debug, PartialEq, Eq)]
pub struct Version([u8; 4]);
@@ -43,7 +36,7 @@ impl wire::Decode for Version {
        buf.try_copy_to_slice(&mut version[..])?;

        if version != PROTOCOL_VERSION_STRING.0 {
-
            return Err(wire::Error::InvalidProtocolVersion(version));
+
            return Err(wire::Invalid::ProtocolVersion { actual: version }.into());
        }
        Ok(Self(version))
    }
@@ -94,29 +87,29 @@ impl StreamId {
    }

    /// Get the kind of stream this is.
-
    pub fn kind(&self) -> Result<StreamKind, u8> {
+
    pub fn kind(&self) -> Result<StreamType, u8> {
        let id = *self.0;
        let kind = ((id >> 1) & 0b11) as u8;

-
        StreamKind::try_from(kind)
+
        StreamType::try_from(kind)
    }

    /// Create a control identifier.
    pub fn control(link: Link) -> Self {
        let link = if link.is_outbound() { 0 } else { 1 };
-
        Self(VarInt::from(((StreamKind::Control as u8) << 1) | link))
+
        Self(VarInt::from(((u8::from(StreamType::Control)) << 1) | link))
    }

    /// Create a gossip identifier.
    pub fn gossip(link: Link) -> Self {
        let link = if link.is_outbound() { 0 } else { 1 };
-
        Self(VarInt::from(((StreamKind::Gossip as u8) << 1) | link))
+
        Self(VarInt::from((u8::from(StreamType::Gossip) << 1) | link))
    }

    /// Create a git identifier.
    pub fn git(link: Link) -> Self {
        let link = if link.is_outbound() { 0 } else { 1 };
-
        Self(VarInt::from(((StreamKind::Git as u8) << 1) | link))
+
        Self(VarInt::from((u8::from(StreamType::Git) << 1) | link))
    }

    /// Get the nth identifier while preserving the stream type and initiator.
@@ -160,7 +153,7 @@ impl wire::Encode for StreamId {
/// Type of stream.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
-
pub enum StreamKind {
+
pub enum StreamType {
    /// Control stream, used to open and close streams.
    Control = 0b00,
    /// Gossip stream, used to exchange messages.
@@ -169,19 +162,25 @@ pub enum StreamKind {
    Git = 0b10,
}

-
impl TryFrom<u8> for StreamKind {
+
impl TryFrom<u8> for StreamType {
    type Error = u8;

    fn try_from(value: u8) -> Result<Self, Self::Error> {
        match value {
-
            0b00 => Ok(StreamKind::Control),
-
            0b01 => Ok(StreamKind::Gossip),
-
            0b10 => Ok(StreamKind::Git),
+
            0b00 => Ok(StreamType::Control),
+
            0b01 => Ok(StreamType::Gossip),
+
            0b10 => Ok(StreamType::Git),
            n => Err(n),
        }
    }
}

+
impl From<StreamType> for u8 {
+
    fn from(value: StreamType) -> Self {
+
        value as u8
+
    }
+
}
+

/// Protocol frame.
///
/// ```text
@@ -234,13 +233,6 @@ impl<M> Frame<M> {
    }
}

-
impl<M: wire::Encode> Frame<M> {
-
    /// Serialize frame to bytes.
-
    pub fn to_bytes(&self) -> Vec<u8> {
-
        wire::serialize(self)
-
    }
-
}
-

/// Frame payload.
#[derive(Debug, PartialEq, Eq)]
pub enum FrameData<M> {
@@ -274,23 +266,50 @@ pub enum Control {
    },
}

+
/// Type of control message.
+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+
#[repr(u8)]
+
pub enum ControlType {
+
    /// Control open byte.
+
    Open = 0,
+
    /// Control close byte.
+
    Close = 1,
+
    /// Control EOF byte.
+
    Eof = 2,
+
}
+

+
impl TryFrom<u8> for ControlType {
+
    type Error = u8;
+

+
    fn try_from(value: u8) -> Result<Self, Self::Error> {
+
        match value {
+
            0b00 => Ok(ControlType::Open),
+
            0b01 => Ok(ControlType::Close),
+
            0b10 => Ok(ControlType::Eof),
+
            n => Err(n),
+
        }
+
    }
+
}
+

+
impl From<ControlType> for u8 {
+
    fn from(value: ControlType) -> Self {
+
        value as u8
+
    }
+
}
+

impl wire::Decode for Control {
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
-
        let command = u8::decode(buf)?;
-
        match command {
-
            CONTROL_OPEN => {
-
                let stream = StreamId::decode(buf)?;
-
                Ok(Control::Open { stream })
-
            }
-
            CONTROL_CLOSE => {
-
                let stream = StreamId::decode(buf)?;
-
                Ok(Control::Close { stream })
-
            }
-
            CONTROL_EOF => {
-
                let stream = StreamId::decode(buf)?;
-
                Ok(Control::Eof { stream })
-
            }
-
            other => Err(wire::Error::InvalidControlMessage(other)),
+
        match ControlType::try_from(u8::decode(buf)?) {
+
            Ok(ControlType::Open) => Ok(Control::Open {
+
                stream: StreamId::decode(buf)?,
+
            }),
+
            Ok(ControlType::Close) => Ok(Control::Close {
+
                stream: StreamId::decode(buf)?,
+
            }),
+
            Ok(ControlType::Eof) => Ok(Control::Eof {
+
                stream: StreamId::decode(buf)?,
+
            }),
+
            Err(other) => Err(wire::Invalid::ControlType { actual: other }.into()),
        }
    }
}
@@ -299,15 +318,15 @@ impl wire::Encode for Control {
    fn encode(&self, buf: &mut impl BufMut) {
        match self {
            Self::Open { stream: id } => {
-
                CONTROL_OPEN.encode(buf);
+
                u8::from(ControlType::Open).encode(buf);
                id.encode(buf);
            }
            Self::Eof { stream: id } => {
-
                CONTROL_EOF.encode(buf);
+
                u8::from(ControlType::Eof).encode(buf);
                id.encode(buf);
            }
            Self::Close { stream: id } => {
-
                CONTROL_CLOSE.encode(buf);
+
                u8::from(ControlType::Close).encode(buf);
                id.encode(buf);
            }
        }
@@ -318,12 +337,15 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
        let version = Version::decode(buf)?;
        if version.number() != PROTOCOL_VERSION {
-
            return Err(wire::Error::WrongProtocolVersion(version.number()));
+
            return Err(wire::Invalid::ProtocolVersionUnsupported {
+
                actual: version.number(),
+
            }
+
            .into());
        }
        let stream = StreamId::decode(buf)?;

        match stream.kind() {
-
            Ok(StreamKind::Control) => {
+
            Ok(StreamType::Control) => {
                let ctrl = Control::decode(buf)?;
                let frame = Frame {
                    version,
@@ -332,7 +354,7 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
                };
                Ok(frame)
            }
-
            Ok(StreamKind::Gossip) => {
+
            Ok(StreamType::Gossip) => {
                let data = varint::payload::decode(buf)?;
                let mut cursor = io::Cursor::new(data);
                let msg = M::decode(&mut cursor)?;
@@ -347,11 +369,11 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {

                Ok(frame)
            }
-
            Ok(StreamKind::Git) => {
+
            Ok(StreamType::Git) => {
                let data = varint::payload::decode(buf)?;
                Ok(Frame::git(stream, data))
            }
-
            Err(n) => Err(wire::Error::InvalidStreamKind(n)),
+
            Err(n) => Err(wire::Invalid::StreamType { actual: n }.into()),
        }
    }
}
@@ -363,7 +385,7 @@ impl<M: wire::Encode> wire::Encode for Frame<M> {
        match &self.data {
            FrameData::Control(ctrl) => ctrl.encode(buf),
            FrameData::Git(data) => varint::payload::encode(data, buf),
-
            FrameData::Gossip(msg) => varint::payload::encode(&wire::serialize(msg), buf),
+
            FrameData::Gossip(msg) => varint::payload::encode(&msg.encode_vec(), buf),
        }
    }
}
@@ -374,9 +396,9 @@ mod test {

    #[test]
    fn test_stream_id() {
-
        assert_eq!(StreamId(VarInt(0b000)).kind().unwrap(), StreamKind::Control);
-
        assert_eq!(StreamId(VarInt(0b010)).kind().unwrap(), StreamKind::Gossip);
-
        assert_eq!(StreamId(VarInt(0b100)).kind().unwrap(), StreamKind::Git);
+
        assert_eq!(StreamId(VarInt(0b000)).kind().unwrap(), StreamType::Control);
+
        assert_eq!(StreamId(VarInt(0b010)).kind().unwrap(), StreamType::Gossip);
+
        assert_eq!(StreamId(VarInt(0b100)).kind().unwrap(), StreamType::Git);
        assert_eq!(StreamId(VarInt(0b001)).link(), Link::Inbound);
        assert_eq!(StreamId(VarInt(0b000)).link(), Link::Outbound);
        assert_eq!(StreamId(VarInt(0b101)).link(), Link::Inbound);
@@ -390,4 +412,27 @@ mod test {
        assert_eq!(StreamId::control(Link::Inbound), StreamId(VarInt(0b001)));
        assert_eq!(StreamId::gossip(Link::Inbound), StreamId(VarInt(0b011)));
    }
+

+
    #[test]
+
    fn test_encode_git_large() {
+
        use wire::Encode as _;
+

+
        let size = u16::MAX as usize * 3;
+
        assert!(
+
            size > (wire::Size::MAX as usize * 2),
+
            "we want to test sizes that are way larger than any gossip message"
+
        );
+

+
        let a_lot_of_data = vec![0u8; size];
+

+
        let frame: Frame<Message> = Frame::git(StreamId(0u8.into()), a_lot_of_data);
+

+
        // In previous versions since 3c5668e this would panic.
+
        let bytes = frame.encode_vec();
+

+
        assert!(
+
            bytes.len() > wire::Size::MAX as usize * 2,
+
            "just making sure that whatever was encoded is still quite large"
+
        );
+
    }
}
modified crates/radicle-protocol/src/wire/message.rs
@@ -227,13 +227,15 @@ impl wire::Decode for Info {

                Ok(Self::RefsAlreadySynced { rid, at })
            }
-
            Err(other) => Err(wire::Error::UnknownInfoType(other)),
+
            Err(other) => Err(wire::Invalid::InfoMessageType { actual: other }.into()),
        }
    }
}

impl wire::Encode for Message {
    fn encode(&self, buf: &mut impl BufMut) {
+
        let buf = &mut buf.limit(wire::Size::MAX as usize);
+

        self.type_id().encode(buf);

        match self {
@@ -334,7 +336,7 @@ impl wire::Decode for Message {
                let zeroes = ZeroBytes::decode(buf)?;
                Ok(Self::Pong { zeroes })
            }
-
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
+
            Err(other) => Err(wire::Invalid::MessageType { actual: other }.into()),
        }
    }
}
@@ -396,7 +398,7 @@ impl wire::Decode for Address {

                HostName::Tor(onion)
            }
-
            Err(other) => return Err(wire::Error::UnknownAddressType(other)),
+
            Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
        };
        let port = u16::decode(buf)?;

@@ -430,7 +432,7 @@ mod tests {
    use radicle::storage::refs::RefsAt;

    use crate::deserializer::Deserializer;
-
    use crate::wire::{self, Encode};
+
    use crate::wire::{self, Decode, Encode};
    use radicle::test::arbitrary;

    #[test]
@@ -444,7 +446,7 @@ mod tests {
        });
        let ann = ann.signed(&signer);
        let msg = Message::Announcement(ann);
-
        let data = wire::serialize(&msg);
+
        let data = msg.encode_vec();

        assert!(data.len() < wire::Size::MAX as usize);
    }
@@ -459,7 +461,7 @@ mod tests {
        });
        let ann = ann.signed(&signer);
        let msg = Message::Announcement(ann);
-
        let data = wire::serialize(&msg);
+
        let data = msg.encode_vec();

        assert!(data.len() < wire::Size::MAX as usize);
    }
@@ -480,44 +482,48 @@ mod tests {
        });
        let ann = ann.signed(&signer);
        let msg = Message::Announcement(ann);
-
        let data = wire::serialize(&msg);
+
        let data = msg.encode_vec();

        assert!(data.len() < wire::Size::MAX as usize);
    }

    #[test]
    fn test_pingpong_encode_max_size() {
-
        wire::serialize(&Message::Ping(Ping {
+
        Message::Ping(Ping {
            ponglen: 0,
            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
-
        }));
+
        })
+
        .encode_vec();

-
        wire::serialize(&Message::Pong {
+
        (Message::Pong {
            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
-
        });
+
        })
+
        .encode_vec();
    }

    #[test]
    #[should_panic(expected = "advance out of bounds")]
    fn test_ping_encode_size_overflow() {
-
        wire::serialize(&Message::Ping(Ping {
+
        Message::Ping(Ping {
            ponglen: 0,
            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
-
        }));
+
        })
+
        .encode_vec();
    }

    #[test]
    #[should_panic(expected = "advance out of bounds")]
    fn test_pong_encode_size_overflow() {
-
        wire::serialize(&Message::Pong {
+
        Message::Pong {
            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
-
        });
+
        }
+
        .encode_vec();
    }

    #[quickcheck]
    fn prop_message_encode_decode(message: Message) {
-
        let encoded = &wire::serialize(&message);
-
        let decoded = wire::deserialize::<Message>(encoded).unwrap();
+
        let encoded = message.encode_vec();
+
        let decoded = Message::decode_exact(&encoded).unwrap();

        assert_eq!(message, decoded);
    }
@@ -550,7 +556,7 @@ mod tests {
            let zeroes = ZeroBytes::new(zeroes);

            assert_eq!(
-
                wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
+
                ZeroBytes::decode_exact(&zeroes.encode_vec()).unwrap(),
                zeroes
            );
        }
@@ -562,9 +568,6 @@ mod tests {

    #[quickcheck]
    fn prop_addr(addr: Address) {
-
        assert_eq!(
-
            wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
-
            addr
-
        );
+
        assert_eq!(Address::decode_exact(&addr.encode_vec()).unwrap(), addr);
    }
}
modified crates/radicle-protocol/src/wire/varint.rs
@@ -213,8 +213,8 @@ mod test {

    #[quickcheck]
    fn prop_encode_decode(input: VarInt) {
-
        let encoded = wire::serialize(&input);
-
        let decoded: VarInt = wire::deserialize(&encoded).unwrap();
+
        let encoded = input.encode_vec();
+
        let decoded: VarInt = VarInt::decode_exact(&encoded).unwrap();

        assert_eq!(decoded, input);
    }
@@ -222,30 +222,24 @@ mod test {
    #[test]
    #[should_panic]
    fn test_encode_overflow() {
-
        wire::serialize(&VarInt(u64::MAX));
+
        VarInt(u64::MAX).encode_vec();
    }

    #[test]
    fn test_encoding() {
-
        assert_eq!(wire::serialize(&VarInt(0)), vec![0x0]);
-
        assert_eq!(wire::serialize(&VarInt(1)), vec![0x01]);
-
        assert_eq!(wire::serialize(&VarInt(10)), vec![0x0a]);
-
        assert_eq!(wire::serialize(&VarInt(37)), vec![0x25]);
+
        assert_eq!(VarInt(0).encode_vec(), vec![0x0]);
+
        assert_eq!(VarInt(1).encode_vec(), vec![0x01]);
+
        assert_eq!(VarInt(10).encode_vec(), vec![0x0a]);
+
        assert_eq!(VarInt(37).encode_vec(), vec![0x25]);
+
        assert_eq!(VarInt::decode_exact(&[0x40, 0x25]).unwrap(), VarInt(37));
+
        assert_eq!(VarInt(15293).encode_vec(), vec![0x7b, 0xbd]);
+
        assert_eq!(VarInt(494878333).encode_vec(), vec![0x9d, 0x7f, 0x3e, 0x7d],);
        assert_eq!(
-
            wire::deserialize::<VarInt>(&[0x40, 0x25]).unwrap(),
-
            VarInt(37)
-
        );
-
        assert_eq!(wire::serialize(&VarInt(15293)), vec![0x7b, 0xbd]);
-
        assert_eq!(
-
            wire::serialize(&VarInt(494878333)),
-
            vec![0x9d, 0x7f, 0x3e, 0x7d],
-
        );
-
        assert_eq!(
-
            wire::serialize(&VarInt(151288809941952652)),
+
            VarInt(151288809941952652).encode_vec(),
            vec![0xc2, 0x19, 0x7c, 0x5e, 0xff, 0x14, 0xe8, 0x8c]
        );
        assert_eq!(
-
            wire::serialize(&VarInt(10000000000)),
+
            VarInt(10000000000).encode_vec(),
            vec![0xc0, 0x00, 0x00, 0x02, 0x54, 0x0b, 0xe4, 0x00],
        );
    }