Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add setting for limiting fetch concurrency
Alexis Sellier committed 3 years ago
commit f4257eb9537b3a7077562d9907f3f2432fdf4ca8
parent ea4294c79f64a2a5739b1fca98d98427b91846a3
7 files changed +151 -32
modified radicle-node/src/main.rs
@@ -21,14 +21,14 @@ Usage

Options

-
    --connect          <peer>        Connect to the given peer address on start
-
    --external-address <address>     Publicly accessible address (default 0.0.0.0:8776)
-
    --git-daemon       <address>     Address to bind git-daemon to (default 0.0.0.0:9418)
-
    --tracking-policy  (track|block) Default tracking policy
-
    --tracking-scope   (trusted|all) Default scope for tracking policies
-
    --force                          Force start even if an existing control socket is found
-
    --help                           Print help
-
    --listen           <address>     Address to listen on
+
    --connect            <peer>         Connect to the given peer address on start
+
    --external-address   <address>      Publicly accessible address (default 0.0.0.0:8776)
+
    --git-daemon         <address>      Address to bind git-daemon to (default 0.0.0.0:9418)
+
    --tracking-policy    (track|block)  Default tracking policy
+
    --tracking-scope     (trusted|all)  Default scope for tracking policies
+
    --force                             Force start even if an existing control socket is found
+
    --help                              Print help
+
    --listen             <address>      Address to listen on

"#;

@@ -96,6 +96,9 @@ impl Options {
                Long("limit-routing-max-size") => {
                    limits.routing_max_size = parser.value()?.parse()?;
                }
+
                Long("limit-fetch-concurrency") => {
+
                    limits.fetch_concurrency = parser.value()?.parse()?;
+
                }
                Long("listen") => {
                    let addr = parser.value()?.parse()?;
                    listen.push(addr);
modified radicle-node/src/service.rs
@@ -567,6 +567,9 @@ where
        let seed = session.id;

        match session.fetch(rid) {
+
            session::FetchResult::Queued => {
+
                debug!(target: "service", "Fetch queued for {rid} with {seed}..");
+
            }
            session::FetchResult::Ready => {
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

@@ -654,11 +657,6 @@ where
                _ => log::debug!(target: "service", "Nothing to announce, no refs were updated.."),
            }
        }
-

-
        // Go back to "idle".
-
        if let Some(s) = self.sessions.get_mut(&remote) {
-
            s.fetched(rid);
-
        }
        // TODO: Since this fetch could be either a full clone
        // or simply a ref update, we need to either announce
        // new inventory, or new refs. Right now, we announce
@@ -666,11 +664,15 @@ where
        //
        // Announce the newly fetched repository to the
        // network, if necessary.
-
        //
-
        // Nb. This needs to be run after we've switched back
-
        // to the gossip protocol, otherwise the messages will
-
        // be queued.
        self.sync_and_announce();
+

+
        if let Some(s) = self.sessions.get_mut(&remote) {
+
            if let Some(dequeued) = s.fetched(rid) {
+
                log::debug!(target: "service", "Dequeued fetch {dequeued} from session {remote}..");
+

+
                self.fetch(dequeued, &remote);
+
            }
+
        }
    }

    pub fn accepted(&mut self, _addr: net::SocketAddr) {
@@ -713,6 +715,7 @@ where
                        self.config.is_persistent(&remote),
                        self.rng.clone(),
                        self.clock,
+
                        self.config.limits.clone(),
                    ));
                    self.reactor.write_all(peer, msgs);
                }
@@ -1275,8 +1278,15 @@ where
        }
        let persistent = self.config.is_persistent(&nid);

-
        self.sessions
-
            .insert(nid, Session::outbound(nid, persistent, self.rng.clone()));
+
        self.sessions.insert(
+
            nid,
+
            Session::outbound(
+
                nid,
+
                persistent,
+
                self.rng.clone(),
+
                self.config.limits.clone(),
+
            ),
+
        );
        self.reactor.connect(nid, addr);

        true
modified radicle-node/src/service/config.rs
@@ -20,6 +20,8 @@ pub struct Limits {
    pub routing_max_size: usize,
    /// How long to keep a routing table entry before being pruned.
    pub routing_max_age: LocalDuration,
+
    /// Maximum number of concurrent fetches per per connection.
+
    pub fetch_concurrency: usize,
}

