Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Properly handle dequeued fetch results
Fintan Halpenny committed 2 years ago
commit 8eac221f4c07752530498fb4410af068a9c07b43
parent a6ba1b010320d20677fd5ccc14c2a4f7cd6b0ca4
2 files changed +158 -36
modified radicle-node/src/service.rs
@@ -226,13 +226,15 @@ pub enum CommandError {

/// Error returned by [`Service::try_fetch`].
#[derive(thiserror::Error, Debug)]
-
enum TryFetchError {
-
    #[error("session {0} does not exist; cannot initiate fetch")]
-
    SessionNotFound(NodeId),
-
    #[error("session {0} is not connected; cannot initiate fetch")]
-
    SessionNotConnected(NodeId),
-
    #[error("session {0} fetch capacity reached; cannot initiate fetch")]
-
    SessionCapacityReached(NodeId),
+
enum TryFetchError<'a> {
+
    #[error("ongoing fetch for repository exists")]
+
    AlreadyFetching(&'a mut FetchState),
+
    #[error("session does not exist; cannot initiate fetch")]
+
    SessionNotFound,
+
    #[error("session is not connected; cannot initiate fetch")]
+
    SessionNotConnected,
+
    #[error("session fetch capacity reached; cannot initiate fetch")]
+
    SessionCapacityReached,
    #[error(transparent)]
    Namespaces(#[from] NamespacesError),
}
@@ -246,6 +248,26 @@ struct FetchState {
    subscribers: Vec<chan::Sender<FetchResult>>,
}

+
impl FetchState {
+
    /// Add a subscriber to this fetch.
+
    fn subscribe(&mut self, c: chan::Sender<FetchResult>) {
+
        if !self.subscribers.iter().any(|s| s.same_channel(&c)) {
+
            self.subscribers.push(c);
+
        }
+
    }
+
}
+

+
/// Fetch waiting to be processed, in the fetch queue.
+
#[derive(Debug)]
+
struct QueuedFetch {
+
    /// Repo being fetched.
+
    rid: Id,
+
    /// Peer being fetched from.
+
    from: NodeId,
+
    /// Result channel.
+
    channel: Option<chan::Sender<FetchResult>>,
+
}
+

/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
@@ -327,7 +349,7 @@ pub struct Service<D, S, G> {
    /// Ongoing fetches.
    fetching: HashMap<Id, FetchState>,
    /// Fetch queue.
-
    queue: VecDeque<(Id, NodeId)>,
+
    queue: VecDeque<QueuedFetch>,
    /// Request/connection rate limitter.
    limiter: RateLimiter,
    /// Current seeded repositories bloom filter.
@@ -681,7 +703,7 @@ where
                }
            },
            Command::Fetch(rid, seed, timeout, resp) => {
-
                self.fetch(rid, &seed, timeout, Some(resp));
+
                self.fetch(rid, seed, timeout, Some(resp));
            }
            Command::Seed(rid, scope, resp) => {
                // Update our seeding policy.
@@ -755,7 +777,7 @@ where
    fn fetch_refs_at(
        &mut self,
        rid: Id,
-
        from: &NodeId,
+
        from: NodeId,
        refs: NonEmpty<RefsAt>,
        timeout: time::Duration,
    ) {
@@ -766,7 +788,7 @@ where
    fn fetch(
        &mut self,
        rid: Id,
-
        from: &NodeId,
+
        from: NodeId,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
@@ -776,19 +798,36 @@ where
    fn _fetch(
        &mut self,
        rid: Id,
-
        from: &NodeId,
+
        from: NodeId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
-
        match self.try_fetch(rid, from, refs_at, timeout) {
-
            Ok(FetchState { subscribers, .. }) => {
+
        match self.try_fetch(rid, &from, refs_at, timeout) {
+
            Ok(fetching) => {
                if let Some(c) = channel {
-
                    if !subscribers.iter().any(|s| s.same_channel(&c)) {
-
                        subscribers.push(c);
+
                    fetching.subscribe(c);
+
                }
+
            }
+
            Err(TryFetchError::AlreadyFetching(fetching)) => {
+
                // If we're already fetching from the requested peer, there's nothing to do, we
+
                // simply add the supplied channel to the list of subscribers so that it is notified
+
                // on completion. Otherwise, we queue a fetch with the requested peer.
+
                if fetching.from == from {
+
                    debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
+

+
                    if let Some(c) = channel {
+
                        fetching.subscribe(c);
                    }
+
                } else {
+
                    debug!(target: "service", "Queueing fetch for {rid} with {from}..");
+
                    self.queue.push_back(QueuedFetch { rid, from, channel });
                }
            }
+
            Err(TryFetchError::SessionCapacityReached) => {
+
                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
+
                self.queue.push_back(QueuedFetch { rid, from, channel });
+
            }
            Err(e) => {
                if let Some(c) = channel {
                    c.send(FetchResult::Failed {
@@ -809,30 +848,26 @@ where
    ) -> Result<&mut FetchState, TryFetchError> {
        let from = *from;
        let Some(session) = self.sessions.get_mut(&from) else {
-
            return Err(TryFetchError::SessionNotFound(from));
+
            return Err(TryFetchError::SessionNotFound);
        };
        let fetching = self.fetching.entry(rid);

        if let Entry::Occupied(fetching) = fetching {
-
            if fetching.get().from == from {
-
                debug!(target: "service", "Ignoring redundant fetch of {rid} from {from}");
-
                debug_assert!(session.is_fetching(&rid));
-
            } else {
-
                debug!(target: "service", "Queueing fetch for {rid} with {from}..");
-
                self.queue.push_back((rid, from));
-
            }
-
            return Ok(fetching.into_mut());
+
            // We're already fetching this repo from some peer.
+
            return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
        }
+
        // Sanity check: We shouldn't be fetching from this session, since we return above if we're
+
        // fetching from any session.
+
        debug_assert!(!session.is_fetching(&rid));
+

        if !session.is_connected() {
            // This can happen if a session disconnects in the time between asking for seeds to
            // fetch from, and initiating the fetch from one of those seeds.
-
            return Err(TryFetchError::SessionNotConnected(session.id));
+
            return Err(TryFetchError::SessionNotConnected);
        }
        if session.is_at_capacity() {
-
            debug!(target: "service", "Fetch capacity reached for {}, queueing {rid}..", session.id);
-
            self.queue.push_back((rid, session.id));
-

-
            return Err(TryFetchError::SessionCapacityReached(session.id));
+
            // If we're already fetching multiple repos from this peer.
+
            return Err(TryFetchError::SessionCapacityReached);
        }

        let fetching = fetching.or_insert(FetchState {
@@ -945,10 +980,10 @@ where
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
    pub fn dequeue_fetch(&mut self) {
-
        if let Some((rid, nid)) = self.queue.pop_front() {
-
            debug!(target: "service", "Dequeued fetch for {rid} from session {nid}..");
+
        if let Some(QueuedFetch { rid, from, channel }) = self.queue.pop_front() {
+
            debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");

-
            self.fetch(rid, &nid, FETCH_TIMEOUT, None);
+
            self.fetch(rid, from, FETCH_TIMEOUT, channel);
        }
    }

@@ -1226,7 +1261,7 @@ where
                                }
                                Ok(false) => {
                                    debug!(target: "service", "Missing seeded inventory {id}; initiating fetch..");
-
                                    self.fetch(*id, announcer, FETCH_TIMEOUT, None);
+
                                    self.fetch(*id, *announcer, FETCH_TIMEOUT, None);
                                }
                                Err(e) => {
                                    error!(target: "service", "Error checking local inventory: {e}");
@@ -1346,7 +1381,7 @@ where
                        // Finally, if there's anything to fetch, we fetch it from the
                        // remote.
                        if let Some(fresh) = NonEmpty::from_vec(fresh) {
-
                            self.fetch_refs_at(message.rid, &remote.id, fresh, FETCH_TIMEOUT);
+
                            self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT);
                        }
                    } else {
                        trace!(
@@ -1973,7 +2008,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, &seed.nid, FETCH_TIMEOUT, None);
+
                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
modified radicle-node/src/tests/e2e.rs
@@ -1151,3 +1151,90 @@ fn missing_default_branch() {
        FetchResult::Failed { .. }
    );
}
+

+
#[test]
+
fn test_background_foreground_fetch() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+

+
    let mut alice = Node::init(tmp.path(), Config::test(Alias::new("alice")));
+
    let bob = Node::init(tmp.path(), Config::test(Alias::new("bob")));
+
    let eve = Node::init(tmp.path(), Config::test(Alias::new("eve")));
+

+
    let rid = alice.project("acme", "");
+

+
    let mut alice = alice.spawn();
+
    let alice_events = alice.handle.events();
+
    let mut bob = bob.spawn();
+
    let mut eve = eve.spawn();
+

+
    bob.handle.seed(rid, Scope::Followed).unwrap();
+
    eve.handle.seed(rid, Scope::Followed).unwrap();
+
    alice.connect(&bob);
+
    alice.connect(&eve);
+
    converge([&alice, &bob, &eve]);
+

+
    bob.handle.fetch(rid, alice.id, DEFAULT_TIMEOUT).unwrap();
+
    assert!(bob.storage.contains(&rid).unwrap());
+
    rad::fork(rid, &bob.signer, &bob.storage).unwrap();
+

+
    eve.handle.fetch(rid, alice.id, DEFAULT_TIMEOUT).unwrap();
+
    assert!(eve.storage.contains(&rid).unwrap());
+
    rad::fork(rid, &eve.signer, &eve.storage).unwrap();
+

+
    // Alice fetches Eve's fork and we make note of the sigrefs
+
    alice
+
        .handle
+
        .follow(eve.id, Some(Alias::new("eve")))
+
        .unwrap();
+
    alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap();
+
    let repo = alice.storage.repository(rid).unwrap();
+
    assert!(repo.remote(&eve.id).is_ok());
+
    let repo = alice.storage.repository(rid).unwrap();
+
    let eve_remote = repo.remote(&eve.id).unwrap();
+
    let old_refs = eve_remote.refs;
+

+
    // Eve creates an issue, updating their refs, and we make note of
+
    // the new refs
+
    eve.issue(
+
        rid,
+
        "Outdated Sigrefs",
+
        "Outdated sigrefs are harshing my vibes",
+
    );
+
    let repo = eve.storage.repository(rid).unwrap();
+
    let eves_refs = repo.remote(&eve.id).unwrap().refs;
+

+
    // Alice follows Bob and they make a new change and announce it,
+
    // this initiates a background fetch for Alice from Bob
+
    alice
+
        .handle
+
        .follow(bob.id, Some(Alias::new("bob")))
+
        .unwrap();
+
    bob.issue(
+
        rid,
+
        "Concurrent fetches",
+
        "Concurrent fetches are harshing my vibes",
+
    );
+
    bob.handle.announce_refs(rid).unwrap();
+
    alice_events
+
        .wait(
+
            |e| matches!(e, service::Event::RefsAnnounced { .. }).then_some(()),
+
            DEFAULT_TIMEOUT,
+
        )
+
        .unwrap();
+

+
    // Alice initiates a fetch from Eve and we ensure that we get the
+
    // updated refs from Eve, and the fetch from Bob should not
+
    // interfere
+
    log::debug!(target: "test", "Alice fetches from Eve..");
+
    assert_matches!(
+
        alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
+
        FetchResult::Success { .. }
+
    );
+
    let repo = alice.storage.repository(rid).unwrap();
+
    let eve_remote = repo.remote(&eve.id).unwrap();
+
    let eves_refs_expected = eve_remote.refs;
+
    assert_ne!(eves_refs_expected, old_refs);
+
    assert_eq!(eves_refs_expected, eves_refs);
+
}