Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: New fetch queueing system
cloudhead committed 2 years ago
commit 06946a2bf4358a91129392d9b385297e3fe81fe1
parent cdcc7cfaf96b83f6d771d02d5971ec1949934d1a
4 files changed +262 -125
modified radicle-node/src/service.rs
@@ -10,7 +10,7 @@ pub mod message;
pub mod session;

use std::collections::hash_map::Entry;
-
use std::collections::{HashMap, HashSet};
+
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, time};
@@ -195,6 +195,29 @@ pub enum CommandError {
    Tracking(#[from] tracking::Error),
}

+
/// Error returned by [`Service::try_fetch`].
+
#[derive(thiserror::Error, Debug)]
+
enum TryFetchError {
+
    #[error("session {0} does not exist; cannot initiate fetch")]
+
    SessionNotFound(NodeId),
+
    #[error("session {0} is not connected; cannot initiate fetch")]
+
    SessionNotConnected(NodeId),
+
    #[error("session {0} fetch capacity reached; cannot initiate fetch")]
+
    SessionCapacityReached(NodeId),
+
    #[error(transparent)]
+
    Namespaces(#[from] NamespacesError),
+
}
+

+
/// Fetch state for an ongoing fetch.
+
#[derive(Debug)]
+
struct FetchState {
+
    /// Node we're fetching from.
+
    from: NodeId,
+
    /// Channels waiting for fetch results.
+
    subscribers: Vec<chan::Sender<FetchResult>>,
+
}
+

+
/// The node service.
#[derive(Debug)]
pub struct Service<R, A, S, G> {
    /// Service configuration.
@@ -221,8 +244,10 @@ pub struct Service<R, A, S, G> {
    node: NodeAnnouncement,
    /// Source of entropy.
    rng: Rng,
-
    /// Fetch requests initiated by user, which are waiting for results.
-
    fetch_reqs: HashMap<(Id, NodeId), chan::Sender<FetchResult>>,
+
    /// Ongoing fetches.
+
    fetching: HashMap<Id, FetchState>,
+
    /// Fetch queue.
+
    queue: VecDeque<(Id, NodeId)>,
    /// Request/connection rate limitter.
    limiter: RateLimiter,
    /// Current tracked repository bloom filter.
@@ -292,7 +317,8 @@ where
            outbox: Outbox::default(),
            limiter: RateLimiter::default(),
            sessions,
-
            fetch_reqs: HashMap::new(),
+
            fetching: HashMap::new(),
+
            queue: VecDeque::new(),
            filter: Filter::empty(),
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
@@ -550,9 +576,7 @@ where
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
-
                // TODO: Establish connections to unconnected seeds, and retry.
-
                self.fetch_reqs.insert((rid, seed), resp);
-
                self.fetch(rid, &seed, timeout);
+
                self.fetch(rid, &seed, timeout, Some(resp));
            }
            Command::TrackRepo(rid, scope, resp) => {
                // Update our tracking policy.
@@ -623,57 +647,94 @@ where
        refs: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
-
        self._fetch(rid, from, refs, timeout)
+
        self._fetch(rid, from, refs, timeout, None)
    }

    /// Initiate an outgoing fetch for some repository.
-
    fn fetch(&mut self, rid: Id, from: &NodeId, timeout: time::Duration) {
-
        self._fetch(rid, from, vec![], timeout)
+
    fn fetch(
+
        &mut self,
+
        rid: Id,
+
        from: &NodeId,
+
        timeout: time::Duration,
+
        channel: Option<chan::Sender<FetchResult>>,
+
    ) {
+
        self._fetch(rid, from, vec![], timeout, channel)
    }

-
    fn _fetch(&mut self, rid: Id, from: &NodeId, refs_at: Vec<RefsAt>, timeout: time::Duration) {
-
        let Some(session) = self.sessions.get_mut(from) else {
-
            error!(target: "service", "Session {from} does not exist; cannot initiate fetch");
-
            return;
+
    fn _fetch(
+
        &mut self,
+
        rid: Id,
+
        from: &NodeId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
        channel: Option<chan::Sender<FetchResult>>,
+
    ) {
+
        match self.try_fetch(rid, from, refs_at, timeout) {
+
            Ok(FetchState { subscribers, .. }) => {
+
                if let Some(c) = channel {
+
                    if !subscribers.iter().any(|s| s.same_channel(&c)) {
+
                        subscribers.push(c);
+
                    }
+
                }
+
            }
+
            Err(e) => {
+
                if let Some(c) = channel {
+
                    c.send(FetchResult::Failed {
+
                        reason: e.to_string(),
+
                    })
+
                    .ok();
+
                }
+
            }
+
        }
+
    }
+

+
    fn try_fetch(
+
        &mut self,
+
        rid: Id,
+
        from: &NodeId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    ) -> Result<&mut FetchState, TryFetchError> {
+
        let from = *from;
+
        let Some(session) = self.sessions.get_mut(&from) else {
+
            return Err(TryFetchError::SessionNotFound(from));
        };
+
        let fetching = self.fetching.entry(rid);
+

+
        if let Entry::Occupied(fetching) = fetching {
+
            if fetching.get().from == from {
+
                debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
+
                debug_assert!(session.is_fetching(&rid));
+
            } else {
+
                debug!(target: "service", "Queueing fetch for {rid} with {from}..");
+
                self.queue.push_back((rid, from));
+
            }
+
            return Ok(fetching.into_mut());
+
        }
        if !session.is_connected() {
            // This can happen if a session disconnects in the time between asking for seeds to
            // fetch from, and initiating the fetch from one of those seeds.
-
            error!(target: "service", "Session {from} is not connected; cannot initiate fetch");
-
            return;
+
            return Err(TryFetchError::SessionNotConnected(session.id));
        }
-
        let seed = session.id;
+
        if session.is_at_capacity() {
+
            debug!(target: "service", "Fetch capacity reached for {}, queueing {rid}..", session.id);
+
            self.queue.push_back((rid, session.id));

-
        match session.fetch(rid) {
-
            session::FetchResult::Queued => {
-
                debug!(target: "service", "Fetch queued for {rid} with {seed}..");
-
            }
-
            session::FetchResult::Ready => {
-
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");
-
                match self.tracking.namespaces_for(&self.storage, &rid) {
-
                    Ok(namespaces) => {
-
                        self.outbox
-
                            .fetch(session, rid, namespaces, refs_at, timeout);
-
                    }
-
                    Err(err) => {
-
                        error!(target: "service", "Error getting namespaces for {rid}: {err}");
-

-
                        if let Some(resp) = self.fetch_reqs.remove(&(rid, seed)) {
-
                            resp.send(FetchResult::Failed {
-
                                reason: err.to_string(),
-
                            })
-
                            .ok();
-
                        }
-
                    }
-
                };
-
            }
-
            session::FetchResult::AlreadyFetching => {
-
                debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}");
-
            }
-
            session::FetchResult::NotConnected => {
-
                error!(target: "service", "Unable to fetch {rid} from peer {seed}: peer is not connected");
-
            }
+
            return Err(TryFetchError::SessionCapacityReached(session.id));
        }
+

+
        let fetching = fetching.or_insert(FetchState {
+
            from,
+
            subscribers: vec![],
+
        });
+
        let namespaces = self.tracking.namespaces_for(&self.storage, &rid)?;
+

+
        self.outbox
+
            .fetch(session, rid, namespaces, refs_at, timeout);
+

+
        debug!(target: "service", "Fetch initiated for {rid} with {}..", session.id);
+

+
        Ok(fetching)
    }

    pub fn fetched(
@@ -716,26 +777,39 @@ where
            }
        };

-
        if let Some(results) = self.fetch_reqs.remove(&(rid, remote)) {
+
        let Some(fetching) = self.fetching.remove(&rid) else {
+
            warn!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
+
            return;
+
        };
+
        debug_assert_eq!(fetching.from, remote);
+

+
        if let Some(s) = self.sessions.get_mut(&remote) {
+
            // Mark this RID as fetched for this session.
+
            s.fetched(rid);
+
        }
+

+
        for sub in &fetching.subscribers {
            debug!(target: "service", "Found existing fetch request, sending result..");

-
            if results.send(result).is_err() {
+
            if sub.send(result.clone()).is_err() {
                error!(target: "service", "Error sending fetch result for {rid}..");
            } else {
                debug!(target: "service", "Sent fetch result for {rid}..");
            }
-
        } else {
-
            debug!(target: "service", "No fetch requests found for {rid}..");
+
        }
+

+
        if fetching.subscribers.is_empty() {
+
            trace!(target: "service", "No fetch requests found for {rid}..");

            // We only announce refs here when the fetch wasn't user-requested. This is
            // because the user might want to announce his fork, once he has created one,
            // or may choose to not announce anything.
-
            match result {
+
            match &result {
                FetchResult::Success {
                    updated,
                    namespaces,
                } if !updated.is_empty() => {
-
                    if let Err(e) = self.announce_refs(rid, namespaces) {
+
                    if let Err(e) = self.announce_refs(rid, namespaces.iter().cloned()) {
                        error!(target: "service", "Failed to announce new refs: {e}");
                    }
                }
@@ -751,12 +825,18 @@ where
        // network, if necessary.
        self.sync_and_announce();

-
        if let Some(s) = self.sessions.get_mut(&remote) {
-
            if let Some(dequeued) = s.fetched(rid) {
-
                debug!(target: "service", "Dequeued fetch {dequeued} from session {remote}..");
+
        // We can now try to dequeue another fetch.
+
        self.dequeue_fetch();
+
    }

-
                self.fetch(dequeued, &remote, FETCH_TIMEOUT);
-
            }
+
    /// Fetches are queued for two reasons:
+
    /// 1. The RID was already being fetched.
+
    /// 2. The session was already at fetch capacity.
+
    pub fn dequeue_fetch(&mut self) {
+
        if let Some((rid, nid)) = self.queue.pop_front() {
+
            debug!(target: "service", "Dequeued fetch for {rid} from session {nid}..");
+

+
            self.fetch(rid, &nid, FETCH_TIMEOUT, None);
        }
    }

@@ -846,16 +926,19 @@ where
        };
        let link = session.link;

-
        // If the peer disconnected while we were fetching, return a failure to any
-
        // potential fetcher.
-
        for rid in session.fetching() {
-
            if let Some(resp) = self.fetch_reqs.remove(&(rid, remote)) {
+
        self.fetching.retain(|_, fetching| {
+
            if fetching.from != remote {
+
                return true;
+
            }
+
            // Remove and fail any pending fetches from this remote node.
+
            for resp in &fetching.subscribers {
                resp.send(FetchResult::Failed {
                    reason: format!("disconnected: {reason}"),
                })
                .ok();
            }
-
        }
+
            false
+
        });

        // Attempt to re-connect to persistent peers.
        if self.config.peer(&remote).is_some() {
@@ -885,6 +968,7 @@ where
                self.maintain_connections();
            }
        }
+
        self.dequeue_fetch();
    }

    pub fn received_message(&mut self, remote: NodeId, message: Message) {
@@ -1012,7 +1096,7 @@ where
                                }
                                Ok(false) => {
                                    debug!(target: "service", "Missing tracked inventory {id}; initiating fetch..");
-
                                    self.fetch(*id, announcer, FETCH_TIMEOUT);
+
                                    self.fetch(*id, announcer, FETCH_TIMEOUT, None);
                                }
                                Err(e) => {
                                    error!(target: "service", "Error checking local inventory: {e}");
@@ -1648,7 +1732,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, &seed.nid, FETCH_TIMEOUT);
+
                            self.fetch(rid, &seed.nid, FETCH_TIMEOUT, None);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
modified radicle-node/src/service/io.rs
@@ -91,6 +91,8 @@ impl Outbox {
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
+
        remote.fetching(rid);
+

        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
        self.io.push_back(Io::Fetch {
            rid,
modified radicle-node/src/service/session.rs
@@ -1,4 +1,4 @@
-
use std::collections::{HashSet, VecDeque};
+
use std::collections::HashSet;
use std::fmt;

use crate::node::config::Limits;
@@ -9,19 +9,6 @@ use crate::Link;

pub use crate::node::{PingState, State};

-
/// Return value of [`Session::fetch`].
-
#[derive(Debug)]
-
pub enum FetchResult {
-
    /// Maximum concurrent fetches reached.
-
    Queued,
-
    /// We are already fetching the given repo from this peer.
-
    AlreadyFetching,
-
    /// Ok, ready to fetch.
-
    Ready,
-
    /// This peer is not ready to fetch.
-
    NotConnected,
-
}
-

#[derive(thiserror::Error, Debug)]
pub enum Error {
    /// The remote peer sent an invalid announcement timestamp,
@@ -70,8 +57,6 @@ pub struct Session {
    pub subscribe: Option<message::Subscribe>,
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,
-
    /// Fetch queue.
-
    pub queue: VecDeque<Id>,

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
@@ -112,7 +97,6 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
-
            queue: VecDeque::default(),
            attempts: 1,
            rng,
            limits,
@@ -139,7 +123,6 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
-
            queue: VecDeque::default(),
            attempts: 0,
            rng,
            limits,
@@ -162,38 +145,51 @@ impl Session {
        matches!(self.state, State::Initial)
    }

+
    pub fn is_at_capacity(&self) -> bool {
+
        if let State::Connected { fetching, .. } = &self.state {
+
            if fetching.len() >= self.limits.fetch_concurrency {
+
                return true;
+
            }
+
        }
+
        false
+
    }
+

+
    pub fn is_fetching(&self, rid: &Id) -> bool {
+
        if let State::Connected { fetching, .. } = &self.state {
+
            return fetching.contains(rid);
+
        }
+
        false
+
    }
+

    pub fn attempts(&self) -> usize {
        self.attempts
    }

-
    pub fn fetch(&mut self, rid: Id) -> FetchResult {
+
    /// Mark this session as fetching the given RID.
+
    ///
+
    /// # Panics
+
    ///
+
    /// If it is already fetching that RID, or the session is disconnected.
+
    pub fn fetching(&mut self, rid: Id) {
        if let State::Connected { fetching, .. } = &mut self.state {
-
            if fetching.contains(&rid) || self.queue.contains(&rid) {
-
                return FetchResult::AlreadyFetching;
-
            }
-
            if fetching.len() >= self.limits.fetch_concurrency {
-
                self.queue.push_back(rid);
-
                return FetchResult::Queued;
-
            }
-
            fetching.insert(rid);
-

-
            FetchResult::Ready
+
            assert!(
+
                fetching.insert(rid),
+
                "Session must not already be fetching {rid}"
+
            );
        } else {
-
            FetchResult::NotConnected
+
            panic!(
+
                "Attempting to fetch {rid} from disconnected session {}",
+
                self.id
+
            );
        }
    }

-
    pub fn fetched(&mut self, rid: Id) -> Option<Id> {
+
    pub fn fetched(&mut self, rid: Id) {
        if let State::Connected { fetching, .. } = &mut self.state {
            if !fetching.remove(&rid) {
-
                log::error!(target: "service", "Fetched unknown repository {rid}");
-
            }
-
            // Dequeue the next fetch, if any.
-
            if let Some(rid) = self.queue.pop_front() {
-
                return Some(rid);
+
                log::warn!(target: "service", "Fetched unknown repository {rid}");
            }
        }
-
        None
    }

    pub fn to_attempted(&mut self) {
@@ -234,14 +230,6 @@ impl Session {
        self.state = State::Initial;
    }

-
    pub fn fetching(&self) -> HashSet<Id> {
-
        if let State::Connected { fetching, .. } = &self.state {
-
            fetching.clone()
-
        } else {
-
            HashSet::default()
-
        }
-
    }
-

    pub fn ping(&mut self, reactor: &mut Outbox) -> Result<(), Error> {
        if let State::Connected { ping, .. } = &mut self.state {
            let msg = message::Ping::new(&mut self.rng);
modified radicle-node/src/tests.rs
@@ -41,6 +41,7 @@ use crate::test::storage as mock_storage;
use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
+
use crate::worker;
use crate::worker::fetch;
use crate::LocalTime;
use crate::{git, identity, rad, runtime, service, test};
@@ -1175,16 +1176,14 @@ fn test_track_repo_subscribe() {
}

#[test]
-
fn test_fetch_missing_inventory() {
+
fn test_fetch_missing_inventory_on_gossip() {
    let rid = arbitrary::gen::<Id>(1);
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
-
    let eve = Peer::new("eve", [9, 9, 9, 9]);
-
    let (send, recv) = chan::bounded::<bool>(1);
    let now = LocalTime::now();

+
    alice.track_repo(&rid, node::tracking::Scope::All).unwrap();
    alice.connect_to(&bob);
-
    alice.connect_to(&eve);
    alice.receive(
        bob.id(),
        Message::inventory(
@@ -1195,33 +1194,48 @@ fn test_fetch_missing_inventory() {
            bob.signer(),
        ),
    );
+
    alice
+
        .outbox()
+
        .find(|m| matches!(m, Io::Fetch { rid: other, .. } if other == &rid))
+
        .unwrap();
+
}
+

+
#[test]
+
fn test_fetch_missing_inventory_on_schedule() {
+
    let rid = arbitrary::gen::<Id>(1);
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let now = LocalTime::now();
+

+
    alice.track_repo(&rid, node::tracking::Scope::All).unwrap();
+
    alice.connect_to(&bob);
    alice.receive(
-
        eve.id(),
+
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: vec![rid].try_into().unwrap(),
                timestamp: now.as_millis(),
            },
-
            eve.signer(),
+
            bob.signer(),
        ),
    );
-
    alice.command(Command::TrackRepo(rid, node::tracking::Scope::All, send));
+
    alice.fetched(
+
        rid,
+
        bob.id,
+
        Err(worker::FetchError::Io(
+
            io::ErrorKind::ConnectionReset.into(),
+
        )),
+
    );
    alice.outbox().for_each(drop);
-

-
    assert!(recv.recv().unwrap());
-

    alice.elapse(service::SYNC_INTERVAL);
    alice
        .outbox()
-
        .find(|m| matches!(m, Io::Fetch { .. }))
-
        .unwrap();
-
    alice
-
        .outbox()
-
        .find(|m| matches!(m, Io::Fetch { .. }))
+
        .find(|m| matches!(m, Io::Fetch { rid: other, .. } if other == &rid))
        .unwrap();
}
+

#[test]
-
fn test_queued_fetch() {
+
fn test_queued_fetch_max_capacity() {
    let storage = arbitrary::nonempty_storage(3);
    let mut repo_keys = storage.inventory.keys();
    let rid1 = *repo_keys.next().unwrap();
@@ -1268,6 +1282,55 @@ fn test_queued_fetch() {
}

#[test]
+
fn test_queued_fetch_same_rid() {
+
    let storage = arbitrary::nonempty_storage(3);
+
    let mut repo_keys = storage.inventory.keys();
+
    let rid1 = *repo_keys.next().unwrap();
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let eve = Peer::new("eve", [9, 9, 9, 9]);
+
    let carol = Peer::new("carol", [10, 10, 10, 10]);
+

+
    logger::init(log::Level::Debug);
+

+
    alice.connect_to(&bob);
+
    alice.connect_to(&eve);
+
    alice.connect_to(&carol);
+

+
    // Send the first fetch.
+
    let (send, _recv1) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid1, bob.id, DEFAULT_TIMEOUT, send));
+

+
    // Send the 2nd fetch that will be queued.
+
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid1, eve.id, DEFAULT_TIMEOUT, send2));
+

+
    // Send the 3rd fetch that will be queued.
+
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid1, carol.id, DEFAULT_TIMEOUT, send3));
+

+
    // The first fetch is initiated.
+
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == bob.id);
+
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
+
    assert_matches!(alice.outbox().next(), None);
+

+
    // Have enough time pass that Alice sends a "ping" to Bob.
+
    alice.elapse(KEEP_ALIVE_DELTA);
+

+
    // Finish the 1st fetch.
+
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::default()));
+
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
+
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == eve.id);
+
    // ... but not the third.
+
    assert_matches!(alice.fetches().next(), None);
+

+
    // Finish the 2nd fetch.
+
    alice.fetched(rid1, eve.id, Ok(fetch::FetchResult::default()));
+
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
+
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == carol.id);
+
}
+

+
#[test]
fn test_refs_synced_event() {
    let temp = tempfile::tempdir().unwrap();
    let storage = Storage::open(temp.path(), fixtures::user()).unwrap();