Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Define naive message vector limits
Slack Coder committed 3 years ago
commit fe60305455ad490ed3f1978e5e4db8fb4b78e8c8
parent 3a2584b5443a9d6b34e133ac378b8521f77f62fa
11 files changed +272 -40
added radicle-node/src/bounded.rs
@@ -0,0 +1,86 @@
+
use std::ops;
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error("invalid size: expected {expected}, got {actual}")]
+
    InvalidSize { expected: usize, actual: usize },
+
}
+

+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
+
pub struct BoundedVec<T, const N: usize> {
+
    v: Vec<T>,
+
}
+

+
impl<T, const N: usize> BoundedVec<T, N> {
+
    pub fn new() -> Self {
+
        BoundedVec { v: Vec::new() }
+
    }
+

+
    pub fn truncate(mut v: Vec<T>) -> Self {
+
        v.truncate(N);
+
        BoundedVec { v }
+
    }
+

+
    pub fn max() -> usize {
+
        N
+
    }
+

+
    pub fn push(&mut self, item: T) -> Result<(), Error> {
+
        if self.len() >= N {
+
            return Err(Error::InvalidSize {
+
                expected: N,
+
                actual: N + 1,
+
            });
+
        }
+
        self.v.push(item);
+
        Ok(())
+
    }
+

+
    pub fn unbound(self) -> Vec<T> {
+
        self.v
+
    }
+

+
    pub fn with_capacity(capacity: usize) -> Result<Self, Error> {
+
        if capacity > N {
+
            return Err(Error::InvalidSize {
+
                expected: N,
+
                actual: capacity,
+
            });
+
        }
+
        Ok(Self {
+
            v: Vec::with_capacity(capacity),
+
        })
+
    }
+
}
+

+
impl<T, const N: usize> ops::Deref for BoundedVec<T, N> {
+
    type Target = Vec<T>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.v
+
    }
+
}
+

+
impl<T, const N: usize> From<Option<T>> for BoundedVec<T, N> {
+
    fn from(value: Option<T>) -> Self {
+
        let v = match value {
+
            None => vec![],
+
            Some(v) => vec![v],
+
        };
+
        BoundedVec { v }
+
    }
+
}
+

+
impl<T, const N: usize> TryFrom<Vec<T>> for BoundedVec<T, N> {
+
    type Error = Error;
+

+
    fn try_from(value: Vec<T>) -> Result<Self, Self::Error> {
+
        if value.len() > N {
+
            return Err(Error::InvalidSize {
+
                expected: N,
+
                actual: value.len(),
+
            });
+
        }
+
        Ok(BoundedVec { v: value })
+
    }
+
}
modified radicle-node/src/lib.rs
@@ -1,4 +1,5 @@
pub mod address;
+
pub mod bounded;
pub mod client;
pub mod clock;
pub mod control;
@@ -16,6 +17,7 @@ pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
pub use radicle::{collections, crypto, git, identity, node, profile, rad, storage};

pub mod prelude {
+
    pub use crate::bounded::BoundedVec;
    pub use crate::clock::Timestamp;
    pub use crate::crypto::hash::Digest;
    pub use crate::crypto::{PublicKey, Signature, Signer};
modified radicle-node/src/main.rs
@@ -21,7 +21,7 @@ struct Options {
}

impl Options {
-
    fn from_env() -> Result<Self, lexopt::Error> {
+
    fn from_env() -> Result<Self, anyhow::Error> {
        use lexopt::prelude::*;

        let mut parser = lexopt::Parser::from_env();
@@ -55,9 +55,17 @@ impl Options {
                    println!("usage: radicle-node [--connect <addr>]..");
                    process::exit(0);
                }
-
                _ => return Err(arg.unexpected()),
+
                _ => anyhow::bail!(arg.unexpected()),
            }
        }
+

+
        if external_addresses.len() > service::ADDRESS_LIMIT {
+
            anyhow::bail!(
+
                "external address limit ({}) exceeded",
+
                service::ADDRESS_LIMIT,
+
            )
+
        }
+

