Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: rate limiter for channel reads
Merged fintohaps opened 1 year ago

This introduces rate limits for the ChannelReader to limit DDoS attacks and attempts to upload repositories that are larger than a node is will to permit.

The limiter sets the total number of bytes it is will to accept in a single exchange, defaulting to 500MB. This means that initial fetches will prevent large repositories, but is plenty for new packfile data to be sent in subsequent fetch exchanges.

The limit can be configured within the node’s config file, under the limits.

10 files changed +237 -20 f13afe49 a90aabb1
modified Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-
version = 3
+
version = 4

[[package]]
name = "adler2"
@@ -309,6 +309,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"

[[package]]
+
name = "bytesize"
+
version = "2.0.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a3c8f83209414aacf0eeae3cf730b18d6981697fba62f200fcfb92b9f082acba"
+
dependencies = [
+
 "serde",
+
]
+

+
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2173,6 +2182,7 @@ version = "0.14.0"
dependencies = [
 "amplify",
 "base64 0.21.7",
+
 "bytesize",
 "chrono",
 "colored",
 "crossbeam-channel",
modified radicle-cli/examples/rad-config.md
@@ -47,7 +47,8 @@ $ rad config
      "connection": {
        "inbound": 128,
        "outbound": 16
-
      }
+
      },
+
      "fetchPackReceive": "500.0 MiB"
    },
    "workers": 8,
    "seedingPolicy": {
modified radicle-node/src/service.rs
@@ -1098,7 +1098,13 @@ where
            refs_at: refs_at.clone(),
            subscribers: vec![],
        });
-
        self.outbox.fetch(session, rid, refs_at, timeout);
+
        self.outbox.fetch(
+
            session,
+
            rid,
+
            refs_at,
+
            timeout,
+
            self.config.limits.fetch_pack_receive,
+
        );

        Ok(fetching)
    }
modified radicle-node/src/service/io.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::time;

use log::*;
+
use radicle::node::config::FetchPackSizeLimit;
use radicle::storage::refs::RefsAt;

use crate::prelude::*;
@@ -30,6 +31,8 @@ pub enum Io {
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
+
        /// Limit the number of bytes fetched.
+
        reader_limit: FetchPackSizeLimit,
    },
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
@@ -124,6 +127,7 @@ impl Outbox {
        rid: RepoId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
+
        reader_limit: FetchPackSizeLimit,
    ) {
        peer.fetching(rid);

@@ -143,6 +147,7 @@ impl Outbox {
            refs_at,
            remote: peer.id,
            timeout,
+
            reader_limit,
        });
    }

modified radicle-node/src/tests/e2e.rs
@@ -1493,3 +1493,44 @@ fn test_multiple_offline_inits() {
        assert!(projects.contains(&repo.rid), "Bob is missing {}", repo.rid);
    }
}
+

+
#[test]
+
fn test_channel_reader_limit() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Node::init(tmp.path(), config::relay("alice"));
+
    let limits = radicle::node::config::Limits {
+
        fetch_pack_receive: radicle::node::config::FetchPackSizeLimit::bytes(1000),
+
        ..radicle::node::config::Limits::default()
+
    };
+
    let bob = Node::init(
+
        tmp.path(),
+
        Config {
+
            limits,
+
            ..config::relay("bob")
+
        },
+
    );
+
    let acme = alice.project("acme", "");
+

+
    let mut alice = alice.spawn();
+
    let mut bob = bob.spawn();
+

+
    alice.connect(&bob);
+
    converge([&alice, &bob]);
+

+
    let updated = bob.handle.seed(acme, Scope::All).unwrap();
+
    assert!(updated);
+

+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
+
    assert!(!result.is_success());
+

+
    let FetchResult::Failed { reason } = result else {
+
        panic!("fetch result must be failed")
+
    };
+
    assert!(
+
        reason.contains("Failed to consume the pack sent by the remote"),
+
        "actual: {}",
+
        reason
+
    );
+
}
modified radicle-node/src/wire/protocol.rs
@@ -35,7 +35,7 @@ use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
use crate::wire::Encode;
use crate::worker;
-
use crate::worker::{ChannelEvent, FetchRequest, FetchResult, Task, TaskResult};
+
use crate::worker::{ChannelEvent, ChannelsConfig, FetchRequest, FetchResult, Task, TaskResult};
use crate::Link;

/// NoiseXK handshake pattern.
@@ -128,22 +128,22 @@ impl Streams {
    }

    /// Open a new stream.
-
    fn open(&mut self, timeout: time::Duration) -> (StreamId, worker::Channels) {
+
    fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
        self.seq += 1;

        let id = StreamId::git(self.link)
            .nth(self.seq)
            .expect("Streams::open: too many streams");
        let channels = self
-
            .register(id, timeout)
+
            .register(id, config)
            .expect("Streams::open: stream was already open");

        (id, channels)
    }

    /// Register an open stream.
