Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Move wire code
Alexis Sellier committed 3 years ago
commit 5b86131257d3b5104bf55c45c235df7b4cae2daa
parent b72c3411f0c40ebc975d74bdfacd302e66a1d8c1
12 files changed +1041 -1014
modified node/src/client.rs
@@ -8,9 +8,9 @@ use crate::clock::RefClock;
use crate::collections::HashMap;
use crate::crypto::Signer;
use crate::service;
-
use crate::service::wire::Wire;
use crate::storage::git::Storage;
use crate::transport::Transport;
+
use crate::wire::Wire;

pub mod handle;

modified node/src/decoder.rs
@@ -2,7 +2,7 @@ use std::io;
use std::marker::PhantomData;

use crate::service::message::Envelope;
-
use crate::service::wire;
+
use crate::wire;

/// Message stream decoder.
///
modified node/src/lib.rs
@@ -4,6 +4,7 @@ pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
pub mod client;
pub mod control;
pub mod crypto;
+
pub mod storage;

mod address_book;
mod address_manager;
@@ -17,7 +18,18 @@ mod logger;
mod rad;
mod serde_ext;
mod service;
-
mod storage;
#[cfg(test)]
mod test;
mod transport;
+
mod wire;
+

+
pub mod prelude {
+
    pub use crate::crypto::{PublicKey, Signature, Signer};
+
    pub use crate::decoder::Decoder;
+
    pub use crate::hash::Digest;
+
    pub use crate::identity::{Did, Id};
+
    pub use crate::service::filter::Filter;
+
    pub use crate::service::{NodeId, Timestamp};
+
    pub use crate::storage::refs::Refs;
+
    pub use crate::storage::WriteStorage;
+
}
modified node/src/service.rs
@@ -3,7 +3,6 @@ pub mod config;
pub mod filter;
pub mod message;
pub mod peer;
-
pub mod wire;

use std::ops::{Deref, DerefMut};
use std::{collections::VecDeque, fmt, net, net::IpAddr};
modified node/src/service/filter.rs
@@ -1,10 +1,8 @@
-
use std::io;
use std::ops::{Deref, DerefMut};

-
use bloomy::BloomFilter;
+
pub use bloomy::BloomFilter;

use crate::identity::Id;
-
use crate::service::wire;

/// Size in bytes of subscription bloom filter.
pub const FILTER_SIZE: usize = 1024 * 16;
@@ -51,34 +49,8 @@ impl DerefMut for Filter {
    }
}

-
#[cfg(test)]
impl From<BloomFilter<Id>> for Filter {
    fn from(bloom: BloomFilter<Id>) -> Self {
        Self(bloom)
    }
}
-

-
impl wire::Encode for Filter {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.0.as_bytes().encode(writer)?;
-

-
        Ok(n)
-
    }
-
}
-

-
impl wire::Decode for Filter {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let size: wire::Size = wire::Decode::decode(reader)?;
-
        if size as usize != FILTER_SIZE {
-
            return Err(wire::Error::InvalidFilterSize(size as usize));
-
        }
-
        let bytes: [u8; FILTER_SIZE] = wire::Decode::decode(reader)?;
-
        let bf = BloomFilter::from(Vec::from(bytes));
-

-
        debug_assert_eq!(bf.hashes(), FILTER_HASHES);
-

-
        Ok(Self(bf))
-
    }
-
}
modified node/src/service/message.rs
@@ -1,14 +1,12 @@
use std::{fmt, io, net};

-
use byteorder::{NetworkEndian, ReadBytesExt};
-

use crate::crypto;
use crate::git;
use crate::identity::Id;
use crate::service::filter::Filter;
-
use crate::service::wire;
use crate::service::{NodeId, Timestamp, PROTOCOL_VERSION};
use crate::storage::refs::Refs;
+
use crate::wire;

/// Message envelope. All messages sent over the network are wrapped in this type.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -26,68 +24,6 @@ pub type NodeFeatures = [u8; 32];
// TODO: We should check the length and charset when deserializing.
pub struct Hostname(String);

-
/// Message type.
-
#[repr(u16)]
-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-
pub enum MessageType {
-
    Initialize = 0,
-
    NodeAnnouncement = 2,
-
    InventoryAnnouncement = 4,
-
    RefsAnnouncement = 6,
-
    Subscribe = 8,
-
}
-

-
impl From<MessageType> for u16 {
-
    fn from(other: MessageType) -> Self {
-
        other as u16
-
    }
-
}
-

-
impl TryFrom<u16> for MessageType {
-
    type Error = u16;
-

-
    fn try_from(other: u16) -> Result<Self, Self::Error> {
-
        match other {
-
            0 => Ok(MessageType::Initialize),
-
            2 => Ok(MessageType::NodeAnnouncement),
-
            4 => Ok(MessageType::InventoryAnnouncement),
-
            6 => Ok(MessageType::RefsAnnouncement),
-
            8 => Ok(MessageType::Subscribe),
-
            _ => Err(other),
-
        }
-
    }
-
}
-

-
/// Address type.
-
#[repr(u8)]
-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-
pub enum AddressType {
-
    Ipv4 = 1,
-
    Ipv6 = 2,
-
    Hostname = 3,
-
    Onion = 4,
-
}
-

-
impl From<AddressType> for u8 {
-
    fn from(other: AddressType) -> Self {
-
        other as u8
-
    }
-
}
-

-
impl TryFrom<u8> for AddressType {
-
    type Error = u8;
-

-
    fn try_from(other: u8) -> Result<Self, Self::Error> {
-
        match other {
-
            1 => Ok(AddressType::Ipv4),
-
            2 => Ok(AddressType::Ipv6),
-
            3 => Ok(AddressType::Hostname),
-
            4 => Ok(AddressType::Onion),
-
            _ => Err(other),
-
        }
-
    }
-
}
-

/// Peer public protocol address.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Address {
@@ -123,78 +59,6 @@ impl From<net::SocketAddr> for Address {
    }
}

-
impl wire::Encode for Envelope {
-
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
-
        let mut n = 0;
-

-
        n += self.magic.encode(writer)?;
-
        n += self.msg.encode(writer)?;
-

-
        Ok(n)
-
    }
-
}
-

-
impl wire::Decode for Envelope {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let magic = u32::decode(reader)?;
-
        let msg = Message::decode(reader)?;
-

-
        Ok(Self { magic, msg })
-
    }
-
}
-

-
impl wire::Encode for Address {
-
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
-
        let mut n = 0;
-

-
        match self {
-
            Self::Ipv4 { ip, port } => {
-
                n += u8::from(AddressType::Ipv4).encode(writer)?;
-
                n += ip.octets().encode(writer)?;
-
                n += port.encode(writer)?;
-
            }
-
            Self::Ipv6 { ip, port } => {
-
                n += u8::from(AddressType::Ipv6).encode(writer)?;
-
                n += ip.octets().encode(writer)?;
-
                n += port.encode(writer)?;
-
            }
-
            Self::Hostname { .. } => todo!(),
-
            Self::Onion { .. } => todo!(),
-
        }
-
        Ok(n)
-
    }
-
}
-

-
impl wire::Decode for Address {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let addrtype = reader.read_u8()?;
-

-
        match AddressType::try_from(addrtype) {
-
            Ok(AddressType::Ipv4) => {
-
                let octets: [u8; 4] = wire::Decode::decode(reader)?;
-
                let ip = net::Ipv4Addr::from(octets);
-
                let port = u16::decode(reader)?;
-

-
                Ok(Self::Ipv4 { ip, port })
-
            }
-
            Ok(AddressType::Ipv6) => {
-
                let octets: [u8; 16] = wire::Decode::decode(reader)?;
-
                let ip = net::Ipv6Addr::from(octets);
-
                let port = u16::decode(reader)?;
-

-
                Ok(Self::Ipv6 { ip, port })
-
            }
-
            Ok(AddressType::Hostname) => {
-
                todo!();
-
            }
-
            Ok(AddressType::Onion) => {
-
                todo!();
-
            }
-
            Err(other) => Err(wire::Error::UnknownAddressType(other)),
-
        }
-
    }
