Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: Reimplement encoding on top of `bytes`
Lorenz Leutgeb committed 9 months ago
commit 3c5668edd22ae4b9a085220d6be552f944ccb038
parent 1c20f64a26641ecb7777567b9661599d92757e08
14 files changed +446 -448
modified Cargo.lock
@@ -350,6 +350,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"

[[package]]
+
name = "bytes"
+
version = "1.10.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
+

+
[[package]]
name = "bytesize"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2643,7 +2649,7 @@ dependencies = [
 "amplify",
 "anyhow",
 "bloomy",
-
 "byteorder",
+
 "bytes",
 "chrono",
 "colored",
 "crossbeam-channel",
@@ -2680,7 +2686,7 @@ name = "radicle-protocol"
version = "0.1.0"
dependencies = [
 "bloomy",
-
 "byteorder",
+
 "bytes",
 "crossbeam-channel",
 "cyphernet",
 "fastrand",
modified Cargo.toml
@@ -22,7 +22,7 @@ rust-version = "1.81.0"
amplify = { version = "4.0.0", default-features = false }
anyhow = "1"
bstr = "1.3"
-
byteorder = "1.4"
+
bytes = "1"
chrono = { version = "0.4.26", default-features = false }
colored = "2.1.0"
crossbeam-channel = "0.5.6"
modified crates/radicle-node/Cargo.toml
@@ -18,7 +18,7 @@ test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "radi
amplify = { workspace = true }
anyhow = { workspace = true }
bloomy = "1.2"
-
byteorder = { workspace = true }
+
bytes = { workspace = true }
chrono = { workspace = true, features = ["clock"] }
colored = { workspace = true }
crossbeam-channel = { workspace = true }
modified crates/radicle-node/src/tests.rs
@@ -79,8 +79,8 @@ fn test_inventory_decode() {
    let timestamp: Timestamp = LocalTime::now().into();

    let mut buf = Vec::new();
-
    inventory.as_slice().encode(&mut buf).unwrap();
-
    timestamp.encode(&mut buf).unwrap();
+
    inventory.as_slice().encode(&mut buf);
+
    timestamp.encode(&mut buf);

    let m = InventoryAnnouncement::decode(&mut buf.as_slice()).expect("message decodes");
    assert_eq!(inventory.as_slice(), m.inventory.as_slice());
modified crates/radicle-node/src/wire.rs
@@ -1001,9 +1001,7 @@ where
                    metrics.sent_gossip_messages += msgs.len();

                    for msg in msgs {
-
                        Frame::gossip(link, msg)
-
                            .encode(&mut data)
-
                            .expect("in-memory writes never fail");
+
                        Frame::gossip(link, msg).encode(&mut data);
                    }
                    metrics.sent_bytes += data.len();

@@ -1262,18 +1260,16 @@ mod test {
        let pong = Message::Pong {
            zeroes: ZeroBytes::new(42),
        };
-
        frame::PROTOCOL_VERSION_STRING.encode(&mut stream).unwrap();
-
        frame::StreamId::gossip(Link::Outbound)
-
            .encode(&mut stream)
-
            .unwrap();
+
        frame::PROTOCOL_VERSION_STRING.encode(&mut stream);
+
        frame::StreamId::gossip(Link::Outbound).encode(&mut stream);

        // Serialize gossip message with some extension fields.
        let mut gossip = wire::serialize(&pong);
-
        String::from("extra").encode(&mut gossip).unwrap();
-
        48u8.encode(&mut gossip).unwrap();
+
        String::from("extra").encode(&mut gossip);
+
        48u8.encode(&mut gossip);

        // Encode gossip message using the varint-prefix format into the stream.
-
        varint::payload::encode(&gossip, &mut stream).unwrap();
+
        varint::payload::encode(&gossip, &mut stream);

        let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
        de.input(&stream).unwrap();
@@ -1298,16 +1294,14 @@ mod test {
        }

        impl wire::Encode for MessageWithExt {
-
            fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
                let mut n = self.msg.encode(writer)?;
-
                n += self.ext.encode(writer)?;
-

-
                Ok(n)
+
            fn encode(&self, writer: &mut impl bytes::BufMut) {
+
                self.msg.encode(writer);
+
                self.ext.encode(writer);
            }
        }

        impl wire::Decode for MessageWithExt {
-
            fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
            fn decode(reader: &mut impl bytes::Buf) -> Result<Self, wire::Error> {
                let msg = Message::decode(reader)?;
                let ext = String::decode(reader).unwrap_or_default();

@@ -1337,12 +1331,9 @@ mod test {
                ext: String::from("extra"),
            },
        )
-
        .encode(&mut stream)
-
        .unwrap();
+
        .encode(&mut stream);
        // Pong message that comes after, without extension.
-
        frame::Frame::gossip(Link::Outbound, pong.clone())
-
            .encode(&mut stream)
-
            .unwrap();
+
        frame::Frame::gossip(Link::Outbound, pong.clone()).encode(&mut stream);

        // First test deserializing using the message with extension type.
        {
modified crates/radicle-protocol/Cargo.toml
@@ -13,7 +13,7 @@ test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "qche

[dependencies]
bloomy = "1.2"
-
byteorder = { workspace = true }
+
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
cyphernet = { workspace = true, features = ["tor"] }
fastrand = { workspace = true }
modified crates/radicle-protocol/src/bounded.rs
@@ -28,7 +28,7 @@ impl<T, const N: usize> BoundedVec<T, N> {
    /// # Examples
    ///
    /// ```
-
    /// use radicle_node::bounded;
+
    /// use radicle_protocol::bounded;
    ///
    /// let mut iter = (0..4).into_iter();
    /// let bounded: bounded::BoundedVec<i32,3> = bounded::BoundedVec::collect_from(&mut iter);
@@ -48,7 +48,7 @@ impl<T, const N: usize> BoundedVec<T, N> {
    /// # Examples
    ///
    /// ```
-
    /// use radicle_node::bounded;
+
    /// use radicle_protocol::bounded;
    ///
    /// let mut vec = vec![1, 2, 3];
    /// let bounded = bounded::BoundedVec::<_, 2>::truncate(vec);
@@ -64,7 +64,7 @@ impl<T, const N: usize> BoundedVec<T, N> {
    /// # Examples
    ///
    /// ```
-
    /// use radicle_node::bounded;
+
    /// use radicle_protocol::bounded;
    ///
    /// let vec = bounded::BoundedVec::<i32, 11>::with_capacity(10).unwrap();
    ///
@@ -94,7 +94,7 @@ impl<T, const N: usize> BoundedVec<T, N> {
    /// # Examples
    ///
    /// ```
-
    /// use radicle_node::bounded;
+
    /// use radicle_protocol::bounded;
    ///
    /// type Inventory = bounded::BoundedVec<(), 10>;
    /// assert_eq!(Inventory::max(), 10);
@@ -120,7 +120,7 @@ impl<T, const N: usize> BoundedVec<T, N> {
    /// # Examples
    ///
    /// ```
-
    /// use radicle_node::bounded;
+
    /// use radicle_protocol::bounded;
    ///
    /// let mut vec: bounded::BoundedVec<_,3> = vec![1, 2].try_into().unwrap();
    /// vec.push(3).expect("within limit");
@@ -147,7 +147,7 @@ impl<T, const N: usize> BoundedVec<T, N> {
    /// # Examples
    ///
    /// ```
-
    /// use radicle_node::bounded;
+
    /// use radicle_protocol::bounded;
    ///
    /// let mut bounded: bounded::BoundedVec<_,3> = vec![1, 2, 3].try_into().unwrap();
    /// let mut vec = bounded.unbound();
@@ -240,6 +240,51 @@ impl<T: std::fmt::Debug, const N: usize> std::fmt::Debug for BoundedVec<T, N> {
    }
}

+
unsafe impl<const N: usize> bytes::BufMut for BoundedVec<u8, N> {
+
    fn remaining_mut(&self) -> usize {
+
        N - self.v.len()
+
    }
+

+
    unsafe fn advance_mut(&mut self, cnt: usize) {
+
        let len = {
+
            let len = self.v.len();
+
            let remaining = N - len;
+

+
            if remaining >= cnt {
+
                len + cnt
+
            } else {
+
                panic!("advance out of bounds: have {remaining} remaining, but advancing by {cnt}",);
+
            }
+
        };
+

+
        debug_assert!(len <= N);
+

+
        // Addition will not overflow since the sum is at most the capacity.
+
        self.v.set_len(len);
+
    }
+

+
    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
+
        let len = self.v.len();
+

+
        // If the vector is full, we double its capacity using `reserve`, but not beyond the limit.
+
        if self.v.capacity() == len {
+
            self.v.reserve(std::cmp::min(len, N - len));
+
        }
+

+
        let cap = self.v.capacity();
+

+
        debug_assert!(cap <= N);
+
        debug_assert!(len <= cap);
+

+
        let ptr = self.v.as_mut_ptr();
+

+
        // SAFETY: Since `ptr` is valid for `cap` bytes, `ptr.add(len)` must be
+
        // valid for `cap - len` bytes. The subtraction will not underflow since
+
        // `len <= cap`.
+
        unsafe { bytes::buf::UninitSlice::from_raw_parts_mut(ptr.add(len), cap - len) }
+
    }
+
}
+

#[cfg(any(test, feature = "test"))]
impl<T, const N: usize> qcheck::Arbitrary for BoundedVec<T, N>
where
modified crates/radicle-protocol/src/deserializer.rs
@@ -58,7 +58,7 @@ impl<const B: usize, D: wire::Decode> Deserializer<B, D> {

                Ok(Some(msg))
            }
-
            Err(err) if err.is_eof() => Ok(None),
+
            Err(wire::Error::UnexpectedEnd { .. }) => Ok(None),
            Err(err) => Err(err),
        }
    }
@@ -79,6 +79,20 @@ impl<const B: usize, D: wire::Decode> Deserializer<B, D> {
    }
}

+
unsafe impl<const B: usize, D: wire::Decode> bytes::BufMut for Deserializer<B, D> {
+
    fn remaining_mut(&self) -> usize {
+
        self.unparsed.remaining_mut()
+
    }
+

+
    unsafe fn advance_mut(&mut self, cnt: usize) {
+
        self.unparsed.advance_mut(cnt);
+
    }
+

+
    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
+
        self.unparsed.chunk_mut()
+
    }
+
}
+

impl<const B: usize, D: wire::Decode> io::Write for Deserializer<B, D> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.input(buf).map_err(|_| io::ErrorKind::OutOfMemory)?;
modified crates/radicle-protocol/src/service/message.rs
@@ -1,6 +1,8 @@
-
use std::{fmt, io, mem};
+
use std::{fmt, mem};

+
use bytes::{Buf, BufMut};
use nonempty::NonEmpty;
+

use radicle::crypto;
use radicle::git;
use radicle::identity::RepoId;
@@ -117,32 +119,28 @@ impl NodeAnnouncement {
}

impl wire::Encode for NodeAnnouncement {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.version.encode(writer)?;
-
        n += self.features.encode(writer)?;
-
        n += self.timestamp.encode(writer)?;
-
        n += self.alias.encode(writer)?;
-
        n += self.addresses.encode(writer)?;
-
        n += self.nonce.encode(writer)?;
-
        n += self.agent.encode(writer)?;
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.version.encode(buf);
+
        self.features.encode(buf);
+
        self.timestamp.encode(buf);
+
        self.alias.encode(buf);
+
        self.addresses.encode(buf);
+
        self.nonce.encode(buf);
+
        self.agent.encode(buf);
    }
}

impl wire::Decode for NodeAnnouncement {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let version = u8::decode(reader)?;
-
        let features = node::Features::decode(reader)?;
-
        let timestamp = Timestamp::decode(reader)?;
-
        let alias = wire::Decode::decode(reader)?;
-
        let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(reader)?;
-
        let nonce = u64::decode(reader)?;
-
        let agent = match UserAgent::decode(reader) {
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let version = u8::decode(buf)?;
+
        let features = node::Features::decode(buf)?;
+
        let timestamp = Timestamp::decode(buf)?;
+
        let alias = wire::Decode::decode(buf)?;
+
        let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(buf)?;
+
        let nonce = u64::decode(buf)?;
+
        let agent = match UserAgent::decode(buf) {
            Ok(ua) => ua,
-
            Err(e) if e.is_eof() => UserAgent::default(),
+
            Err(wire::Error::UnexpectedEnd { .. }) => UserAgent::default(),
            Err(e) => return Err(e),
        };

@@ -708,8 +706,8 @@ mod tests {
        .signed(&Device::mock())
        .into();

-
        let mut buf: Vec<u8> = Vec::new();
-
        assert!(msg.encode(&mut buf).is_ok());
+
        let mut buf = Vec::new();
+
        msg.encode(&mut buf);

        let decoded = wire::deserialize(buf.as_slice());
        assert!(decoded.is_ok());
@@ -728,10 +726,7 @@ mod tests {
            &Device::mock(),
        );
        let mut buf: Vec<u8> = Vec::new();
-
        assert!(
-
            msg.encode(&mut buf).is_ok(),
-
            "INVENTORY_LIMIT is a valid limit for encoding",
-
        );
+
        msg.encode(&mut buf);

        let decoded = wire::deserialize(buf.as_slice());
        assert!(
modified crates/radicle-protocol/src/wire.rs
@@ -7,12 +7,13 @@ pub use message::{AddressType, MessageType};

use std::collections::BTreeMap;
use std::convert::TryFrom;
+
use std::mem;
use std::ops::Deref;
use std::str::FromStr;
use std::string::FromUtf8Error;
-
use std::{io, mem};

-
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
+
use bytes::{Buf, BufMut};
+

use cyphernet::addr::tor;

use radicle::crypto::{PublicKey, Signature, Unverified};
@@ -41,8 +42,6 @@ pub type Size = u16;

#[derive(thiserror::Error, Debug)]
pub enum Error {
-
    #[error("i/o: {0}")]
-
    Io(#[from] io::Error),
    #[error("UTF-8 error: {0}")]
    FromUtf8(#[from] FromUtf8Error),
    #[error("invalid size: expected {expected}, got {actual}")]
@@ -75,24 +74,32 @@ pub enum Error {
    UnknownInfoType(u16),
    #[error("unexpected bytes")]
    UnexpectedBytes,
-
}
-

-
impl Error {
-
    /// Whether we've reached the end of file. This will be true when we fail to decode
-
    /// a message because there's not enough data in the stream.
-
    pub fn is_eof(&self) -> bool {
-
        matches!(self, Self::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof)
+
    #[error("unexpected end of buffer, requested {requested} more bytes but only {available} are available")]
+
    UnexpectedEnd { available: usize, requested: usize },
+
}
+

+
impl From<bytes::TryGetError> for Error {
+
    fn from(
+
        bytes::TryGetError {
+
            available,
+
            requested,
+
        }: bytes::TryGetError,
+
    ) -> Self {
+
        Self::UnexpectedEnd {
+
            available,
+
            requested,
+
        }
    }
}

/// Things that can be encoded as binary.
pub trait Encode {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error>;
+
    fn encode(&self, buffer: &mut impl BufMut);
}

/// Things that can be decoded from binary.
pub trait Decode: Sized {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error>;
+
    fn decode(buffer: &mut impl Buf) -> Result<Self, Error>;
}

/// Encode an object into a byte vector.
@@ -100,79 +107,62 @@ pub trait Decode: Sized {
/// # Panics
///
/// If the encoded object exceeds [`Size::MAX`].
-
pub fn serialize<T: Encode + ?Sized>(data: &T) -> Vec<u8> {
-
    let mut buffer = Vec::new();
-
    // SAFETY: We expect this to panic if the user passes
-
    // in data that exceeds the maximum allowed size.
-
    #[allow(clippy::unwrap_used)]
-
    let len = data.encode(&mut buffer).unwrap();
-

-
    debug_assert_eq!(len, buffer.len());
-

-
    buffer
+
pub fn serialize<E: Encode + ?Sized>(data: &E) -> Vec<u8> {
+
    let mut buffer = Vec::new().limit(Size::MAX as usize);
+
    data.encode(&mut buffer);
+
    buffer.into_inner()
}

-
/// Decode an object from a vector.
-
pub fn deserialize<T: Decode>(data: &[u8]) -> Result<T, Error> {
-
    let mut cursor = io::Cursor::new(data);
-
    let obj = T::decode(&mut cursor)?;
+
/// Decode an object from a slice.
+
pub fn deserialize<T: Decode>(mut data: &[u8]) -> Result<T, Error> {
+
    let result = T::decode(&mut data)?;

-
    if cursor.position() as usize != cursor.get_ref().len() {
-
        return Err(Error::UnexpectedBytes);
+
    if data.is_empty() {
+
        Ok(result)
+
    } else {
+
        Err(Error::UnexpectedBytes)
    }
-
    Ok(obj)
}

impl Encode for u8 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u8(*self)?;
-

-
        Ok(mem::size_of::<Self>())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_u8(*self);
    }
}

impl Encode for u16 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u16::<NetworkEndian>(*self)?;
-

-
        Ok(mem::size_of::<Self>())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_u16(*self);
    }
}

impl Encode for u32 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u32::<NetworkEndian>(*self)?;
-

-
        Ok(mem::size_of::<Self>())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_u32(*self);
    }
}

impl Encode for u64 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u64::<NetworkEndian>(*self)?;
-

-
        Ok(mem::size_of::<Self>())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_u64(*self);
    }
}

impl Encode for PublicKey {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.deref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.deref().encode(buf)
    }
}

impl<const T: usize> Encode for &[u8; T] {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_all(&**self)?;
-
        Ok(mem::size_of::<Self>())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_slice(&**self);
    }
}

impl<const T: usize> Encode for [u8; T] {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_all(self)?;
-

-
        Ok(mem::size_of::<Self>())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_slice(self);
    }
}

@@ -180,13 +170,12 @@ impl<T> Encode for &[T]
where
    T: Encode,
{
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = (self.len() as Size).encode(writer)?;
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        (self.len() as Size).encode(buf);

        for item in self.iter() {
-
            n += item.encode(writer)?;
+
            item.encode(buf);
        }
-
        Ok(n)
    }
}

@@ -194,75 +183,72 @@ impl<T, const N: usize> Encode for BoundedVec<T, N>
where
    T: Encode,
{
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_slice().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.as_slice().encode(buf)
    }
}

impl Encode for &str {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
    fn encode(&self, buf: &mut impl BufMut) {
        assert!(self.len() <= u8::MAX as usize);

-
        let n = (self.len() as u8).encode(writer)?;
+
        (self.len() as u8).encode(buf);
        let bytes = self.as_bytes();

        // Nb. Don't use the [`Encode`] instance here for &[u8], because we are prefixing the
        // length ourselves.
-
        writer.write_all(bytes)?;
-

-
        Ok(n + bytes.len())
+
        buf.put_slice(bytes);
    }
}

impl Encode for String {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_str().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.as_str().encode(buf)
    }
}

impl Encode for git::Url {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.to_string().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.to_string().encode(buf)
    }
}

impl Encode for RepoId {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.deref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.deref().encode(buf)
    }
}

impl Encode for Refs {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
    fn encode(&self, buf: &mut impl BufMut) {
        let len: Size = self
            .len()
            .try_into()
-
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
-
        let mut n = len.encode(writer)?;
+
            .expect("`Refs::len()` must be less than or equal to `Size::MAX`");
+
        len.encode(buf);

        for (name, oid) in self.iter() {
-
            n += name.as_str().encode(writer)?;
-
            n += oid.encode(writer)?;
+
            name.as_str().encode(buf);
+
            oid.encode(buf);
        }
-
        Ok(n)
    }
}

impl Encode for cyphernet::addr::tor::OnionAddrV3 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.into_raw_bytes().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.into_raw_bytes().encode(buf)
    }
}