        Ok(Self {
            connect,
            external_addresses,
modified radicle-node/src/service.rs
@@ -29,6 +29,7 @@ use crate::crypto::{Signer, Verified};
use crate::git;
use crate::identity::{Doc, Id};
use crate::node;
+
use crate::prelude::*;
use crate::service::config::ProjectTracking;
use crate::service::message::{Address, Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
@@ -67,6 +68,11 @@ pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Maximum attempts to connect to a peer before we give up.
pub const MAX_CONNECTION_ATTEMPTS: usize = 3;

+
/// Maximum external address limit imposed by message size limits.
+
pub use message::ADDRESS_LIMIT;
+
/// Maximum inventory limit imposed by message size limits.
+
pub use message::INVENTORY_LIMIT;
+

/// A service event.
#[derive(Debug, Clone)]
pub enum Event {
@@ -639,9 +645,11 @@ where
                    return Ok(false);
                }

-
                if let Err(err) =
-
                    self.process_inventory(&message.inventory, *announcer, &message.timestamp)
-
                {
+
                if let Err(err) = self.process_inventory(
+
                    message.inventory.as_slice(),
+
                    *announcer,
+
                    &message.timestamp,
+
                ) {
                    error!("Error processing inventory from {}: {}", announcer, err);

                    if let Error::Fetch(storage::FetchError::Verify(err)) = err {
@@ -797,7 +805,7 @@ where
                peer.state = session::State::Negotiated {
                    id,
                    since: self.clock.local_time(),
-
                    addrs,
+
                    addrs: addrs.unbound(),
                    ping: Default::default(),
                };
            }
@@ -875,7 +883,7 @@ where
    /// Process a peer inventory announcement by updating our routing table.
    fn process_inventory(
        &mut self,
-
        inventory: &Vec<Id>,
+
        inventory: &[Id],
        from: NodeId,
        timestamp: &Timestamp,
    ) -> Result<(), Error> {
@@ -1269,7 +1277,14 @@ mod gossip {
        };

        let mut msgs = vec![
-
            Message::init(*signer.public_key(), config.external_addresses.clone()),
+
            Message::init(
+
                *signer.public_key(),
+
                config
+
                    .external_addresses
+
                    .clone()
+
                    .try_into()
+
                    .expect("external addresses are within the limit"),
+
            ),
            Message::inventory(gossip::inventory(timestamp, inventory), signer),
            Message::subscribe(config.filter(), timestamp, Timestamp::MAX),
        ];
@@ -1283,7 +1298,11 @@ mod gossip {
    pub fn node(timestamp: Timestamp, config: &Config) -> Option<NodeAnnouncement> {
        let features = node::Features::SEED;
        let alias = config.alias();
-
        let addresses = config.external_addresses.clone();
+
        let addresses: BoundedVec<_, ADDRESS_LIMIT> = config
+
            .external_addresses
+
            .clone()
+
            .try_into()
+
            .expect("external addresses are within the limit");

        if addresses.is_empty() {
            return None;
@@ -1302,8 +1321,17 @@ mod gossip {
    }

    pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement {
+
        type Inventory = BoundedVec<Id, INVENTORY_LIMIT>;
+

+
        if inventory.len() > Inventory::max() {
+
            log::error!(
+
                "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects",
+
                inventory.len()
+
            );
+
        }
+

        InventoryAnnouncement {
-
            inventory,
+
            inventory: BoundedVec::truncate(inventory),
            timestamp,
        }
    }
modified radicle-node/src/service/config.rs
@@ -85,7 +85,7 @@ impl Default for Config {
    fn default() -> Self {
        Self {
            connect: Vec::default(),
-
            external_addresses: vec![],
+
            external_addresses: Vec::default(),
            network: Network::default(),
            project_tracking: ProjectTracking::default(),
            remote_tracking: RemoteTracking::default(),
modified radicle-node/src/service/message.rs
@@ -6,11 +6,17 @@ use thiserror::Error;
use crate::crypto;
use crate::identity::Id;
use crate::node;
+
use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{NodeId, Timestamp, PROTOCOL_VERSION};
use crate::storage::refs::Refs;
use crate::wire;

+
/// Maximum number of addresses which can be announced to other nodes.
+
pub const ADDRESS_LIMIT: usize = 16;
+
/// Maximum number of inventory which can be announced to other nodes.
+
pub const INVENTORY_LIMIT: usize = 2973;
+

#[derive(Debug, Clone, PartialEq, Eq)]
// TODO: We should check the length and charset when deserializing.
pub struct Hostname(String);
@@ -132,7 +138,7 @@ pub struct NodeAnnouncement {
    /// Non-unique alias. Must be valid UTF-8.
    pub alias: [u8; 32],
    /// Announced addresses.
-
    pub addresses: Vec<Address>,
+
    pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
    /// Nonce used for announcement proof-of-work.
    pub nonce: u64,
}
@@ -188,7 +194,7 @@ impl wire::Encode for NodeAnnouncement {
        n += self.features.encode(writer)?;
        n += self.timestamp.encode(writer)?;
        n += self.alias.encode(writer)?;
-
        n += self.addresses.as_slice().encode(writer)?;
+
        n += self.addresses.encode(writer)?;
        n += self.nonce.encode(writer)?;

        Ok(n)
@@ -200,7 +206,7 @@ impl wire::Decode for NodeAnnouncement {
        let features = node::Features::decode(reader)?;
        let timestamp = Timestamp::decode(reader)?;
        let alias = wire::Decode::decode(reader)?;
-
        let addresses = Vec::<Address>::decode(reader)?;
+
        let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(reader)?;
        let nonce = u64::decode(reader)?;

        Ok(Self {
@@ -229,7 +235,7 @@ pub struct RefsAnnouncement {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InventoryAnnouncement {
    /// Node inventory.
-
    pub inventory: Vec<Id>,
+
    pub inventory: BoundedVec<Id, INVENTORY_LIMIT>,
    /// Time of announcement.
    pub timestamp: Timestamp,
}
@@ -362,7 +368,7 @@ pub enum Message {
        // TODO: This is currently untrusted.
        id: NodeId,
        version: u32,
-
        addrs: Vec<Address>,
+
        addrs: BoundedVec<Address, ADDRESS_LIMIT>,
    },

    /// Subscribe to gossip messages matching the filter and time range.
@@ -386,7 +392,7 @@ pub enum Message {
}

impl Message {
-
    pub fn init(id: NodeId, addrs: Vec<Address>) -> Self {
+
    pub fn init(id: NodeId, addrs: BoundedVec<Address, ADDRESS_LIMIT>) -> Self {
        Self::Initialize {
            id,
            version: PROTOCOL_VERSION,
@@ -497,9 +503,41 @@ impl ZeroBytes {
#[cfg(test)]
mod tests {
    use super::*;
-
    use quickcheck_macros::quickcheck;
+
    use crate::prelude::*;
+
    use crate::wire::Encode;

    use crate::crypto::test::signer::MockSigner;
+
    use crate::test::arbitrary;
+
    use quickcheck_macros::quickcheck;
+

+
    #[test]
+
    fn test_inventory_limit() {
+
        let msg = Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: arbitrary::vec(INVENTORY_LIMIT)
+
                    .try_into()
+
                    .expect("size within bounds limit"),
+
                timestamp: LocalTime::now().as_secs(),
+
            },
+
            &MockSigner::default(),
+
        );
+
        let mut buf: Vec<u8> = Vec::new();
+
        assert!(
+
            msg.encode(&mut buf).is_ok(),
+
            "INVENTORY_LIMIT is a valid limit for encoding",
+
        );
+

+
        let decoded = wire::deserialize(buf.as_slice());
+
        assert!(
+
            decoded.is_ok(),
+
            "INVENTORY_LIMIT is a valid limit for decoding"
+
        );
+
        assert_eq!(
+
            msg,
+
            decoded.unwrap(),
+
            "encoding and decoding should be safe for message at INVENTORY_LIMIT",
+
        );
+
    }

    #[quickcheck]
    fn prop_refs_announcement_signing(id: Id, refs: Refs) {
@@ -521,7 +559,7 @@ mod tests {
            features: node::Features::SEED,
            timestamp: 42491841,
            alias: [0; 32],
-
            addresses: vec![],
+
            addresses: BoundedVec::new(),
            nonce: 0,
        };

modified radicle-node/src/test/arbitrary.rs
@@ -4,7 +4,7 @@ use bloomy::BloomFilter;
use quickcheck::Arbitrary;

use crate::crypto;
-
use crate::prelude::{Id, NodeId, Refs, Timestamp};
+
use crate::prelude::{BoundedVec, Id, NodeId, Refs, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
use crate::service::message::{
    Address, Announcement, InventoryAnnouncement, Message, NodeAnnouncement, Ping,
@@ -45,7 +45,7 @@ impl Arbitrary for Message {
            MessageType::InventoryAnnouncement => Announcement {
                node: NodeId::arbitrary(g),
                message: InventoryAnnouncement {
-
                    inventory: Vec::<Id>::arbitrary(g),
+
                    inventory: BoundedVec::arbitrary(g),
                    timestamp: Timestamp::arbitrary(g),
                }
                .into(),
@@ -123,3 +123,14 @@ impl Arbitrary for ZeroBytes {
        ZeroBytes::new(u16::arbitrary(g))
    }
}
+

+
impl<T, const N: usize> Arbitrary for BoundedVec<T, N>
+
where
+
    T: Arbitrary + Eq,
+
{
+
    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
+
        let mut v: Vec<T> = Arbitrary::arbitrary(g);
+
        v.truncate(N);
+
        v.try_into().expect("size within bounds")
+
    }
+
}
modified radicle-node/src/test/peer.rs
@@ -185,7 +185,8 @@ where
                features: node::Features::SEED,
                timestamp: self.timestamp(),
                alias,
-
                addresses: vec![net::SocketAddr::from((self.ip, service::DEFAULT_PORT)).into()],
+
                addresses: Some(net::SocketAddr::from((self.ip, service::DEFAULT_PORT)).into())
+
                    .into(),
                nonce: 0,
            }
            .solve(),
@@ -214,7 +215,7 @@ where
        self.service.connected(remote, Link::Inbound);
        self.receive(
            &remote,
-
            Message::init(peer.node_id(), vec![Address::from(remote)]),
+
            Message::init(peer.node_id(), Some(Address::from(remote)).into()),
        );

        let mut msgs = self.messages(&remote);
@@ -257,7 +258,14 @@ where

        self.receive(
            &remote,
-
            Message::init(peer.node_id(), peer.config().listen.clone()),
+
            Message::init(
+
                peer.node_id(),
+
                peer.config()
+
                    .listen
+
                    .clone()
+
                    .try_into()
+
                    .expect("within bound limits"),
+
            ),
        );
    }

modified radicle-node/src/tests.rs
@@ -8,6 +8,7 @@ use crate::address;
use crate::collections::{HashMap, HashSet};
use crate::crypto::test::signer::MockSigner;
use crate::identity::Id;
+
use crate::prelude::*;
use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
@@ -27,6 +28,8 @@ use crate::test::peer::Peer;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};
use crate::test::storage::MockStorage;
+
use crate::wire::Decode;
+
use crate::wire::Encode;
use crate::LocalTime;
use crate::{client, git, identity, rad, service, test};

@@ -39,6 +42,20 @@ use crate::{client, git, identity, rad, service, test};
// You may then run the test with eg. `cargo test -- --nocapture` to always show output.

#[test]
+
fn test_inventory_decode() {
+
    let inventory: Vec<Id> = arbitrary::gen(300);
+
    let timestamp = LocalTime::now().as_secs();
+

+
    let mut buf = Vec::new();
+
    inventory.as_slice().encode(&mut buf).unwrap();
+
    timestamp.encode(&mut buf).unwrap();
+

+
    let m = InventoryAnnouncement::decode(&mut buf.as_slice()).expect("message decodes");
+
    assert_eq!(inventory.as_slice(), m.inventory.as_slice());
+
    assert_eq!(timestamp, m.timestamp);
+
}
+

+
#[test]
fn test_ping_response() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8], MockStorage::empty());
    let bob = Peer::new("bob", [9, 9, 9, 9], MockStorage::empty());
@@ -215,7 +232,7 @@ fn test_inventory_sync() {
        &bob.addr(),
        Message::inventory(
            InventoryAnnouncement {
-
                inventory: projs.clone(),
+
                inventory: projs.clone().try_into().unwrap(),
                timestamp: now,
            },
            bob.signer(),
@@ -303,7 +320,7 @@ fn test_inventory_pruning() {
                &bob.addr(),
                Message::inventory(
                    InventoryAnnouncement {
-
                        inventory: test::arbitrary::vec::<Id>(num_projs),
+
                        inventory: test::arbitrary::vec::<Id>(num_projs).try_into().unwrap(),
                        timestamp: bob.clock().timestamp(),
                    },
                    &MockSigner::default(),
@@ -369,7 +386,7 @@ fn test_inventory_relay_bad_timestamp() {
        &bob.addr(),
        Message::inventory(
            InventoryAnnouncement {
-
                inventory: vec![],
+
                inventory: BoundedVec::new(),
                timestamp,
            },
            bob.signer(),
@@ -598,7 +615,7 @@ fn test_inventory_relay() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
-
    let inv = vec![];
+
    let inv = BoundedVec::new();
    let now = LocalTime::now().as_secs();

    // Inventory from Bob relayed to Eve.
modified radicle-node/src/wire.rs
@@ -20,6 +20,7 @@ use crate::git;
use crate::git::fmt;
use crate::identity::Id;
use crate::node;
+
use crate::prelude::*;
use crate::service;
use crate::service::reactor::Io;
use crate::service::{filter, routing, session};
@@ -161,6 +162,15 @@ where
    }
}

+
impl<T, const N: usize> Encode for BoundedVec<T, N>
+
where
+
    T: Encode,
+
{
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.as_slice().encode(writer)
+
    }
+
}
+

impl Encode for &str {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        assert!(self.len() <= u8::MAX as usize);
@@ -318,19 +328,22 @@ impl<const N: usize> Decode for [u8; N] {
    }
}

-
impl<T> Decode for Vec<T>
+
impl<T, const N: usize> Decode for BoundedVec<T, N>
where
    T: Decode,
{
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
-
        let len: Size = Size::decode(reader)?;
-
        let mut vec = Vec::with_capacity(len as usize);
+
        let len: usize = Size::decode(reader)? as usize;
+
        let mut items = Self::with_capacity(len).map_err(|_| Error::InvalidSize {
+
            expected: Self::max(),
+
            actual: len,
+
        })?;

-
        for _ in 0..len {
+
        for _ in 0..items.capacity() {
            let item = T::decode(reader)?;
-
            vec.push(item);
+
            items.push(item).ok();
        }
-
        Ok(vec)
+
        Ok(items)
    }
}

@@ -652,9 +665,9 @@ mod tests {
    }

    #[quickcheck]
-
    fn prop_vec(input: Vec<String>) {
+
    fn prop_vec(input: BoundedVec<String, 16>) {
        assert_eq!(
-
            deserialize::<Vec<String>>(&serialize(&input.as_slice())).unwrap(),
+
            deserialize::<BoundedVec<String, 16>>(&serialize(&input.as_slice())).unwrap(),
            input
        );
    }
@@ -731,4 +744,24 @@ mod tests {
            Error::InvalidFilterSize(_)
        );
    }
+

+
    #[test]
+
    fn test_bounded_vec_limit() {
+
        let v: BoundedVec<u8, 2> = vec![1, 2].try_into().unwrap();
+
        let buf = serialize(&v);
+

+
        assert_matches!(
+
            deserialize::<BoundedVec<u8, 1>>(&buf),
+
            Err(Error::InvalidSize {
+
                expected: 1,
+
                actual: 2
+
            }),
+
            "fail when vector is too small for buffer",
+
        );
+

+
        assert!(
+
            deserialize::<BoundedVec<u8, 2>>(&buf).is_ok(),
+
            "successfully decode vector of same size",
+
        );
+
    }
}
modified radicle-node/src/wire/message.rs
@@ -3,6 +3,7 @@ use std::{io, mem, net};
use byteorder::{NetworkEndian, ReadBytesExt};

use crate::prelude::*;
+
use crate::service;
use crate::service::message::*;
use crate::wire;

@@ -144,7 +145,7 @@ impl wire::Encode for InventoryAnnouncement {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        let mut n = 0;

-
        n += self.inventory.as_slice().encode(writer)?;
+
        n += self.inventory.encode(writer)?;
        n += self.timestamp.encode(writer)?;

        Ok(n)
@@ -153,7 +154,7 @@ impl wire::Encode for InventoryAnnouncement {

impl wire::Decode for InventoryAnnouncement {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let inventory = Vec::<Id>::decode(reader)?;
+
        let inventory = BoundedVec::decode(reader)?;
        let timestamp = Timestamp::decode(reader)?;

        Ok(Self {
@@ -171,7 +172,7 @@ impl wire::Encode for Message {
            Self::Initialize { id, version, addrs } => {
                n += id.encode(writer)?;
                n += version.encode(writer)?;
-
                n += addrs.as_slice().encode(writer)?;
+
                n += addrs.encode(writer)?;
            }
            Self::Subscribe(Subscribe {
                filter,
@@ -218,7 +219,7 @@ impl wire::Decode for Message {
            Ok(MessageType::Initialize) => {
                let id = NodeId::decode(reader)?;
                let version = u32::decode(reader)?;
-
                let addrs = Vec::<Address>::decode(reader)?;
+
                let addrs = BoundedVec::<Address, { service::ADDRESS_LIMIT }>::decode(reader)?;

                Ok(Self::Initialize { id, version, addrs })
            }