-
}
-

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Subscribe {
    /// Subscribe to events matching this filter.
@@ -276,26 +140,6 @@ impl RefsAnnouncement {
    }
}

-
impl wire::Encode for RefsAnnouncement {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.id.encode(writer)?;
-
        n += self.refs.encode(writer)?;
-

-
        Ok(n)
-
    }
-
}
-

-
impl wire::Decode for RefsAnnouncement {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let id = Id::decode(reader)?;
-
        let refs = Refs::decode(reader)?;
-

-
        Ok(Self { id, refs })
-
    }
-
}
-

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InventoryAnnouncement {
    pub inventory: Vec<Id>,
@@ -310,29 +154,6 @@ impl InventoryAnnouncement {
    }
}

-
impl wire::Encode for InventoryAnnouncement {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = 0;
-

-
        n += self.inventory.as_slice().encode(writer)?;
-
        n += self.timestamp.encode(writer)?;
-

-
        Ok(n)
-
    }
-
}
-

-
impl wire::Decode for InventoryAnnouncement {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let inventory = Vec::<Id>::decode(reader)?;
-
        let timestamp = Timestamp::decode(reader)?;
-

-
        Ok(Self {
-
            inventory,
-
            timestamp,
-
        })
-
    }
-
}
-

/// Message payload.
/// These are the messages peers send to each other.
#[derive(Clone, PartialEq, Eq)]
@@ -425,17 +246,6 @@ impl Message {
            until,
        })
    }
-

-
    pub fn type_id(&self) -> u16 {
-
        match self {
-
            Self::Initialize { .. } => MessageType::Initialize,
-
            Self::Subscribe { .. } => MessageType::Subscribe,
-
            Self::NodeAnnouncement { .. } => MessageType::NodeAnnouncement,
-
            Self::InventoryAnnouncement { .. } => MessageType::InventoryAnnouncement,
-
            Self::RefsAnnouncement { .. } => MessageType::RefsAnnouncement,
-
        }
-
        .into()
-
    }
}

impl fmt::Debug for Message {
@@ -472,187 +282,15 @@ impl fmt::Debug for Message {
    }
}

-
impl wire::Encode for Message {
-
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
-
        let mut n = self.type_id().encode(writer)?;
-

-
        match self {
-
            Self::Initialize {
-
                id,
-
                timestamp,
-
                version,
-
                addrs,
-
                git,
-
            } => {
-
                n += id.encode(writer)?;
-
                n += timestamp.encode(writer)?;
-
                n += version.encode(writer)?;
-
                n += addrs.as_slice().encode(writer)?;
-
                n += git.encode(writer)?;
-
            }
-
            Self::Subscribe(Subscribe {
-
                filter,
-
                since,
-
                until,
-
            }) => {
-
                n += filter.encode(writer)?;
-
                n += since.encode(writer)?;
-
                n += until.encode(writer)?;
-
            }
-
            Self::RefsAnnouncement {
-
                node,
-
                message,
-
                signature,
-
            } => {
-
                n += node.encode(writer)?;
-
                n += message.encode(writer)?;
-
                n += signature.encode(writer)?;
-
            }
-
            Self::InventoryAnnouncement {
-
                node,
-
                message,
-
                signature,
-
            } => {
-
                n += node.encode(writer)?;
-
                n += message.encode(writer)?;
-
                n += signature.encode(writer)?;
-
            }
-
            Self::NodeAnnouncement {
-
                node,
-
                message,
-
                signature,
-
            } => {
-
                n += node.encode(writer)?;
-
                n += message.encode(writer)?;
-
                n += signature.encode(writer)?;
-
            }
-
        }
-
        Ok(n)
-
    }
-
}
-

-
impl wire::Decode for Message {
-
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let type_id = reader.read_u16::<NetworkEndian>()?;
-

-
        match MessageType::try_from(type_id) {
-
            Ok(MessageType::Initialize) => {
-
                let id = NodeId::decode(reader)?;
-
                let timestamp = Timestamp::decode(reader)?;
-
                let version = u32::decode(reader)?;
-
                let addrs = Vec::<Address>::decode(reader)?;
-
                let git = git::Url::decode(reader)?;
-

-
                Ok(Self::Initialize {
-
                    id,
-
                    timestamp,
-
                    version,
-
                    addrs,
-
                    git,
-
                })
-
            }
-
            Ok(MessageType::Subscribe) => {
-
                let filter = Filter::decode(reader)?;
-
                let since = Timestamp::decode(reader)?;
-
                let until = Timestamp::decode(reader)?;
-

-
                Ok(Self::Subscribe(Subscribe {
-
                    filter,
-
                    since,
-
                    until,
-
                }))
-
            }
-
            Ok(MessageType::NodeAnnouncement) => {
-
                let node = NodeId::decode(reader)?;
-
                let message = NodeAnnouncement::decode(reader)?;
-
                let signature = crypto::Signature::decode(reader)?;
-

-
                Ok(Self::NodeAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                })
-
            }
-
            Ok(MessageType::InventoryAnnouncement) => {
-
                let node = NodeId::decode(reader)?;
-
                let message = InventoryAnnouncement::decode(reader)?;
-
                let signature = crypto::Signature::decode(reader)?;
-

-
                Ok(Self::InventoryAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                })
-
            }
-
            Ok(MessageType::RefsAnnouncement) => {
-
                let node = NodeId::decode(reader)?;
-
                let message = RefsAnnouncement::decode(reader)?;
-
                let signature = crypto::Signature::decode(reader)?;
-

-
                Ok(Self::RefsAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                })
-
            }
-
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
-
        }
-
    }
-
}
-

#[cfg(test)]
mod tests {
    use super::*;
    use quickcheck_macros::quickcheck;

    use crate::crypto::Signer;
-
    use crate::decoder::Decoder;
-
    use crate::service::wire::{self, Encode};
    use crate::test::crypto::MockSigner;