impl Encode for UserAgent {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_ref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.as_ref().encode(buf)
    }
}

impl Encode for Alias {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_ref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.as_ref().encode(buf)
    }
}

@@ -271,51 +257,50 @@ where
    A: Encode,
    B: Encode,
{
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = self.0.encode(writer)?;
-
        n += self.1.encode(writer)?;
-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.0.encode(buf);
+
        self.1.encode(buf);
    }
}

impl Encode for git::RefString {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_str().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.as_str().encode(buf)
    }
}

impl Encode for Signature {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.deref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.deref().encode(buf)
    }
}

impl Encode for git::Oid {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
    fn encode(&self, buf: &mut impl BufMut) {
        // Nb. We use length-encoding here to support future SHA-2 object ids.
-
        self.as_bytes().encode(writer)
+
        self.as_bytes().encode(buf)
    }
}

////////////////////////////////////////////////////////////////////////////////

impl Decode for PublicKey {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let buf: [u8; 32] = Decode::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let buf: [u8; 32] = Decode::decode(buf)?;

        Ok(PublicKey::from(buf))
    }
}

impl Decode for Refs {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len = Size::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let len = Size::decode(buf)?;
        let mut refs = BTreeMap::new();

        for _ in 0..len {
-
            let name = String::decode(reader)?;
+
            let name = String::decode(buf)?;
            let name = git::RefString::try_from(name).map_err(Error::from)?;
-
            let oid = git::Oid::decode(reader)?;
+
            let oid = git::Oid::decode(buf)?;

            refs.insert(name, oid);
        }
@@ -324,22 +309,21 @@ impl Decode for Refs {
}

impl Decode for git::RefString {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let ref_str = String::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let ref_str = String::decode(buf)?;
        git::RefString::try_from(ref_str).map_err(Error::from)
    }
}

