Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: rate limiter for channel reads
Fintan Halpenny committed 1 year ago
commit a90aabb1fe2c5d6d969f6c3236f45f93bc6bf3c4
parent f13afe491d169004159a033c4ad7548a7ba76271
10 files changed +237 -20
modified Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-
version = 3
+
version = 4

[[package]]
name = "adler2"
@@ -309,6 +309,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"

[[package]]
+
name = "bytesize"
+
version = "2.0.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a3c8f83209414aacf0eeae3cf730b18d6981697fba62f200fcfb92b9f082acba"
+
dependencies = [
+
 "serde",
+
]
+

+
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2173,6 +2182,7 @@ version = "0.14.0"
dependencies = [
 "amplify",
 "base64 0.21.7",
+
 "bytesize",
 "chrono",
 "colored",
 "crossbeam-channel",
modified radicle-cli/examples/rad-config.md
@@ -47,7 +47,8 @@ $ rad config
      "connection": {
        "inbound": 128,
        "outbound": 16
-
      }
+
      },
+
      "fetchPackReceive": "500.0 MiB"
    },
    "workers": 8,
    "seedingPolicy": {
modified radicle-node/src/service.rs
@@ -1098,7 +1098,13 @@ where
            refs_at: refs_at.clone(),
            subscribers: vec![],
        });
-
        self.outbox.fetch(session, rid, refs_at, timeout);
+
        self.outbox.fetch(
+
            session,
+
            rid,
+
            refs_at,
+
            timeout,
+
            self.config.limits.fetch_pack_receive,
+
        );

        Ok(fetching)
    }
modified radicle-node/src/service/io.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::time;

use log::*;
+
use radicle::node::config::FetchPackSizeLimit;
use radicle::storage::refs::RefsAt;

use crate::prelude::*;
@@ -30,6 +31,8 @@ pub enum Io {
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
+
        /// Limit the number of bytes fetched.
+
        reader_limit: FetchPackSizeLimit,
    },
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
@@ -124,6 +127,7 @@ impl Outbox {
        rid: RepoId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
+
        reader_limit: FetchPackSizeLimit,
    ) {
        peer.fetching(rid);

@@ -143,6 +147,7 @@ impl Outbox {
            refs_at,
            remote: peer.id,
            timeout,
+
            reader_limit,
        });
    }

modified radicle-node/src/tests/e2e.rs
@@ -1493,3 +1493,44 @@ fn test_multiple_offline_inits() {
        assert!(projects.contains(&repo.rid), "Bob is missing {}", repo.rid);
    }
}
+

+
#[test]
+
fn test_channel_reader_limit() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Node::init(tmp.path(), config::relay("alice"));
+
    let limits = radicle::node::config::Limits {
+
        fetch_pack_receive: radicle::node::config::FetchPackSizeLimit::bytes(1000),
+
        ..radicle::node::config::Limits::default()
+
    };
+
    let bob = Node::init(
+
        tmp.path(),
+
        Config {
+
            limits,
+
            ..config::relay("bob")
+
        },
+
    );
+
    let acme = alice.project("acme", "");
+

+
    let mut alice = alice.spawn();
+
    let mut bob = bob.spawn();
+

+
    alice.connect(&bob);
+
    converge([&alice, &bob]);
+

+
    let updated = bob.handle.seed(acme, Scope::All).unwrap();
+
    assert!(updated);
+

+
    let result = bob.handle.fetch(acme, alice.id, DEFAULT_TIMEOUT).unwrap();
+
    assert!(!result.is_success());
+

+
    let FetchResult::Failed { reason } = result else {
+
        panic!("fetch result must be failed")
+
    };
+
    assert!(
+
        reason.contains("Failed to consume the pack sent by the remote"),
+
        "actual: {}",
+
        reason
+
    );
+
}
modified radicle-node/src/wire/protocol.rs
@@ -35,7 +35,7 @@ use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
use crate::wire::Encode;
use crate::worker;
-
use crate::worker::{ChannelEvent, FetchRequest, FetchResult, Task, TaskResult};
+
use crate::worker::{ChannelEvent, ChannelsConfig, FetchRequest, FetchResult, Task, TaskResult};
use crate::Link;

/// NoiseXK handshake pattern.
@@ -128,22 +128,22 @@ impl Streams {
    }

    /// Open a new stream.
-
    fn open(&mut self, timeout: time::Duration) -> (StreamId, worker::Channels) {
+
    fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
        self.seq += 1;

        let id = StreamId::git(self.link)
            .nth(self.seq)
            .expect("Streams::open: too many streams");
        let channels = self
-
            .register(id, timeout)
+
            .register(id, config)
            .expect("Streams::open: stream was already open");

        (id, channels)
    }

    /// Register an open stream.