    #[quickcheck]
-
    fn prop_message_encode_decode(message: Message) {
-
        assert_eq!(
-
            wire::deserialize::<Message>(&wire::serialize(&message)).unwrap(),
-
            message
-
        );
-
    }
-

-
    #[quickcheck]
-
    fn prop_envelope_encode_decode(envelope: Envelope) {
-
        assert_eq!(
-
            wire::deserialize::<Envelope>(&wire::serialize(&envelope)).unwrap(),
-
            envelope
-
        );
-
    }
-

-
    #[test]
-
    fn prop_envelope_decoder() {
-
        fn property(items: Vec<Envelope>) {
-
            let mut decoder = Decoder::<Envelope>::new(8);
-

-
            for item in &items {
-
                item.encode(&mut decoder).unwrap();
-
            }
-
            for item in items {
-
                assert_eq!(decoder.next().unwrap().unwrap(), item);
-
            }
-
        }
-

-
        quickcheck::QuickCheck::new()
-
            .gen(quickcheck::Gen::new(16))
-
            .quickcheck(property as fn(items: Vec<Envelope>));
-
    }
-

-
    #[quickcheck]
-
    fn prop_addr(addr: Address) {
-
        assert_eq!(
-
            wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
-
            addr
-
        );
-
    }
-

-
    #[quickcheck]
    fn prop_refs_announcement_signing(id: Id, refs: Refs) {
        let signer = MockSigner::new(&mut fastrand::Rng::new());
        let message = RefsAnnouncement { id, refs };
deleted node/src/service/wire.rs
@@ -1,614 +0,0 @@
-
use std::collections::{BTreeMap, HashMap};
-
use std::convert::TryFrom;
-
use std::net::IpAddr;
-
use std::ops::{Deref, DerefMut};
-
use std::string::FromUtf8Error;
-
use std::{io, mem};
-

-
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
-
use nakamoto_net as nakamoto;
-
use nakamoto_net::Link;
-

-
use crate::address_book;
-
use crate::crypto::{PublicKey, Signature, Signer};
-
use crate::decoder::Decoder;
-
use crate::git;
-
use crate::git::fmt;
-
use crate::hash::Digest;
-
use crate::identity::Id;
-
use crate::service;
-
use crate::storage::refs::Refs;
-
use crate::storage::WriteStorage;
-

-
/// The default type we use to represent sizes.
-
/// Four bytes is more than enough for anything sent over the wire.
-
/// Note that in certain cases, we may use only one or two byte types.
-
pub type Size = u32;
-

-
#[derive(thiserror::Error, Debug)]
-
pub enum Error {
-
    #[error("i/o: {0}")]
-
    Io(#[from] io::Error),
-
    #[error("UTF-8 error: {0}")]
-
    FromUtf8(#[from] FromUtf8Error),
-
    #[error("invalid size: expected {expected}, got {actual}")]
-
    InvalidSize { expected: usize, actual: usize },
-
    #[error("invalid filter size: {0}")]
-
    InvalidFilterSize(usize),
-
    #[error(transparent)]
-
    InvalidRefName(#[from] fmt::Error),
-
    #[error("invalid git url `{url}`: {error}")]
-
    InvalidGitUrl {
-
        url: String,
-
        error: git::url::parse::Error,
-
    },
-
    #[error("unknown address type `{0}`")]
-
    UnknownAddressType(u8),
-
    #[error("unknown message type `{0}`")]
-
    UnknownMessageType(u16),
-
}
-

-
impl Error {
-
    /// Whether we've reached the end of file. This will be true when we fail to decode
-
    /// a message because there's not enough data in the stream.
-
    pub fn is_eof(&self) -> bool {
-
        matches!(self, Self::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof)
-
    }
-
}
-

-
/// Things that can be encoded as binary.
-
pub trait Encode {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error>;
-
}
-

-
/// Things that can be decoded from binary.
-
pub trait Decode: Sized {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error>;
-
}
-

-
/// Encode an object into a vector.
-
pub fn serialize<T: Encode + ?Sized>(data: &T) -> Vec<u8> {
-
    let mut buffer = Vec::new();
-
    let len = data
-
        .encode(&mut buffer)
-
        .expect("in-memory writes don't error");
-

-
    debug_assert_eq!(len, buffer.len());
-

-
    buffer
-
}
-

-
/// Decode an object from a vector.
-
pub fn deserialize<T: Decode>(data: &[u8]) -> Result<T, Error> {
-
    let mut cursor = io::Cursor::new(data);
-

-
    T::decode(&mut cursor)
-
}
-

-
impl Encode for u8 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u8(*self)?;
-

-
        Ok(mem::size_of::<Self>())
-
    }
-
}
-

-
impl Encode for u16 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u16::<NetworkEndian>(*self)?;
-

-
        Ok(mem::size_of::<Self>())
-
    }
-
}
-

-
impl Encode for u32 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u32::<NetworkEndian>(*self)?;
-

-
        Ok(mem::size_of::<Self>())
-
    }
-
}
-

-
impl Encode for u64 {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_u64::<NetworkEndian>(*self)?;
-

-
        Ok(mem::size_of::<Self>())
-
    }
-
}
-

-
impl Encode for usize {
-
    /// We encode this type to a [`u32`], since there's no need to send larger messages
-
    /// over the network.
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        assert!(
-
            *self <= u32::MAX as usize,
-
            "Cannot encode sizes larger than {}",
-
            u32::MAX
-
        );
-
        writer.write_u32::<NetworkEndian>(*self as u32)?;
-

-
        Ok(mem::size_of::<u32>())
-
    }
-
}
-

-
impl Encode for PublicKey {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_bytes().encode(writer)
-
    }
-
}
-

-
impl<const T: usize> Encode for &[u8; T] {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_all(*self)?;
-

-
        Ok(mem::size_of::<Self>())
-
    }
-
}
-

-
impl<const T: usize> Encode for [u8; T] {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        writer.write_all(self)?;
-

-
        Ok(mem::size_of::<Self>())
-
    }
-
}
-

-
impl<T> Encode for &[T]
-
where
-
    T: Encode,
-
{
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = (self.len() as Size).encode(writer)?;
-

-
        for item in self.iter() {
-
            n += item.encode(writer)?;
-
        }
-
        Ok(n)
-
    }
-
}
-

-
impl Encode for &str {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        assert!(self.len() <= u8::MAX as usize);
-

-
        let n = (self.len() as u8).encode(writer)?;
-
        let bytes = self.as_bytes();
-

-
        // Nb. Don't use the [`Encode`] instance here for &[u8], because we are prefixing the
-
        // length ourselves.
-
        writer.write_all(bytes)?;
-

-
        Ok(n + bytes.len())
-
    }
-
}
-

-
impl Encode for String {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_str().encode(writer)
-
    }
-
}
-

-
impl Encode for git::Url {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.to_string().encode(writer)
-
    }
-
}
-

-
impl Encode for Digest {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.as_ref().encode(writer)
-
    }
-
}
-

-
impl Encode for Id {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.deref().encode(writer)
-
    }
-
}
-

-
impl Encode for Refs {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        let mut n = self.len().encode(writer)?;
-

-
        for (name, oid) in self.iter() {
-
            n += name.as_str().encode(writer)?;
-
            n += oid.encode(writer)?;
-
        }
-
        Ok(n)
-
    }
-
}
-

-
impl Encode for Signature {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        self.to_bytes().encode(writer)
-
    }
-
}
-

-
impl Encode for git::Oid {
-
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
-
        // Nb. We use length-encoding here to support future SHA-2 object ids.
-
        self.as_bytes().encode(writer)
-
    }
-
}
-

-
////////////////////////////////////////////////////////////////////////////////
-

-
impl Decode for PublicKey {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let buf: [u8; 32] = Decode::decode(reader)?;
-

-
        PublicKey::try_from(buf)
-
            .map_err(|e| Error::Io(io::Error::new(io::ErrorKind::InvalidInput, e.to_string())))
-
    }
-
}
-

-
impl Decode for Refs {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len = usize::decode(reader)?;
-
        let mut refs = BTreeMap::new();
-

-
        for _ in 0..len {
-
            let name = String::decode(reader)?;
-
            let name = git::RefString::try_from(name).map_err(Error::from)?;
-
            let oid = git::Oid::decode(reader)?;
-

-
            refs.insert(name, oid);
-
        }
-
        Ok(refs.into())
-
    }
-
}
-

-
impl Decode for git::Oid {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len = usize::decode(reader)?;
-
        #[allow(non_upper_case_globals)]
-
        const expected: usize = mem::size_of::<git2::Oid>();
-

-
        if len != expected {
-
            return Err(Error::InvalidSize {
-
                expected,
-
                actual: len,
-
            });
-
        }
-

-
        let buf: [u8; expected] = Decode::decode(reader)?;
-
        let oid = git2::Oid::from_bytes(&buf).expect("the buffer is exactly the right size");
-
        let oid = git::Oid::from(oid);
-

-
        Ok(oid)
-
    }
