Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: A few potential fixes for memory usage
Merged did:key:z6MksFqX...wzpT opened 1 year ago
  • node: Make sure we bound peer message inboxes
  • node: Re-use inventory message timestamps
15 files changed +376 -182 e8f2f88b 6ab3bfcb
modified radicle-node/src/bounded.rs
@@ -1,4 +1,7 @@
-
use std::{collections::BTreeSet, ops};
+
use std::{
+
    collections::BTreeSet,
+
    ops::{self, RangeBounds},
+
};

#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -155,6 +158,26 @@ impl<T, const N: usize> BoundedVec<T, N> {
    pub fn unbound(self) -> Vec<T> {
        self.v
    }
+

+
    /// Calls [`Vec::Drain`].
+
    pub fn drain<R: RangeBounds<usize>>(&mut self, range: R) -> std::vec::Drain<T> {
+
        self.v.drain(range)
+
    }
+
}
+

+
impl<T: Clone, const N: usize> BoundedVec<T, N> {
+
    /// Like [`Vec::extend_from_slice`] but returns an error if out of bounds.
+
    pub fn extend_from_slice(&mut self, slice: &[T]) -> Result<(), Error> {
+
        if self.len() + slice.len() > N {
+
            return Err(Error::InvalidSize {
+
                expected: N,
+
                actual: self.len() + slice.len(),
+
            });
+
        }
+
        self.v.extend_from_slice(slice);
+

+
        Ok(())
+
    }
}

