Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Have multiple fetch queues
Merged did:key:z6MksFqX...wzpT opened 1 year ago

In the current design it’s possible for one peer to fill the fetch queue so that fetches from other peers are delayed.

To improve fairness, we move to a queue per peer. We then try to dequeue from all peers in a random order for good measure.

4 files changed +184 -106 27eff809 4a497fa6
modified radicle-node/src/runtime/handle.rs
@@ -334,11 +334,16 @@ impl radicle::node::Handle for Handle {
                        "subscribers": state.subscribers.len(),
                    })
                }).collect::<Vec<_>>(),
-
                "queue": state.queue().iter().map(|fetch| {
+
                "queue": state.sessions().values().map(|sess| {
                    json!({
-
                        "rid": fetch.rid,
-
                        "from": fetch.from,
-
                        "refsAt": fetch.refs_at,
+
                        "nid": sess.id,
+
                        "queue": sess.queue.iter().map(|fetch| {
+
                            json!({
+
                                "rid": fetch.rid,
+
                                "from": fetch.from,
+
                                "refsAt": fetch.refs_at,
+
                            })
+
                        }).collect::<Vec<_>>()
                    })
                }).collect::<Vec<_>>(),
                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
modified radicle-node/src/service.rs
@@ -10,7 +10,7 @@ pub mod message;
pub mod session;

use std::collections::hash_map::Entry;
-
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
+
use std::collections::{BTreeSet, HashMap, HashSet};
use std::net::IpAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
@@ -60,7 +60,7 @@ use crate::{crypto, PROTOCOL_VERSION};
pub use crate::node::events::{Event, Events};
pub use crate::node::{config::Network, Config, NodeId};
pub use crate::service::message::{Message, ZeroBytes};
-
pub use crate::service::session::Session;
+
pub use crate::service::session::{QueuedFetch, Session};

pub use radicle::node::policy::config as policy;

@@ -312,31 +312,6 @@ impl FetchState {
    }
}

-
/// Fetch waiting to be processed, in the fetch queue.
-
#[derive(Debug)]
-
pub struct QueuedFetch {
-
    /// Repo being fetched.
-
    pub rid: RepoId,
-
    /// Peer being fetched from.
-
    pub from: NodeId,
-
    /// Refs being fetched.
-
    pub refs_at: Vec<RefsAt>,
-
    /// The timeout given for the fetch request.
-
    timeout: time::Duration,
-
    /// Result channel.
-
    channel: Option<chan::Sender<FetchResult>>,
-
}
-

-
impl PartialEq for QueuedFetch {
-
    fn eq(&self, other: &Self) -> bool {
-
        self.rid == other.rid
-
            && self.from == other.from
-
            && self.refs_at == other.refs_at
-
            && self.channel.is_none()
-
            && other.channel.is_none()
-
    }
-
}
-

/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
@@ -438,8 +413,6 @@ pub struct Service<D, S, G> {
    rng: Rng,
    /// Ongoing fetches.
    fetching: HashMap<RepoId, FetchState>,
-
    /// Fetch queue.
-
    queue: VecDeque<QueuedFetch>,
    /// Request/connection rate limiter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -525,7 +498,6 @@ where
            limiter,
            sessions,
            fetching: HashMap::new(),
-
            queue: VecDeque::new(),
            filter: Filter::empty(),
            relayed_by: HashMap::default(),
            last_idle: LocalTime::default(),
@@ -809,7 +781,7 @@ where
            self.disconnect_unresponsive_peers(&now);
            self.idle_connections();
            self.maintain_connections();
-
            self.dequeue_fetch();
+
            self.dequeue_fetches();
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
@@ -1045,17 +1017,14 @@ where
                        timeout,
                        channel,
                    };
-
                    if self.queue.contains(&fetch) {
-
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
-
                    } else {
-
                        debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
-
                        self.queue.push_back(fetch);
-
                    }
+
                    debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
+

+
                    self.queue_fetch(fetch);
                }
            }
            Err(TryFetchError::SessionCapacityReached) => {
                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
-
                self.queue.push_back(QueuedFetch {
+
                self.queue_fetch(QueuedFetch {
                    rid,
                    refs_at,
                    from,
@@ -1075,6 +1044,17 @@ where
        false
    }

+
    fn queue_fetch(&mut self, fetch: QueuedFetch) {
+
        let Some(s) = self.sessions.get_mut(&fetch.from) else {
+
            log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
+
            return;
+
        };
+
        if let Err(e) = s.queue_fetch(fetch) {
+
            let fetch = e.inner();
+
            log::debug!(target: "service", "Unable to queue fetch for {} with {}: {e}", &fetch.rid, &fetch.from);
+
        }
+
    }
+

    // TODO: Buffer/throttle fetches.
    fn try_fetch(
        &mut self,
@@ -1091,10 +1071,13 @@ where

        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");

-
        if let Entry::Occupied(fetching) = fetching {
-
            // We're already fetching this repo from some peer.
-
            return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
-
        }
+
        let fetching = match fetching {
+
            Entry::Vacant(fetching) => fetching,
+
            Entry::Occupied(fetching) => {
+
                // We're already fetching this repo from some peer.
+
                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
+
            }
+
        };
        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
        // fetching from any session.
        debug_assert!(!session.is_fetching(&rid));
@@ -1109,7 +1092,7 @@ where
            return Err(TryFetchError::SessionCapacityReached);
        }

-
        let fetching = fetching.or_insert(FetchState {
+
        let fetching = fetching.insert(FetchState {
            from,
            refs_at: refs_at.clone(),
            subscribers: vec![],
@@ -1214,56 +1197,57 @@ where
                }
            }
        }
-
        // We can now try to dequeue another fetch.
-
        self.dequeue_fetch();
+
        // We can now try to dequeue more fetches.
+
        self.dequeue_fetches();
    }

+
    /// Attempt to dequeue fetches from all peers.
+
    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
+
    /// it is put back on the queue for that peer.
+
    ///
    /// Fetches are queued for two reasons:
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
-
    pub fn dequeue_fetch(&mut self) {
-
        let mut tries = self.queue.len();
+
    pub fn dequeue_fetches(&mut self) {
+
        let sessions = self
+
            .sessions
+
            .shuffled()
+
            .map(|(k, _)| *k)
+
            .collect::<Vec<_>>();
+

+
        // Try to dequeue once per session.
+
        for nid in sessions {
+
            // SAFETY: All the keys we are iterating on exist.
+
            #[allow(clippy::unwrap_used)]
+
            let sess = self.sessions.get_mut(&nid).unwrap();
+
            if !sess.is_connected() || sess.is_at_capacity() {
+
                continue;
+
            }

-
        while let Some(QueuedFetch {
-
            rid,
-
            from,
-
            refs_at,
-
            timeout,
-
            channel,
-
        }) = self.queue.pop_front()
-
        {
-
            debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
+
            if let Some(QueuedFetch {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
                channel,
+
            }) = sess.dequeue_fetch()
+
            {
+
                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");

-
            if let Some(refs) = NonEmpty::from_vec(refs_at) {
-
                let repo_entry = self
-
                    .policies
-
                    .seed_policy(&rid)
-
                    .expect("Service::dequeue_fetch: error accessing repo seeding configuration");
-
                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
-
                    debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
-
                    continue;
-
                };
-
                // Keep dequeueing if there was nothing to fetch, otherwise break.
-
                if self.fetch_refs_at(rid, from, refs, scope, timeout, channel) {
-
                    break;
-
                }
-
            } else {
-
                // If no refs are specified, always do a full fetch.
-
                if self.fetch(rid, from, timeout, channel) {
-
                    break;
+
                if let Some(refs) = NonEmpty::from_vec(refs_at) {
+
                    let repo_entry = self.policies.seed_policy(&rid).expect(
+
                        "Service::dequeue_fetch: error accessing repo seeding configuration",
+
                    );
+
                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
+
                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
+
                        continue;
+
                    };
+
                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
+
                } else {
+
                    // If no refs are specified, always do a full fetch.
+
                    self.fetch(rid, from, timeout, channel);
                }
            }
-
            // Nb. Just a precaution, `tries` should always be >= 1 here.
-
            tries = tries.saturating_sub(1);
-
            // To avoid looping forever, only try to dequeue a fixed number of fetches at a time.
-
            if tries == 0 {
-
                debug!(
-
                    target: "service",
-
                    "Giving up on dequeuing, {} item(s) still left in the queue..",
-
                    self.queue.len()
-
                );
-
                break;
-
            }
        }
    }

@@ -1460,7 +1444,7 @@ where
                self.maintain_connections();
            }
        }
-
        self.dequeue_fetch();
+
        self.dequeue_fetches();
    }

    pub fn received_message(&mut self, remote: NodeId, message: Message) {
@@ -1586,10 +1570,12 @@ where
                    }
                }
                let mut missing = Vec::new();
+
                let nid = *self.nid();

-
                for id in message.inventory.as_slice() {
-
                    // TODO: Move this out (good luck with the borrow checker).
-
                    if let Some(sess) = self.sessions.get_mut(announcer) {
+
                // Here we handle the special case where the inventory we received is that of
+
                // a connected peer, as opposed to being relayed to us.
+
                if let Some(sess) = self.sessions.get_mut(announcer) {
+
                    for id in message.inventory.as_slice() {
                        // If we are connected to the announcer of this inventory, update the peer's
                        // subscription filter to include all inventory items. This way, we'll
                        // relay messages relating to the peer's inventory.
@@ -1604,7 +1590,7 @@ where
                        ) {
                            // Only if we do not have the repository locally do we fetch here.
                            // If we do have it, only fetch after receiving a ref announcement.
-
                            match self.db.routing().entry(id, self.nid()) {
+
                            match self.db.routing().entry(id, &nid) {
                                Ok(entry) => {
                                    if entry.is_none() {
                                        missing.push(*id);
@@ -1618,6 +1604,12 @@ where
                        }
                    }
                }
+
                // Since we have limited fetch capacity, it may be that we can't fetch an entire
+
                // inventory from a peer. Therefore we randomize the order of the RIDs to fetch
+
                // different RIDs from different peers in case multiple peers announce the same
+
                // RIDs.
+
                self.rng.shuffle(&mut missing);
+

                for rid in missing {
                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
@@ -2612,8 +2604,6 @@ pub trait ServiceState {
    fn sessions(&self) -> &Sessions;
    /// Get fetch state.
    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
-
    /// Get fetch queue.
-
    fn queue(&self) -> &VecDeque<QueuedFetch>;
    /// Get outbox.
    fn outbox(&self) -> &Outbox;
    /// Get rate limiter.
@@ -2650,10 +2640,6 @@ where
        &self.fetching
    }

-
    fn queue(&self) -> &VecDeque<QueuedFetch> {
-
        &self.queue
-
    }
-

    fn outbox(&self) -> &Outbox {
        &self.outbox
    }
modified radicle-node/src/service/session.rs
@@ -1,17 +1,22 @@
use std::collections::{HashSet, VecDeque};
-
use std::fmt;
+
use std::{fmt, time};
+

+
use crossbeam_channel as chan;

use crate::node::config::Limits;
-
use crate::node::Severity;
+
use crate::node::{FetchResult, Severity};
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
+
use crate::storage::refs::RefsAt;
use crate::{Link, Timestamp};

pub use crate::node::{PingState, State};

/// Time after which a connection is considered stable.
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
+
/// Maximum items in the fetch queue.
+
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;

#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
@@ -43,6 +48,52 @@ impl Error {
    }
}

+
/// Error when trying to queue a fetch.
+
#[derive(thiserror::Error, Debug, Clone)]
+
pub enum QueueError {
+
    /// The item already exists in the queue.
+
    #[error("item is already queued")]
+
    Duplicate(QueuedFetch),
+
    /// The queue is at capacity.
+
    #[error("queue capacity reached")]
+
    CapacityReached(QueuedFetch),
+
}
+

+
impl QueueError {
+
    /// Get the inner [`QueuedFetch`].
+
    pub fn inner(&self) -> &QueuedFetch {
+
        match self {
+
            Self::Duplicate(f) => f,
+
            Self::CapacityReached(f) => f,
+
        }
+
    }
+
}
+

+
/// Fetch waiting to be processed, in the fetch queue.
+
#[derive(Debug, Clone)]
+
pub struct QueuedFetch {
+
    /// Repo being fetched.
+
    pub rid: RepoId,
+
    /// Peer being fetched from.
+
    pub from: NodeId,
+
    /// Refs being fetched.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    pub timeout: time::Duration,
+
    /// Result channel.
+
    pub channel: Option<chan::Sender<FetchResult>>,
+
}
+

+
impl PartialEq for QueuedFetch {
+
    fn eq(&self, other: &Self) -> bool {
+
        self.rid == other.rid
+
            && self.from == other.from
+
            && self.refs_at == other.refs_at
+
            && self.channel.is_none()
+
            && other.channel.is_none()
+
    }
+
}
+

/// A peer session. Each connected peer will have one session.
#[derive(Debug, Clone)]
pub struct Session {
@@ -61,6 +112,8 @@ pub struct Session {
    pub subscribe: Option<message::Subscribe>,
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,
+
    /// Fetch queue.
+
    pub queue: VecDeque<QueuedFetch>,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
@@ -101,6 +154,7 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
+
            queue: VecDeque::with_capacity(MAX_FETCH_QUEUE_SIZE),
            attempts: 1,
            rng,
            limits,
@@ -129,6 +183,7 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: time,
+
            queue: VecDeque::new(),
            attempts: 0,
            rng,
            limits,
@@ -171,6 +226,25 @@ impl Session {
        false
    }

+
    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
+
    /// it already was present in the queue.
+
    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> Result<(), QueueError> {
+
        assert_eq!(fetch.from, self.id);
+

+
        if self.queue.len() >= MAX_FETCH_QUEUE_SIZE {
+
            return Err(QueueError::CapacityReached(fetch));
+
        } else if self.queue.contains(&fetch) {
+
            return Err(QueueError::Duplicate(fetch));
+
        }
+
        self.queue.push_back(fetch);
+

+
        Ok(())
+
    }
+

+
    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
+
        self.queue.pop_front()
+
    }
+

    pub fn attempts(&self) -> usize {
        self.attempts
    }
modified radicle-node/src/tests.rs
@@ -1526,8 +1526,16 @@ fn test_queued_fetch_from_command_same_rid() {
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
    alice.command(Command::Fetch(rid1, carol.id, DEFAULT_TIMEOUT, send3));

+
    // Peers Alice will fetch from.
+
    let mut peers = [bob.id, eve.id, carol.id]
+
        .into_iter()
+
        .collect::<BTreeSet<_>>();
+

    // The first fetch is initiated.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == bob.id);
+
    let (rid, nid) = alice.fetches().next().unwrap();
+
    assert_eq!(rid, rid1);
+
    assert!(peers.remove(&nid));
+

    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.outbox().next(), None);

@@ -1535,16 +1543,21 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
+
    alice.fetched(rid1, nid, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == eve.id);
+
    let (rid, nid) = alice.fetches().next().unwrap();
+
    assert_eq!(rid, rid1);
+
    assert!(peers.remove(&nid));
+

    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid1, eve.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
+
    alice.fetched(rid1, nid, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == carol.id);
+
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && peers.remove(&nid));
+
    // All fetches were initiated.
+
    assert!(peers.is_empty());
}

#[test]