-
}
-

-
impl Decode for Signature {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let bytes: [u8; 64] = Decode::decode(reader)?;
-

-
        Ok(Signature::from(bytes))
-
    }
-
}
-

-
impl Decode for u8 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u8().map_err(Error::from)
-
    }
-
}
-

-
impl Decode for u16 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u16::<NetworkEndian>().map_err(Error::from)
-
    }
-
}
-

-
impl Decode for u32 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u32::<NetworkEndian>().map_err(Error::from)
-
    }
-
}
-

-
impl Decode for u64 {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        reader.read_u64::<NetworkEndian>().map_err(Error::from)
-
    }
-
}
-

-
impl Decode for usize {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let size: usize = u32::decode(reader)?
-
            .try_into()
-
            .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
-

-
        Ok(size)
-
    }
-
}
-

-
impl<const N: usize> Decode for [u8; N] {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let mut ary = [0; N];
-
        reader.read_exact(&mut ary)?;
-

-
        Ok(ary)
-
    }
-
}
-

-
impl<T> Decode for Vec<T>
-
where
-
    T: Decode,
-
{
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len: Size = Size::decode(reader)?;
-
        let mut vec = Vec::with_capacity(len as usize);
-

-
        for _ in 0..len {
-
            let item = T::decode(reader)?;
-
            vec.push(item);
-
        }
-
        Ok(vec)
-
    }
-
}
-

-
impl Decode for String {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len = u8::decode(reader)?;
-
        let mut bytes = vec![0; len as usize];
-

-
        reader.read_exact(&mut bytes)?;
-

-
        let string = String::from_utf8(bytes)?;
-

-
        Ok(string)
-
    }
-
}
-

-
impl Decode for git::Url {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let url = String::decode(reader)?;
-
        let url = Self::from_bytes(url.as_bytes())
-
            .map_err(|error| Error::InvalidGitUrl { url, error })?;
-

-
        Ok(url)
-
    }
-
}
-

-
impl Decode for Id {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let digest: Digest = Decode::decode(reader)?;
-

-
        Ok(Self::from(digest))
-
    }
-
}
-

-
impl Decode for Digest {
-
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let bytes: [u8; 32] = Decode::decode(reader)?;
-

-
        Ok(Self::from(bytes))
-
    }
-
}
-

-
#[derive(Debug)]
-
pub struct Wire<S, T, G> {
-
    inboxes: HashMap<IpAddr, Decoder>,
-
    inner: service::Service<S, T, G>,
-
}
-

-
impl<S, T, G> Wire<S, T, G> {
-
    pub fn new(inner: service::Service<S, T, G>) -> Self {
-
        Self {
-
            inboxes: HashMap::new(),
-
            inner,
-
        }
-
    }
-
}
-

-
impl<'r, S, T, G> Wire<S, T, G>
-
where
-
    S: address_book::Store,
-
    T: WriteStorage<'r> + 'static,
-
    G: Signer,
-
{
-
    pub fn connected(
-
        &mut self,
-
        addr: std::net::SocketAddr,
-
        local_addr: &std::net::SocketAddr,
-
        link: Link,
-
    ) {
-
        self.inboxes.insert(addr.ip(), Decoder::new(256));
-
        self.inner.connected(addr, local_addr, link)
-
    }
-

-
    pub fn disconnected(
-
        &mut self,
-
        addr: &std::net::SocketAddr,
-
        reason: nakamoto::DisconnectReason<service::DisconnectReason>,
-
    ) {
-
        self.inboxes.remove(&addr.ip());
-
        self.inner.disconnected(addr, reason)
-
    }
-

-
    pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
-
        let peer_ip = addr.ip();
-

-
        if let Some(inbox) = self.inboxes.get_mut(&peer_ip) {
-
            inbox.input(bytes);
-

-
            loop {
-
                match inbox.decode_next() {
-
                    Ok(Some(msg)) => self.inner.received_message(addr, msg),
-
                    Ok(None) => break,
-

-
                    Err(err) => {
-
                        // TODO: Disconnect peer.
-
                        log::error!("Invalid message received from {}: {}", peer_ip, err);
-

-
                        return;
-
                    }
-
                }
-
            }
-
        } else {
-
            log::debug!("Received message from unknown peer {}", peer_ip);
-
        }
-
    }
-
}
-

-
impl<S, T, G> Iterator for Wire<S, T, G> {
-
    type Item = nakamoto::Io<service::Event, service::DisconnectReason>;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        match self.inner.next() {
-
            Some(service::Io::Write(addr, msgs)) => {
-
                let mut buf = Vec::new();
-
                for msg in msgs {
-
                    log::debug!("Write {:?} to {}", &msg, addr.ip());
-

-
                    msg.encode(&mut buf)
-
                        .expect("writing to an in-memory buffer doesn't fail");
-
                }
-
                Some(nakamoto::Io::Write(addr, buf))
-
            }
-
            Some(service::Io::Event(e)) => Some(nakamoto::Io::Event(e)),
-
            Some(service::Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
-
            Some(service::Io::Disconnect(a, r)) => Some(nakamoto::Io::Disconnect(a, r)),
-
            Some(service::Io::Wakeup(d)) => Some(nakamoto::Io::Wakeup(d)),
-

-
            None => None,
-
        }
-
    }
-
}
-

-
impl<S, T, G> Deref for Wire<S, T, G> {
-
    type Target = service::Service<S, T, G>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.inner
-
    }
-
}
-

-
impl<S, T, G> DerefMut for Wire<S, T, G> {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.inner
-
    }
-
}
-

