Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Rate-limit updates
Merged did:key:z6MksFqX...wzpT opened 2 years ago

See commits.

5 files changed +98 -61 9cd08a01 abf89438
modified radicle-cli/examples/rad-config.md
@@ -38,12 +38,12 @@ $ rad config
      "maxOpenFiles": 4096,
      "rate": {
        "inbound": {
-
          "fillRate": 0.2,
-
          "capacity": 32
+
          "fillRate": 2.0,
+
          "capacity": 128
        },
        "outbound": {
-
          "fillRate": 1.0,
-
          "capacity": 64
+
          "fillRate": 5.0,
+
          "capacity": 256
        }
      },
      "connection": {
modified radicle-httpd/src/api/v1/profile.rs
@@ -101,12 +101,12 @@ mod routes {
                    "maxOpenFiles": 4096,
                    "rate": {
                      "inbound": {
-
                        "fillRate": 0.2,
-
                        "capacity": 32
+
                        "fillRate": 2.0,
+
                        "capacity": 128
                      },
                      "outbound": {
-
                        "fillRate": 1.0,
-
                        "capacity": 64
+
                        "fillRate": 5.0,
+
                        "capacity": 256
                      }
                    },
                    "connection": {
modified radicle-node/src/service.rs
@@ -438,6 +438,7 @@ where
        emitter: Emitter<Event>,
    ) -> Self {
        let sessions = Sessions::new(rng.clone());
+
        let limiter = RateLimiter::new(config.peers());

        Self {
            config,
@@ -449,7 +450,7 @@ where
            clock,
            db,
            outbox: Outbox::default(),
-
            limiter: RateLimiter::default(),
+
            limiter,
            sessions,
            fetching: HashMap::new(),
            queue: VecDeque::new(),
@@ -1155,10 +1156,12 @@ where
        }
        let host: HostName = addr.into();

-
        if self
-
            .limiter
-
            .limit(host.clone(), &self.config.limits.rate.inbound, self.clock)
-
        {
+
        if self.limiter.limit(
+
            host.clone(),
+
            None,
+
            &self.config.limits.rate.inbound,
+
            self.clock,
+
        ) {
            trace!(target: "service", "Rate limiting inbound connection from {host}..");
            return false;
        }
@@ -1633,7 +1636,7 @@ where
        };
        if self
            .limiter
-
            .limit(peer.addr.clone().into(), limit, self.clock)
+
            .limit(peer.addr.clone().into(), Some(remote), limit, self.clock)
        {
            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
            return Ok(());
modified radicle-node/src/service/limiter.rs
@@ -1,7 +1,7 @@
-
use std::collections::HashMap;
+
use std::collections::{HashMap, HashSet};

use localtime::LocalTime;
-
use radicle::node::{address, config, HostName};
+
use radicle::node::{address, config, HostName, NodeId};

/// Peer rate limiter.
///
@@ -12,15 +12,35 @@ use radicle::node::{address, config, HostName};
#[derive(Debug, Default)]
pub struct RateLimiter {
    buckets: HashMap<HostName, TokenBucket>,
+
    bypass: HashSet<NodeId>,
}

impl RateLimiter {
+
    /// Create a new rate limiter with a bypass list. Nodes in the bypass list are not limited.
+
    pub fn new(bypass: impl IntoIterator<Item = NodeId>) -> Self {
+
        Self {
+
            buckets: HashMap::default(),
+
            bypass: bypass.into_iter().collect(),
+
        }
+
    }
+

    /// Call this when the address has performed some rate-limited action.
    /// Returns whether the action is rate-limited or not.
    ///
    /// Supplying a different amount of tokens per address is useful if for eg. a peer
    /// is outbound vs. inbound.
-
    pub fn limit<T: AsTokens>(&mut self, addr: HostName, tokens: &T, now: LocalTime) -> bool {
+
    pub fn limit<T: AsTokens>(
+
        &mut self,
+
        addr: HostName,
+
        nid: Option<&NodeId>,
+
        tokens: &T,
+
        now: LocalTime,
+
    ) -> bool {
+
        if let Some(nid) = nid {
+
            if self.bypass.contains(nid) {
+
                return false;
+
            }
+
        }
        if let HostName::Ip(ip) = addr {
            // Don't limit LAN addresses.
            if !address::is_routable(&ip) {
@@ -99,6 +119,8 @@ impl TokenBucket {
#[cfg(test)]
#[allow(clippy::bool_assert_comparison, clippy::redundant_clone)]
mod test {
+
    use radicle::test::arbitrary;
+

    use super::*;

    impl AsTokens for (usize, f64) {
@@ -116,64 +138,72 @@ mod test {
        let mut r = RateLimiter::default();
        let t = (3, 0.2); // Three tokens burst. One token every 5 seconds.
        let a = HostName::Dns(String::from("seed.radicle.xyz"));
-

-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(0)), false); // Burst capacity
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(1)), false); // Burst capacity
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(2)), false); // Burst capacity
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(3)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(4)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(5)), false); // Refilled (1)
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(6)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(7)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(8)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(9)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(10)), false); // Refilled (1)
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(11)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(12)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(13)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(14)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(15)), false); // Refilled (1)
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(16)), true); // Limited
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(60)), false); // Refilled (3)
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(60)), false); // Burst capacity
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(60)), false); // Burst capacity
-
        assert_eq!(r.limit(a.clone(), &t, LocalTime::from_secs(60)), true); // Limited