impl<T, const N: usize> ops::Deref for BoundedVec<T, N> {
modified radicle-node/src/deserializer.rs
@@ -1,6 +1,8 @@
use std::io;
use std::marker::PhantomData;

+
use crate::bounded;
+
use crate::prelude::BoundedVec;
use crate::service::message::Message;
use crate::wire;

@@ -8,43 +10,46 @@ use crate::wire;
///
/// Used to for example turn a byte stream into network messages.
#[derive(Debug)]
-
pub struct Deserializer<D = Message> {
-
    unparsed: Vec<u8>,
+
pub struct Deserializer<const B: usize, D = Message> {
+
    unparsed: BoundedVec<u8, B>,
    item: PhantomData<D>,
}

-
impl<D: wire::Decode> Default for Deserializer<D> {
+
impl<const B: usize, D: wire::Decode> Default for Deserializer<B, D> {
    fn default() -> Self {
        Self::new(wire::Size::MAX as usize + 1)
    }
}

-
impl<D> From<Vec<u8>> for Deserializer<D> {
-
    fn from(unparsed: Vec<u8>) -> Self {
-
        Self {
+
impl<const B: usize, D> TryFrom<Vec<u8>> for Deserializer<B, D> {
+
    type Error = bounded::Error;
+

+
    fn try_from(unparsed: Vec<u8>) -> Result<Self, Self::Error> {
+
        BoundedVec::try_from(unparsed).map(|unparsed| Self {
            unparsed,
            item: PhantomData,
-
        }
+
        })
    }
}

-
impl<D: wire::Decode> Deserializer<D> {
+
impl<const B: usize, D: wire::Decode> Deserializer<B, D> {
    /// Create a new stream decoder.
    pub fn new(capacity: usize) -> Self {
        Self {
-
            unparsed: Vec::with_capacity(capacity),
+
            unparsed: BoundedVec::with_capacity(capacity)
+
                .expect("Deserializer::new: capacity exceeds maximum"),
            item: PhantomData,
        }
    }

    /// Input bytes into the decoder.
-
    pub fn input(&mut self, bytes: &[u8]) {
-
        self.unparsed.extend_from_slice(bytes);
+
    pub fn input(&mut self, bytes: &[u8]) -> Result<(), bounded::Error> {
+
        self.unparsed.extend_from_slice(bytes)
    }

    /// Decode and return the next message. Returns [`None`] if nothing was decoded.
    pub fn deserialize_next(&mut self) -> Result<Option<D>, wire::Error> {
-
        let mut reader = io::Cursor::new(self.unparsed.as_mut_slice());
+
        let mut reader = io::Cursor::new(self.unparsed.as_slice());

        match D::decode(&mut reader) {
            Ok(msg) => {
@@ -67,11 +72,16 @@ impl<D: wire::Decode> Deserializer<D> {
    pub fn is_empty(&self) -> bool {
        self.unparsed.is_empty()
    }
+

+
    /// Return the size of the unparsed data.
+
    pub fn len(&self) -> usize {
+
        self.unparsed.len()
+
    }
}

-
impl<D: wire::Decode> io::Write for Deserializer<D> {
+
impl<const B: usize, D: wire::Decode> io::Write for Deserializer<B, D> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-
        self.input(buf);
+
        self.input(buf).map_err(|_| io::ErrorKind::OutOfMemory)?;

        Ok(buf.len())
    }
@@ -81,7 +91,7 @@ impl<D: wire::Decode> io::Write for Deserializer<D> {
    }
}

-
impl<D: wire::Decode> Iterator for Deserializer<D> {
+
impl<const B: usize, D: wire::Decode> Iterator for Deserializer<B, D> {
    type Item = Result<D, wire::Error>;

    fn next(&mut self) -> Option<Self::Item> {
@@ -101,17 +111,17 @@ mod test {

    #[test]
    fn test_decode_next() {
-
        let mut decoder = Deserializer::<String>::new(8);
+
        let mut decoder = Deserializer::<1024, String>::new(8);

-
        decoder.input(&[3, b'b']);
+
        decoder.input(&[3, b'b']).unwrap();
        assert_matches!(decoder.deserialize_next(), Ok(None));
        assert_eq!(decoder.unparsed.len(), 2);

-
        decoder.input(&[b'y']);
+
        decoder.input(&[b'y']).unwrap();
        assert_matches!(decoder.deserialize_next(), Ok(None));
        assert_eq!(decoder.unparsed.len(), 3);

-
        decoder.input(&[b'e']);
+
        decoder.input(&[b'e']).unwrap();
        assert_matches!(decoder.deserialize_next(), Ok(Some(s)) if s.as_str() == "bye");
        assert_eq!(decoder.unparsed.len(), 0);
        assert!(decoder.is_empty());
@@ -119,9 +129,9 @@ mod test {

    #[test]
    fn test_unparsed() {
-
        let mut decoder = Deserializer::<String>::new(8);
+
        let mut decoder = Deserializer::<1024, String>::new(8);

-
        decoder.input(&[3, b'b', b'y']);
+
        decoder.input(&[3, b'b', b'y']).unwrap();
        assert_eq!(decoder.unparsed().collect::<Vec<_>>(), vec![3, b'b', b'y']);
        assert!(decoder.is_empty());
    }
@@ -130,7 +140,7 @@ mod test {
    fn prop_decode_next(chunk_size: usize) {
        let mut bytes = vec![];
        let mut msgs = vec![];
-
        let mut decoder = Deserializer::<String>::new(8);
+
        let mut decoder = Deserializer::<1024, String>::new(8);

        let chunk_size = 1 + chunk_size % MSG_HELLO.len() + MSG_BYE.len();

@@ -138,7 +148,7 @@ mod test {
        bytes.extend_from_slice(MSG_BYE);

        for chunk in bytes.as_slice().chunks(chunk_size) {
-
            decoder.input(chunk);
+
            decoder.input(chunk).unwrap();

            while let Some(msg) = decoder.deserialize_next().unwrap() {
                msgs.push(msg);
modified radicle-node/src/runtime.rs
@@ -21,9 +21,7 @@ use radicle::node::address::Store as _;
use radicle::node::notifications;
use radicle::node::Handle as _;
use radicle::profile::Home;
-
use radicle::storage;
-
use radicle::Storage;
-
use radicle::{cob, git};
+
use radicle::{cob, git, storage, Storage};

use crate::control;
use crate::crypto::Signer;
@@ -152,6 +150,7 @@ impl Runtime {
        let network = config.network;
        let rng = fastrand::Rng::new();
        let clock = LocalTime::now();
+
        let timestamp = clock.into();
        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
        let scope = config.scope;
        let policy = config.policy;
@@ -200,7 +199,7 @@ impl Runtime {
            );
            ann
        } else {
-
            service::gossip::node(&config, clock.into())
+
            service::gossip::node(&config, timestamp)
                .solve(Default::default())
                .expect("Runtime::init: unable to solve proof-of-work puzzle")
        };
@@ -226,7 +225,6 @@ impl Runtime {
        let emitter: Emitter<Event> = Default::default();
        let mut service = service::Service::new(
            config.clone(),
-
            clock,
            stores,
            storage.clone(),
            policies,
modified radicle-node/src/service.rs
@@ -32,7 +32,7 @@ use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{ConnectOptions, Penalty, Severity};
use radicle::storage::refs::SIGREFS_BRANCH;
-
use radicle::storage::{Inventory, RepositoryError};
+
use radicle::storage::RepositoryError;

use crate::crypto;
use crate::crypto::{Signer, Verified};
@@ -380,6 +380,8 @@ pub struct Service<D, S, G> {
    outbox: Outbox,
    /// Cached local node announcement.
    node: NodeAnnouncement,
+
    /// Cached local inventory announcement.
+
    inventory: InventoryAnnouncement,
    /// Source of entropy.
    rng: Rng,
    /// Ongoing fetches.
@@ -431,7 +433,6 @@ where
{
    pub fn new(
        config: Config,
-
        clock: LocalTime,
        db: Stores<D>,
        storage: S,
        policies: policy::Config<Write>,
@@ -442,6 +443,9 @@ where
    ) -> Self {
        let sessions = Sessions::new(rng.clone());
        let limiter = RateLimiter::new(config.peers());
+
        let last_timestamp = node.timestamp;
+
        let clock = LocalTime::default(); // Updated on initialize.
+
        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.

        Self {
            config,
@@ -449,6 +453,7 @@ where
            policies,
            signer,
            rng,
+
            inventory,
            node,
            clock,
            db,
@@ -461,9 +466,9 @@ where
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
-
            last_timestamp: Timestamp::MIN,
+
            last_timestamp,
            last_announce: LocalTime::default(),
-
            started_at: None,
+
            started_at: None, // Updated on initialize.
            emitter,
            listening: vec![],
        }
@@ -571,11 +576,17 @@ where
        })
    }

+
    /// Initialize service with current time. Call this once.
    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
        debug!(target: "service", "Init @{}", time.as_millis());
+
        assert_ne!(time, LocalTime::default());

        let nid = self.node_id();
+
        let inventory = self.storage.inventory()?;
+

        self.started_at = Some(time);
+
        self.clock = time;
+
        self.inventory = gossip::inventory(self.timestamp(), inventory.clone());

        // Populate refs database. This is only useful as part of the upgrade process for nodes
        // that have been online since before the refs database was created.
@@ -609,15 +620,16 @@ where
        // Ensure that our inventory is recorded in our routing table, and we are seeding
        // all of it. It can happen that inventory is not properly seeded if for eg. the
        // user creates a new repository while the node is stopped.
-
        let rids = self.storage.inventory()?;
-
        self.db.routing_mut().insert(&rids, nid, time.into())?;
+
        self.db
+
            .routing_mut()
+
            .insert(inventory.iter(), nid, time.into())?;

        let announced = self
            .db
            .seeds()
            .seeded_by(&nid)?
            .collect::<Result<HashMap<_, _>, _>>()?;
-
        for rid in rids {
+
        for rid in inventory {
            let repo = self.storage.repository(rid)?;

            // If we're not seeding this repo, just skip it.
@@ -707,6 +719,7 @@ where

            self.keep_alive(&now);
            self.disconnect_unresponsive_peers(&now);
+
            self.idle_connections();
            self.maintain_connections();
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
@@ -723,11 +736,7 @@ where
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
            trace!(target: "service", "Running 'announce' task...");

-
            if let Err(err) = self
-
                .storage
-
                .inventory()
-
                .and_then(|i| self.announce_inventory(i))
-
            {
+
            if let Err(err) = self.announce_inventory() {
                error!(target: "service", "Error announcing inventory: {err}");
            }
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
@@ -852,11 +861,7 @@ where
                }
            }
            Command::AnnounceInventory => {
-
                if let Err(err) = self
-
                    .storage
-
                    .inventory()
-
                    .and_then(|i| self.announce_inventory(i))
-
                {
+
                if let Err(err) = self.announce_inventory() {
                    error!(target: "service", "Error announcing inventory: {err}");
                }
            }
@@ -1157,6 +1162,14 @@ where
        if self.sessions.inbound().count() >= self.config.limits.connection.inbound {
            return false;
        }
+
        match self.db.addresses().is_banned(&addr) {
+
            Ok(banned) => {
+
                if banned {
+
                    return false;
+
                }
+
            }
+
            Err(e) => error!(target: "service", "Error querying ban status for {addr}: {e}"),
+
        }
        let host: HostName = addr.into();

        if self.limiter.limit(
@@ -1198,14 +1211,6 @@ where
            if let Some(peer) = self.sessions.get_mut(&remote) {
                peer.to_connected(self.clock);
                self.outbox.write_all(peer, msgs);
-

-
                if let Err(e) =
-
                    self.db
-
                        .addresses_mut()
-
                        .connected(&remote, &peer.addr, self.clock.into())
-
                {
-
                    error!(target: "service", "Error updating address book with connection: {e}");
-
                }
            }
        } else {
            match self.sessions.entry(remote) {
@@ -1452,7 +1457,7 @@ where
                                    self.fetch(*id, *announcer, FETCH_TIMEOUT, None);
                                }
                                Err(e) => {
-
                                    error!(target: "service", "Error checking local inventory: {e}");
+
                                    error!(target: "service", "Error checking local inventory for {id}: {e}");
                                }
                            }
                        }
@@ -1788,18 +1793,8 @@ where

    /// Set of initial messages to send to a peer.
    fn initial(&mut self, _link: Link) -> Vec<Message> {
-
        let timestamp = self.timestamp();
        let now = self.clock();
        let filter = self.filter();
-
        let inventory = match self.storage.inventory() {
-
            Ok(i) => i,
-
            Err(e) => {
-
                // Other than crashing the node completely, there's nothing we can do
-
                // here besides returning an empty inventory and logging an error.
-
                error!(target: "service", "Error getting local inventory for initial messages: {e}");
-
                Default::default()
-
            }
-
        };

        // TODO: Only subscribe to outbound connections, otherwise we will consume too
        // much bandwidth.
@@ -1823,7 +1818,7 @@ where

        vec![
            Message::node(self.node.clone(), &self.signer),
-
            Message::inventory(gossip::inventory(timestamp, inventory), &self.signer),
+
            Message::inventory(self.inventory.clone(), &self.signer),
            Message::subscribe(filter, since, Timestamp::MAX),
        ]
    }
@@ -1840,7 +1835,9 @@ where
    /// Update our routing table with our local node's inventory.
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
        let inventory = self.storage.inventory()?;
-
        let result = self.sync_routing(inventory, self.node_id(), self.clock.into())?;
+
        let result = self.sync_routing(inventory.clone(), self.node_id(), self.clock.into())?;
+
        // Update cached inventory message.
+
        self.inventory = gossip::inventory(self.timestamp(), inventory);

        Ok(result)
    }
@@ -2001,11 +1998,7 @@ where
            Ok(synced) => {
                // Only announce if our inventory changed.
                if synced.added.len() + synced.removed.len() > 0 {
-
                    if let Err(e) = self
-
                        .storage
-
                        .inventory()
-
                        .and_then(|i| self.announce_inventory(i))
-
                    {
+
                    if let Err(e) = self.announce_inventory() {
                        error!(target: "service", "Failed to announce inventory: {e}");
                    }
                }
@@ -2134,16 +2127,15 @@ where
    ////////////////////////////////////////////////////////////////////////////

    /// Announce our inventory to all connected peers.
-
    fn announce_inventory(&mut self, inventory: Inventory) -> Result<(), storage::Error> {
-
        let time = self.timestamp();
-
        let msg = AnnouncementMessage::from(gossip::inventory(time, inventory));
+
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
+
        let msg = AnnouncementMessage::from(self.inventory.clone());

        self.outbox.announce(
            msg.signed(&self.signer),
            self.sessions.connected().map(|(_, p)| p),
            self.db.gossip_mut(),
        );
-
        self.last_announce = time.to_local_time();
+
        self.last_announce = self.clock;

        Ok(())
    }
@@ -2201,7 +2193,7 @@ where
                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
                let mut peers = entries
                    .filter(|entry| !entry.address.banned)
-
                    .filter(|entry| !entry.penalty.is_threshold_reached())
+
                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
                    .filter(|entry| !self.sessions.contains_key(&entry.node))
                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
                    .filter(|entry| &entry.node != self.nid())
@@ -2264,6 +2256,25 @@ where
        Ok(())
    }

+
    /// Run idle task for all connections.
+
    fn idle_connections(&mut self) {
+
        for (_, sess) in self.sessions.iter_mut() {
+
            sess.idle(self.clock);
+

+
            if sess.is_stable() {
+
                // Mark as connected once connection is stable.
+
                if let Err(e) =
+
                    self.db
+
                        .addresses_mut()
+
                        .connected(&sess.id, &sess.addr, self.clock.into())
+
                {
+
                    error!(target: "service", "Error updating address book with connection: {e}");
+
                }
+
            }
+
        }
+
    }
+

+
    /// Try to maintain a target number of connections.
    fn maintain_connections(&mut self) {
        let PeerConfig::Dynamic { target } = self.config.peers else {
            return;
modified radicle-node/src/service/gossip/store.rs
@@ -79,6 +79,11 @@ impl Store for Database {
    }

    fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error> {
+
        assert_ne!(
+
            ann.timestamp(),
+
            Timestamp::MIN,
+
            "Timestamp of {ann:?} must not be zero"
+
        );
        let mut stmt = self.db.prepare(
            "INSERT INTO `announcements` (node, repo, type, message, signature, timestamp)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
@@ -291,3 +296,40 @@ impl TryFrom<&sql::Value> for GossipType {
        }
    }
}
+

+
#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
+
mod test {
+
    use super::*;
+
    use crate::prelude::{BoundedVec, RepoId};
+
    use crate::test::arbitrary;
+
    use localtime::LocalTime;
+
    use radicle_crypto::test::signer::MockSigner;
+

+
    #[test]
+
    fn test_announced() {
+
        let mut db = Database::memory().unwrap();
+
        let nid = arbitrary::gen::<NodeId>(1);
+
        let rid = arbitrary::gen::<RepoId>(1);
+
        let timestamp = LocalTime::now().into();
+
        let signer = MockSigner::default();
+
        let refs = AnnouncementMessage::Refs(RefsAnnouncement {
+
            rid,
+
            refs: BoundedVec::new(),
+
            timestamp,
+
        })
+
        .signed(&signer);
+
        let inv = AnnouncementMessage::Inventory(InventoryAnnouncement {
+
            inventory: BoundedVec::new(),
+
            timestamp,
+
        })
+
        .signed(&signer);
+

+
        // Only the first announcement of each type is recognized as new.
+
        assert!(db.announced(&nid, &refs).unwrap());
+
        assert!(!db.announced(&nid, &refs).unwrap());
+

+
        assert!(db.announced(&nid, &inv).unwrap());
+
        assert!(!db.announced(&nid, &inv).unwrap());
+
    }
+
}
modified radicle-node/src/service/session.rs
@@ -5,11 +5,14 @@ use crate::node::config::Limits;
use crate::node::Severity;
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::{Address, LocalTime, NodeId, Outbox, RepoId, Rng};
+
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
use crate::{Link, Timestamp};

pub use crate::node::{PingState, State};

+
/// Time after which a connection is considered stable.
+
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
+

#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
    /// The remote peer sent an invalid announcement timestamp,
@@ -61,7 +64,7 @@ pub struct Session {

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
-
    /// upon successful connection.
+
    /// upon successful connection, once the connection is stable.
    attempts: usize,
    /// Source of entropy.
    rng: Rng,
@@ -120,6 +123,7 @@ impl Session {
                ping: PingState::default(),
                fetching: HashSet::default(),
                latencies: VecDeque::default(),
+
                stable: false,
            },
            link: Link::Inbound,
            subscribe: None,
@@ -135,6 +139,10 @@ impl Session {
        matches!(self.state, State::Attempted { .. })
    }

+
    pub fn is_stable(&self) -> bool {
+
        matches!(self.state, State::Connected { stable: true, .. })
+
    }
+

    pub fn is_connected(&self) -> bool {
        self.state.is_connected()
    }
@@ -167,6 +175,22 @@ impl Session {
        self.attempts
    }

+
    /// Run 'idle' task for session.
+
    pub fn idle(&mut self, now: LocalTime) {
+
        if let State::Connected {
+
            since,
+
            ref mut stable,
+
            ..
+
        } = self.state
+
        {
+
            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
+
                *stable = true;
+
                // Reset number of attempts for stable connections.
+
                self.attempts = 0;
+
            }
+
        }
+
    }
+

    /// Mark this session as fetching the given RID.
    ///
    /// # Panics
@@ -204,7 +228,6 @@ impl Session {
    }

    pub fn to_connected(&mut self, since: LocalTime) {
-
        self.attempts = 0;
        self.last_active = since;

        let State::Attempted = &self.state else {
@@ -215,6 +238,7 @@ impl Session {
            ping: PingState::default(),
            fetching: HashSet::default(),
            latencies: VecDeque::default(),
+
            stable: false,
        };
    }

modified radicle-node/src/test/peer.rs
@@ -44,6 +44,7 @@ pub struct Peer<S, G> {
    pub service: Service<S, G>,
    pub id: NodeId,
    pub ip: net::IpAddr,
+
    pub local_time: LocalTime,
    pub rng: fastrand::Rng,
    pub local_addr: net::SocketAddr,
    pub tempdir: tempfile::TempDir,
@@ -56,9 +57,7 @@ where
    S: WriteStorage + 'static,
    G: Signer + 'static,
{
-
    fn init(&mut self) {
-
        self.initialize();
-
    }
+
    fn init(&mut self) {}

    fn addr(&self) -> Address {
        self.address()
@@ -85,7 +84,7 @@ impl<S, G> DerefMut for Peer<S, G> {

impl Peer<MockStorage, MockSigner> {
    pub fn new(name: &'static str, ip: impl Into<net::IpAddr>) -> Self {
-
        Self::with_storage(name, ip, MockStorage::empty())
+
        Self::with_storage(name, ip, MockStorage::empty()).initialized()
    }
}

@@ -94,7 +93,7 @@ where
    S: WriteStorage + 'static,
{
    pub fn with_storage(name: &'static str, ip: impl Into<net::IpAddr>, storage: S) -> Self {
-
        Self::config(name, ip, storage, Config::default())
+
        Self::config(name, ip, storage, Config::default()).initialized()
    }
}

@@ -167,18 +166,18 @@ where
        let id = *config.signer.public_key();
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));
+
        let inventory = storage.inventory().unwrap();

        // Make sure the peer address is advertized.
        config.config.external_addresses.push(local_addr.into());
-

-
        for rid in storage.inventory().unwrap() {
-
            policies.seed(&rid, Scope::Followed).unwrap();
+
        for rid in &inventory {
+
            policies.seed(rid, Scope::Followed).unwrap();
        }
-
        let announcement = service::gossip::node(&config.config, config.local_time.into());
+
        let announcement =
+
            service::gossip::node(&config.config, Timestamp::from(config.local_time) + 1);
        let emitter: Emitter<Event> = Default::default();
        let service = Service::new(
            config.config,
-
            config.local_time,
            config.db,
            storage,
            policies,
@@ -194,25 +193,29 @@ where
            id,
            ip,
            local_addr,
+
            local_time: config.local_time,
            rng: config.rng,
            initialized: false,
            tempdir: config.tmp,
        }
    }

-
    pub fn initialize(&mut self) -> bool {
-
        if !self.initialized {
-
            info!(
-
                target: "test",
-
                "{}: Initializing: id = {}, address = {}",
-
                self.name, self.id, self.ip
-
            );
+
    pub fn initialize(&mut self) -> &mut Self {
+
        info!(
+
            target: "test",
+
            "{}: Initializing: id = {}, address = {}",
+
            self.name, self.id, self.ip
+
        );
+
        assert_ne!(self.local_time, LocalTime::default());
+

+
        self.initialized = true;
+
        self.service.initialize(self.local_time).unwrap();
+
        self
+
    }

-
            self.initialized = true;
-
            self.service.initialize(LocalTime::now()).unwrap();
-
            return true;
-
        }
-
        false
+
    pub fn initialized(mut self) -> Self {
+
        self.initialize();
+
        self
    }

    pub fn restart(&mut self) {
@@ -222,7 +225,7 @@ where
            "{}: Restarting: id = {}, address = {}",
            self.name, self.id, self.ip
        );
-
        self.service.initialize(LocalTime::now()).unwrap();
+
        self.service.initialize(*self.service.clock()).unwrap();
    }

    pub fn address(&self) -> Address {
@@ -340,7 +343,6 @@ where
    pub fn connect_from(&mut self, peer: &Self) {
        let remote_id = simulator::Peer::<S, G>::id(peer);

-
        self.initialize();
        self.service
            .connected(remote_id, peer.address(), Link::Inbound);
        self.service
@@ -366,7 +368,6 @@ where
        let remote_id = simulator::Peer::<T, H>::id(peer);
        let remote_addr = simulator::Peer::<T, H>::addr(peer);

-
        self.initialize();
        self.service.command(Command::Connect(
            remote_id,
            remote_addr.clone(),
modified radicle-node/src/tests.rs
@@ -132,7 +132,7 @@ fn test_redundant_connect() {

    // Only one connection attempt is made.
    assert_matches!(
-
        alice.outbox().collect::<Vec<_>>().as_slice(),
+
        alice.outbox().filter(|o| matches!(o, Io::Connect { .. })).collect::<Vec<_>>().as_slice(),
        [Io::Connect(id, addr)]
        if *id == bob.id() && *addr == bob.addr()
    );
@@ -233,9 +233,8 @@ fn test_persistent_peer_connect() {
            },
            ..peer::Config::default()
        },
-
    );
-

-
    alice.initialize();
+
    )
+
    .initialized();

    let outbox = alice.outbox().collect::<Vec<_>>();
    outbox
@@ -251,15 +250,14 @@ fn test_persistent_peer_connect() {
#[test]
fn test_inventory_sync() {
    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Peer::config(
+
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );
    let bob_signer = MockSigner::default();
    let bob_storage = fixtures::storage(tmp.path().join("bob"), &bob_signer).unwrap();
-
    let bob = Peer::config("bob", [8, 8, 8, 8], bob_storage, peer::Config::default());
+
    let bob = Peer::with_storage("bob", [8, 8, 8, 8], bob_storage);
    let now = LocalTime::now().into();
    let projs = bob.storage().inventory().unwrap();

@@ -349,7 +347,8 @@ fn test_inventory_pruning() {
                },
                ..peer::Config::default()
            },
-
        );
+
        )
+
        .initialized();

        let bob = Peer::config(
            "bob",
@@ -359,7 +358,8 @@ fn test_inventory_pruning() {
                local_time: alice.local_time(),
                ..peer::Config::default()
            },
-
        );
+
        )
+
        .initialized();

        // Tell Alice about the amazing projects available
        alice.connect_to(&bob);
@@ -597,14 +597,14 @@ fn test_announcement_relay() {
        Some(Message::Announcement(_))
    );

-
    alice.receive(bob.id(), bob.inventory_announcement());
+
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "Another inventory with the same timestamp is ignored"
    );

    bob.elapse(LocalDuration::from_mins(1));
-
    alice.receive(bob.id(), bob.inventory_announcement());
+
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
@@ -647,17 +647,15 @@ fn test_announcement_relay() {
#[test]
fn test_refs_announcement_relay() {
    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Peer::config(
+
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );
-
    let eve = Peer::config(
+
    let eve = Peer::with_storage(
        "eve",
        [8, 8, 8, 8],
        Storage::open(tmp.path().join("eve"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );

    let bob = {
@@ -675,6 +673,7 @@ fn test_refs_announcement_relay() {
                ..peer::Config::default()
            },
        )
+
        .initialized()
    };
    let bob_inv = bob
        .storage()
@@ -724,11 +723,10 @@ fn test_refs_announcement_fetch_trusted_no_inventory() {
    logger::init(log::Level::Debug);

    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Peer::config(
+
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );
    let bob = {
        let mut rng = fastrand::Rng::new();
@@ -745,6 +743,7 @@ fn test_refs_announcement_fetch_trusted_no_inventory() {
                ..peer::Config::default()
            },
        )
+
        .initialized()
    };
    let bob_inv = bob.storage().inventory().unwrap();
    let rid = bob_inv.first().unwrap();
@@ -861,8 +860,9 @@ fn test_refs_announcement_offline() {
    bob.seed(rid, policy::Scope::All).unwrap();

    // Make sure alice's service wasn't initialized before.
-
    assert!(alice.initialize());
+
    assert_eq!(*alice.clock(), LocalTime::default());

+
    alice.initialize();
    alice.connect_to(&bob);
    alice.receive(bob.id, Message::Subscribe(Subscribe::all()));

@@ -1057,7 +1057,8 @@ fn test_persistent_peer_reconnect_attempt() {
            },
            ..peer::Config::default()
        },
-
    );
+
    )
+
    .initialized();

    let mut sim = Simulation::new(
        LocalTime::now(),
@@ -1100,12 +1101,7 @@ fn test_persistent_peer_reconnect_attempt() {
fn test_persistent_peer_reconnect_success() {
    use std::collections::HashSet;

-
    let bob = Peer::config(
-
        "bob",
-
        [9, 9, 9, 9],
-
        MockStorage::empty(),
-
        peer::Config::default(),
-
    );
+
    let bob = Peer::with_storage("bob", [9, 9, 9, 9], MockStorage::empty());
    let mut alice = Peer::config(
        "alice",
        [7, 7, 7, 7],
@@ -1117,7 +1113,8 @@ fn test_persistent_peer_reconnect_success() {
            },
            ..peer::Config::default()
        },
-
    );
+
    )
+
    .initialized();
    alice.connect_to(&bob);

    // A transient error such as this will cause Alice to attempt a reconnection.
@@ -1568,20 +1565,15 @@ fn test_init_and_seed() {
    )
    .unwrap();
    let (repo, _) = fixtures::repository(tempdir.path().join("working"));
-
    let mut alice = Peer::config(
-
        "alice",
-
        [7, 7, 7, 7],
-
        storage_alice,
-
        peer::Config::default(),
-
    );
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);

    let storage_bob =
        Storage::open(tempdir.path().join("bob").join("storage"), fixtures::user()).unwrap();
-
    let mut bob = Peer::config("bob", [8, 8, 8, 8], storage_bob, peer::Config::default());
+
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);

    let storage_eve =
        Storage::open(tempdir.path().join("eve").join("storage"), fixtures::user()).unwrap();
-
    let mut eve = Peer::config("eve", [9, 9, 9, 9], storage_eve, peer::Config::default());
+
    let mut eve = Peer::with_storage("eve", [9, 9, 9, 9], storage_eve);

    remote::mock::register(&alice.node_id(), alice.storage().path());
    remote::mock::register(&eve.node_id(), eve.storage().path());
@@ -1676,24 +1668,9 @@ fn test_init_and_seed() {
fn prop_inventory_exchange_dense() {
    fn property(alice_inv: MockStorage, bob_inv: MockStorage, eve_inv: MockStorage) {
        let rng = fastrand::Rng::new();
-
        let alice = Peer::config(
-
            "alice",
-
            [7, 7, 7, 7],
-
            alice_inv.clone(),
-
            peer::Config::default(),
-
        );
-
        let mut bob = Peer::config(
-
            "bob",
-
            [8, 8, 8, 8],
-
            bob_inv.clone(),
-
            peer::Config::default(),
-
        );
-
        let mut eve = Peer::config(
-
            "eve",
-
            [9, 9, 9, 9],
-
            eve_inv.clone(),
-
            peer::Config::default(),
-
        );
+
        let alice = Peer::with_storage("alice", [7, 7, 7, 7], alice_inv.clone());
+
        let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], bob_inv.clone());
+
        let mut eve = Peer::with_storage("eve", [9, 9, 9, 9], eve_inv.clone());
        let mut routing = RandomMap::with_hasher(rng.clone().into());

        for (inv, peer) in &[
modified radicle-node/src/wire/message.rs
@@ -564,7 +564,7 @@ mod tests {
    #[test]
    fn prop_message_decoder() {
        fn property(items: Vec<Message>) {
-
            let mut decoder = Deserializer::<Message>::new(8);
+
            let mut decoder = Deserializer::<1048576, Message>::new(8);

            for item in &items {
                item.encode(&mut decoder).unwrap();
modified radicle-node/src/wire/protocol.rs
@@ -53,6 +53,9 @@ pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs
/// Default time to wait when dialing a connection, before the remote is considered unreachable.
pub const DEFAULT_DIAL_TIMEOUT: time::Duration = time::Duration::from_secs(6);

+
/// Maximum size of a peer inbox, in bytes.
+
pub const MAX_INBOX_SIZE: usize = 1024 * 1024 * 2;
+

/// Control message used internally between workers, users, and the service.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
@@ -198,7 +201,7 @@ enum Peer {
        addr: NetAddr<HostName>,
        link: Link,
        nid: NodeId,
-
        inbox: Deserializer<Frame>,
+
        inbox: Deserializer<MAX_INBOX_SIZE, Frame>,
        streams: Streams,
    },
    /// The peer was scheduled for disconnection. Once the transport is handed over
@@ -699,7 +702,13 @@ where
                    ..
                }) = self.peers.get_mut(&id)
                {
-
                    inbox.input(&data);
+
                    if inbox.input(&data).is_err() {
+
                        log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
+
                        log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
+
                        self.disconnect(id, DisconnectReason::Session(session::Error::Misbehavior));
+

+
                        return;
+
                    }

                    loop {
                        match inbox.deserialize_next() {
@@ -781,7 +790,7 @@ where
                                log::error!(target: "wire", "Invalid gossip message from {nid}: {e}");

                                if !inbox.is_empty() {
-
                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.unparsed().count());
+
                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
                                }
                                self.disconnect(
                                    id,
@@ -1167,8 +1176,8 @@ mod test {
        // Encode gossip message using the varint-prefix format into the stream.
        varint::payload::encode(&gossip, &mut stream).unwrap();

-
        let mut de = deserializer::Deserializer::<Frame>::new(1024);
-
        de.input(&stream);
+
        let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
+
        de.input(&stream).unwrap();

        // The "pong" message decodes successfully, even though there is trailing data.
        assert_eq!(
modified radicle/src/node.rs
@@ -53,7 +53,9 @@ pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
/// Maximum length in bytes of a node alias.
pub const MAX_ALIAS_LENGTH: usize = 32;
/// Penalty threshold at which point we avoid connecting to this node.
-
pub const PENALTY_THRESHOLD: u8 = 32;
+
pub const PENALTY_CONNECT_THRESHOLD: u8 = 32;
+
/// Penalty threshold at which point we ban this node.
+
pub const PENALTY_BAN_THRESHOLD: u8 = 64;
/// Filename of node database under the node directory.
pub const NODE_DB_FILE: &str = "node.db";
/// Filename of policies database under the node directory.
@@ -105,6 +107,9 @@ pub enum State {
        /// Measured latencies for this peer.
        #[serde(skip)]
        latencies: VecDeque<LocalDuration>,
+
        /// Whether the connection is stable.
+
        #[serde(skip)]
+
        stable: bool,
    },
    /// When a peer is disconnected.
    #[serde(rename_all = "camelCase")]
@@ -159,8 +164,12 @@ pub struct Penalty(u8);
impl Penalty {
    /// If the penalty threshold is reached, at which point we should just avoid
    /// connecting to this node.
-
    pub fn is_threshold_reached(&self) -> bool {
-
        self.0 >= PENALTY_THRESHOLD
+
    pub fn is_connect_threshold_reached(&self) -> bool {
+
        self.0 >= PENALTY_CONNECT_THRESHOLD
+
    }
+

+
    pub fn is_ban_threshold_reached(&self) -> bool {
+
        self.0 >= PENALTY_BAN_THRESHOLD
    }
}

@@ -1260,6 +1269,7 @@ mod test {
                ping: Default::default(),
                fetching: Default::default(),
                latencies: VecDeque::default(),
+
                stable: false,
            }))
            .unwrap(),
        )
modified radicle/src/node/address.rs
@@ -132,6 +132,8 @@ pub struct Node {
    pub timestamp: Timestamp,
    /// Node connection penalty.
    pub penalty: Penalty,
+
    /// Whether the node is banned.
+
    pub banned: bool,
}

/// A known address.
modified radicle/src/node/address/store.rs
@@ -63,6 +63,9 @@ pub trait Store {
    fn is_empty(&self) -> Result<bool, Error> {
        self.len().map(|l| l == 0)
    }
+
    /// Check if an address is banned. Also returns `true` if the node this address belongs
+
    /// to is banned.
+
    fn is_banned(&self, addr: &Address) -> Result<bool, Error>;
    /// Get the address entries in the store.
    fn entries(&self) -> Result<Box<dyn Iterator<Item = AddressEntry>>, Error>;
    /// Mark a node as attempted at a certain time.
@@ -80,10 +83,9 @@ pub trait Store {

impl Store for Database {
    fn get(&self, node: &NodeId) -> Result<Option<Node>, Error> {
-
        let mut stmt = self
-
            .db
-
            .prepare("SELECT features, alias, pow, penalty, timestamp FROM nodes WHERE id = ?")?;
-

+
        let mut stmt = self.db.prepare(
+
            "SELECT features, alias, pow, penalty, banned, timestamp FROM nodes WHERE id = ?",
+
        )?;
        stmt.bind((1, node))?;

        if let Some(Ok(row)) = stmt.into_iter().next() {
@@ -93,6 +95,7 @@ impl Store for Database {
            let pow = row.read::<i64, _>("pow") as u32;
            let penalty = row.read::<i64, _>("penalty").min(u8::MAX as i64);
            let penalty = Penalty(penalty as u8);
+
            let banned = row.read::<i64, _>("banned").is_positive();
            let addrs = self.addresses_of(node)?;

            Ok(Some(Node {
@@ -102,12 +105,34 @@ impl Store for Database {
                timestamp,
                penalty,
                addrs,
+
                banned,
            }))
        } else {
            Ok(None)
        }
    }

+
    fn is_banned(&self, addr: &Address) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT a.banned, n.banned
+
             FROM addresses AS a
+
             JOIN nodes AS n ON a.node = n.id
+
             WHERE value = ?1 AND type =?2",
+
        )?;
+
        stmt.bind((1, addr))?;
+
        stmt.bind((2, AddressType::from(addr)))?;
+

+
        if let Some(row) = stmt.into_iter().next() {
+
            let row = row?;
+
            let addr_banned = row.read::<i64, _>(0).is_positive();
+
            let node_banned = row.read::<i64, _>(1).is_positive();
+

+
            Ok(node_banned || addr_banned)
+
        } else {
+
            Ok(false)
+
        }
+
    }
+

    fn addresses_of(&self, node: &NodeId) -> Result<Vec<KnownAddress>, Error> {
        let mut addrs = Vec::new();
        let mut stmt = self.db.prepare(
@@ -306,17 +331,29 @@ impl Store for Database {
        _addr: &Address,
        severity: Severity,
    ) -> Result<(), Error> {
-
        let mut stmt = self.db.prepare(
-
            "UPDATE `nodes`
-
             SET penalty = penalty + ?2
-
             WHERE id = ?1",
-
        )?;
+
        transaction(&self.db, |db| {
+
            let mut stmt = self.db.prepare(
+
                "UPDATE `nodes`
+
                 SET penalty = penalty + ?2
+
                 WHERE id = ?1",
+
            )?;
+
            stmt.bind((1, nid))?;
+
            stmt.bind((2, severity as i64))?;
+
            stmt.next()?;

-
        stmt.bind((1, nid))?;
-
        stmt.bind((2, severity as i64))?;
-
        stmt.next()?;
+
            // If the ban threshold is reached, we ban the node and its addresses.
+
            let node = self.get(nid)?.ok_or(Error::NoRows)?;
+
            if node.penalty.is_ban_threshold_reached() {
+
                let mut stmt = db.prepare("UPDATE `nodes` SET banned = 1 WHERE id = ?1")?;
+
                stmt.bind((1, nid))?;
+
                stmt.next()?;

-
        Ok(())
+
                let mut stmt = db.prepare("UPDATE `addresses` SET banned = 1 WHERE node = ?1")?;
+
                stmt.bind((1, nid))?;
+
                stmt.next()?;
+
            }
+
            Ok(())
+
        })
    }
}

@@ -684,4 +721,46 @@ mod test {
        let node = cache.get(&alice).unwrap().unwrap();
        assert_eq!(node.penalty, Penalty(4));
    }
+

+
    #[test]
+
    fn test_disconnected_ban() {
+
        let alice = arbitrary::gen::<NodeId>(1);
+
        let ka1 = arbitrary::gen::<KnownAddress>(1);
+
        let ka2 = arbitrary::gen::<KnownAddress>(1);
+
        let mut db = Database::memory().unwrap();
+
        let features = node::Features::SEED;
+
        let timestamp = Timestamp::from(LocalTime::now());
+

+
        db.insert(
+
            &alice,
+
            features,
+
            Alias::new("alice"),
+
            16,
+
            timestamp,
+
            [ka1.clone(), ka2.clone()],
+
        )
+
        .unwrap();
+
        let node = db.get(&alice).unwrap().unwrap();
+
        assert_eq!(node.penalty, Penalty::default());
+

+
        for _ in 0..7 {
+
            db.disconnected(&alice, &ka1.addr, Severity::High).unwrap();
+
            let node = db.get(&alice).unwrap().unwrap();
+

+
            assert!(!node.penalty.is_ban_threshold_reached());
+
            assert!(!node.banned);
+
        }
+

+
        db.disconnected(&alice, &ka1.addr, Severity::High).unwrap();
+
        let node = db.get(&alice).unwrap().unwrap();
+

+
        assert!(node.penalty.is_ban_threshold_reached());
+
        assert!(node.banned);
+

+
        for addr in node.addrs {
+
            assert!(addr.banned);
+
        }
+
        assert!(db.is_banned(&ka1.addr).unwrap());
+
        assert!(db.is_banned(&ka2.addr).unwrap()); // Banned because node is banned.
+
    }
}
modified radicle/src/node/timestamp.rs
@@ -8,7 +8,9 @@ use localtime::LocalTime;
use sqlite as sql;

/// Milliseconds since epoch.
-
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
#[derive(
+
    Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
+
)]
#[serde(transparent)]
pub struct Timestamp(u64);

modified radicle/src/test/arbitrary.rs
@@ -18,8 +18,8 @@ use crate::identity::{
    project::Project,
    Did,
};
-
use crate::node::address::AddressType;
-
use crate::node::{Address, Alias, Timestamp};
+
use crate::node::address::{AddressType, Source};
+
use crate::node::{Address, Alias, KnownAddress, Timestamp};
use crate::storage;
use crate::storage::refs::{Refs, RefsAt, SignedRefs};
use crate::test::storage::{MockRepository, MockStorage};
@@ -303,6 +303,12 @@ impl Arbitrary for Address {
    }
}

+
impl Arbitrary for KnownAddress {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        KnownAddress::new(Address::arbitrary(g), Source::Peer)
+
    }
+
}
+

impl Arbitrary for Alias {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        let s = g