-
#[cfg(test)]
-
mod tests {
-
    use super::*;
-
    use quickcheck_macros::quickcheck;
-

-
    use crate::crypto::Unverified;
-
    use crate::storage::refs::SignedRefs;
-
    use crate::test::arbitrary;
-

-
    #[quickcheck]
-
    fn prop_u8(input: u8) {
-
        assert_eq!(deserialize::<u8>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_u16(input: u16) {
-
        assert_eq!(deserialize::<u16>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_u32(input: u32) {
-
        assert_eq!(deserialize::<u32>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_u64(input: u64) {
-
        assert_eq!(deserialize::<u64>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_usize(input: usize) -> quickcheck::TestResult {
-
        if input > u32::MAX as usize {
-
            return quickcheck::TestResult::discard();
-
        }
-
        assert_eq!(deserialize::<usize>(&serialize(&input)).unwrap(), input);
-

-
        quickcheck::TestResult::passed()
-
    }
-

-
    #[quickcheck]
-
    fn prop_string(input: String) -> quickcheck::TestResult {
-
        if input.len() > u8::MAX as usize {
-
            return quickcheck::TestResult::discard();
-
        }
-
        assert_eq!(deserialize::<String>(&serialize(&input)).unwrap(), input);
-

-
        quickcheck::TestResult::passed()
-
    }
-

-
    #[quickcheck]
-
    fn prop_vec(input: Vec<String>) {
-
        assert_eq!(
-
            deserialize::<Vec<String>>(&serialize(&input.as_slice())).unwrap(),
-
            input
-
        );
-
    }
-

-
    #[quickcheck]
-
    fn prop_pubkey(input: PublicKey) {
-
        assert_eq!(deserialize::<PublicKey>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_id(input: Id) {
-
        assert_eq!(deserialize::<Id>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_digest(input: Digest) {
-
        assert_eq!(deserialize::<Digest>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_refs(input: Refs) {
-
        assert_eq!(deserialize::<Refs>(&serialize(&input)).unwrap(), input);
-
    }
-

-
    #[quickcheck]
-
    fn prop_signature(input: arbitrary::ByteArray<64>) {
-
        let signature = Signature::from(input.into_inner());
-

-
        assert_eq!(
-
            deserialize::<Signature>(&serialize(&signature)).unwrap(),
-
            signature
-
        );
-
    }
-

-
    #[quickcheck]
-
    fn prop_oid(input: arbitrary::ByteArray<20>) {
-
        let oid = git::Oid::try_from(input.into_inner().as_slice()).unwrap();
-

-
        assert_eq!(deserialize::<git::Oid>(&serialize(&oid)).unwrap(), oid);
-
    }
-

-
    #[quickcheck]
-
    fn prop_signed_refs(input: SignedRefs<Unverified>) {
-
        assert_eq!(
-
            deserialize::<SignedRefs<Unverified>>(&serialize(&input)).unwrap(),
-
            input
-
        );
-
    }
-

-
    #[test]
-
    fn test_string() {
-
        assert_eq!(
-
            serialize(&String::from("hello")),
-
            vec![5, b'h', b'e', b'l', b'l', b'o']
-
        );
-
    }
-

-
    #[test]
-
    fn test_git_url() {
-
        let url = git::Url {
-
            scheme: git::url::Scheme::Https,
-
            path: "/git".to_owned().into(),
-
            host: Some("seed.radicle.xyz".to_owned()),
-
            port: Some(8888),
-
            ..git::Url::default()
-
        };
-
        assert_eq!(deserialize::<git::Url>(&serialize(&url)).unwrap(), url);
-
    }
-
}
modified node/src/storage/refs.rs
@@ -15,9 +15,9 @@ use crate::crypto;
use crate::crypto::{PublicKey, Signature, Signer, Unverified, Verified};
use crate::git;
use crate::git::Oid;
-
use crate::service::wire;
use crate::storage;
use crate::storage::{ReadRepository, RemoteId, WriteRepository};
+
use crate::wire;

pub static SIGNATURE_REF: Lazy<git::RefString> = Lazy::new(|| git::refname!("radicle/signature"));
pub const REFS_BLOB_PATH: &str = "refs";
modified node/src/test/arbitrary.rs
@@ -16,13 +16,14 @@ use crate::hash;
use crate::identity::{Delegate, Did, Doc, Id, Project};
use crate::service::filter::{Filter, FILTER_SIZE};
use crate::service::message::{
-
    Address, Envelope, InventoryAnnouncement, Message, MessageType, NodeAnnouncement,
-
    RefsAnnouncement, Subscribe,
+
    Address, Envelope, InventoryAnnouncement, Message, NodeAnnouncement, RefsAnnouncement,
+
    Subscribe,
};
use crate::service::{NodeId, Timestamp};
use crate::storage;
use crate::storage::refs::{Refs, SignedRefs};
use crate::test::storage::MockStorage;
+
use crate::wire::message::MessageType;

use super::crypto::MockSigner;

modified node/src/transport.rs
@@ -8,9 +8,9 @@ use nakamoto_net::{Io, Link};
use crate::address_book;
use crate::collections::HashMap;
use crate::crypto;
-
use crate::service::wire::Wire;
use crate::service::{Command, DisconnectReason, Event, Service};
use crate::storage::WriteStorage;
+
use crate::wire::Wire;

#[derive(Debug)]
struct Peer {
added node/src/wire.rs
@@ -0,0 +1,642 @@
+
pub mod message;
+

+
use std::collections::{BTreeMap, HashMap};
+
use std::convert::TryFrom;
+
use std::net::IpAddr;
+
use std::ops::{Deref, DerefMut};
+
use std::string::FromUtf8Error;
+
use std::{io, mem};
+

+
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
+
use nakamoto_net as nakamoto;
+
use nakamoto_net::Link;
+

+
use crate::address_book;
+
use crate::crypto::{PublicKey, Signature, Signer};
+
use crate::decoder::Decoder;
+
use crate::git;
+
use crate::git::fmt;
+
use crate::hash::Digest;
+
use crate::identity::Id;
+
use crate::service;
+
use crate::service::filter;
+
use crate::storage::refs::Refs;
+
use crate::storage::WriteStorage;
+

+
/// The default type we use to represent sizes.
+
/// Four bytes is more than enough for anything sent over the wire.
+
/// Note that in certain cases, we may use only one or two byte types.
+
pub type Size = u32;
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error("i/o: {0}")]
+
    Io(#[from] io::Error),
+
    #[error("UTF-8 error: {0}")]
+
    FromUtf8(#[from] FromUtf8Error),
+
    #[error("invalid size: expected {expected}, got {actual}")]
+
    InvalidSize { expected: usize, actual: usize },
+
    #[error("invalid filter size: {0}")]
+
    InvalidFilterSize(usize),
+
    #[error(transparent)]
+
    InvalidRefName(#[from] fmt::Error),
+
    #[error("invalid git url `{url}`: {error}")]
+
    InvalidGitUrl {
+
        url: String,
+
        error: git::url::parse::Error,
+
    },
+
    #[error("unknown address type `{0}`")]
+
    UnknownAddressType(u8),
+
    #[error("unknown message type `{0}`")]
+
    UnknownMessageType(u16),
+
}
+

+
impl Error {
+
    /// Whether we've reached the end of file. This will be true when we fail to decode
+
    /// a message because there's not enough data in the stream.
+
    pub fn is_eof(&self) -> bool {
+
        matches!(self, Self::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof)
+
    }
+
}
+

+
/// Things that can be encoded as binary.
+
pub trait Encode {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error>;
+
}
+

+
/// Things that can be decoded from binary.
+
pub trait Decode: Sized {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error>;
+
}
+

+
/// Encode an object into a vector.
+
pub fn serialize<T: Encode + ?Sized>(data: &T) -> Vec<u8> {
+
    let mut buffer = Vec::new();
+
    let len = data
+
        .encode(&mut buffer)
+
        .expect("in-memory writes don't error");
+

+
    debug_assert_eq!(len, buffer.len());
+

+
    buffer
+
}
+

+
/// Decode an object from a vector.
+
pub fn deserialize<T: Decode>(data: &[u8]) -> Result<T, Error> {
+
    let mut cursor = io::Cursor::new(data);
+

+
    T::decode(&mut cursor)
+
}
+

+
impl Encode for u8 {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_u8(*self)?;
+

+
        Ok(mem::size_of::<Self>())
+
    }
+
}
+

+
impl Encode for u16 {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_u16::<NetworkEndian>(*self)?;
+

+
        Ok(mem::size_of::<Self>())
+
    }
+
}
+

+
impl Encode for u32 {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_u32::<NetworkEndian>(*self)?;
+

+
        Ok(mem::size_of::<Self>())
+
    }
+
}
+

+
impl Encode for u64 {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_u64::<NetworkEndian>(*self)?;
+

+
        Ok(mem::size_of::<Self>())
+
    }
+
}
+

+
impl Encode for usize {
+
    /// We encode this type to a [`u32`], since there's no need to send larger messages
+
    /// over the network.
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        assert!(
+
            *self <= u32::MAX as usize,
+
            "Cannot encode sizes larger than {}",
+
            u32::MAX
+
        );
+
        writer.write_u32::<NetworkEndian>(*self as u32)?;
+

+
        Ok(mem::size_of::<u32>())
+
    }
+
}
+

+
impl Encode for PublicKey {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.as_bytes().encode(writer)
+
    }
+
}
+

+
impl<const T: usize> Encode for &[u8; T] {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_all(*self)?;
+

+
        Ok(mem::size_of::<Self>())
+
    }
+
}
+

+
impl<const T: usize> Encode for [u8; T] {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_all(self)?;
+

+
        Ok(mem::size_of::<Self>())
+
    }
+
}
+

+
impl<T> Encode for &[T]
+
where
+
    T: Encode,
+
{
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = (self.len() as Size).encode(writer)?;
+

+
        for item in self.iter() {
+
            n += item.encode(writer)?;
+
        }
+
        Ok(n)
+
    }
+
}
+

+
impl Encode for &str {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        assert!(self.len() <= u8::MAX as usize);
+

+
        let n = (self.len() as u8).encode(writer)?;
+
        let bytes = self.as_bytes();
+

+
        // Nb. Don't use the [`Encode`] instance here for &[u8], because we are prefixing the
+
        // length ourselves.
+
        writer.write_all(bytes)?;
+

+
        Ok(n + bytes.len())
+
    }
+
}
+

+
impl Encode for String {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.as_str().encode(writer)
+
    }
+
}
+

+
impl Encode for git::Url {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.to_string().encode(writer)
+
    }
+
}
+

+
impl Encode for Digest {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.as_ref().encode(writer)
+
    }
+
}
+

+
impl Encode for Id {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.deref().encode(writer)
+
    }
+
}
+

+
impl Encode for Refs {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = self.len().encode(writer)?;
+

+
        for (name, oid) in self.iter() {
+
            n += name.as_str().encode(writer)?;
+
            n += oid.encode(writer)?;
+
        }
+
        Ok(n)
+
    }
+
}
+

+
impl Encode for Signature {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.to_bytes().encode(writer)
+
    }
+
}
+