impl Decode for UserAgent {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        String::decode(reader)
-
            .and_then(|s| UserAgent::from_str(&s).map_err(Error::InvalidUserAgent))
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        String::decode(buf).and_then(|s| UserAgent::from_str(&s).map_err(Error::InvalidUserAgent))
    }
}

impl Decode for Alias {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        String::decode(reader).and_then(|s| Alias::from_str(&s).map_err(Error::from))
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        String::decode(buf).and_then(|s| Alias::from_str(&s).map_err(Error::from))
    }
}

@@ -348,16 +332,16 @@ where
    A: Decode,
    B: Decode,
{
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let a = A::decode(reader)?;
-
        let b = B::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let a = A::decode(buf)?;
+
        let b = B::decode(buf)?;
        Ok((a, b))
    }
}

impl Decode for git::Oid {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len = Size::decode(reader)? as usize;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let len = Size::decode(buf)? as usize;
        #[allow(non_upper_case_globals)]
        const expected: usize = mem::size_of::<git::raw::Oid>();

@@ -368,7 +352,7 @@ impl Decode for git::Oid {
            });
        }

-
        let buf: [u8; expected] = Decode::decode(reader)?;
+
        let buf: [u8; expected] = Decode::decode(buf)?;
        let oid = git::raw::Oid::from_bytes(&buf).expect("the buffer is exactly the right size");
        let oid = git::Oid::from(oid);

