| |
MinReplicationFactor { preferred: usize, synced: usize },
|
| |
MaxReplicationFactor { preferred: usize, synced: usize },
|
| |
}
|
| + |
|
| + |
#[allow(clippy::unwrap_used)]
|
| + |
#[cfg(test)]
|
| + |
mod test {
|
| + |
use crate::test::arbitrary;
|
| + |
|
| + |
use super::*;
|
| + |
|
| + |
#[test]
|
| + |
fn announcer_reached_min_replication_target() {
|
| + |
let local = arbitrary::gen::<NodeId>(0);
|
| + |
let seeds = arbitrary::set::<NodeId>(10..=10);
|
| + |
let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
|
| + |
let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::must_reach(3),
|
| + |
preferred_seeds.clone(),
|
| + |
BTreeSet::new(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
let mut announcer = Announcer::new(config).unwrap();
|
| + |
let to_sync = announcer.to_sync();
|
| + |
assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
|
| + |
|
| + |
let mut synced_result = BTreeMap::new();
|
| + |
let mut success = None;
|
| + |
let mut successes = 0;
|
| + |
|
| + |
for node in preferred_seeds.iter() {
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
for node in unsynced.iter() {
|
| + |
assert_ne!(*node, local);
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
|
| + |
assert_eq!(
|
| + |
success.as_ref().unwrap().outcome(),
|
| + |
SuccessfulOutcome::MinReplicationFactor {
|
| + |
preferred: 2,
|
| + |
synced: 3,
|
| + |
}
|
| + |
)
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn announcer_reached_max_replication_target() {
|
| + |
let local = arbitrary::gen::<NodeId>(0);
|
| + |
let seeds = arbitrary::set::<NodeId>(10..=10);
|
| + |
let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
|
| + |
let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::range(3, 6),
|
| + |
preferred_seeds.clone(),
|
| + |
BTreeSet::new(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
let mut announcer = Announcer::new(config).unwrap();
|
| + |
let to_sync = announcer.to_sync();
|
| + |
assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
|
| + |
|
| + |
let mut synced_result = BTreeMap::new();
|
| + |
let mut success = None;
|
| + |
let mut successes = 0;
|
| + |
|
| + |
for node in preferred_seeds.iter() {
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
for node in unsynced.iter() {
|
| + |
assert_ne!(*node, local);
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
|
| + |
assert_eq!(
|
| + |
success.as_ref().unwrap().outcome(),
|
| + |
SuccessfulOutcome::MaxReplicationFactor {
|
| + |
preferred: 2,
|
| + |
synced: 6,
|
| + |
}
|
| + |
)
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn announcer_must_reach_preferred_seeds() {
|
| + |
let local = arbitrary::gen::<NodeId>(0);
|
| + |
let seeds = arbitrary::set::<NodeId>(10..=10);
|
| + |
let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
|
| + |
let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::range(3, 6),
|
| + |
preferred_seeds.clone(),
|
| + |
BTreeSet::new(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
let mut announcer = Announcer::new(config).unwrap();
|
| + |
let to_sync = announcer.to_sync();
|
| + |
assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
|
| + |
|
| + |
let mut synced_result = BTreeMap::new();
|
| + |
let mut success = None;
|
| + |
let mut successes = 0;
|
| + |
|
| + |
for node in unsynced.iter() {
|
| + |
assert_ne!(*node, local);
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
for node in preferred_seeds.iter() {
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
|
| + |
assert_eq!(
|
| + |
success.as_ref().unwrap().outcome(),
|
| + |
SuccessfulOutcome::MaxReplicationFactor {
|
| + |
preferred: 2,
|
| + |
synced: 10,
|
| + |
}
|
| + |
)
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn announcer_will_minimise_replication_factor() {
|
| + |
let local = arbitrary::gen::<NodeId>(0);
|
| + |
let seeds = arbitrary::set::<NodeId>(10..=10);
|
| + |
let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
|
| + |
let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::must_reach(11),
|
| + |
preferred_seeds.clone(),
|
| + |
BTreeSet::new(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
let mut announcer = Announcer::new(config).unwrap();
|
| + |
let to_sync = announcer.to_sync();
|
| + |
assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
|
| + |
|
| + |
let mut synced_result = BTreeMap::new();
|
| + |
let mut success = None;
|
| + |
let mut successes = 0;
|
| + |
|
| + |
// Simulate not being able to reach all nodes
|
| + |
for node in to_sync.iter() {
|
| + |
assert_ne!(*node, local);
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
success = Some(stop);
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
|
| + |
assert_eq!(
|
| + |
success.as_ref().unwrap().outcome(),
|
| + |
SuccessfulOutcome::MinReplicationFactor {
|
| + |
preferred: 2,
|
| + |
synced: 10,
|
| + |
}
|
| + |
)
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn announcer_timed_out() {
|
| + |
let local = arbitrary::gen::<NodeId>(0);
|
| + |
let seeds = arbitrary::set::<NodeId>(10..=10);
|
| + |
let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
|
| + |
let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::must_reach(11),
|
| + |
preferred_seeds.clone(),
|
| + |
BTreeSet::new(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
let mut announcer = Announcer::new(config).unwrap();
|
| + |
let to_sync = announcer.to_sync();
|
| + |
assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
|
| + |
|
| + |
let mut synced_result = BTreeMap::new();
|
| + |
let mut announcer_result = None;
|
| + |
let mut successes = 0;
|
| + |
|
| + |
// Simulate not being able to reach all nodes
|
| + |
for node in to_sync.iter() {
|
| + |
assert_ne!(*node, local);
|
| + |
if successes > 5 {
|
| + |
announcer_result = Some(announcer.timed_out());
|
| + |
break;
|
| + |
}
|
| + |
let t = time::Duration::from_secs(1);
|
| + |
synced_result.insert(*node, SyncStatus::Synced { duration: t });
|
| + |
successes += 1;
|
| + |
match announcer.synced_with(*node, t) {
|
| + |
ControlFlow::Continue(progress) => {
|
| + |
assert_eq!(progress.synced(), successes)
|
| + |
}
|
| + |
ControlFlow::Break(stop) => {
|
| + |
announcer_result = Some(stop.into());
|
| + |
break;
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
match announcer_result {
|
| + |
Some(AnnouncerResult::TimedOut(timeout)) => {
|
| + |
assert_eq!(timeout.synced, synced_result);
|
| + |
assert_eq!(
|
| + |
timeout.timed_out,
|
| + |
to_sync
|
| + |
.difference(&synced_result.keys().copied().collect())
|
| + |
.copied()
|
| + |
.collect()
|
| + |
);
|
| + |
}
|
| + |
unexpected => panic!("Expected AnnouncerResult::TimedOut, found: {unexpected:#?}"),
|
| + |
}
|
| + |
}
|
| + |
|
| + |
#[test]
|
| + |
fn cannot_construct_announcer() {
|
| + |
let local = arbitrary::gen::<NodeId>(0);
|
| + |
let seeds = arbitrary::set::<NodeId>(10..=10);
|
| + |
let synced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
|
| + |
let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
|
| + |
let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
|
| + |
let replicas = ReplicationFactor::default();
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::default(),
|
| + |
BTreeSet::new(),
|
| + |
BTreeSet::new(),
|
| + |
BTreeSet::new(),
|
| + |
);
|
| + |
assert!(matches!(
|
| + |
Announcer::new(config),
|
| + |
Err(AnnouncerError::NoSeeds)
|
| + |
));
|
| + |
|
| + |
// No nodes to sync
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
replicas,
|
| + |
preferred_seeds.clone(),
|
| + |
synced.clone(),
|
| + |
BTreeSet::new(),
|
| + |
);
|
| + |
assert!(matches!(
|
| + |
Announcer::new(config),
|
| + |
Err(AnnouncerError::AlreadySynced { .. })
|
| + |
));
|
| + |
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::must_reach(0),
|
| + |
BTreeSet::new(),
|
| + |
synced.clone(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
assert!(matches!(
|
| + |
Announcer::new(config),
|
| + |
Err(AnnouncerError::Target(_))
|
| + |
));
|
| + |
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::MustReach(2),
|
| + |
preferred_seeds.clone(),
|
| + |
synced.clone(),
|
| + |
unsynced.clone(),
|
| + |
);
|
| + |
// Min replication factor
|
| + |
assert!(matches!(
|
| + |
Announcer::new(config),
|
| + |
Err(AnnouncerError::AlreadySynced { .. })
|
| + |
));
|
| + |
let config = AnnouncerConfig::public(
|
| + |
local,
|
| + |
ReplicationFactor::range(2, 3),
|
| + |
preferred_seeds,
|
| + |
synced,
|
| + |
unsynced,
|
| + |
);
|
| + |
// Max replication factor
|
| + |
assert!(matches!(
|
| + |
Announcer::new(config),
|
| + |
Err(AnnouncerError::AlreadySynced { .. })
|
| + |
));
|
| + |
}
|
| + |
}
|