+
impl Encode for git::Oid {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        // Nb. We use length-encoding here to support future SHA-2 object ids.
+
        self.as_bytes().encode(writer)
+
    }
+
}
+

+
////////////////////////////////////////////////////////////////////////////////
+

+
impl Decode for PublicKey {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let buf: [u8; 32] = Decode::decode(reader)?;
+

+
        PublicKey::try_from(buf)
+
            .map_err(|e| Error::Io(io::Error::new(io::ErrorKind::InvalidInput, e.to_string())))
+
    }
+
}
+

+
impl Decode for Refs {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let len = usize::decode(reader)?;
+
        let mut refs = BTreeMap::new();
+

+
        for _ in 0..len {
+
            let name = String::decode(reader)?;
+
            let name = git::RefString::try_from(name).map_err(Error::from)?;
+
            let oid = git::Oid::decode(reader)?;
+

+
            refs.insert(name, oid);
+
        }
+
        Ok(refs.into())
+
    }
+
}
+

+
impl Decode for git::Oid {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let len = usize::decode(reader)?;
+
        #[allow(non_upper_case_globals)]
+
        const expected: usize = mem::size_of::<git2::Oid>();
+

+
        if len != expected {
+
            return Err(Error::InvalidSize {
+
                expected,
+
                actual: len,
+
            });
+
        }
+

+
        let buf: [u8; expected] = Decode::decode(reader)?;
+
        let oid = git2::Oid::from_bytes(&buf).expect("the buffer is exactly the right size");
+
        let oid = git::Oid::from(oid);
+

+
        Ok(oid)
+
    }
+
}
+

+
impl Decode for Signature {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let bytes: [u8; 64] = Decode::decode(reader)?;
+

+
        Ok(Signature::from(bytes))
+
    }
+
}
+

+
impl Decode for u8 {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        reader.read_u8().map_err(Error::from)
+
    }
+
}
+

+
impl Decode for u16 {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        reader.read_u16::<NetworkEndian>().map_err(Error::from)
+
    }
+
}
+

+
impl Decode for u32 {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        reader.read_u32::<NetworkEndian>().map_err(Error::from)
+
    }
+
}
+

+
impl Decode for u64 {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        reader.read_u64::<NetworkEndian>().map_err(Error::from)
+
    }
+
}
+

+
impl Decode for usize {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let size: usize = u32::decode(reader)?
+
            .try_into()
+
            .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
+

+
        Ok(size)
+
    }
+
}
+

+
impl<const N: usize> Decode for [u8; N] {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let mut ary = [0; N];
+
        reader.read_exact(&mut ary)?;
+

+
        Ok(ary)
+
    }
+
}
+

+
impl<T> Decode for Vec<T>
+
where
+
    T: Decode,
+
{
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let len: Size = Size::decode(reader)?;
+
        let mut vec = Vec::with_capacity(len as usize);
+

+
        for _ in 0..len {
+
            let item = T::decode(reader)?;
+
            vec.push(item);
+
        }
+
        Ok(vec)
+
    }
+
}
+

+
impl Decode for String {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let len = u8::decode(reader)?;
+
        let mut bytes = vec![0; len as usize];
+

+
        reader.read_exact(&mut bytes)?;
+

+
        let string = String::from_utf8(bytes)?;
+

+
        Ok(string)
+
    }
+
}
+

+
impl Decode for git::Url {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let url = String::decode(reader)?;
+
        let url = Self::from_bytes(url.as_bytes())
+
            .map_err(|error| Error::InvalidGitUrl { url, error })?;
+

+
        Ok(url)
+
    }
+
}
+

+
impl Decode for Id {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let digest: Digest = Decode::decode(reader)?;
+

+
        Ok(Self::from(digest))
+
    }
+
}
+

+
impl Decode for Digest {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let bytes: [u8; 32] = Decode::decode(reader)?;
+

+
        Ok(Self::from(bytes))
+
    }
+
}
+

+
impl Encode for filter::Filter {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.deref().as_bytes().encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl Decode for filter::Filter {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let size: Size = Decode::decode(reader)?;
+
        if size as usize != filter::FILTER_SIZE {
+
            return Err(Error::InvalidFilterSize(size as usize));
+
        }
+
        let bytes: [u8; filter::FILTER_SIZE] = Decode::decode(reader)?;
+
        let bf = filter::BloomFilter::from(Vec::from(bytes));
+

+
        debug_assert_eq!(bf.hashes(), filter::FILTER_HASHES);
+

+
        Ok(Self::from(bf))
+
    }
+
}
+

+
#[derive(Debug)]
+
pub struct Wire<S, T, G> {
+
    inboxes: HashMap<IpAddr, Decoder>,
+
    inner: service::Service<S, T, G>,
+
}
+

+
impl<S, T, G> Wire<S, T, G> {
+
    pub fn new(inner: service::Service<S, T, G>) -> Self {
+
        Self {
+
            inboxes: HashMap::new(),
+
            inner,
+
        }
+
    }
+
}
+

+
impl<'r, S, T, G> Wire<S, T, G>
+
where
+
    S: address_book::Store,
+
    T: WriteStorage<'r> + 'static,
+
    G: Signer,
+
{
+
    pub fn connected(
+
        &mut self,
+
        addr: std::net::SocketAddr,
+
        local_addr: &std::net::SocketAddr,
+
        link: Link,
+
    ) {
+
        self.inboxes.insert(addr.ip(), Decoder::new(256));
+
        self.inner.connected(addr, local_addr, link)
+
    }
+

+
    pub fn disconnected(
+
        &mut self,
+
        addr: &std::net::SocketAddr,
+
        reason: nakamoto::DisconnectReason<service::DisconnectReason>,
+
    ) {
+
        self.inboxes.remove(&addr.ip());
+
        self.inner.disconnected(addr, reason)
+
    }
+

+
    pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
+
        let peer_ip = addr.ip();
+

+
        if let Some(inbox) = self.inboxes.get_mut(&peer_ip) {
+
            inbox.input(bytes);
+

+
            loop {
+
                match inbox.decode_next() {
+
                    Ok(Some(msg)) => self.inner.received_message(addr, msg),
+
                    Ok(None) => break,
+

+
                    Err(err) => {
+
                        // TODO: Disconnect peer.
+
                        log::error!("Invalid message received from {}: {}", peer_ip, err);
+

+
                        return;
+
                    }
+
                }
+
            }
+
        } else {
+
            log::debug!("Received message from unknown peer {}", peer_ip);
+
        }
+
    }
