use std::collections::HashMap;
use radicle_core::{NodeId, RepoId};
use crate::fetcher::{
RefsToFetch,
state::{
Config, FetcherState, QueuedFetch,
command::{self},
event,
},
};
/// Service layer that wraps [`FetcherState`] and manages subscriber coalescing.
///
/// When multiple callers request the same fetch, their subscribers are collected
/// and all notified when the fetch completes.
///
/// # Type Parameter
/// - `S`: The subscriber type (e.g., `chan::Sender<FetchResult>`).
#[derive(Debug)]
pub struct FetcherService<S> {
state: FetcherState,
subscribers: HashMap<FetchKey, Vec<S>>,
}
impl<S> FetcherService<S> {
/// Initialize the [`FetcherService`] with the give [`Config`].
pub fn new(config: Config) -> Self {
Self {
state: FetcherState::new(config),
subscribers: HashMap::new(),
}
}
/// Provide a reference handle to the [`FetcherState`].
pub fn state(&self) -> &FetcherState {
&self.state
}
}
/// Key for pending subscribers.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct FetchKey {
rid: RepoId,
node: NodeId,
refs: RefsToFetch,
}
impl FetchKey {
fn new(rid: RepoId, node: NodeId, refs: RefsToFetch) -> Self {
Self { rid, node, refs }
}
}
/// The result of calling [`FetcherService::fetch`].
#[must_use]
#[derive(Debug)]
pub struct FetchInitiated<S> {
/// The underlying result from calling [`FetcherState::fetch`].
pub event: event::Fetch,
/// Subscriber returned if fetch was rejected (queue at capacity).
pub rejected: Option<S>,
}
/// The result of calling [`FetcherService::fetched`].
#[must_use]
#[derive(Debug)]
pub struct FetchCompleted<S> {
/// The underlying result from calling [`FetcherState::fetched`].
pub event: event::Fetched,
/// All the subscribers that were interested in this given fetch.
pub subscribers: Vec<S>,
}
/// The result of calling [`FetcherService::cancel`].
#[must_use]
#[derive(Debug)]
pub struct FetchesCancelled<S> {
/// The underlying result from calling [`FetcherState::cancel`].
pub event: event::Cancel,
/// Orphaned subscribers paired with their [`RepoId`].
pub orphaned: Vec<(RepoId, S)>,
}
impl<S> FetcherService<S> {
/// Initiate a fetch, optionally registering a subscriber.
///
/// Subscribers are coalesced: if the same `(rid, node)` is already being
/// fetched or queued, the subscriber joins the existing waiters.
///
/// If the fetch could not be initiated, and also could not be queued, then
/// subscriber is returned to notify of the rejection.
///
/// See [`FetcherState::fetch`].
pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs.clone());
let event = self.state.fetch(cmd);
let rejected = match &event {
event::Fetch::QueueAtCapacity { .. } => subscriber,
_ => {
if let Some(r) = subscriber {
self.subscribers.entry(key).or_default().push(r);
}
None
}
};
FetchInitiated { event, rejected }
}
/// Mark a fetch as completed and retrieve waiting subscribers.
///
/// See [`FetcherState::fetched`].
pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
let event = self.state.fetched(cmd);
match event {
// TODO(finto): drop subscribers with this partial key?
e @ event::Fetched::NotFound { .. } => FetchCompleted {
event: e,
subscribers: vec![],
},
ref e @ event::Fetched::Completed { ref refs, .. } => {
let key = FetchKey::new(cmd.rid, cmd.from, refs.clone());
let subscribers = self.subscribers.remove(&key).unwrap_or_default();
FetchCompleted {
event: e.clone(),
subscribers,
}
}
}
}
/// Cancel all fetches for a disconnected peer, returning any orphaned
/// subscribers.
///
/// See [`FetcherState::cancel`].
pub fn cancel(&mut self, cmd: command::Cancel) -> FetchesCancelled<S> {
let from = cmd.from;
let event = self.state.cancel(cmd);
let mut orphaned = Vec::new();
self.subscribers.retain(|key, subscribers| {
if key.node == from {
orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
false
} else {
true
}
});
FetchesCancelled { event, orphaned }
}
/// Dequeue the next fetch for a node.
///
/// See [`FetcherState::dequeue`].
pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
self.state.dequeue(from)
}
}
#[cfg(test)]
mod tests {
use radicle::storage::refs::RefsAt;
use radicle::test::arbitrary;
use std::num::NonZeroUsize;
use super::*;
use crate::fetcher::{FetchConfig, MaxQueueSize};
#[test]
fn test_fetch_coalescing_different_refs() {
let config = Config::new()
.with_max_concurrency(NonZeroUsize::new(1).unwrap())
.with_max_capacity(MaxQueueSize::new(NonZeroUsize::new(10).unwrap()));
let mut service = FetcherService::<usize>::new(config);
let node = arbitrary::r#gen(1);
let repo = arbitrary::r#gen(1);
let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
let refs_all = vec![];
let config = FetchConfig::default();
// fetch specific refs (Subscriber 1)
let initiated1 = service.fetch(
command::Fetch {
from: node,
rid: repo,
refs: refs_specific.clone().into(),
config,
},
Some(1),
);
assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
// fetch all refs (Subscriber 2)
let initiated2 = service.fetch(
command::Fetch {
from: node,
rid: repo,
refs: refs_all.clone().into(),
config,
},
Some(2),
);
// should be queued because refs differ
assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
// complete the specific refs fetch
let completed = service.fetched(command::Fetched {
from: node,
rid: repo,
});
match completed.event {
event::Fetched::Completed { ref refs, .. } => {
assert_eq!(refs, &refs_specific.into());
}
_ => panic!("Expected Completed event"),
}
// only Subscriber 1 should be notified
assert_eq!(completed.subscribers, vec![1]);
// subscriber 2 should still be waiting
assert!(service.subscribers.contains_key(&FetchKey::new(
repo,
node,
refs_all.clone().into()
)));
let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all.into())];
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0], 2);
}
}