impl Default for Limits {
@@ -27,6 +29,7 @@ impl Default for Limits {
        Self {
            routing_max_size: 1000,
            routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
            fetch_concurrency: 1,
        }
    }
}
modified radicle-node/src/service/session.rs
@@ -1,6 +1,7 @@
-
use std::collections::HashSet;
+
use std::collections::{HashSet, VecDeque};
use std::fmt;

+
use crate::service::config::Limits;
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Id, LocalTime, NodeId, Reactor, Rng};
@@ -64,6 +65,8 @@ impl fmt::Display for State {
/// Return value of [`Session::fetch`].
#[derive(Debug)]
pub enum FetchResult {
+
    /// Maximum concurrent fetches reached.
+
    Queued,
    /// We are already fetching the given repo from this peer.
    AlreadyFetching,
    /// Ok, ready to fetch.
@@ -118,14 +121,17 @@ pub struct Session {
    pub subscribe: Option<message::Subscribe>,
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,
+
    /// Fetch queue.
+
    pub queue: VecDeque<Id>,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
    /// upon successful connection.
    attempts: usize,
-

    /// Source of entropy.
    rng: Rng,
+
    /// Protocol limits.
+
    limits: Limits,
}

impl fmt::Display for Session {
@@ -148,7 +154,7 @@ impl fmt::Display for Session {
}

impl Session {
-
    pub fn outbound(id: NodeId, persistent: bool, rng: Rng) -> Self {
+
    pub fn outbound(id: NodeId, persistent: bool, rng: Rng, limits: Limits) -> Self {
        Self {
            id,
            state: State::Initial,
@@ -156,12 +162,20 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
+
            queue: VecDeque::default(),
            attempts: 1,
            rng,
+
            limits,
        }
    }

-
    pub fn inbound(id: NodeId, persistent: bool, rng: Rng, time: LocalTime) -> Self {
+
    pub fn inbound(
+
        id: NodeId,
+
        persistent: bool,
+
        rng: Rng,
+
        time: LocalTime,
+
        limits: Limits,
+
    ) -> Self {
        Self {
            id,
            state: State::Connected {
@@ -173,8 +187,10 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
+
            queue: VecDeque::default(),
            attempts: 0,
            rng,
+
            limits,
        }
    }

@@ -200,22 +216,32 @@ impl Session {

    pub fn fetch(&mut self, rid: Id) -> FetchResult {
        if let State::Connected { fetching, .. } = &mut self.state {
-
            if fetching.insert(rid) {
-
                FetchResult::Ready
-
            } else {
-
                FetchResult::AlreadyFetching
+
            if fetching.contains(&rid) || self.queue.contains(&rid) {
+
                return FetchResult::AlreadyFetching;
+
            }
+
            if fetching.len() >= self.limits.fetch_concurrency {
+
                self.queue.push_back(rid);
+
                return FetchResult::Queued;
            }
+
            fetching.insert(rid);
+

+
            FetchResult::Ready
        } else {
            FetchResult::NotConnected
        }
    }

-
    pub fn fetched(&mut self, rid: Id) {
+
    pub fn fetched(&mut self, rid: Id) -> Option<Id> {
        if let State::Connected { fetching, .. } = &mut self.state {
            if !fetching.remove(&rid) {
                log::error!(target: "service", "Fetched unknown repository {rid}");
            }
+
            // Dequeue the next fetch, if any.
+
            if let Some(rid) = self.queue.pop_front() {
+
                return Some(rid);
+
            }
        }
+
        None
    }

    pub fn to_attempted(&mut self) {
modified radicle-node/src/test/peer.rs
@@ -23,7 +23,7 @@ use crate::service::tracking::{Policy, Scope};
use crate::service::*;
use crate::storage::git::transport::remote;
use crate::storage::Inventory;
-
use crate::storage::{RemoteId, WriteStorage};
+
use crate::storage::{Namespaces, RemoteId, WriteStorage};
use crate::test::arbitrary;
use crate::test::simulator;
use crate::test::storage::MockStorage;
@@ -351,4 +351,20 @@ where
    pub fn outbox(&mut self) -> impl Iterator<Item = Io> + '_ {
        iter::from_fn(|| self.service.reactor().next())
    }
+

+
    /// Get a draining iterator over the peer's I/O outbox, which only returns fetches.
+
    pub fn fetches(&mut self) -> impl Iterator<Item = (Id, NodeId, Namespaces)> + '_ {
+
        iter::from_fn(|| self.service.reactor().next()).filter_map(|io| {
+
            if let Io::Fetch {
+
                rid,
+
                remote,
+
                namespaces,
+
            } = io
+
            {
+
                Some((rid, remote, namespaces))
+
            } else {
+
                None
+
            }
+
        })
+
    }
}
modified radicle-node/src/tests.rs
@@ -22,6 +22,7 @@ use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
use crate::storage::git::Storage;
+
use crate::storage::Namespaces;
use crate::storage::ReadStorage;
use crate::test::arbitrary;
use crate::test::assert_matches;
@@ -269,6 +270,7 @@ fn test_inventory_pruning() {
            limits: Limits {
                routing_max_size: 0,
                routing_max_age: LocalDuration::from_secs(0),
+
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
@@ -279,6 +281,7 @@ fn test_inventory_pruning() {
            limits: Limits {
                routing_max_size: 0,
                routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
@@ -289,6 +292,7 @@ fn test_inventory_pruning() {
            limits: Limits {
                routing_max_size: 50,
                routing_max_age: LocalDuration::from_mins(0),
+
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
@@ -299,6 +303,7 @@ fn test_inventory_pruning() {
            limits: Limits {
                routing_max_size: 25,
                routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
@@ -1079,6 +1084,52 @@ fn test_fetch_missing_inventory() {
        .find(|m| matches!(m, Io::Fetch { .. }))
        .unwrap();
}
+
#[test]
+
fn test_queued_fetch() {
+
    let storage = arbitrary::nonempty_storage(3);
+
    let mut repo_keys = storage.inventory.keys();
+
    let rid1 = *repo_keys.next().unwrap();
+
    let rid2 = *repo_keys.next().unwrap();
+
    let rid3 = *repo_keys.next().unwrap();
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+

+
    logger::init(log::Level::Debug);
+

+
    alice.connect_to(&bob);
+

+
    // Send the first fetch.
+
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid1, bob.id, send));
+

+
    // Send the 2nd fetch that will be queued.
+
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid2, bob.id, send2));
+

+
    // Send the 3rd fetch that will be queued.
+
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid3, bob.id, send3));
+

+
    // The first fetch is initiated.
+
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid1);
+
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
+
    assert_matches!(alice.outbox().next(), None);
+

+
    // Have enough time pass that Alice sends a "ping" to Bob.
+
    alice.elapse(KEEP_ALIVE_DELTA);
+

+
    // Finish the 1st fetch.
+
    alice.fetched(rid1, Namespaces::All, bob.id, Ok(vec![]));
+
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
+
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid2);
+
    // ... but not the third.
+
    assert_matches!(alice.fetches().next(), None);
+

+
    // Finish the 2nd fetch.
+
    alice.fetched(rid2, Namespaces::All, bob.id, Ok(vec![]));
+
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
+
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid3);
+
}

#[test]
fn test_push_and_pull() {
modified radicle-node/src/tests/e2e.rs
@@ -7,6 +7,7 @@ use radicle::test::fixtures;
use radicle::{assert_matches, rad};

use crate::service;
+
use crate::service::config::Limits;
use crate::service::tracking::Scope;
use crate::storage::git::transport;
use crate::test::environment::{converge, Environment, Node};
@@ -468,8 +469,9 @@ fn test_concurrent_fetches() {
    let mut alice_repos = HashSet::new();
    let mut alice = Node::init(&env.tmp());
    let mut bob = Node::init(&env.tmp());
+
    let repos = scale.max(4);

-
    for i in 0..scale.max(3) {
+
    for i in 0..repos {
        // Create a repo for Alice.
        let tmp = tempfile::tempdir().unwrap();
        let (repo, _) = fixtures::repository(tmp.path());
@@ -487,8 +489,16 @@ fn test_concurrent_fetches() {
        bob_repos.insert(rid);
    }

-
    let mut alice = alice.spawn(service::Config::default());
-
    let mut bob = bob.spawn(service::Config::default());
+
    let config = service::Config {
+
        limits: Limits {
+
            // Have one fetch be queued.
+
            fetch_concurrency: repos - 1,
+
            ..Limits::default()
+
        },
+
        ..service::Config::default()
+
    };
+
    let mut alice = alice.spawn(config.clone());
+
    let mut bob = bob.spawn(config);

    let alice_events = alice.handle.events();
    let bob_events = bob.handle.events();