Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
protocol: fix FetchService race where 2 subscribers want different refs
Merged ade opened 2 months ago

On Mac OS we are seeing failures on the e2e outdated sigrefs tests. After investigation its found that you can have a race condition where 2 fetches for example are created; one for ‘sigrefs’ and the other for ‘all’. Before the ‘sigrefs’ fetcher completes the FetcherService subscribes the ‘all’ listener to the ‘sigrefs’ result causing the ‘all’ listener to panic because there are missing refs - and the ‘all’ fetch never happens.

Introduces a regression test and fix. We now include RefsAt as part of the subscribers HashMap and filter more specifically for interested listeners of the completed fetch.

1 file changed +119 -4 993428df 613c4e83
modified crates/radicle-protocol/src/fetcher/service.rs
@@ -1,5 +1,6 @@
use std::collections::HashMap;

+
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
@@ -14,7 +15,7 @@ use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
#[derive(Debug)]
pub struct FetcherService<S> {
    state: FetcherState,
-
    subscribers: HashMap<FetchKey, Vec<S>>,
+
    subscribers: HashMap<FetchKey, Vec<(Vec<RefsAt>, S)>>,
}

impl<S> FetcherService<S> {
@@ -87,13 +88,14 @@ impl<S> FetcherService<S> {
    /// See [`FetcherState::fetch`].
    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
        let key = FetchKey::new(cmd.rid, cmd.from);
+
        let refs_at = cmd.refs_at.clone();
        let event = self.state.fetch(cmd);

        let rejected = match &event {
            event::Fetch::QueueAtCapacity { .. } => subscriber,
            _ => {
                if let Some(r) = subscriber {
-
                    self.subscribers.entry(key).or_default().push(r);
+
                    self.subscribers.entry(key).or_default().push((refs_at, r));
                }
                None
            }
@@ -108,7 +110,41 @@ impl<S> FetcherService<S> {
    pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
        let key = FetchKey::new(cmd.rid, cmd.from);
        let event = self.state.fetched(cmd);
-
        let subscribers = self.subscribers.remove(&key).unwrap_or_default();
+

+
        let subscribers = if let event::Fetched::Completed {
+
            refs_at: ref fetched_refs,
+
            ..
+
        } = event
+
        {
+
            if let Some(subs) = self.subscribers.get_mut(&key) {
+
                // satisfied: subscribers whos 'wanted refs' match 'fetched_refs'
+
                // remaining: all other subscribers
+
                let (satisfied, remaining): (Vec<_>, Vec<_>) = subs
+
                    .drain(..)
+
                    .partition(|(wanted, _)| wanted == fetched_refs);
+

+
                // if everyone is satisfied, remove the entry; otherwise, put the remaining
+
                // ones back
+
                if remaining.is_empty() {
+
                    self.subscribers.remove(&key);
+
                } else {
+
                    *subs = remaining;
+
                }
+
                satisfied.into_iter().map(|(_, s)| s).collect()
+
            } else {
+
                Vec::new()
+
            }
+
        } else {
+
            // if the fetch failed or wasn't found, we notify everyone waiting on this key
+
            // because the connection/process failed entirely.
+
            self.subscribers
+
                .remove(&key)
+
                .unwrap_or_default()
+
                .into_iter()
+
                .map(|(_, s)| s)
+
                .collect()
+
        };
+

        FetchCompleted { event, subscribers }
    }

@@ -123,7 +159,7 @@ impl<S> FetcherService<S> {
        let mut orphaned = Vec::new();
        self.subscribers.retain(|key, subscribers| {
            if key.node == from {
-
                orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
+
                orphaned.extend(subscribers.drain(..).map(|(_, r)| (key.rid, r)));
                false
            } else {
                true
@@ -140,3 +176,82 @@ impl<S> FetcherService<S> {
        self.state.dequeue(from)
    }
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+
    use crate::fetcher::MaxQueueSize;
+
    use radicle::test::arbitrary;
+
    use std::num::NonZeroUsize;
+
    use std::time::Duration;
+

+
    fn config(max_concurrency: usize, max_queue_size: usize) -> Config {
+
        Config::new()
+
            .with_max_concurrency(NonZeroUsize::new(max_concurrency).unwrap())
+
            .with_max_capacity(MaxQueueSize::new(
+
                NonZeroUsize::new(max_queue_size).unwrap(),
+
            ))
+
    }
+

+
    fn gen_refs_at(count: usize) -> Vec<RefsAt> {
+
        (0..count).map(|_| arbitrary::gen(1)).collect()
+
    }
+

+
    #[test]
+
    fn test_fetch_coalescing_different_refs() {
+
        let mut service = FetcherService::<usize>::new(config(1, 10));
+
        let node = arbitrary::gen(1);
+
        let repo = arbitrary::gen(1);
+
        let refs_specific = gen_refs_at(1);
+
        let refs_all = vec![];
+
        let timeout = Duration::from_secs(30);
+

+
        // fetch specific refs (Subscriber 1)
+
        let initiated1 = service.fetch(
+
            command::Fetch {
+
                from: node,
+
                rid: repo,
+
                refs_at: refs_specific.clone(),
+
                timeout,
+
            },
+
            Some(1),
+
        );
+
        assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
+

+
        // fetch all refs (Subscriber 2)
+
        let initiated2 = service.fetch(
+
            command::Fetch {
+
                from: node,
+
                rid: repo,
+
                refs_at: refs_all.clone(),
+
                timeout,
+
            },
+
            Some(2),
+
        );
+

+
        // should be queued because refs differ
+
        assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
+

+
        // complete the specific refs fetch
+
        let completed = service.fetched(command::Fetched {
+
            from: node,
+
            rid: repo,
+
        });
+

+
        match completed.event {
+
            event::Fetched::Completed { ref refs_at, .. } => {
+
                assert_eq!(refs_at, &refs_specific);
+
            }
+
            _ => panic!("Expected Completed event"),
+
        }
+

+
        // only Subscriber 1 should be notified
+
        assert_eq!(completed.subscribers, vec![1]);
+

+
        // subscriber 2 should still be waiting
+
        assert!(service.subscribers.contains_key(&FetchKey::new(repo, node)));
+
        let remaining = &service.subscribers[&FetchKey::new(repo, node)];
+
        assert_eq!(remaining.len(), 1);
+
        assert_eq!(remaining[0].1, 2);
+
    }
+
}