@@ -377,41 +361,41 @@ impl Decode for git::Oid {
}

impl Decode for Signature {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let bytes: [u8; 64] = Decode::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let bytes: [u8; 64] = Decode::decode(buf)?;

        Ok(Signature::from(bytes))
    }
}

impl Decode for u8 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u8().map_err(Error::from)
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        Ok(buf.try_get_u8()?)
    }
}

impl Decode for u16 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u16::<NetworkEndian>().map_err(Error::from)
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        Ok(buf.try_get_u16()?)
    }
}

impl Decode for u32 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u32::<NetworkEndian>().map_err(Error::from)
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        Ok(buf.try_get_u32()?)
    }
}

impl Decode for u64 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u64::<NetworkEndian>().map_err(Error::from)
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        Ok(buf.try_get_u64()?)
    }
}

impl<const N: usize> Decode for [u8; N] {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let mut ary = [0; N];
-
        reader.read_exact(&mut ary)?;
+
        buf.try_copy_to_slice(&mut ary).map_err(Error::from)?;

        Ok(ary)
    }
@@ -421,15 +405,15 @@ impl<T, const N: usize> Decode for BoundedVec<T, N>
where
    T: Decode,
{
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len: usize = Size::decode(reader)? as usize;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let len: usize = Size::decode(buf)? as usize;
        let mut items = Self::with_capacity(len).map_err(|_| Error::InvalidSize {
            expected: Self::max(),
            actual: len,
        })?;

        for _ in 0..items.capacity() {
-
            let item = T::decode(reader)?;
+
            let item = T::decode(buf)?;
            items.push(item).ok();
        }
        Ok(items)
@@ -437,11 +421,11 @@ where
}

impl Decode for String {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len = u8::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let len = u8::decode(buf)?;
        let mut bytes = vec![0; len as usize];

-
        reader.read_exact(&mut bytes)?;
+
        buf.try_copy_to_slice(&mut bytes)?;

        let string = String::from_utf8(bytes)?;

@@ -450,32 +434,29 @@ impl Decode for String {
}

impl Decode for RepoId {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let oid: git::Oid = Decode::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let oid: git::Oid = Decode::decode(buf)?;

        Ok(Self::from(oid))
    }
}

impl Encode for filter::Filter {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.deref().as_bytes().encode(writer)?;
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.deref().as_bytes().encode(buf);
    }
}

impl Decode for filter::Filter {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let size: usize = Size::decode(reader)? as usize;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let size: usize = Size::decode(buf)? as usize;
        if !filter::FILTER_SIZES.contains(&size) {
            return Err(Error::InvalidFilterSize(size));
        }

        let mut bytes = vec![0; size];
-
        reader.read_exact(&mut bytes[..])?;
+

+
        buf.try_copy_to_slice(&mut bytes)?;