-
    fn register(&mut self, stream: StreamId, timeout: time::Duration) -> Option<worker::Channels> {
-
        let (wire, worker) = worker::Channels::pair(timeout)
+
    fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
+
        let (wire, worker) = worker::Channels::pair(config)
            .expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
@@ -762,8 +762,12 @@ where
                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
                                metrics.streams_opened += 1;
                                metrics.received_fetch_requests += 1;
-

-
                                let Some(channels) = streams.register(stream, FETCH_TIMEOUT) else {
+
                                let reader_limit = self.service.config().limits.fetch_pack_receive;
+
                                let Some(channels) = streams.register(
+
                                    stream,
+
                                    ChannelsConfig::new(FETCH_TIMEOUT)
+
                                        .with_reader_limit(reader_limit),
+
                                ) else {
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
                                    continue;
                                };
@@ -1062,6 +1066,7 @@ where
                    rid,
                    remote,
                    timeout,
+
                    reader_limit,
                    refs_at,
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
@@ -1076,7 +1081,8 @@ where
                        log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
                        continue;
                    };
-
                    let (stream, channels) = streams.open(timeout);
+
                    let (stream, channels) =
+
                        streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));

                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");

modified radicle-node/src/worker.rs
@@ -23,7 +23,7 @@ use crate::service::policy;
use crate::service::policy::SeedingPolicy;
use crate::wire::StreamId;

-
pub use channels::{ChannelEvent, Channels};
+
pub use channels::{ChannelEvent, Channels, ChannelsConfig};