+
}
+

+
impl<S, T, G> Iterator for Wire<S, T, G> {
+
    type Item = nakamoto::Io<service::Event, service::DisconnectReason>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        match self.inner.next() {
+
            Some(service::Io::Write(addr, msgs)) => {
+
                let mut buf = Vec::new();
+
                for msg in msgs {
+
                    log::debug!("Write {:?} to {}", &msg, addr.ip());
+

+
                    msg.encode(&mut buf)
+
                        .expect("writing to an in-memory buffer doesn't fail");
+
                }
+
                Some(nakamoto::Io::Write(addr, buf))
+
            }
+
            Some(service::Io::Event(e)) => Some(nakamoto::Io::Event(e)),
+
            Some(service::Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
+
            Some(service::Io::Disconnect(a, r)) => Some(nakamoto::Io::Disconnect(a, r)),
+
            Some(service::Io::Wakeup(d)) => Some(nakamoto::Io::Wakeup(d)),
+

+
            None => None,
+
        }
+
    }
+
}
+

+
impl<S, T, G> Deref for Wire<S, T, G> {
+
    type Target = service::Service<S, T, G>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.inner
+
    }
+
}
+

+
impl<S, T, G> DerefMut for Wire<S, T, G> {
+
    fn deref_mut(&mut self) -> &mut Self::Target {
+
        &mut self.inner
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+
    use quickcheck_macros::quickcheck;
+

+
    use crate::crypto::Unverified;
+
    use crate::storage::refs::SignedRefs;
+
    use crate::test::arbitrary;
+

+
    #[quickcheck]
+
    fn prop_u8(input: u8) {
+
        assert_eq!(deserialize::<u8>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_u16(input: u16) {
+
        assert_eq!(deserialize::<u16>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_u32(input: u32) {
+
        assert_eq!(deserialize::<u32>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_u64(input: u64) {
+
        assert_eq!(deserialize::<u64>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_usize(input: usize) -> quickcheck::TestResult {
+
        if input > u32::MAX as usize {
+
            return quickcheck::TestResult::discard();
+
        }
+
        assert_eq!(deserialize::<usize>(&serialize(&input)).unwrap(), input);
+

+
        quickcheck::TestResult::passed()
+
    }
+

+
    #[quickcheck]
+
    fn prop_string(input: String) -> quickcheck::TestResult {
+
        if input.len() > u8::MAX as usize {
+
            return quickcheck::TestResult::discard();
+
        }
+
        assert_eq!(deserialize::<String>(&serialize(&input)).unwrap(), input);
+

+
        quickcheck::TestResult::passed()
+
    }
+

+
    #[quickcheck]
+
    fn prop_vec(input: Vec<String>) {
+
        assert_eq!(
+
            deserialize::<Vec<String>>(&serialize(&input.as_slice())).unwrap(),
+
            input
+
        );
+
    }
+

+
    #[quickcheck]
+
    fn prop_pubkey(input: PublicKey) {
+
        assert_eq!(deserialize::<PublicKey>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_id(input: Id) {
+
        assert_eq!(deserialize::<Id>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_digest(input: Digest) {
+
        assert_eq!(deserialize::<Digest>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_refs(input: Refs) {
+
        assert_eq!(deserialize::<Refs>(&serialize(&input)).unwrap(), input);
+
    }
+

+
    #[quickcheck]
+
    fn prop_signature(input: arbitrary::ByteArray<64>) {
+
        let signature = Signature::from(input.into_inner());
+

+
        assert_eq!(
+
            deserialize::<Signature>(&serialize(&signature)).unwrap(),
+
            signature
+
        );
+
    }
+

+
    #[quickcheck]
+
    fn prop_oid(input: arbitrary::ByteArray<20>) {
+
        let oid = git::Oid::try_from(input.into_inner().as_slice()).unwrap();
+

+
        assert_eq!(deserialize::<git::Oid>(&serialize(&oid)).unwrap(), oid);
+
    }
+

+
    #[quickcheck]
+
    fn prop_signed_refs(input: SignedRefs<Unverified>) {
+
        assert_eq!(
+
            deserialize::<SignedRefs<Unverified>>(&serialize(&input)).unwrap(),
+
            input
+
        );
+
    }
+

+
    #[test]
+
    fn test_string() {
+
        assert_eq!(
+
            serialize(&String::from("hello")),
+
            vec![5, b'h', b'e', b'l', b'l', b'o']
+
        );
+
    }
+

+
    #[test]
+
    fn test_git_url() {
+
        let url = git::Url {
+
            scheme: git::url::Scheme::Https,
+
            path: "/git".to_owned().into(),
+
            host: Some("seed.radicle.xyz".to_owned()),
+
            port: Some(8888),
+
            ..git::Url::default()
+
        };
+
        assert_eq!(deserialize::<git::Url>(&serialize(&url)).unwrap(), url);
+
    }
+
}
added node/src/wire/message.rs
@@ -0,0 +1,377 @@
+
use std::{io, net};
+

+
use byteorder::{NetworkEndian, ReadBytesExt};
+

+
use crate::git;
+
use crate::prelude::*;
+
use crate::service::message::*;
+
use crate::wire;
+

+
/// Message type.
+
#[repr(u16)]
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum MessageType {
+
    Initialize = 0,
+
    NodeAnnouncement = 2,
+
    InventoryAnnouncement = 4,
+
    RefsAnnouncement = 6,
+
    Subscribe = 8,
+
}
+

+
impl From<MessageType> for u16 {
+
    fn from(other: MessageType) -> Self {
+
        other as u16
+
    }
+
}
+

+
impl TryFrom<u16> for MessageType {
+
    type Error = u16;
+

+
    fn try_from(other: u16) -> Result<Self, Self::Error> {
+
        match other {
+
            0 => Ok(MessageType::Initialize),
+
            2 => Ok(MessageType::NodeAnnouncement),
+
            4 => Ok(MessageType::InventoryAnnouncement),
+
            6 => Ok(MessageType::RefsAnnouncement),
+
            8 => Ok(MessageType::Subscribe),
+
            _ => Err(other),
+
        }
+
    }
+
}
+

+
impl Message {
+
    pub fn type_id(&self) -> u16 {
+
        match self {
+
            Self::Initialize { .. } => MessageType::Initialize,
+
            Self::Subscribe { .. } => MessageType::Subscribe,
+
            Self::NodeAnnouncement { .. } => MessageType::NodeAnnouncement,
+
            Self::InventoryAnnouncement { .. } => MessageType::InventoryAnnouncement,
+
            Self::RefsAnnouncement { .. } => MessageType::RefsAnnouncement,
+
        }
+
        .into()
+
    }
+
}
+

+
/// Address type.
+
#[repr(u8)]
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum AddressType {
+
    Ipv4 = 1,
+
    Ipv6 = 2,
+
    Hostname = 3,
+
    Onion = 4,
+
}
+

+
impl From<AddressType> for u8 {
+
    fn from(other: AddressType) -> Self {
+
        other as u8
+
    }
+
}
+

+
impl TryFrom<u8> for AddressType {
+
    type Error = u8;
+

+
    fn try_from(other: u8) -> Result<Self, Self::Error> {
+
        match other {
+
            1 => Ok(AddressType::Ipv4),
+
            2 => Ok(AddressType::Ipv6),
+
            3 => Ok(AddressType::Hostname),
+
            4 => Ok(AddressType::Onion),
+
            _ => Err(other),
+
        }
+
    }
+
}
+

+
impl wire::Encode for RefsAnnouncement {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.id.encode(writer)?;
+
        n += self.refs.encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for RefsAnnouncement {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let id = Id::decode(reader)?;
+
        let refs = Refs::decode(reader)?;
+

+
        Ok(Self { id, refs })
+
    }
+
}
+

+
impl wire::Encode for InventoryAnnouncement {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.inventory.as_slice().encode(writer)?;
+
        n += self.timestamp.encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for InventoryAnnouncement {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let inventory = Vec::<Id>::decode(reader)?;
+
        let timestamp = Timestamp::decode(reader)?;
+

+
        Ok(Self {
+
            inventory,
+
            timestamp,
+
        })
+
    }
+
}
+

+
impl wire::Encode for Message {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        let mut n = self.type_id().encode(writer)?;
+

+
        match self {
+
            Self::Initialize {
+
                id,
+
                timestamp,
+
                version,
+
                addrs,
+
                git,
+
            } => {
+
                n += id.encode(writer)?;
+
                n += timestamp.encode(writer)?;
+
                n += version.encode(writer)?;
+
                n += addrs.as_slice().encode(writer)?;
+
                n += git.encode(writer)?;
+
            }
+
            Self::Subscribe(Subscribe {
+
                filter,
+
                since,
+
                until,
+
            }) => {
+
                n += filter.encode(writer)?;
+
                n += since.encode(writer)?;
+
                n += until.encode(writer)?;
+
            }
+
            Self::RefsAnnouncement {
+
                node,
+
                message,
+
                signature,
+
            } => {
+
                n += node.encode(writer)?;
+
                n += message.encode(writer)?;
+
                n += signature.encode(writer)?;
+
            }
+
            Self::InventoryAnnouncement {
+
                node,
+
                message,
+
                signature,
+
            } => {
+
                n += node.encode(writer)?;
+
                n += message.encode(writer)?;
+
                n += signature.encode(writer)?;
+
            }
+
            Self::NodeAnnouncement {
+
                node,
+
                message,
+
                signature,
+
            } => {
+
                n += node.encode(writer)?;
+
                n += message.encode(writer)?;
+
                n += signature.encode(writer)?;
+
            }
+
        }
+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for Message {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let type_id = reader.read_u16::<NetworkEndian>()?;
+

+
        match MessageType::try_from(type_id) {
+
            Ok(MessageType::Initialize) => {
+
                let id = NodeId::decode(reader)?;
+
                let timestamp = Timestamp::decode(reader)?;
+
                let version = u32::decode(reader)?;
+
                let addrs = Vec::<Address>::decode(reader)?;
+
                let git = git::Url::decode(reader)?;
+

+
                Ok(Self::Initialize {
+
                    id,
+
                    timestamp,
+
                    version,
+
                    addrs,
+
                    git,
+
                })
+
            }
+
            Ok(MessageType::Subscribe) => {
+
                let filter = Filter::decode(reader)?;
+
                let since = Timestamp::decode(reader)?;
+
                let until = Timestamp::decode(reader)?;
+

+
                Ok(Self::Subscribe(Subscribe {
+
                    filter,
+
                    since,
+
                    until,
+
                }))
+
            }
+
            Ok(MessageType::NodeAnnouncement) => {
+
                let node = NodeId::decode(reader)?;
+
                let message = NodeAnnouncement::decode(reader)?;
+
                let signature = Signature::decode(reader)?;
+

+
                Ok(Self::NodeAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                })
+
            }
+
            Ok(MessageType::InventoryAnnouncement) => {
+
                let node = NodeId::decode(reader)?;
+
                let message = InventoryAnnouncement::decode(reader)?;
+
                let signature = Signature::decode(reader)?;
+

+
                Ok(Self::InventoryAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                })
+
            }
+
            Ok(MessageType::RefsAnnouncement) => {
+
                let node = NodeId::decode(reader)?;
+
                let message = RefsAnnouncement::decode(reader)?;
+
                let signature = Signature::decode(reader)?;
+

+
                Ok(Self::RefsAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                })
+
            }
+
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
+
        }
+
    }
+
}
+

+
impl wire::Encode for Envelope {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        let mut n = 0;
+

+
        n += self.magic.encode(writer)?;
+
        n += self.msg.encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for Envelope {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let magic = u32::decode(reader)?;
+
        let msg = Message::decode(reader)?;
+

+
        Ok(Self { magic, msg })
+
    }
+
}
+

+
impl wire::Encode for Address {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        let mut n = 0;
+

+
        match self {
+
            Self::Ipv4 { ip, port } => {
+
                n += u8::from(AddressType::Ipv4).encode(writer)?;
+
                n += ip.octets().encode(writer)?;
+
                n += port.encode(writer)?;
+
            }
+
            Self::Ipv6 { ip, port } => {
+
                n += u8::from(AddressType::Ipv6).encode(writer)?;
+
                n += ip.octets().encode(writer)?;
+
                n += port.encode(writer)?;
+
            }
+
            Self::Hostname { .. } => todo!(),
+
            Self::Onion { .. } => todo!(),
+
        }
+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for Address {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let addrtype = reader.read_u8()?;
+

+
        match AddressType::try_from(addrtype) {
+
            Ok(AddressType::Ipv4) => {
+
                let octets: [u8; 4] = wire::Decode::decode(reader)?;
+
                let ip = net::Ipv4Addr::from(octets);
+
                let port = u16::decode(reader)?;
+

+
                Ok(Self::Ipv4 { ip, port })
+
            }
+
            Ok(AddressType::Ipv6) => {
+
                let octets: [u8; 16] = wire::Decode::decode(reader)?;
+
                let ip = net::Ipv6Addr::from(octets);
+
                let port = u16::decode(reader)?;
+

+
                Ok(Self::Ipv6 { ip, port })
+
            }
+
            Ok(AddressType::Hostname) => {
+
                todo!();
+
            }
+
            Ok(AddressType::Onion) => {
+
                todo!();
+
            }
+
            Err(other) => Err(wire::Error::UnknownAddressType(other)),
+
        }
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+
    use quickcheck_macros::quickcheck;
+

+
    use crate::decoder::Decoder;
+
    use crate::wire::{self, Encode};
+

+
    #[quickcheck]
+
    fn prop_message_encode_decode(message: Message) {
+
        assert_eq!(
+
            wire::deserialize::<Message>(&wire::serialize(&message)).unwrap(),
+
            message
+
        );
+
    }
+

+
    #[quickcheck]
+
    fn prop_envelope_encode_decode(envelope: Envelope) {
+
        assert_eq!(
+
            wire::deserialize::<Envelope>(&wire::serialize(&envelope)).unwrap(),
+
            envelope
+
        );
+
    }
+

+
    #[test]
+
    fn prop_envelope_decoder() {
+
        fn property(items: Vec<Envelope>) {
+
            let mut decoder = Decoder::<Envelope>::new(8);
+

+
            for item in &items {
+
                item.encode(&mut decoder).unwrap();
+
            }
+
            for item in items {
+
                assert_eq!(decoder.next().unwrap().unwrap(), item);
+
            }
+
        }
+

+
        quickcheck::QuickCheck::new()
+
            .gen(quickcheck::Gen::new(16))
+
            .quickcheck(property as fn(items: Vec<Envelope>));
+
    }
+

+
    #[quickcheck]
+
    fn prop_addr(addr: Address) {
+
        assert_eq!(
+
            wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
+
            addr
+
        );
+
    }
+
}