+
        let n = arbitrary::gen::<NodeId>(1);
+
        let n = Some(&n);
+

+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(0)), false); // Burst capacity
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(1)), false); // Burst capacity
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(2)), false); // Burst capacity
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(3)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(4)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(5)), false); // Refilled (1)
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(6)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(7)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(8)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(9)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(10)), false); // Refilled (1)
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(11)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(12)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(13)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(14)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(15)), false); // Refilled (1)
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(16)), true); // Limited
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(60)), false); // Refilled (3)
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(60)), false); // Burst capacity
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(60)), false); // Burst capacity
+
        assert_eq!(r.limit(a.clone(), n, &t, LocalTime::from_secs(60)), true); // Limited
    }

    #[test]
+
    #[rustfmt::skip]
    fn test_limitter_multi() {
        let t = (1, 1.0); // One token per second. One token burst.
+
        let n = arbitrary::gen::<NodeId>(1);
+
        let n = Some(&n);
        let mut r = RateLimiter::default();
        let addr1 = HostName::Dns(String::from("seed.radicle.xyz"));
        let addr2 = HostName::Dns(String::from("seed.radicle.net"));

-
        assert_eq!(r.limit(addr1.clone(), &t, LocalTime::from_secs(0)), false);
-
        assert_eq!(r.limit(addr1.clone(), &t, LocalTime::from_secs(0)), true);
-
        assert_eq!(r.limit(addr2.clone(), &t, LocalTime::from_secs(0)), false);
-
        assert_eq!(r.limit(addr2.clone(), &t, LocalTime::from_secs(0)), true);
-
        assert_eq!(r.limit(addr1.clone(), &t, LocalTime::from_secs(1)), false); // Refilled (1)
-
        assert_eq!(r.limit(addr1.clone(), &t, LocalTime::from_secs(1)), true);
-
        assert_eq!(r.limit(addr2.clone(), &t, LocalTime::from_secs(1)), false);
-
        assert_eq!(r.limit(addr2.clone(), &t, LocalTime::from_secs(1)), true);
+
        assert_eq!(r.limit(addr1.clone(), n, &t, LocalTime::from_secs(0)), false);
+
        assert_eq!(r.limit(addr1.clone(), n, &t, LocalTime::from_secs(0)), true);
+
        assert_eq!(r.limit(addr2.clone(), n, &t, LocalTime::from_secs(0)), false);
+
        assert_eq!(r.limit(addr2.clone(), n, &t, LocalTime::from_secs(0)), true);
+
        assert_eq!(r.limit(addr1.clone(), n, &t, LocalTime::from_secs(1)), false);
+
        assert_eq!(r.limit(addr1.clone(), n, &t, LocalTime::from_secs(1)), true);
+
        assert_eq!(r.limit(addr2.clone(), n, &t, LocalTime::from_secs(1)), false);
+
        assert_eq!(r.limit(addr2.clone(), n, &t, LocalTime::from_secs(1)), true);
    }

    #[test]