-
    fn register(&mut self, stream: StreamId, timeout: time::Duration) -> Option<worker::Channels> {
-
        let (wire, worker) = worker::Channels::pair(timeout)
+
    fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
+
        let (wire, worker) = worker::Channels::pair(config)
            .expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
@@ -762,8 +762,12 @@ where
                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
                                metrics.streams_opened += 1;
                                metrics.received_fetch_requests += 1;
-

-
                                let Some(channels) = streams.register(stream, FETCH_TIMEOUT) else {
+
                                let reader_limit = self.service.config().limits.fetch_pack_receive;
+
                                let Some(channels) = streams.register(
+
                                    stream,
+
                                    ChannelsConfig::new(FETCH_TIMEOUT)
+
                                        .with_reader_limit(reader_limit),
+
                                ) else {
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
                                    continue;
                                };
@@ -1062,6 +1066,7 @@ where
                    rid,
                    remote,
                    timeout,
+
                    reader_limit,
                    refs_at,
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
@@ -1076,7 +1081,8 @@ where
                        log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
                        continue;
                    };
-
                    let (stream, channels) = streams.open(timeout);
+
                    let (stream, channels) =
+
                        streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));

                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");

modified radicle-node/src/worker.rs
@@ -23,7 +23,7 @@ use crate::service::policy;
use crate::service::policy::SeedingPolicy;
use crate::wire::StreamId;

-
pub use channels::{ChannelEvent, Channels};
+
pub use channels::{ChannelEvent, Channels, ChannelsConfig};

