Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: Refactor decoding
Lorenz Leutgeb committed 8 months ago
commit a568e7f4848729ca02d5d5373dfacba289658493
parent cbd2a7070c9a8c18ad20785f383eb1dd0273a1ee
6 files changed +192 -148
modified crates/radicle-protocol/src/deserializer.rs
@@ -48,7 +48,7 @@ impl<const B: usize, D: wire::Decode> Deserializer<B, D> {
    }

    /// Decode and return the next message. Returns [`None`] if nothing was decoded.
-
    pub fn deserialize_next(&mut self) -> Result<Option<D>, wire::Error> {
+
    pub fn deserialize_next(&mut self) -> Result<Option<D>, wire::Invalid> {
        let mut reader = io::Cursor::new(self.unparsed.as_slice());

        match D::decode(&mut reader) {
@@ -59,7 +59,7 @@ impl<const B: usize, D: wire::Decode> Deserializer<B, D> {
                Ok(Some(msg))
            }
            Err(wire::Error::UnexpectedEnd { .. }) => Ok(None),
-
            Err(err) => Err(err),
+
            Err(wire::Error::Invalid(err)) => Err(err),
        }
    }

@@ -106,7 +106,7 @@ impl<const B: usize, D: wire::Decode> io::Write for Deserializer<B, D> {
}

impl<const B: usize, D: wire::Decode> Iterator for Deserializer<B, D> {
-
    type Item = Result<D, wire::Error>;
+
    type Item = Result<D, wire::Invalid>;

    fn next(&mut self) -> Option<Self::Item> {
        self.deserialize_next().transpose()
modified crates/radicle-protocol/src/service/message.rs
@@ -677,12 +677,14 @@ mod tests {
    use std::str::FromStr;

    use fastrand;
+
    use localtime::LocalTime;
    use qcheck_macros::quickcheck;
    use radicle::git::raw;
+
    use radicle::test::arbitrary;
+

+
    use crate::wire::Decode as _;

    use super::*;
-
    use localtime::LocalTime;
-
    use radicle::test::arbitrary;

    #[test]
    fn test_ref_remote_limit() {
@@ -711,7 +713,7 @@ mod tests {
        let mut buf = Vec::new();
        msg.encode(&mut buf);

-
        let decoded = wire::deserialize(buf.as_slice());
+
        let decoded = Message::decode_exact(buf.as_slice());
        assert!(decoded.is_ok());
        assert_eq!(msg, decoded.unwrap());
    }
@@ -730,7 +732,7 @@ mod tests {
        let mut buf: Vec<u8> = Vec::new();
        msg.encode(&mut buf);

-
        let decoded = wire::deserialize(buf.as_slice());
+
        let decoded = Message::decode_exact(buf.as_slice());
        assert!(
            decoded.is_ok(),
            "INVENTORY_LIMIT is a valid limit for decoding"
modified crates/radicle-protocol/src/wire.rs
@@ -42,39 +42,50 @@ use crate::service::filter;
pub type Size = u16;

#[derive(thiserror::Error, Debug)]
-
pub enum Error {
+
pub enum Invalid {
+
    #[error("invalid Git object identifier size: expected {expected}, got {actual}")]
+
    Oid { expected: usize, actual: usize },
+
    #[error(transparent)]
+
    Bounded(#[from] crate::bounded::Error),
+
    #[error("invalid filter size: {actual}")]
+
    FilterSize { actual: usize },
    #[error("UTF-8 error: {0}")]
    FromUtf8(#[from] FromUtf8Error),
-
    #[error("invalid size: expected {expected}, got {actual}")]
-
    InvalidSize { expected: usize, actual: usize },
-
    #[error("invalid filter size: {0}")]
-
    InvalidFilterSize(usize),
-
    #[error("invalid channel type {0:x}")]
-
    InvalidStreamKind(u8),
    #[error(transparent)]
-
    InvalidRefName(#[from] fmt::Error),
+
    RefName(#[from] fmt::Error),
    #[error(transparent)]
-
    InvalidAlias(#[from] node::AliasError),
-
    #[error("invalid user agent string: {0:?}")]
-
    InvalidUserAgent(String),
-
    #[error("invalid control message with type `{0}`")]
-
    InvalidControlMessage(u8),
-
    #[error("invalid protocol version header `{0:x?}`")]
-
    InvalidProtocolVersion([u8; 4]),
+
    Alias(#[from] node::AliasError),
+
    #[error("invalid user agent string: {err}")]
+
    InvalidUserAgent { err: String },
    #[error("invalid onion address: {0}")]
-
    InvalidOnionAddr(#[from] tor::OnionAddrDecodeError),
-
    #[error("invalid timestamp: {0}")]
-
    InvalidTimestamp(u64),
-
    #[error("wrong protocol version `{0}`")]
-
    WrongProtocolVersion(u8),
-
    #[error("unknown address type `{0}`")]
-
    UnknownAddressType(u8),
-
    #[error("unknown message type `{0}`")]
-
    UnknownMessageType(u16),
-
    #[error("unknown info type `{0}`")]
-
    UnknownInfoType(u16),
-
    #[error("unexpected bytes")]
-
    UnexpectedBytes,
+
    OnionAddr(#[from] tor::OnionAddrDecodeError),
+
    #[error("invalid timestamp: {actual_millis} millis")]
+
    Timestamp { actual_millis: u64 },
+

+
    // Message types
+
    #[error("invalid control message type: {actual:x}")]
+
    ControlType { actual: u8 },
+
    #[error("invalid stream type: {actual:x}")]
+
    StreamType { actual: u8 },
+
    #[error("invalid address type: {actual:x}")]
+
    AddressType { actual: u8 },
+
    #[error("invalid message type: {actual:x}")]
+
    MessageType { actual: u16 },
+
    #[error("invalid info message type: {actual:x}")]
+
    InfoMessageType { actual: u16 },
+

+
    // Protocol version handling
+
    #[error("invalid protocol version string: {actual:x?}")]
+
    ProtocolVersion { actual: [u8; 4] },
+
    #[error("unsupported protocol version: {actual}")]
+
    ProtocolVersionUnsupported { actual: u8 },
+
}
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error(transparent)]
+
    Invalid(#[from] Invalid),
+

    #[error("unexpected end of buffer, requested {requested} more bytes but only {available} are available")]
    UnexpectedEnd { available: usize, requested: usize },
}
@@ -110,16 +121,28 @@ pub trait Encode {
/// Things that can be decoded from binary.
pub trait Decode: Sized {
    fn decode(buffer: &mut impl Buf) -> Result<Self, Error>;
-
}
-

-
/// Decode an object from a slice.
-
pub fn deserialize<T: Decode>(mut data: &[u8]) -> Result<T, Error> {
-
    let result = T::decode(&mut data)?;

-
    if data.is_empty() {
-
        Ok(result)
-
    } else {
-
        Err(Error::UnexpectedBytes)
+
    /// A convenience wrapper around [`Decode::decode`] to decode
+
    /// from a slice exactly.
+
    ///
+
    /// # Panics
+
    ///
+
    ///  - If decoding failed because there were not enough bytes.
+
    ///  - If there are any bytes left after decoding.
+
    #[cfg(test)]
+
    fn decode_exact(mut data: &[u8]) -> Result<Self, Invalid> {
+
        match Self::decode(&mut data) {
+
            Ok(value) => {
+
                if !data.is_empty() {
+
                    panic!("{} bytes left in buffer", data.len());
+
                }
+
                Ok(value)
+
            }
+
            Err(err @ Error::UnexpectedEnd { .. }) => {
+
                panic!("{}", err);
+
            }
+
            Err(Error::Invalid(e)) => Err(e),
+
        }
    }
}

@@ -298,7 +321,7 @@ impl Decode for Refs {

        for _ in 0..len {
            let name = String::decode(buf)?;
-
            let name = git::RefString::try_from(name).map_err(Error::from)?;
+
            let name = git::RefString::try_from(name).map_err(Invalid::from)?;
            let oid = git::Oid::decode(buf)?;

            refs.insert(name, oid);
@@ -310,19 +333,21 @@ impl Decode for Refs {
impl Decode for git::RefString {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let ref_str = String::decode(buf)?;
-
        git::RefString::try_from(ref_str).map_err(Error::from)
+
        Ok(git::RefString::try_from(ref_str).map_err(Invalid::from)?)
    }
}

impl Decode for UserAgent {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
-
        String::decode(buf).and_then(|s| UserAgent::from_str(&s).map_err(Error::InvalidUserAgent))
+
        let user_agent = String::decode(buf)?;
+
        Ok(UserAgent::from_str(&user_agent).map_err(|err| Invalid::InvalidUserAgent { err })?)
    }
}

impl Decode for Alias {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
-
        String::decode(buf).and_then(|s| Alias::from_str(&s).map_err(Error::from))
+
        let alias = String::decode(buf)?;
+
        Ok(Alias::from_str(&alias).map_err(Invalid::from)?)
    }
}

@@ -340,18 +365,19 @@ where

impl Decode for git::Oid {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        const LEN_EXPECTED: usize = mem::size_of::<git::raw::Oid>();
+

        let len = Size::decode(buf)? as usize;
-
        #[allow(non_upper_case_globals)]
-
        const expected: usize = mem::size_of::<git::raw::Oid>();

-
        if len != expected {
-
            return Err(Error::InvalidSize {
-
                expected,
+
        if len != LEN_EXPECTED {
+
            return Err(Invalid::Oid {
+
                expected: LEN_EXPECTED,
                actual: len,
-
            });
+
            }
+
            .into());
        }

-
        let buf: [u8; expected] = Decode::decode(buf)?;
+
        let buf: [u8; LEN_EXPECTED] = Decode::decode(buf)?;
        let oid = git::raw::Oid::from_bytes(&buf).expect("the buffer is exactly the right size");
        let oid = git::Oid::from(oid);

@@ -406,10 +432,7 @@ where
{
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let len: usize = Size::decode(buf)? as usize;
-
        let mut items = Self::with_capacity(len).map_err(|_| Error::InvalidSize {
-
            expected: Self::max(),
-
            actual: len,
-
        })?;
+
        let mut items = Self::with_capacity(len).map_err(Invalid::from)?;

        for _ in 0..items.capacity() {
            let item = T::decode(buf)?;
@@ -426,7 +449,7 @@ impl Decode for String {

        buf.try_copy_to_slice(&mut bytes)?;

-
        let string = String::from_utf8(bytes)?;
+
        let string = String::from_utf8(bytes).map_err(Invalid::from)?;

        Ok(string)
    }
@@ -450,7 +473,7 @@ impl Decode for filter::Filter {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let size: usize = Size::decode(buf)? as usize;
        if !filter::FILTER_SIZES.contains(&size) {
-
            return Err(Error::InvalidFilterSize(size));
+
            return Err(Invalid::FilterSize { actual: size }.into());
        }

        let mut bytes = vec![0; size];
@@ -514,7 +537,7 @@ impl Decode for node::Features {
impl Decode for tor::OnionAddrV3 {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let bytes: [u8; tor::ONION_V3_RAW_LEN] = Decode::decode(buf)?;
-
        let addr = tor::OnionAddrV3::from_raw_bytes(bytes)?;
+
        let addr = tor::OnionAddrV3::from_raw_bytes(bytes).map_err(Invalid::from)?;

        Ok(addr)
    }
@@ -529,7 +552,9 @@ impl Encode for Timestamp {
impl Decode for Timestamp {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let millis = u64::decode(buf)?;
-
        let ts = Timestamp::try_from(millis).map_err(Error::InvalidTimestamp)?;
+
        let ts = Timestamp::try_from(millis).map_err(|value| Invalid::Timestamp {
+
            actual_millis: value,
+
        })?;

        Ok(ts)
    }
@@ -547,22 +572,22 @@ mod tests {

    #[quickcheck]
    fn prop_u8(input: u8) {
-
        assert_eq!(deserialize::<u8>(&input.encode_to_vec()).unwrap(), input);
+
        assert_eq!(u8::decode_exact(&input.encode_to_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_u16(input: u16) {
-
        assert_eq!(deserialize::<u16>(&input.encode_to_vec()).unwrap(), input);
+
        assert_eq!(u16::decode_exact(&input.encode_to_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_u32(input: u32) {
-
        assert_eq!(deserialize::<u32>(&input.encode_to_vec()).unwrap(), input);
+
        assert_eq!(u32::decode_exact(&input.encode_to_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_u64(input: u64) {
-
        assert_eq!(deserialize::<u64>(&input.encode_to_vec()).unwrap(), input);
+
        assert_eq!(u64::decode_exact(&input.encode_to_vec()).unwrap(), input);
    }

    #[quickcheck]
@@ -570,10 +595,7 @@ mod tests {
        if input.len() > u8::MAX as usize {
            return qcheck::TestResult::discard();
        }
-
        assert_eq!(
-
            deserialize::<String>(&input.encode_to_vec()).unwrap(),
-
            input
-
        );
+
        assert_eq!(String::decode_exact(&input.encode_to_vec()).unwrap(), input);

        qcheck::TestResult::passed()
    }
@@ -581,7 +603,7 @@ mod tests {
    #[quickcheck]
    fn prop_vec(input: BoundedVec<String, 16>) {
        assert_eq!(
-
            deserialize::<BoundedVec<String, 16>>(&input.encode_to_vec()).unwrap(),
+
            BoundedVec::<String, 16>::decode_exact(&input.encode_to_vec()).unwrap(),
            input
        );
    }
@@ -589,7 +611,7 @@ mod tests {
    #[quickcheck]
    fn prop_pubkey(input: PublicKey) {
        assert_eq!(
-
            deserialize::<PublicKey>(&input.encode_to_vec()).unwrap(),
+
            PublicKey::decode_exact(&input.encode_to_vec()).unwrap(),
            input
        );
    }
@@ -597,28 +619,25 @@ mod tests {
    #[quickcheck]
    fn prop_filter(input: filter::Filter) {
        assert_eq!(
-
            deserialize::<filter::Filter>(&input.encode_to_vec()).unwrap(),
+
            filter::Filter::decode_exact(&input.encode_to_vec()).unwrap(),
            input
        );
    }

    #[quickcheck]
    fn prop_id(input: RepoId) {
-
        assert_eq!(
-
            deserialize::<RepoId>(&input.encode_to_vec()).unwrap(),
-
            input
-
        );
+
        assert_eq!(RepoId::decode_exact(&input.encode_to_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_refs(input: Refs) {
-
        assert_eq!(deserialize::<Refs>(&input.encode_to_vec()).unwrap(), input);
+
        assert_eq!(Refs::decode_exact(&input.encode_to_vec()).unwrap(), input);
    }

    #[quickcheck]
    fn prop_tuple(input: (String, String)) {
        assert_eq!(
-
            deserialize::<(String, String)>(&input.encode_to_vec()).unwrap(),
+
            <(String, String)>::decode_exact(&input.encode_to_vec()).unwrap(),
            input
        );
    }
@@ -628,7 +647,7 @@ mod tests {
        let signature = Signature::from(input);

        assert_eq!(
-
            deserialize::<Signature>(&signature.encode_to_vec()).unwrap(),
+
            Signature::decode_exact(&signature.encode_to_vec()).unwrap(),
            signature
        );
    }
@@ -637,13 +656,13 @@ mod tests {
    fn prop_oid(input: [u8; 20]) {
        let oid = git::Oid::try_from(input.as_slice()).unwrap();

-
        assert_eq!(deserialize::<git::Oid>(&oid.encode_to_vec()).unwrap(), oid);
+
        assert_eq!(git::Oid::decode_exact(&oid.encode_to_vec()).unwrap(), oid);
    }

    #[quickcheck]
    fn prop_signed_refs(input: SignedRefs<Unverified>) {
        assert_eq!(
-
            deserialize::<SignedRefs<Unverified>>(&input.encode_to_vec()).unwrap(),
+
            SignedRefs::<Unverified>::decode_exact(&input.encode_to_vec()).unwrap(),
            input
        );
    }
@@ -671,8 +690,8 @@ mod tests {
        let bytes = f.encode_to_vec();

        assert_matches!(
-
            deserialize::<filter::Filter>(&bytes).unwrap_err(),
-
            Error::InvalidFilterSize(_)
+
            filter::Filter::decode_exact(&bytes).unwrap_err(),
+
            Invalid::FilterSize { .. }
        );
    }

@@ -682,16 +701,16 @@ mod tests {
        let buf = &v.encode_to_vec();

        assert_matches!(
-
            deserialize::<BoundedVec<u8, 1>>(buf),
-
            Err(Error::InvalidSize {
+
            BoundedVec::<u8, 1>::decode_exact(buf),
+
            Err(Invalid::Bounded(crate::bounded::Error::InvalidSize {
                expected: 1,
                actual: 2
-
            }),
+
            })),
            "fail when vector is too small for buffer",
        );

        assert!(
-
            deserialize::<BoundedVec<u8, 2>>(buf).is_ok(),
+
            BoundedVec::<u8, 2>::decode_exact(buf).is_ok(),
            "successfully decode vector of same size",
        );
    }
modified crates/radicle-protocol/src/wire/frame.rs
@@ -12,13 +12,6 @@ use crate::{wire, wire::varint, wire::varint::VarInt, PROTOCOL_VERSION};
/// by a version number.
pub const PROTOCOL_VERSION_STRING: Version = Version([b'r', b'a', b'd', PROTOCOL_VERSION]);

-
/// Control open byte.
-
const CONTROL_OPEN: u8 = 0;
-
/// Control close byte.
-
const CONTROL_CLOSE: u8 = 1;
-
/// Control EOF byte.
-
const CONTROL_EOF: u8 = 2;
-

/// Protocol version.
#[derive(Debug, PartialEq, Eq)]
pub struct Version([u8; 4]);
@@ -43,7 +36,7 @@ impl wire::Decode for Version {
        buf.try_copy_to_slice(&mut version[..])?;

        if version != PROTOCOL_VERSION_STRING.0 {
-
            return Err(wire::Error::InvalidProtocolVersion(version));
+
            return Err(wire::Invalid::ProtocolVersion { actual: version }.into());
        }
        Ok(Self(version))
    }
@@ -94,29 +87,29 @@ impl StreamId {
    }

    /// Get the kind of stream this is.
-
    pub fn kind(&self) -> Result<StreamKind, u8> {
+
    pub fn kind(&self) -> Result<StreamType, u8> {
        let id = *self.0;
        let kind = ((id >> 1) & 0b11) as u8;

-
        StreamKind::try_from(kind)
+
        StreamType::try_from(kind)
    }

    /// Create a control identifier.
    pub fn control(link: Link) -> Self {
        let link = if link.is_outbound() { 0 } else { 1 };
-
        Self(VarInt::from(((StreamKind::Control as u8) << 1) | link))
+
        Self(VarInt::from(((u8::from(StreamType::Control)) << 1) | link))
    }

    /// Create a gossip identifier.
    pub fn gossip(link: Link) -> Self {
        let link = if link.is_outbound() { 0 } else { 1 };
-
        Self(VarInt::from(((StreamKind::Gossip as u8) << 1) | link))
+
        Self(VarInt::from((u8::from(StreamType::Gossip) << 1) | link))
    }

    /// Create a git identifier.
    pub fn git(link: Link) -> Self {
        let link = if link.is_outbound() { 0 } else { 1 };
-
        Self(VarInt::from(((StreamKind::Git as u8) << 1) | link))
+
        Self(VarInt::from((u8::from(StreamType::Git) << 1) | link))
    }

    /// Get the nth identifier while preserving the stream type and initiator.
@@ -160,7 +153,7 @@ impl wire::Encode for StreamId {
/// Type of stream.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
-
pub enum StreamKind {
+
pub enum StreamType {
    /// Control stream, used to open and close streams.
    Control = 0b00,
    /// Gossip stream, used to exchange messages.
@@ -169,19 +162,25 @@ pub enum StreamKind {
    Git = 0b10,
}

-
impl TryFrom<u8> for StreamKind {
+
impl TryFrom<u8> for StreamType {
    type Error = u8;

    fn try_from(value: u8) -> Result<Self, Self::Error> {
        match value {
-
            0b00 => Ok(StreamKind::Control),
-
            0b01 => Ok(StreamKind::Gossip),
-
            0b10 => Ok(StreamKind::Git),
+
            0b00 => Ok(StreamType::Control),
+
            0b01 => Ok(StreamType::Gossip),
+
            0b10 => Ok(StreamType::Git),
            n => Err(n),
        }
    }
}

+
impl From<StreamType> for u8 {
+
    fn from(value: StreamType) -> Self {
+
        value as u8
+
    }
+
}
+

/// Protocol frame.
///
/// ```text
@@ -267,23 +266,50 @@ pub enum Control {
    },
}

+
/// Type of control message.
+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+
#[repr(u8)]
+
pub enum ControlType {
+
    /// Control open byte.
+
    Open = 0,
+
    /// Control close byte.
+
    Close = 1,
+
    /// Control EOF byte.
+
    Eof = 2,
+
}
+

+
impl TryFrom<u8> for ControlType {
+
    type Error = u8;
+

+
    fn try_from(value: u8) -> Result<Self, Self::Error> {
+
        match value {
+
            0b00 => Ok(ControlType::Open),
+
            0b01 => Ok(ControlType::Close),
+
            0b10 => Ok(ControlType::Eof),
+
            n => Err(n),
+
        }
+
    }
+
}
+

+
impl From<ControlType> for u8 {
+
    fn from(value: ControlType) -> Self {
+
        value as u8
+
    }
+
}
+

impl wire::Decode for Control {
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
-
        let command = u8::decode(buf)?;
-
        match command {
-
            CONTROL_OPEN => {
-
                let stream = StreamId::decode(buf)?;
-
                Ok(Control::Open { stream })
-
            }
-
            CONTROL_CLOSE => {
-
                let stream = StreamId::decode(buf)?;
-
                Ok(Control::Close { stream })
-
            }
-
            CONTROL_EOF => {
-
                let stream = StreamId::decode(buf)?;
-
                Ok(Control::Eof { stream })
-
            }
-
            other => Err(wire::Error::InvalidControlMessage(other)),
+
        match ControlType::try_from(u8::decode(buf)?) {
+
            Ok(ControlType::Open) => Ok(Control::Open {
+
                stream: StreamId::decode(buf)?,
+
            }),
+
            Ok(ControlType::Close) => Ok(Control::Close {
+
                stream: StreamId::decode(buf)?,
+
            }),
+
            Ok(ControlType::Eof) => Ok(Control::Eof {
+
                stream: StreamId::decode(buf)?,
+
            }),
+
            Err(other) => Err(wire::Invalid::ControlType { actual: other }.into()),
        }
    }
}
@@ -292,15 +318,15 @@ impl wire::Encode for Control {
    fn encode(&self, buf: &mut impl BufMut) {
        match self {
            Self::Open { stream: id } => {
-
                CONTROL_OPEN.encode(buf);
+
                u8::from(ControlType::Open).encode(buf);
                id.encode(buf);
            }
            Self::Eof { stream: id } => {
-
                CONTROL_EOF.encode(buf);
+
                u8::from(ControlType::Eof).encode(buf);
                id.encode(buf);
            }
            Self::Close { stream: id } => {
-
                CONTROL_CLOSE.encode(buf);
+
                u8::from(ControlType::Close).encode(buf);
                id.encode(buf);
            }
        }
@@ -311,12 +337,15 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
        let version = Version::decode(buf)?;
        if version.number() != PROTOCOL_VERSION {
-
            return Err(wire::Error::WrongProtocolVersion(version.number()));
+
            return Err(wire::Invalid::ProtocolVersionUnsupported {
+
                actual: version.number(),
+
            }
+
            .into());
        }
        let stream = StreamId::decode(buf)?;

        match stream.kind() {
-
            Ok(StreamKind::Control) => {
+
            Ok(StreamType::Control) => {
                let ctrl = Control::decode(buf)?;
                let frame = Frame {
                    version,
@@ -325,7 +354,7 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
                };
                Ok(frame)
            }
-
            Ok(StreamKind::Gossip) => {
+
            Ok(StreamType::Gossip) => {
                let data = varint::payload::decode(buf)?;
                let mut cursor = io::Cursor::new(data);
                let msg = M::decode(&mut cursor)?;
@@ -340,11 +369,11 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {

                Ok(frame)
            }
-
            Ok(StreamKind::Git) => {
+
            Ok(StreamType::Git) => {
                let data = varint::payload::decode(buf)?;
                Ok(Frame::git(stream, data))
            }
-
            Err(n) => Err(wire::Error::InvalidStreamKind(n)),
+
            Err(n) => Err(wire::Invalid::StreamType { actual: n }.into()),
        }
    }
}
@@ -367,9 +396,9 @@ mod test {

    #[test]
    fn test_stream_id() {
-
        assert_eq!(StreamId(VarInt(0b000)).kind().unwrap(), StreamKind::Control);
-
        assert_eq!(StreamId(VarInt(0b010)).kind().unwrap(), StreamKind::Gossip);
-
        assert_eq!(StreamId(VarInt(0b100)).kind().unwrap(), StreamKind::Git);
+
        assert_eq!(StreamId(VarInt(0b000)).kind().unwrap(), StreamType::Control);
+
        assert_eq!(StreamId(VarInt(0b010)).kind().unwrap(), StreamType::Gossip);
+
        assert_eq!(StreamId(VarInt(0b100)).kind().unwrap(), StreamType::Git);
        assert_eq!(StreamId(VarInt(0b001)).link(), Link::Inbound);
        assert_eq!(StreamId(VarInt(0b000)).link(), Link::Outbound);
        assert_eq!(StreamId(VarInt(0b101)).link(), Link::Inbound);
modified crates/radicle-protocol/src/wire/message.rs
@@ -227,7 +227,7 @@ impl wire::Decode for Info {

                Ok(Self::RefsAlreadySynced { rid, at })
            }
-
            Err(other) => Err(wire::Error::UnknownInfoType(other)),
+
            Err(other) => Err(wire::Invalid::InfoMessageType { actual: other }.into()),
        }
    }
}
@@ -336,7 +336,7 @@ impl wire::Decode for Message {
                let zeroes = ZeroBytes::decode(buf)?;
                Ok(Self::Pong { zeroes })
            }
-
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
+
            Err(other) => Err(wire::Invalid::MessageType { actual: other }.into()),
        }
    }
}
@@ -398,7 +398,7 @@ impl wire::Decode for Address {

                HostName::Tor(onion)
            }
-
            Err(other) => return Err(wire::Error::UnknownAddressType(other)),
+
            Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
        };
        let port = u16::decode(buf)?;

@@ -432,7 +432,7 @@ mod tests {
    use radicle::storage::refs::RefsAt;

    use crate::deserializer::Deserializer;
-
    use crate::wire::{self, Encode};
+
    use crate::wire::{self, Decode, Encode};
    use radicle::test::arbitrary;

    #[test]
@@ -523,7 +523,7 @@ mod tests {
    #[quickcheck]
    fn prop_message_encode_decode(message: Message) {
        let encoded = message.encode_to_vec();
-
        let decoded = wire::deserialize::<Message>(&encoded).unwrap();
+
        let decoded = Message::decode_exact(&encoded).unwrap();

        assert_eq!(message, decoded);
    }
@@ -556,7 +556,7 @@ mod tests {
            let zeroes = ZeroBytes::new(zeroes);

            assert_eq!(
-
                wire::deserialize::<ZeroBytes>(&zeroes.encode_to_vec()).unwrap(),
+
                ZeroBytes::decode_exact(&zeroes.encode_to_vec()).unwrap(),
                zeroes
            );
        }
@@ -568,9 +568,6 @@ mod tests {

    #[quickcheck]
    fn prop_addr(addr: Address) {
-
        assert_eq!(
-
            wire::deserialize::<Address>(&addr.encode_to_vec()).unwrap(),
-
            addr
-
        );
+
        assert_eq!(Address::decode_exact(&addr.encode_to_vec()).unwrap(), addr);
    }
}
modified crates/radicle-protocol/src/wire/varint.rs
@@ -214,7 +214,7 @@ mod test {
    #[quickcheck]
    fn prop_encode_decode(input: VarInt) {
        let encoded = input.encode_to_vec();
-
        let decoded: VarInt = wire::deserialize(&encoded).unwrap();
+
        let decoded: VarInt = VarInt::decode_exact(&encoded).unwrap();

        assert_eq!(decoded, input);
    }
@@ -231,10 +231,7 @@ mod test {
        assert_eq!(VarInt(1).encode_to_vec(), vec![0x01]);
        assert_eq!(VarInt(10).encode_to_vec(), vec![0x0a]);
        assert_eq!(VarInt(37).encode_to_vec(), vec![0x25]);
-
        assert_eq!(
-
            wire::deserialize::<VarInt>(&[0x40, 0x25]).unwrap(),
-
            VarInt(37)
-
        );
+
        assert_eq!(VarInt::decode_exact(&[0x40, 0x25]).unwrap(), VarInt(37));
        assert_eq!(VarInt(15293).encode_to_vec(), vec![0x7b, 0xbd]);
        assert_eq!(
            VarInt(494878333).encode_to_vec(),