        let f = filter::BloomFilter::from(bytes);
        debug_assert_eq!(f.hashes(), filter::FILTER_HASHES);
@@ -485,63 +466,55 @@ impl Decode for filter::Filter {
}

impl<V> Encode for SignedRefs<V> {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.id.encode(writer)?;
-
        n += self.refs.encode(writer)?;
-
        n += self.signature.encode(writer)?;
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.id.encode(buf);
+
        self.refs.encode(buf);
+
        self.signature.encode(buf);
    }
}

impl Decode for SignedRefs<Unverified> {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let id = NodeId::decode(reader)?;
-
        let refs = Refs::decode(reader)?;
-
        let signature = Signature::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let id = NodeId::decode(buf)?;
+
        let refs = Refs::decode(buf)?;
+
        let signature = Signature::decode(buf)?;

        Ok(Self::new(refs, id, signature))
    }
}

impl Encode for RefsAt {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.remote.encode(writer)?;
-
        n += self.at.encode(writer)?;
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.remote.encode(buf);
+
        self.at.encode(buf);
    }
}

impl Decode for RefsAt {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let remote = NodeId::decode(reader)?;
-
        let at = git::Oid::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let remote = NodeId::decode(buf)?;
+
        let at = git::Oid::decode(buf)?;
        Ok(Self { remote, at })
    }
}

impl Encode for node::Features {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.deref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.deref().encode(buf)
    }
}

impl Decode for node::Features {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let features = u64::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let features = u64::decode(buf)?;

        Ok(Self::from(features))
    }
}

impl Decode for tor::OnionAddrV3 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let bytes: [u8; tor::ONION_V3_RAW_LEN] = Decode::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let bytes: [u8; tor::ONION_V3_RAW_LEN] = Decode::decode(buf)?;
        let addr = tor::OnionAddrV3::from_raw_bytes(bytes)?;

        Ok(addr)
@@ -549,14 +522,14 @@ impl Decode for tor::OnionAddrV3 {
}

impl Encode for Timestamp {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.deref().encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.deref().encode(buf)
    }
}

impl Decode for Timestamp {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let millis = u64::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let millis = u64::decode(buf)?;
        let ts = Timestamp::try_from(millis).map_err(Error::InvalidTimestamp)?;

        Ok(ts)
modified crates/radicle-protocol/src/wire/frame.rs
@@ -2,9 +2,11 @@
#![warn(clippy::missing_docs_in_private_items)]
use std::{fmt, io};

+
use bytes::{Buf, BufMut};
+
use radicle::node::Link;
+

use crate::service::Message;
use crate::{wire, wire::varint, wire::varint::VarInt, PROTOCOL_VERSION};
-
use radicle::node::Link;

/// Protocol version strings all start with the magic sequence `rad`, followed
/// by a version number.
@@ -29,17 +31,16 @@ impl Version {
}

impl wire::Encode for Version {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_all(&PROTOCOL_VERSION_STRING.0)?;
-

-
        Ok(PROTOCOL_VERSION_STRING.0.len())
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        buf.put_slice(&PROTOCOL_VERSION_STRING.0);
    }
}

impl wire::Decode for Version {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
        let mut version = [0u8; 4];
-
        reader.read_exact(&mut version[..])?;
+

+
        buf.try_copy_to_slice(&mut version[..])?;

        if version != PROTOCOL_VERSION_STRING.0 {
            return Err(wire::Error::InvalidProtocolVersion(version));
@@ -144,15 +145,15 @@ impl fmt::Display for StreamId {
}

impl wire::Decode for StreamId {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let id = VarInt::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let id = VarInt::decode(buf)?;
        Ok(Self(id))
    }
}

impl wire::Encode for StreamId {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.0.encode(writer)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.0.encode(buf)
    }
}

@@ -272,19 +273,19 @@ pub enum Control {
}