+
    #[rustfmt::skip]
    fn test_limitter_different_rates() {
        let t1 = (1, 1.0); // One token per second. One token burst.
        let t2 = (2, 2.0); // Two tokens per second. Two token burst.
+
        let n = arbitrary::gen::<NodeId>(1);
+
        let n = Some(&n);
        let mut r = RateLimiter::default();
        let addr1 = HostName::Dns(String::from("seed.radicle.xyz"));
        let addr2 = HostName::Dns(String::from("seed.radicle.net"));

-
        assert_eq!(r.limit(addr1.clone(), &t1, LocalTime::from_secs(0)), false);
-
        assert_eq!(r.limit(addr1.clone(), &t1, LocalTime::from_secs(0)), true);
-
        assert_eq!(r.limit(addr2.clone(), &t2, LocalTime::from_secs(0)), false);
-
        assert_eq!(r.limit(addr2.clone(), &t2, LocalTime::from_secs(0)), false);
-
        assert_eq!(r.limit(addr2.clone(), &t2, LocalTime::from_secs(0)), true);
-
        assert_eq!(r.limit(addr1.clone(), &t1, LocalTime::from_secs(1)), false); // Refilled (1)
-
        assert_eq!(r.limit(addr1.clone(), &t1, LocalTime::from_secs(1)), true);
-
        assert_eq!(r.limit(addr2.clone(), &t2, LocalTime::from_secs(1)), false); // Refilled (2)
-
        assert_eq!(r.limit(addr2.clone(), &t2, LocalTime::from_secs(1)), false);
-
        assert_eq!(r.limit(addr2.clone(), &t2, LocalTime::from_secs(1)), true);
+
        assert_eq!(r.limit(addr1.clone(), n, &t1, LocalTime::from_secs(0)), false);
+
        assert_eq!(r.limit(addr1.clone(), n, &t1, LocalTime::from_secs(0)), true);
+
        assert_eq!(r.limit(addr2.clone(), n, &t2, LocalTime::from_secs(0)), false);
+
        assert_eq!(r.limit(addr2.clone(), n, &t2, LocalTime::from_secs(0)), false);
+
        assert_eq!(r.limit(addr2.clone(), n, &t2, LocalTime::from_secs(0)), true);
+
        assert_eq!(r.limit(addr1.clone(), n, &t1, LocalTime::from_secs(1)), false); // Refilled (1)
+
        assert_eq!(r.limit(addr1.clone(), n, &t1, LocalTime::from_secs(1)), true);
+
        assert_eq!(r.limit(addr2.clone(), n, &t2, LocalTime::from_secs(1)), false); // Refilled (2)
+
        assert_eq!(r.limit(addr2.clone(), n, &t2, LocalTime::from_secs(1)), false);
+
        assert_eq!(r.limit(addr2.clone(), n, &t2, LocalTime::from_secs(1)), true);
    }
}
modified radicle/src/node/config.rs
@@ -168,12 +168,12 @@ impl Default for RateLimits {
    fn default() -> Self {
        Self {
            inbound: RateLimit {
-
                fill_rate: 0.2,
-
                capacity: 32,
+
                fill_rate: 2.0,
+
                capacity: 128,
            },
            outbound: RateLimit {
-
                fill_rate: 1.0,
-
                capacity: 64,
+
                fill_rate: 5.0,
+
                capacity: 256,
            },
        }
    }
@@ -326,6 +326,10 @@ impl Config {
            .map(|ca| &ca.addr)
    }

+
    pub fn peers(&self) -> impl Iterator<Item = NodeId> + '_ {
+
        self.connect.iter().cloned().map(|p| p.id)
+
    }
+

    pub fn is_persistent(&self, id: &NodeId) -> bool {
        self.peer(id).is_some()
    }