/// Worker pool configuration.
pub struct Config {
modified radicle-node/src/worker/channels.rs
@@ -4,6 +4,7 @@ use std::ops::Deref;
use std::{fmt, io, time};

use crossbeam_channel as chan;
+
use radicle::node::config::FetchPackSizeLimit;
use radicle::node::NodeId;

use crate::runtime::Handle;
@@ -14,6 +15,32 @@ use crate::wire::StreamId;
/// upload-pack's stdout, the data chunks are of a maximum size of 8192 bytes.
pub const MAX_WORKER_CHANNEL_SIZE: usize = 64;

+
#[derive(Clone, Copy, Debug)]
+
pub struct ChannelsConfig {
+
    timeout: time::Duration,
+
    reader_limit: FetchPackSizeLimit,
+
}
+

+
impl ChannelsConfig {
+
    pub fn new(timeout: time::Duration) -> Self {
+
        Self {
+
            timeout,
+
            reader_limit: FetchPackSizeLimit::default(),
+
        }
+
    }
+

+
    pub fn with_timeout(self, timeout: time::Duration) -> Self {
+
        Self { timeout, ..self }
+
    }
+

+
    pub fn with_reader_limit(self, reader_limit: FetchPackSizeLimit) -> Self {
+
        Self {
+
            reader_limit,
+
            ..self
+
        }
+
    }
+
}
+

/// A reader and writer pair that can be used in the fetch protocol.
///
/// It implements [`radicle::fetch::transport::ConnectionStream`] to
@@ -92,20 +119,23 @@ impl<T: AsRef<[u8]>> Channels<T> {
    pub fn new(
        sender: chan::Sender<ChannelEvent<T>>,
        receiver: chan::Receiver<ChannelEvent<T>>,
-
        timeout: time::Duration,
+
        config: ChannelsConfig,
    ) -> Self {
-
        let sender = ChannelWriter { sender, timeout };
-
        let receiver = ChannelReader::new(receiver, timeout);
+
        let sender = ChannelWriter {
+
            sender,
+
            timeout: config.timeout,
+
        };
+
        let receiver = ChannelReader::new(receiver, config.timeout, config.reader_limit);

        Self { sender, receiver }
    }

-
    pub fn pair(timeout: time::Duration) -> io::Result<(Channels<T>, Channels<T>)> {
+
    pub fn pair(config: ChannelsConfig) -> io::Result<(Channels<T>, Channels<T>)> {
        let (l_send, r_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);
        let (r_send, l_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);

-
        let l = Channels::new(l_send, l_recv, timeout);
-
        let r = Channels::new(r_send, r_recv, timeout);
+
        let l = Channels::new(l_send, l_recv, config);
+
        let r = Channels::new(r_send, r_recv, config);

        Ok((l, r))
    }
@@ -123,12 +153,41 @@ impl<T: AsRef<[u8]>> Channels<T> {
    }
}

+
#[derive(Clone, Copy, Debug)]
+
pub struct ReadLimiter {
+
    limit: FetchPackSizeLimit,
+
    total_read: usize,
+
}
+

+
impl ReadLimiter {
+
    pub fn new(limit: FetchPackSizeLimit) -> Self {
+
        Self {
+
            limit,
+
            total_read: 0,
+
        }
+
    }
+

+
    pub fn read(&mut self, bytes: usize) -> io::Result<()> {
+
        self.total_read = self.total_read.saturating_add(bytes);
+
        log::trace!(target: "worker", "limit {}, total bytes read: {}", self.limit, self.total_read);
+
        if self.limit.exceeded_by(self.total_read) {
+
            Err(io::Error::new(
+
                io::ErrorKind::Other,
+
                "sender has exceeded number of allowed bytes, aborting read",
+
            ))
+
        } else {
+
            Ok(())
+
        }
+
    }
+
}
+

/// Wraps a [`chan::Receiver`] and provides it with [`io::Read`].
#[derive(Clone)]
pub struct ChannelReader<T = Vec<u8>> {
    buffer: io::Cursor<Vec<u8>>,
    receiver: chan::Receiver<ChannelEvent<T>>,
    timeout: time::Duration,
+
    limiter: ReadLimiter,
}

impl<T> Deref for ChannelReader<T> {
@@ -140,11 +199,16 @@ impl<T> Deref for ChannelReader<T> {
}

impl<T: AsRef<[u8]>> ChannelReader<T> {
-
    pub fn new(receiver: chan::Receiver<ChannelEvent<T>>, timeout: time::Duration) -> Self {
+
    pub fn new(
+
        receiver: chan::Receiver<ChannelEvent<T>>,
+
        timeout: time::Duration,
+
        limit: FetchPackSizeLimit,
+
    ) -> Self {
        Self {
            buffer: io::Cursor::new(Vec::new()),
            receiver,
            timeout,
+
            limiter: ReadLimiter::new(limit),
        }
    }
}
@@ -152,6 +216,7 @@ impl<T: AsRef<[u8]>> ChannelReader<T> {
impl Read for ChannelReader<Vec<u8>> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let read = self.buffer.read(buf)?;
+
        self.limiter.read(read)?;
        if read > 0 {
            return Ok(read);
        }
modified radicle/Cargo.toml
@@ -16,6 +16,7 @@ logger = ["colored", "chrono"]
[dependencies]
amplify = { version = "4.0.0", default-features = false, features = ["std"] }
base64 = { version = "0.21.3" }
+
bytesize = { version = "2", features = ["serde"] }
crossbeam-channel = { version = "0.5.6" }
cyphernet = { version = "0.5.0", features = ["tor", "dns", "p2p-ed25519"] }
fastrand = { version = "2.0.0" }
modified radicle/src/node/config.rs
@@ -1,6 +1,7 @@
use std::collections::HashSet;
-
use std::net;
use std::ops::Deref;
+
use std::str::FromStr;
+
use std::{fmt, net};

use cyphernet::addr::PeerAddr;
use localtime::LocalDuration;
@@ -114,6 +115,9 @@ pub struct Limits {
    /// Connection limits.
    #[serde(default)]
    pub connection: ConnectionLimits,
+
    /// Channel limits.
+
    #[serde(default)]
+
    pub fetch_pack_receive: FetchPackSizeLimit,
}

impl Default for Limits {
@@ -126,10 +130,88 @@ impl Default for Limits {
            max_open_files: 4096,
            rate: RateLimits::default(),
            connection: ConnectionLimits::default(),
+
            fetch_pack_receive: FetchPackSizeLimit::default(),
        }
    }
}

+
/// Limiter for byte streams.
+
///
+
/// Default: 500MiB
+
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
#[serde(into = "String", try_from = "String")]
+
pub struct FetchPackSizeLimit {
+
    limit: bytesize::ByteSize,
+
}
+

+
impl From<bytesize::ByteSize> for FetchPackSizeLimit {
+
    fn from(limit: bytesize::ByteSize) -> Self {
+
        Self { limit }
+
    }
+
}
+

+
impl From<FetchPackSizeLimit> for String {
+
    fn from(limit: FetchPackSizeLimit) -> Self {
+
        limit.to_string()
+
    }
+
}
+

+
impl TryFrom<String> for FetchPackSizeLimit {
+
    type Error = String;
+

+
    fn try_from(s: String) -> Result<Self, Self::Error> {
+
        s.parse()
+
    }
+
}
+

+
impl FromStr for FetchPackSizeLimit {
+
    type Err = String;
+

+
    fn from_str(s: &str) -> Result<Self, Self::Err> {
+
        Ok(FetchPackSizeLimit { limit: s.parse()? })
+
    }
+
}
+

+
impl fmt::Display for FetchPackSizeLimit {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "{}", self.limit)
+
    }
+
}
+

+
impl FetchPackSizeLimit {
+
    /// New `FetchPackSizeLimit` in bytes.
+
    pub fn bytes(size: u64) -> Self {
+
        bytesize::ByteSize::b(size).into()
+
    }
+

+
    /// New `FetchPackSizeLimit` in kibibytes.
+
    pub fn kibibytes(size: u64) -> Self {
+
        bytesize::ByteSize::kib(size).into()
+
    }
+

+
    /// New `FetchPackSizeLimit` in mebibytes.
+
    pub fn mebibytes(size: u64) -> Self {
+
        bytesize::ByteSize::mib(size).into()
+
    }
+

+
    /// New `FetchPackSizeLimit` in gibibytes.
+
    pub fn gibibytes(size: u64) -> Self {
+
        bytesize::ByteSize::gib(size).into()
+
    }
+

+
    /// Check if this limit is exceeded by the number of `bytes` provided.
+
    pub fn exceeded_by(&self, bytes: usize) -> bool {
+
        bytes >= self.limit.as_u64() as usize
+
    }
+
}
+

+
impl Default for FetchPackSizeLimit {
+
    fn default() -> Self {
+
        Self::mebibytes(500)
+
    }
+
}
+

/// Connection limits.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]