impl wire::Decode for Control {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let command = u8::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let command = u8::decode(buf)?;
        match command {
            CONTROL_OPEN => {
-
                let stream = StreamId::decode(reader)?;
+
                let stream = StreamId::decode(buf)?;
                Ok(Control::Open { stream })
            }
            CONTROL_CLOSE => {
-
                let stream = StreamId::decode(reader)?;
+
                let stream = StreamId::decode(buf)?;
                Ok(Control::Close { stream })
            }
            CONTROL_EOF => {
-
                let stream = StreamId::decode(reader)?;
+
                let stream = StreamId::decode(buf)?;
                Ok(Control::Eof { stream })
            }
            other => Err(wire::Error::InvalidControlMessage(other)),
@@ -293,38 +294,35 @@ impl wire::Decode for Control {
}

impl wire::Encode for Control {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

+
    fn encode(&self, buf: &mut impl BufMut) {
        match self {
            Self::Open { stream: id } => {
-
                n += CONTROL_OPEN.encode(writer)?;
-
                n += id.encode(writer)?;
+
                CONTROL_OPEN.encode(buf);
+
                id.encode(buf);
            }
            Self::Eof { stream: id } => {
-
                n += CONTROL_EOF.encode(writer)?;
-
                n += id.encode(writer)?;
+
                CONTROL_EOF.encode(buf);
+
                id.encode(buf);
            }
            Self::Close { stream: id } => {
-
                n += CONTROL_CLOSE.encode(writer)?;
-
                n += id.encode(writer)?;
+
                CONTROL_CLOSE.encode(buf);
+
                id.encode(buf);
            }
        }
-
        Ok(n)
    }
}

impl<M: wire::Decode> wire::Decode for Frame<M> {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let version = Version::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let version = Version::decode(buf)?;
        if version.number() != PROTOCOL_VERSION {
            return Err(wire::Error::WrongProtocolVersion(version.number()));
        }
-
        let stream = StreamId::decode(reader)?;
+
        let stream = StreamId::decode(buf)?;

        match stream.kind() {
            Ok(StreamKind::Control) => {
-
                let ctrl = Control::decode(reader)?;
+
                let ctrl = Control::decode(buf)?;
                let frame = Frame {
                    version,
                    stream,
@@ -333,7 +331,7 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
                Ok(frame)
            }
            Ok(StreamKind::Gossip) => {
-
                let data = varint::payload::decode(reader)?;
+
                let data = varint::payload::decode(buf)?;
                let mut cursor = io::Cursor::new(data);
                let msg = M::decode(&mut cursor)?;
                let frame = Frame {
@@ -348,7 +346,7 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
                Ok(frame)
            }
            Ok(StreamKind::Git { .. }) => {
-
                let data = varint::payload::decode(reader)?;
+
                let data = varint::payload::decode(buf)?;
                Ok(Frame::git(stream, data))
            }
            Err(n) => Err(wire::Error::InvalidStreamKind(n)),
@@ -357,18 +355,14 @@ impl<M: wire::Decode> wire::Decode for Frame<M> {
}

impl<M: wire::Encode> wire::Encode for Frame<M> {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.version.encode(writer)?;
-
        n += self.stream.encode(writer)?;
-
        n += match &self.data {
-
            FrameData::Control(ctrl) => ctrl.encode(writer)?,
-
            FrameData::Git(data) => varint::payload::encode(data, writer)?,
-
            FrameData::Gossip(msg) => varint::payload::encode(&wire::serialize(msg), writer)?,
-
        };
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.version.encode(buf);
+
        self.stream.encode(buf);
+
        match &self.data {
+
            FrameData::Control(ctrl) => ctrl.encode(buf),
+
            FrameData::Git(data) => varint::payload::encode(data, buf),
+
            FrameData::Gossip(msg) => varint::payload::encode(&wire::serialize(msg), buf),
+
        }
    }
}

modified crates/radicle-protocol/src/wire/message.rs
@@ -1,6 +1,7 @@
-
use std::{io, mem, net};
+
use std::{mem, net};

-
use byteorder::{NetworkEndian, ReadBytesExt};
+
use bytes::Buf;
+
use bytes::BufMut;
use cyphernet::addr::{tor, Addr, HostName, NetAddr};
use radicle::crypto::Signature;
use radicle::git::Oid;
@@ -114,32 +115,28 @@ impl TryFrom<u8> for AddressType {
}

impl wire::Encode for AnnouncementMessage {
-
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
    fn encode(&self, buf: &mut impl BufMut) {
        match self {
-
            Self::Node(ann) => ann.encode(writer),
-
            Self::Inventory(ann) => ann.encode(writer),
-
            Self::Refs(ann) => ann.encode(writer),
+
            Self::Node(ann) => ann.encode(buf),
+
            Self::Inventory(ann) => ann.encode(buf),
+
            Self::Refs(ann) => ann.encode(buf),
        }
    }
}

impl wire::Encode for RefsAnnouncement {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.rid.encode(writer)?;
-
        n += self.refs.encode(writer)?;
-
        n += self.timestamp.encode(writer)?;
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.rid.encode(buf);
+
        self.refs.encode(buf);
+
        self.timestamp.encode(buf);
    }
}

impl wire::Decode for RefsAnnouncement {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let rid = RepoId::decode(reader)?;
-
        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(reader)?;
-
        let timestamp = Timestamp::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let rid = RepoId::decode(buf)?;
+
        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
+
        let timestamp = Timestamp::decode(buf)?;

        Ok(Self {
            rid,
@@ -150,20 +147,16 @@ impl wire::Decode for RefsAnnouncement {
}

impl wire::Encode for InventoryAnnouncement {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.inventory.encode(writer)?;
-
        n += self.timestamp.encode(writer)?;
-

-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.inventory.encode(buf);
+
        self.timestamp.encode(buf);
    }
}

impl wire::Decode for InventoryAnnouncement {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let inventory = BoundedVec::decode(reader)?;
-
        let timestamp = Timestamp::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let inventory = BoundedVec::decode(buf)?;
+
        let timestamp = Timestamp::decode(buf)?;

        Ok(Self {
            inventory,
@@ -212,28 +205,25 @@ impl From<&Info> for InfoType {
}

impl wire::Encode for Info {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-
        n += u16::from(InfoType::from(self)).encode(writer)?;
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        u16::from(InfoType::from(self)).encode(buf);
        match self {
            Info::RefsAlreadySynced { rid, at } => {
-
                n += rid.encode(writer)?;
-
                n += at.encode(writer)?;
+
                rid.encode(buf);
+
                at.encode(buf);
            }
        }
-

-
        Ok(n)
    }
}

impl wire::Decode for Info {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let info_type = reader.read_u16::<NetworkEndian>()?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let info_type = buf.try_get_u16()?;

        match InfoType::try_from(info_type) {
            Ok(InfoType::RefsAlreadySynced) => {
-
                let rid = RepoId::decode(reader)?;
-
                let at = Oid::decode(reader)?;
+
                let rid = RepoId::decode(buf)?;
+
                let at = Oid::decode(buf)?;

                Ok(Self::RefsAlreadySynced { rid, at })
            }
@@ -243,8 +233,8 @@ impl wire::Decode for Info {
}

impl wire::Encode for Message {
-
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
-
        let mut n = self.type_id().encode(writer)?;
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.type_id().encode(buf);

        match self {
            Self::Subscribe(Subscribe {
@@ -252,50 +242,42 @@ impl wire::Encode for Message {
                since,
                until,
            }) => {
-
                n += filter.encode(writer)?;
-
                n += since.encode(writer)?;
-
                n += until.encode(writer)?;
+
                filter.encode(buf);
+
                since.encode(buf);
+
                until.encode(buf);
            }
            Self::Announcement(Announcement {
                node,
                message,
                signature,
            }) => {
-
                n += node.encode(writer)?;
-
                n += signature.encode(writer)?;
-
                n += message.encode(writer)?;
+
                node.encode(buf);
+
                signature.encode(buf);
+
                message.encode(buf);
            }
            Self::Info(info) => {
-
                n += info.encode(writer)?;
+
                info.encode(buf);
            }
            Self::Ping(Ping { ponglen, zeroes }) => {
-
                n += ponglen.encode(writer)?;
-
                n += zeroes.encode(writer)?;
+
                ponglen.encode(buf);
+
                zeroes.encode(buf);
            }
            Self::Pong { zeroes } => {
-
                n += zeroes.encode(writer)?;
+
                zeroes.encode(buf);
            }
        }
-

-
        if n > wire::Size::MAX as usize {
-
            return Err(io::Error::new(
-
                io::ErrorKind::InvalidData,
-
                "Message exceeds maximum size",
-
            ));
-
        }
-
        Ok(n)
    }
}

impl wire::Decode for Message {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let type_id = reader.read_u16::<NetworkEndian>()?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let type_id = buf.try_get_u16()?;

        match MessageType::try_from(type_id) {
            Ok(MessageType::Subscribe) => {
-
                let filter = Filter::decode(reader)?;
-
                let since = Timestamp::decode(reader)?;
-
                let until = Timestamp::decode(reader)?;
+
                let filter = Filter::decode(buf)?;
+
                let since = Timestamp::decode(buf)?;
+
                let until = Timestamp::decode(buf)?;

                Ok(Self::Subscribe(Subscribe {
                    filter,
@@ -304,9 +286,9 @@ impl wire::Decode for Message {
                }))
            }
            Ok(MessageType::NodeAnnouncement) => {
-
                let node = NodeId::decode(reader)?;
-
                let signature = Signature::decode(reader)?;
-
                let message = NodeAnnouncement::decode(reader)?.into();
+
                let node = NodeId::decode(buf)?;
+
                let signature = Signature::decode(buf)?;
+
                let message = NodeAnnouncement::decode(buf)?.into();

                Ok(Announcement {
                    node,
@@ -316,9 +298,9 @@ impl wire::Decode for Message {
                .into())
            }
            Ok(MessageType::InventoryAnnouncement) => {
-
                let node = NodeId::decode(reader)?;
-
                let signature = Signature::decode(reader)?;
-
                let message = InventoryAnnouncement::decode(reader)?.into();
+
                let node = NodeId::decode(buf)?;
+
                let signature = Signature::decode(buf)?;
+
                let message = InventoryAnnouncement::decode(buf)?.into();

                Ok(Announcement {
                    node,
@@ -328,9 +310,9 @@ impl wire::Decode for Message {
                .into())
            }
            Ok(MessageType::RefsAnnouncement) => {
-
                let node = NodeId::decode(reader)?;
-
                let signature = Signature::decode(reader)?;
-
                let message = RefsAnnouncement::decode(reader)?.into();
+
                let node = NodeId::decode(buf)?;
+
                let signature = Signature::decode(buf)?;
+
                let message = RefsAnnouncement::decode(buf)?.into();

                Ok(Announcement {
                    node,
@@ -340,16 +322,16 @@ impl wire::Decode for Message {
                .into())
            }
            Ok(MessageType::Info) => {
-
                let info = Info::decode(reader)?;
+
                let info = Info::decode(buf)?;
                Ok(Self::Info(info))
            }
            Ok(MessageType::Ping) => {
-
                let ponglen = u16::decode(reader)?;
-
                let zeroes = ZeroBytes::decode(reader)?;
+
                let ponglen = u16::decode(buf)?;
+
                let zeroes = ZeroBytes::decode(buf)?;
                Ok(Self::Ping(Ping { ponglen, zeroes }))
            }
            Ok(MessageType::Pong) => {
-
                let zeroes = ZeroBytes::decode(reader)?;
+
                let zeroes = ZeroBytes::decode(buf)?;
                Ok(Self::Pong { zeroes })
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
@@ -358,85 +340,82 @@ impl wire::Decode for Message {
}

impl wire::Encode for Address {
-
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
-
        let mut n = 0;
-

+
    fn encode(&self, buf: &mut impl BufMut) {
        match self.host {
            HostName::Ip(net::IpAddr::V4(ip)) => {
-
                n += u8::from(AddressType::Ipv4).encode(writer)?;
-
                n += ip.octets().encode(writer)?;
+
                u8::from(AddressType::Ipv4).encode(buf);
+
                ip.octets().encode(buf);
            }
            HostName::Ip(net::IpAddr::V6(ip)) => {
-
                n += u8::from(AddressType::Ipv6).encode(writer)?;
-
                n += ip.octets().encode(writer)?;
+
                u8::from(AddressType::Ipv6).encode(buf);
+
                ip.octets().encode(buf);
            }
            HostName::Dns(ref dns) => {
-
                n += u8::from(AddressType::Dns).encode(writer)?;
-
                n += dns.encode(writer)?;
+
                u8::from(AddressType::Dns).encode(buf);
+
                dns.encode(buf);
            }
            HostName::Tor(addr) => {
-
                n += u8::from(AddressType::Onion).encode(writer)?;
-
                n += addr.encode(writer)?;
+
                u8::from(AddressType::Onion).encode(buf);
+
                addr.encode(buf);
            }
            _ => {
-
                return Err(io::ErrorKind::Unsupported.into());
+
                unimplemented!(
+
                    "Encoding not defined for addresses of the same type as the following: {:?}",
+
                    self.host
+
                );
            }
        }
-
        n += self.port().encode(writer)?;
-

-
        Ok(n)
+
        self.port().encode(buf);
    }
}

impl wire::Decode for Address {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let addrtype = reader.read_u8()?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let addrtype = buf.try_get_u8()?;
+

        let host = match AddressType::try_from(addrtype) {
            Ok(AddressType::Ipv4) => {
-
                let octets: [u8; 4] = wire::Decode::decode(reader)?;
+
                let octets: [u8; 4] = wire::Decode::decode(buf)?;
                let ip = net::Ipv4Addr::from(octets);

                HostName::Ip(net::IpAddr::V4(ip))
            }
            Ok(AddressType::Ipv6) => {
-
                let octets: [u8; 16] = wire::Decode::decode(reader)?;
+
                let octets: [u8; 16] = wire::Decode::decode(buf)?;
                let ip = net::Ipv6Addr::from(octets);

                HostName::Ip(net::IpAddr::V6(ip))
            }
            Ok(AddressType::Dns) => {
-
                let dns: String = wire::Decode::decode(reader)?;
+
                let dns: String = wire::Decode::decode(buf)?;

                HostName::Dns(dns)
            }
            Ok(AddressType::Onion) => {
-
                let onion: tor::OnionAddrV3 = wire::Decode::decode(reader)?;
+
                let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;

                HostName::Tor(onion)
            }
            Err(other) => return Err(wire::Error::UnknownAddressType(other)),
        };
-
        let port = u16::decode(reader)?;
+
        let port = u16::decode(buf)?;

        Ok(Self::from(NetAddr { host, port }))
    }
}

impl wire::Encode for ZeroBytes {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = (self.len() as u16).encode(writer)?;
-
        for _ in 0..self.len() {
-
            n += 0u8.encode(writer)?;
-
        }
-
        Ok(n)
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        (self.len() as u16).encode(buf);
+
        buf.put_bytes(0u8, self.len());
    }
}

impl wire::Decode for ZeroBytes {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let zeroes = u16::decode(reader)?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let zeroes = u16::decode(buf)?;
        for _ in 0..zeroes {
-
            _ = u8::decode(reader)?;
+
            _ = u8::decode(buf)?;
        }
        Ok(ZeroBytes::new(zeroes))
    }
@@ -508,40 +487,31 @@ mod tests {

    #[test]
    fn test_pingpong_encode_max_size() {
-
        let mut buf = Vec::new();
-

-
        let ping = Message::Ping(Ping {
+
        wire::serialize(&Message::Ping(Ping {
            ponglen: 0,
            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
-
        });
-
        ping.encode(&mut buf)
-
            .expect("ping should be within max message size");
+
        }));

-
        let pong = Message::Pong {
+
        wire::serialize(&Message::Pong {
            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
-
        };
-
        pong.encode(&mut buf)
-
            .expect("pong should be within max message size");
+
        });
    }

    #[test]
-
    fn test_pingpong_encode_size_overflow() {
-
        let ping = Message::Ping(Ping {
+
    #[should_panic(expected = "advance out of bounds")]
+
    fn test_ping_encode_size_overflow() {
+
        wire::serialize(&Message::Ping(Ping {
            ponglen: 0,
            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
-
        });
-

-
        let mut buf = Vec::new();
-
        ping.encode(&mut buf)
-
            .expect_err("ping should exceed max message size");
+
        }));
+
    }

-
        let pong = Message::Pong {
+
    #[test]
+
    #[should_panic(expected = "advance out of bounds")]
+
    fn test_pong_encode_size_overflow() {
+
        wire::serialize(&Message::Pong {
            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
-
        };
-

-
        let mut buf = Vec::new();
-
        pong.encode(&mut buf)
-
            .expect_err("pong should exceed max message size");
+
        });
    }

    #[quickcheck]
@@ -558,7 +528,7 @@ mod tests {
            let mut decoder = Deserializer::<1048576, Message>::new(8);

            for item in &items {
-
                item.encode(&mut decoder).unwrap();
+
                item.encode(&mut decoder);
            }
            for item in items {
                assert_eq!(decoder.next().unwrap().unwrap(), item);
@@ -570,12 +540,24 @@ mod tests {
            .quickcheck(property as fn(items: Vec<Message>));
    }

-
    #[quickcheck]
-
    fn prop_zero_bytes_encode_decode(zeroes: ZeroBytes) {
-
        assert_eq!(
-
            wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
-
            zeroes
-
        );
+
    #[test]
+
    fn prop_zero_bytes_encode_decode() {
+
        fn property(zeroes: wire::Size) {
+
            if zeroes > Ping::MAX_PING_ZEROES {
+
                return;
+
            }
+

+
            let zeroes = ZeroBytes::new(zeroes);
+

+
            assert_eq!(
+
                wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
+
                zeroes
+
            );
+
        }
+

+
        qcheck::QuickCheck::new()
+
            .gen(qcheck::Gen::new(16))
+
            .quickcheck(property as fn(zeroes: wire::Size));
    }

    #[quickcheck]
modified crates/radicle-protocol/src/wire/varint.rs
@@ -3,9 +3,9 @@

// This implementation is largely based on the `quinn` crate.
// Copyright (c) 2018 The quinn developers.
-
use std::{fmt, io, ops};
+
use std::{fmt, ops};

-
use byteorder::ReadBytesExt;
+
use bytes::{Buf, BufMut};
use thiserror::Error;

use crate::wire;
@@ -44,6 +44,10 @@ impl VarInt {
            Err(BoundsExceeded)
        }
    }
+

+
    pub fn new_unchecked(x: u64) -> Self {
+
        Self(x)
+
    }
}

impl ops::Deref for VarInt {
@@ -98,27 +102,27 @@ impl fmt::Display for VarInt {
pub struct BoundsExceeded;

impl Decode for VarInt {
-
    fn decode<R: io::Read + ?Sized>(r: &mut R) -> Result<Self, wire::Error> {
-
        let mut buf = [0; 8];
-
        buf[0] = r.read_u8()?;
+
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
+
        let mut tmp = [0; 8];
+
        tmp[0] = buf.try_get_u8()?;

        // Integer length.
-
        let tag = buf[0] >> 6;
-
        buf[0] &= 0b0011_1111;
+
        let tag = tmp[0] >> 6;
+
        tmp[0] &= 0b0011_1111;

        let x = match tag {
-
            0b00 => u64::from(buf[0]),
+
            0b00 => u64::from(tmp[0]),
            0b01 => {
-
                r.read_exact(&mut buf[1..2])?;
-
                u64::from(u16::from_be_bytes([buf[0], buf[1]]))
+
                buf.try_copy_to_slice(&mut tmp[1..2])?;
+
                u64::from(u16::from_be_bytes([tmp[0], tmp[1]]))
            }
            0b10 => {
-
                r.read_exact(&mut buf[1..4])?;
-
                u64::from(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
+
                buf.try_copy_to_slice(&mut tmp[1..4])?;
+
                u64::from(u32::from_be_bytes([tmp[0], tmp[1], tmp[2], tmp[3]]))
            }
            0b11 => {
-
                r.read_exact(&mut buf[1..8])?;
-
                u64::from_be_bytes(buf)
+
                buf.try_copy_to_slice(&mut tmp[1..8])?;
+
                u64::from_be_bytes(tmp)
            }
            // SAFETY: It should be obvious that we can't have any other bit pattern
            // than the above, since all other bits are zeroed.
@@ -129,7 +133,7 @@ impl Decode for VarInt {
}

impl Encode for VarInt {
-
    fn encode<W: io::Write + ?Sized>(&self, w: &mut W) -> io::Result<usize> {
+
    fn encode(&self, w: &mut impl BufMut) {
        let x: u64 = self.0;

        if x < 2u64.pow(6) {
@@ -151,25 +155,19 @@ pub mod payload {
    use super::*;

    /// Encode varint-prefixed data payload.
-
    pub fn encode<W: io::Write + ?Sized>(payload: &[u8], writer: &mut W) -> io::Result<usize> {
-
        let mut n = 0;
+
    pub fn encode(payload: &[u8], buf: &mut impl BufMut) {
        let len = payload.len();
-
        let varint =
-
            VarInt::new(len as u64).map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
-

-
        n += varint.encode(writer)?; // The length of the payload length.
-
        n += len; // The length of the data payload itself.
-

-
        writer.write_all(payload)?;
+
        let varint = VarInt::new_unchecked(len as u64);

-
        Ok(n)
+
        varint.encode(buf); // The length of the payload length.
+
        buf.put_slice(payload);
    }

    /// Decode varint-prefixed data payload.
-
    pub fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Vec<u8>, wire::Error> {
-
        let size = VarInt::decode(reader)?;
+
    pub fn decode(buf: &mut impl Buf) -> Result<Vec<u8>, wire::Error> {
+
        let size = VarInt::decode(buf)?;
        let mut data = vec![0; *size as usize];
-
        reader.read_exact(&mut data[..])?;
+
        buf.try_copy_to_slice(&mut data[..])?;

        Ok(data)
    }
modified crates/radicle-ssh/Cargo.toml
@@ -14,7 +14,7 @@ edition.workspace = true
rust-version.workspace = true

[dependencies]
-
byteorder = { workspace = true }
+
byteorder = "1.4"
log = { workspace = true }
thiserror = { workspace = true }
zeroize = { workspace = true }