Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: separate subscribers by RefsAt
✗ CI failure Fintan Halpenny committed 2 months ago
commit 30436db7eb6d9c3c4267bf5dc46e66b7e3e324bc
parent 537eaba8d16cbc34dab2a04e212a704b3bb68f7c
1 failed (1 total) View logs
1 file changed +99 -7
modified crates/radicle-protocol/src/fetcher/service.rs
@@ -1,5 +1,6 @@
use std::collections::HashMap;

+
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
@@ -33,15 +34,16 @@ impl<S> FetcherService<S> {
}

/// Key for pending subscribers.
-
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct FetchKey {
    rid: RepoId,
    node: NodeId,
+
    refs_at: Vec<RefsAt>,
}

impl FetchKey {
-
    fn new(rid: RepoId, node: NodeId) -> Self {
-
        Self { rid, node }
+
    fn new(rid: RepoId, node: NodeId, refs_at: Vec<RefsAt>) -> Self {
+
        Self { rid, node, refs_at }
    }
}

@@ -86,7 +88,7 @@ impl<S> FetcherService<S> {
    ///
    /// See [`FetcherState::fetch`].
    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
-
        let key = FetchKey::new(cmd.rid, cmd.from);
+
        let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs_at.clone());
        let event = self.state.fetch(cmd);

        let rejected = match &event {
@@ -106,10 +108,22 @@ impl<S> FetcherService<S> {
    ///
    /// See [`FetcherState::fetched`].
    pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
-
        let key = FetchKey::new(cmd.rid, cmd.from);
        let event = self.state.fetched(cmd);
-
        let subscribers = self.subscribers.remove(&key).unwrap_or_default();
-
        FetchCompleted { event, subscribers }
+
        match event {
+
            // TODO(finto): drop subscribers with this partial key?
+
            e @ event::Fetched::NotFound { .. } => FetchCompleted {
+
                event: e,
+
                subscribers: vec![],
+
            },
+
            ref e @ event::Fetched::Completed { ref refs_at, .. } => {
+
                let key = FetchKey::new(cmd.rid, cmd.from, refs_at.clone());
+
                let subscribers = self.subscribers.remove(&key).unwrap_or_default();
+
                FetchCompleted {
+
                    event: e.clone(),
+
                    subscribers,
+
                }
+
            }
+
        }
    }

    /// Cancel all fetches for a disconnected peer, returning any orphaned
@@ -140,3 +154,81 @@ impl<S> FetcherService<S> {
        self.state.dequeue(from)
    }
}
+

+
#[cfg(test)]
+
mod tests {
+
    use radicle::test::arbitrary;
+
    use std::num::NonZeroUsize;
+
    use std::time::Duration;
+

+
    use super::*;
+

+
    use crate::fetcher::MaxQueueSize;
+

+
    #[test]
+
    fn test_fetch_coalescing_different_refs() {
+
        let config = Config::new()
+
            .with_max_concurrency(NonZeroUsize::new(1).unwrap())
+
            .with_max_capacity(MaxQueueSize::new(NonZeroUsize::new(10).unwrap()));
+
        let mut service = FetcherService::<usize>::new(config);
+
        let node = arbitrary::gen(1);
+
        let repo = arbitrary::gen(1);
+
        let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
+
        let refs_all = vec![];
+
        let timeout = Duration::from_secs(30);
+

+
        // fetch specific refs (Subscriber 1)
+
        let initiated1 = service.fetch(
+
            command::Fetch {
+
                from: node,
+
                rid: repo,
+
                refs_at: refs_specific.clone(),
+
                timeout,
+
            },
+
            Some(1),
+
        );
+

+
        assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
+

+
        // fetch all refs (Subscriber 2)
+
        let initiated2 = service.fetch(
+
            command::Fetch {
+
                from: node,
+
                rid: repo,
+
                refs_at: refs_all.clone(),
+
                timeout,
+
            },
+
            Some(2),
+
        );
+

+
        // should be queued because refs differ
+

+
        assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
+

+
        // complete the specific refs fetch
+

+
        let completed = service.fetched(command::Fetched {
+
            from: node,
+
            rid: repo,
+
        });
+

+
        match completed.event {
+
            event::Fetched::Completed { ref refs_at, .. } => {
+
                assert_eq!(refs_at, &refs_specific);
+
            }
+
            _ => panic!("Expected Completed event"),
+
        }
+

+
        // only Subscriber 1 should be notified
+
        assert_eq!(completed.subscribers, vec![1]);
+

+
        // subscriber 2 should still be waiting
+
        assert!(service
+
            .subscribers
+
            .contains_key(&FetchKey::new(repo, node, refs_all.clone())));
+

+
        let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all)];
+
        assert_eq!(remaining.len(), 1);
+
        assert_eq!(remaining[0], 2);
+
    }
+
}