Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Have multiple fetch queues
cloudhead committed 1 year ago
commit 3c1c35f259f145d10c64abc6b006825c7bf8243b
parent 27eff8095dbc95ed18d7e88e97367e744de6a398
4 files changed +145 -100
modified radicle-node/src/runtime/handle.rs
@@ -334,11 +334,16 @@ impl radicle::node::Handle for Handle {
                        "subscribers": state.subscribers.len(),
                    })
                }).collect::<Vec<_>>(),
-
                "queue": state.queue().iter().map(|fetch| {
+
                "queue": state.sessions().values().map(|sess| {
                    json!({
-
                        "rid": fetch.rid,
-
                        "from": fetch.from,
-
                        "refsAt": fetch.refs_at,
+
                        "nid": sess.id,
+
                        "queue": sess.queue.iter().map(|fetch| {
+
                            json!({
+
                                "rid": fetch.rid,
+
                                "from": fetch.from,
+
                                "refsAt": fetch.refs_at,
+
                            })
+
                        }).collect::<Vec<_>>()
                    })
                }).collect::<Vec<_>>(),
                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
modified radicle-node/src/service.rs
@@ -10,7 +10,7 @@ pub mod message;
pub mod session;

use std::collections::hash_map::Entry;
-
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
+
use std::collections::{BTreeSet, HashMap, HashSet};
use std::net::IpAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
@@ -60,7 +60,7 @@ use crate::{crypto, PROTOCOL_VERSION};
pub use crate::node::events::{Event, Events};
pub use crate::node::{config::Network, Config, NodeId};
pub use crate::service::message::{Message, ZeroBytes};
-
pub use crate::service::session::Session;
+
pub use crate::service::session::{QueuedFetch, Session};

pub use radicle::node::policy::config as policy;

@@ -312,31 +312,6 @@ impl FetchState {
    }
}

-
/// Fetch waiting to be processed, in the fetch queue.
-
#[derive(Debug)]
-
pub struct QueuedFetch {
-
    /// Repo being fetched.
-
    pub rid: RepoId,
-
    /// Peer being fetched from.
-
    pub from: NodeId,
-
    /// Refs being fetched.
-
    pub refs_at: Vec<RefsAt>,
-
    /// The timeout given for the fetch request.
-
    timeout: time::Duration,
-
    /// Result channel.
-
    channel: Option<chan::Sender<FetchResult>>,
-
}
-

-
impl PartialEq for QueuedFetch {
-
    fn eq(&self, other: &Self) -> bool {
-
        self.rid == other.rid
-
            && self.from == other.from
-
            && self.refs_at == other.refs_at
-
            && self.channel.is_none()
-
            && other.channel.is_none()
-
    }
-
}
-

/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
@@ -438,8 +413,6 @@ pub struct Service<D, S, G> {
    rng: Rng,
    /// Ongoing fetches.
    fetching: HashMap<RepoId, FetchState>,
-
    /// Fetch queue.
-
    queue: VecDeque<QueuedFetch>,
    /// Request/connection rate limiter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -525,7 +498,6 @@ where
            limiter,
            sessions,
            fetching: HashMap::new(),
-
            queue: VecDeque::new(),
            filter: Filter::empty(),
            relayed_by: HashMap::default(),
            last_idle: LocalTime::default(),
@@ -809,7 +781,7 @@ where
            self.disconnect_unresponsive_peers(&now);
            self.idle_connections();
            self.maintain_connections();
-
            self.dequeue_fetch();
+
            self.dequeue_fetches();
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
@@ -1045,17 +1017,17 @@ where
                        timeout,
                        channel,
                    };
-
                    if self.queue.contains(&fetch) {
-
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
-
                    } else {
+

+
                    if self.queue_fetch(fetch) {
                        debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
-
                        self.queue.push_back(fetch);
+
                    } else {
+
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
                    }
                }
            }
            Err(TryFetchError::SessionCapacityReached) => {
                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
-
                self.queue.push_back(QueuedFetch {
+
                self.queue_fetch(QueuedFetch {
                    rid,
                    refs_at,
                    from,
@@ -1075,6 +1047,14 @@ where
        false
    }

+
    fn queue_fetch(&mut self, fetch: QueuedFetch) -> bool {
+
        let Some(s) = self.sessions.get_mut(&fetch.from) else {
+
            log::error!(target: "service", "Cannot queue fetch for unknown session {}", fetch.from);
+
            return false;
+
        };
+
        s.queue_fetch(fetch)
+
    }
+

    // TODO: Buffer/throttle fetches.
    fn try_fetch(
        &mut self,
@@ -1091,10 +1071,13 @@ where

        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");

-
        if let Entry::Occupied(fetching) = fetching {
-
            // We're already fetching this repo from some peer.
-
            return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
-
        }
+
        let fetching = match fetching {
+
            Entry::Vacant(fetching) => fetching,
+
            Entry::Occupied(fetching) => {
+
                // We're already fetching this repo from some peer.
+
                return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
+
            }
+
        };
        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
        // fetching from any session.
        debug_assert!(!session.is_fetching(&rid));
@@ -1109,7 +1092,7 @@ where
            return Err(TryFetchError::SessionCapacityReached);
        }

-
        let fetching = fetching.or_insert(FetchState {
+
        let fetching = fetching.insert(FetchState {
            from,
            refs_at: refs_at.clone(),
            subscribers: vec![],
@@ -1214,56 +1197,57 @@ where
                }
            }
        }
-
        // We can now try to dequeue another fetch.
-
        self.dequeue_fetch();
+
        // We can now try to dequeue more fetches.
+
        self.dequeue_fetches();
    }

+
    /// Attempt to dequeue fetches from all peers.
+
    /// At most one fetch is dequeued per peer. If the fetch cannot be processed,
+
    /// it is put back on the queue for that peer.
+
    ///
    /// Fetches are queued for two reasons:
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
-
    pub fn dequeue_fetch(&mut self) {
-
        let mut tries = self.queue.len();
+
    pub fn dequeue_fetches(&mut self) {
+
        let sessions = self
+
            .sessions
+
            .shuffled()
+
            .map(|(k, _)| *k)
+
            .collect::<Vec<_>>();
+

+
        // Try to dequeue once per session.
+
        for nid in sessions {
+
            // SAFETY: All the keys we are iterating on exist.
+
            #[allow(clippy::unwrap_used)]
+
            let sess = self.sessions.get_mut(&nid).unwrap();
+
            if !sess.is_connected() || sess.is_at_capacity() {
+
                continue;
+
            }

-
        while let Some(QueuedFetch {
-
            rid,
-
            from,
-
            refs_at,
-
            timeout,
-
            channel,
-
        }) = self.queue.pop_front()
-
        {
-
            debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");
+
            if let Some(QueuedFetch {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
                channel,
+
            }) = sess.dequeue_fetch()
+
            {
+
                debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");

-
            if let Some(refs) = NonEmpty::from_vec(refs_at) {
-
                let repo_entry = self
-
                    .policies
-
                    .seed_policy(&rid)
-
                    .expect("Service::dequeue_fetch: error accessing repo seeding configuration");
-
                let SeedingPolicy::Allow { scope } = repo_entry.policy else {
-
                    debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
-
                    continue;
-
                };
-
                // Keep dequeueing if there was nothing to fetch, otherwise break.
-
                if self.fetch_refs_at(rid, from, refs, scope, timeout, channel) {
-
                    break;
-
                }
-
            } else {
-
                // If no refs are specified, always do a full fetch.
-
                if self.fetch(rid, from, timeout, channel) {
-
                    break;
+
                if let Some(refs) = NonEmpty::from_vec(refs_at) {
+
                    let repo_entry = self.policies.seed_policy(&rid).expect(
+
                        "Service::dequeue_fetch: error accessing repo seeding configuration",
+
                    );
+
                    let SeedingPolicy::Allow { scope } = repo_entry.policy else {
+
                        debug!(target: "service", "Repository {rid} is no longer seeded, skipping..");
+
                        continue;
+
                    };
+
                    self.fetch_refs_at(rid, from, refs, scope, timeout, channel);
+
                } else {
+
                    // If no refs are specified, always do a full fetch.
+
                    self.fetch(rid, from, timeout, channel);
                }
            }
-
            // Nb. Just a precaution, `tries` should always be >= 1 here.
-
            tries = tries.saturating_sub(1);
-
            // To avoid looping forever, only try to dequeue a fixed number of fetches at a time.
-
            if tries == 0 {
-
                debug!(
-
                    target: "service",
-
                    "Giving up on dequeuing, {} item(s) still left in the queue..",
-
                    self.queue.len()
-
                );
-
                break;
-
            }
        }
    }

@@ -1460,7 +1444,7 @@ where
                self.maintain_connections();
            }
        }
-
        self.dequeue_fetch();
+
        self.dequeue_fetches();
    }

    pub fn received_message(&mut self, remote: NodeId, message: Message) {
@@ -2612,8 +2596,6 @@ pub trait ServiceState {
    fn sessions(&self) -> &Sessions;
    /// Get fetch state.
    fn fetching(&self) -> &HashMap<RepoId, FetchState>;
-
    /// Get fetch queue.
-
    fn queue(&self) -> &VecDeque<QueuedFetch>;
    /// Get outbox.
    fn outbox(&self) -> &Outbox;
    /// Get rate limiter.
@@ -2650,10 +2632,6 @@ where
        &self.fetching
    }

-
    fn queue(&self) -> &VecDeque<QueuedFetch> {
-
        &self.queue
-
    }
-

    fn outbox(&self) -> &Outbox {
        &self.outbox
    }
modified radicle-node/src/service/session.rs
@@ -1,11 +1,14 @@
use std::collections::{HashSet, VecDeque};
-
use std::fmt;
+
use std::{fmt, time};
+

+
use crossbeam_channel as chan;

use crate::node::config::Limits;
-
use crate::node::Severity;
+
use crate::node::{FetchResult, Severity};
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
+
use crate::storage::refs::RefsAt;
use crate::{Link, Timestamp};

pub use crate::node::{PingState, State};
@@ -43,6 +46,31 @@ impl Error {
    }
}

+
/// Fetch waiting to be processed, in the fetch queue.
+
#[derive(Debug, Clone)]
+
pub struct QueuedFetch {
+
    /// Repo being fetched.
+
    pub rid: RepoId,
+
    /// Peer being fetched from.
+
    pub from: NodeId,
+
    /// Refs being fetched.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    pub timeout: time::Duration,
+
    /// Result channel.
+
    pub channel: Option<chan::Sender<FetchResult>>,
+
}
+

+
impl PartialEq for QueuedFetch {
+
    fn eq(&self, other: &Self) -> bool {
+
        self.rid == other.rid
+
            && self.from == other.from
+
            && self.refs_at == other.refs_at
+
            && self.channel.is_none()
+
            && other.channel.is_none()
+
    }
+
}
+

/// A peer session. Each connected peer will have one session.
#[derive(Debug, Clone)]
pub struct Session {
@@ -61,6 +89,8 @@ pub struct Session {
    pub subscribe: Option<message::Subscribe>,
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,
+
    /// Fetch queue.
+
    pub queue: VecDeque<QueuedFetch>,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
@@ -101,6 +131,7 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
+
            queue: VecDeque::new(),
            attempts: 1,
            rng,
            limits,
@@ -129,6 +160,7 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: time,
+
            queue: VecDeque::new(),
            attempts: 0,
            rng,
            limits,
@@ -171,6 +203,23 @@ impl Session {
        false
    }

+
    /// Queue a fetch. Returns `true` if it was added to the queue, and `false` if
+
    /// it already was present in the queue.
+
    pub fn queue_fetch(&mut self, fetch: QueuedFetch) -> bool {
+
        assert_eq!(fetch.from, self.id);
+

+
        if self.queue.contains(&fetch) {
+
            false
+
        } else {
+
            self.queue.push_back(fetch);
+
            true
+
        }
+
    }
+

+
    pub fn dequeue_fetch(&mut self) -> Option<QueuedFetch> {
+
        self.queue.pop_front()
+
    }
+

    pub fn attempts(&self) -> usize {
        self.attempts
    }
modified radicle-node/src/tests.rs
@@ -1526,8 +1526,16 @@ fn test_queued_fetch_from_command_same_rid() {
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
    alice.command(Command::Fetch(rid1, carol.id, DEFAULT_TIMEOUT, send3));

+
    // Peers Alice will fetch from.
+
    let mut peers = [bob.id, eve.id, carol.id]
+
        .into_iter()
+
        .collect::<BTreeSet<_>>();
+

    // The first fetch is initiated.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == bob.id);
+
    let (rid, nid) = alice.fetches().next().unwrap();
+
    assert_eq!(rid, rid1);
+
    assert!(peers.remove(&nid));
+

    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.outbox().next(), None);

@@ -1535,16 +1543,21 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
+
    alice.fetched(rid1, nid, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == eve.id);
+
    let (rid, nid) = alice.fetches().next().unwrap();
+
    assert_eq!(rid, rid1);
+
    assert!(peers.remove(&nid));
+

    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid1, eve.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
+
    alice.fetched(rid1, nid, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == carol.id);
+
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && peers.remove(&nid));
+
    // All fetches were initiated.
+
    assert!(peers.is_empty());
}

#[test]