Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: include refsat for fetcher service subscribers
Adrian Duke committed 2 months ago
commit 282fa57c66cfcf6a545145d0a9e4ed8d0b5e168d
parent 537eaba8d16cbc34dab2a04e212a704b3bb68f7c
1 file changed +40 -4
modified crates/radicle-protocol/src/fetcher/service.rs
@@ -1,5 +1,6 @@
use std::collections::HashMap;

+
use radicle::storage::refs::RefsAt;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
@@ -14,7 +15,7 @@ use crate::fetcher::state::{command, event, Config, FetcherState, QueuedFetch};
#[derive(Debug)]
pub struct FetcherService<S> {
    state: FetcherState,
-
    subscribers: HashMap<FetchKey, Vec<S>>,
+
    subscribers: HashMap<FetchKey, Vec<(Vec<RefsAt>, S)>>,
}

impl<S> FetcherService<S> {
@@ -87,13 +88,14 @@ impl<S> FetcherService<S> {
    /// See [`FetcherState::fetch`].
    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
        let key = FetchKey::new(cmd.rid, cmd.from);
+
        let refs_at = cmd.refs_at.clone();
        let event = self.state.fetch(cmd);

        let rejected = match &event {
            event::Fetch::QueueAtCapacity { .. } => subscriber,
            _ => {
                if let Some(r) = subscriber {
-
                    self.subscribers.entry(key).or_default().push(r);
+
                    self.subscribers.entry(key).or_default().push((refs_at, r));
                }
                None
            }
@@ -108,7 +110,41 @@ impl<S> FetcherService<S> {
    pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
        let key = FetchKey::new(cmd.rid, cmd.from);
        let event = self.state.fetched(cmd);
-
        let subscribers = self.subscribers.remove(&key).unwrap_or_default();
+

+
        let subscribers = if let event::Fetched::Completed {
+
            refs_at: ref fetched_refs,
+
            ..
+
        } = event
+
        {
+
            if let Some(subs) = self.subscribers.get_mut(&key) {
+
                // satisfied: subscribers whos 'wanted refs' match 'fetched_refs'
+
                // remaining: all other subscribers
+
                let (satisfied, remaining): (Vec<_>, Vec<_>) = subs
+
                    .drain(..)
+
                    .partition(|(wanted, _)| wanted == fetched_refs);
+

+
                // if everyone is satisfied, remove the entry; otherwise, put the remaining
+
                // ones back
+
                if remaining.is_empty() {
+
                    self.subscribers.remove(&key);
+
                } else {
+
                    *subs = remaining;
+
                }
+
                satisfied.into_iter().map(|(_, s)| s).collect()
+
            } else {
+
                Vec::new()
+
            }
+
        } else {
+
            // if the fetch failed or wasn't found, we notify everyone waiting on this key
+
            // because the connection/process failed entirely.
+
            self.subscribers
+
                .remove(&key)
+
                .unwrap_or_default()
+
                .into_iter()
+
                .map(|(_, s)| s)
+
                .collect()
+
        };
+

        FetchCompleted { event, subscribers }
    }

@@ -123,7 +159,7 @@ impl<S> FetcherService<S> {
        let mut orphaned = Vec::new();
        self.subscribers.retain(|key, subscribers| {
            if key.node == from {
-
                orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
+
                orphaned.extend(subscribers.drain(..).map(|(_, r)| (key.rid, r)));
                false
            } else {
                true