/// Worker pool configuration.
pub struct Config {
modified radicle-node/src/worker/channels.rs
@@ -4,6 +4,7 @@ use std::ops::Deref;
use std::{fmt, io, time};

use crossbeam_channel as chan;
+
use radicle::node::config::FetchPackSizeLimit;
use radicle::node::NodeId;

use crate::runtime::Handle;
@@ -14,6 +15,32 @@ use crate::wire::StreamId;
/// upload-pack's stdout, the data chunks are of a maximum size of 8192 bytes.
pub const MAX_WORKER_CHANNEL_SIZE: usize = 64;

+
#[derive(Clone, Copy, Debug)]
+
pub struct ChannelsConfig {
+
    timeout: time::Duration,
+
    reader_limit: FetchPackSizeLimit,
+
}
+

+
impl ChannelsConfig {
+
    pub fn new(timeout: time::Duration) -> Self {
+
        Self {
+
            timeout,
+
            reader_limit: FetchPackSizeLimit::default(),
+
        }
+
    }
+

+
    pub fn with_timeout(self, timeout: time::Duration) -> Self {
+
        Self { timeout, ..self }
+
    }
+

+
    pub fn with_reader_limit(self, reader_limit: FetchPackSizeLimit) -> Self {
+
        Self {
+
            reader_limit,
+
            ..self
+
        }
+
    }
+
}
+

/// A reader and writer pair that can be used in the fetch protocol.
///
/// It implements [`radicle::fetch::transport::ConnectionStream`] to
@@ -92,20 +119,23 @@ impl<T: AsRef<[u8]>> Channels<T> {
    pub fn new(
        sender: chan::Sender<ChannelEvent<T>>,
        receiver: chan::Receiver<ChannelEvent<T>>,
-
        timeout: time::Duration,
+
        config: ChannelsConfig,
    ) -> Self {
-
        let sender = ChannelWriter { sender, timeout };
-
        let receiver = ChannelReader::new(receiver, timeout);
+
        let sender = ChannelWriter {
+
            sender,
+
            timeout: config.timeout,
+
        };
+
        let receiver = ChannelReader::new(receiver, config.timeout, config.reader_limit);

        Self { sender, receiver }
    }

-
    pub fn pair(timeout: time::Duration) -> io::Result<(Channels<T>, Channels<T>)> {
+
    pub fn pair(config: ChannelsConfig) -> io::Result<(Channels<T>, Channels<T>)> {
        let (l_send, r_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);
        let (r_send, l_recv) = chan::bounded::<ChannelEvent<T>>(MAX_WORKER_CHANNEL_SIZE);

-
        let l = Channels::new(l_send, l_recv, timeout);
-
        let r = Channels::new(r_send, r_recv, timeout);
+
        let l = Channels::new(l_send, l_recv, config);
+
        let r = Channels::new(r_send, r_recv, config);

        Ok((l, r))
    }
@@ -123,12 +153,41 @@ impl<T: AsRef<[u8]>> Channels<T> {
    }
}

+
#[derive(Clone, Copy, Debug)]
+
pub struct ReadLimiter {
+
    limit: FetchPackSizeLimit,
+
    total_read: usize,
+
}
+

+
impl ReadLimiter {
+
    pub fn new(limit: FetchPackSizeLimit) -> Self {
+
        Self {
+
            limit,
+
            total_read: 0,
+
        }
+
    }
+

+
    pub fn read(&mut self, bytes: usize) -> io::Result<()> {
+
        self.total_read = self.total_read.saturating_add(bytes);
+
        log::trace!(target: "worker", "limit {}, total bytes read: {}", self.limit, self.total_read);
+
        if self.limit.exceeded_by(self.total_read) {
+
            Err(io::Error::new(
+
                io::ErrorKind::Other,
+
                "sender has exceeded number of allowed bytes, aborting read",
+
            ))
+
        } else {
+
            Ok(())
+
        }
+
    }
+
}
+

/// Wraps a [`chan::Receiver`] and provides it with [`io::Read`].
#[derive(Clone)]
pub struct ChannelReader<T = Vec<u8>> {
    buffer: io::Cursor<Vec<u8>>,
    receiver: chan::Receiver<ChannelEvent<T>>,
    timeout: time::Duration,
+
    limiter: ReadLimiter,
}

impl<T> Deref for ChannelReader<T> {
@@ -140,11 +199,16 @@ impl<T> Deref for ChannelReader<T> {
}

impl<T: AsRef<[u8]>> ChannelReader<T> {
-
    pub fn new(receiver: chan::Receiver<ChannelEvent<T>>, timeout: time::Duration) -> Self {
+
    pub fn new(
+
        receiver: chan::Receiver<ChannelEvent<T>>,
+
        timeout: time::Duration,
+
        limit: FetchPackSizeLimit,
+
    ) -> Self {
        Self {
            buffer: io::Cursor::new(Vec::new()),
            receiver,
            timeout,
+
            limiter: ReadLimiter::new(limit),
        }
    }
}
@@ -152,6 +216,7 @@ impl<T: AsRef<[u8]>> ChannelReader<T> {
impl Read for ChannelReader<Vec<u8>> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let read = self.buffer.read(buf)?;
+
        self.limiter.read(read)?;
        if read > 0 {
            return Ok(read);
        }
modified radicle/Cargo.toml
@@ -16,6 +16,7 @@ logger = ["colored", "chrono"]
[dependencies]
amplify = { version = "4.0.0", default-features = false, features = ["std"] }
base64 = { version = "0.21.3" }
+
bytesize = { version = "2", features = ["serde"] }
crossbeam-channel = { version = "0.5.6" }
cyphernet = { version = "0.5.0", features = ["tor", "dns", "p2p-ed25519"] }
fastrand = { version = "2.0.0" }
modified radicle/src/node/config.rs
@@ -1,6 +1,7 @@
use std::collections::HashSet;
-
use std::net;
use std::ops::Deref;
+
use std::str::FromStr;
+
use std::{fmt, net};

use cyphernet::addr::PeerAddr;
use localtime::LocalDuration;
@@ -114,6 +115,9 @@ pub struct Limits {
    /// Connection limits.
    #[serde(default)]
    pub connection: ConnectionLimits,
+
    /// Channel limits.
+
    #[serde(default)]
+
    pub fetch_pack_receive: FetchPackSizeLimit,
}

impl Default for Limits {
@@ -126,10 +130,88 @@ impl Default for Limits {
            max_open_files: 4096,
            rate: RateLimits::default(),
            connection: ConnectionLimits::default(),
+
            fetch_pack_receive: FetchPackSizeLimit::default(),
        }
    }
}

+
/// Limiter for byte streams.
+
///
+
/// Default: 500MiB
+
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
#[serde(into = "String", try_from = "String")]
+
pub struct FetchPackSizeLimit {
+
    limit: bytesize::ByteSize,
+
}
+

+
impl From<bytesize::ByteSize> for FetchPackSizeLimit {
+
    fn from(limit: bytesize::ByteSize) -> Self {
+
        Self { limit }
+
    }
+
}
+

+
impl From<FetchPackSizeLimit> for String {
+
    fn from(limit: FetchPackSizeLimit) -> Self {
+
        limit.to_string()
+
    }
+
}
+

+
impl TryFrom<String> for FetchPackSizeLimit {
+
    type Error = String;
+

+
    fn try_from(s: String) -> Result<Self, Self::Error> {
+
        s.parse()
+
    }
+
}
+

+
impl FromStr for FetchPackSizeLimit {
+
    type Err = String;
+

+
    fn from_str(s: &str) -> Result<Self, Self::Err> {
+
        Ok(FetchPackSizeLimit { limit: s.parse()? })
+
    }
+
}
+

+
impl fmt::Display for FetchPackSizeLimit {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "{}", self.limit)
+
    }
+
}
+

+
impl FetchPackSizeLimit {
+
    /// New `FetchPackSizeLimit` in bytes.
+
    pub fn bytes(size: u64) -> Self {
+
        bytesize::ByteSize::b(size).into()
+
    }
+

+
    /// New `FetchPackSizeLimit` in kibibytes.
+
    pub fn kibibytes(size: u64) -> Self {
+
        bytesize::ByteSize::kib(size).into()
+
    }
+

+
    /// New `FetchPackSizeLimit` in mebibytes.
+
    pub fn mebibytes(size: u64) -> Self {
+
        bytesize::ByteSize::mib(size).into()
+
    }
+

+
    /// New `FetchPackSizeLimit` in gibibytes.
+
    pub fn gibibytes(size: u64) -> Self {
+
        bytesize::ByteSize::gib(size).into()
+
    }
+

+
    /// Check if this limit is exceeded by the number of `bytes` provided.
+
    pub fn exceeded_by(&self, bytes: usize) -> bool {
+
        bytes >= self.limit.as_u64() as usize
+
    }
+
}
+

+
impl Default for FetchPackSizeLimit {
+
    fn default() -> Self {
+
        Self::mebibytes(500)
+
    }
+
}
+

/// Connection limits.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]