Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Test and fix the Announcer
Merged fintohaps opened 11 months ago

This adds testing and fixes to the Announcer. See the commits for more details.

2 files changed +413 -9 c57d43f2 924b9328
modified radicle-cli/src/node.rs
@@ -246,7 +246,9 @@ where
            term::format::secondary(progress.preferred()),
            term::format::secondary(n_preferred_seeds),
            term::format::secondary(progress.synced()),
-
            term::format::secondary(min_replicas),
+
            // N.b. the number of replicas could exceed the target if we're
+
            // waiting for preferred seeds
+
            term::format::secondary(min_replicas.max(progress.synced())),
        ));
    }) {
        Ok(result) => {
modified radicle/src/node/sync/announce.rs
@@ -8,6 +8,7 @@ use crate::node::NodeId;

use super::{PrivateNetwork, ReplicationFactor};

+
#[derive(Debug)]
pub struct Announcer {
    local_node: NodeId,
    target: Target,
@@ -51,6 +52,12 @@ impl Announcer {
            .into());
        }

+
        // N.b extend the unsynced set with any preferred seeds that are not yet
+
        // synced
+
        config
+
            .unsynced
+
            .extend(config.preferred_seeds.difference(&config.synced).copied());
+

        let replicas = config.replicas.min(config.unsynced.len());
        let announcer = Self {
            local_node: config.local_node,
@@ -147,7 +154,7 @@ impl Announcer {

    /// Get the [`Progress`] of the [`Announcer`].
    pub fn progress(&self) -> Progress {
-
        let (synced, preferred) = self.success_counts();
+
        let SuccessCounts { preferred, synced } = self.success_counts();
        let unsynced = self.to_sync.len().saturating_sub(synced);
        Progress {
            preferred,
@@ -168,7 +175,7 @@ impl Announcer {
    }

    fn is_target_reached(&self) -> Option<SuccessfulOutcome> {
-
        let (preferred, synced) = self.success_counts();
+
        let SuccessCounts { preferred, synced } = self.success_counts();
        let reached_preferred = self.target.preferred_seeds.is_empty()
            || preferred >= self.target.preferred_seeds.len();

@@ -182,19 +189,41 @@ impl Announcer {
        }
    }

-
    fn success_counts(&self) -> (usize, usize) {
+
    fn success_counts(&self) -> SuccessCounts {
        self.synced
            .keys()
-
            .fold((0, 0), |(mut preferred, mut succeeded), nid| {
-
                succeeded += 1;
+
            .fold(SuccessCounts::default(), |counts, nid| {
                if self.target.preferred_seeds.contains(nid) {
-
                    preferred += 1;
+
                    counts.preferred().synced()
+
                } else {
+
                    counts.synced()
                }
-
                (preferred, succeeded)
            })
    }
}

+
#[derive(Default)]
+
struct SuccessCounts {
+
    preferred: usize,
+
    synced: usize,
+
}
+

+
impl SuccessCounts {
+
    fn synced(self) -> Self {
+
        Self {
+
            synced: self.synced + 1,
+
            ..self
+
        }
+
    }
+

+
    fn preferred(self) -> Self {
+
        Self {
+
            preferred: self.preferred + 1,
+
            ..self
+
        }
+
    }
+
}
+

/// Configuration of the [`Announcer`].
pub struct AnnouncerConfig {
    local_node: NodeId,
@@ -255,6 +284,7 @@ impl AnnouncerConfig {
}

/// Result of running an [`Announcer`] process.
+
#[derive(Debug)]
pub enum AnnouncerResult {
    /// The target of the [`Announcer`] was successfully met.
    Success(Success),
@@ -304,6 +334,7 @@ impl From<NoNodes> for AnnouncerResult {
    }
}

+
#[derive(Debug)]
pub struct NoNodes {
    synced: BTreeMap<NodeId, SyncStatus>,
}
@@ -315,6 +346,7 @@ impl NoNodes {
    }
}

+
#[derive(Debug)]
pub struct TimedOut {
    synced: BTreeMap<NodeId, SyncStatus>,
    timed_out: BTreeSet<NodeId>,
@@ -332,6 +364,7 @@ impl TimedOut {
    }
}

+
#[derive(Debug)]
pub struct Success {
    outcome: SuccessfulOutcome,
    synced: BTreeMap<NodeId, SyncStatus>,
@@ -350,6 +383,7 @@ impl Success {
}

/// Error in constructing the [`Announcer`].
+
#[derive(Debug)]
pub enum AnnouncerError {
    /// Both sets of already synchronized and un-synchronized nodes were empty
    /// of nodes were empty.
@@ -366,6 +400,7 @@ impl From<AlreadySynced> for AnnouncerError {
    }
}

+
#[derive(Debug)]
pub struct AlreadySynced {
    preferred: usize,
    synced: usize,
@@ -384,7 +419,7 @@ impl AlreadySynced {
}

/// The status of the synchronized node.
-
#[derive(Clone, Copy, Debug)]
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SyncStatus {
    /// The node was already synchronized before starting the [`Announcer`]
    /// process.
@@ -462,3 +497,370 @@ pub enum SuccessfulOutcome {
    MinReplicationFactor { preferred: usize, synced: usize },
    MaxReplicationFactor { preferred: usize, synced: usize },
}
+

+
#[allow(clippy::unwrap_used)]
+
#[cfg(test)]
+
mod test {
+
    use crate::test::arbitrary;
+

+
    use super::*;
+

+
    #[test]
+
    fn announcer_reached_min_replication_target() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(10..=10);
+
        let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(3),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+
        let mut announcer = Announcer::new(config).unwrap();
+
        let to_sync = announcer.to_sync();
+
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
+

+
        let mut synced_result = BTreeMap::new();
+
        let mut success = None;
+
        let mut successes = 0;
+

+
        for node in preferred_seeds.iter() {
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+

+
        for node in unsynced.iter() {
+
            assert_ne!(*node, local);
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+
        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
+
        assert_eq!(
+
            success.as_ref().unwrap().outcome(),
+
            SuccessfulOutcome::MinReplicationFactor {
+
                preferred: 2,
+
                synced: 3,
+
            }
+
        )
+
    }
+

+
    #[test]
+
    fn announcer_reached_max_replication_target() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(10..=10);
+
        let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::range(3, 6),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+
        let mut announcer = Announcer::new(config).unwrap();
+
        let to_sync = announcer.to_sync();
+
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
+

+
        let mut synced_result = BTreeMap::new();
+
        let mut success = None;
+
        let mut successes = 0;
+

+
        for node in preferred_seeds.iter() {
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+

+
        for node in unsynced.iter() {
+
            assert_ne!(*node, local);
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+
        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
+
        assert_eq!(
+
            success.as_ref().unwrap().outcome(),
+
            SuccessfulOutcome::MaxReplicationFactor {
+
                preferred: 2,
+
                synced: 6,
+
            }
+
        )
+
    }
+

+
    #[test]
+
    fn announcer_must_reach_preferred_seeds() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(10..=10);
+
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::range(3, 6),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+
        let mut announcer = Announcer::new(config).unwrap();
+
        let to_sync = announcer.to_sync();
+
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
+

+
        let mut synced_result = BTreeMap::new();
+
        let mut success = None;
+
        let mut successes = 0;
+

+
        for node in unsynced.iter() {
+
            assert_ne!(*node, local);
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+
        for node in preferred_seeds.iter() {
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+

+
        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
+
        assert_eq!(
+
            success.as_ref().unwrap().outcome(),
+
            SuccessfulOutcome::MaxReplicationFactor {
+
                preferred: 2,
+
                synced: 10,
+
            }
+
        )
+
    }
+

+
    #[test]
+
    fn announcer_will_minimise_replication_factor() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(10..=10);
+
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(11),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+
        let mut announcer = Announcer::new(config).unwrap();
+
        let to_sync = announcer.to_sync();
+
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
+

+
        let mut synced_result = BTreeMap::new();
+
        let mut success = None;
+
        let mut successes = 0;
+

+
        // Simulate not being able to reach all nodes
+
        for node in to_sync.iter() {
+
            assert_ne!(*node, local);
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    success = Some(stop);
+
                    break;
+
                }
+
            }
+
        }
+

+
        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
+
        assert_eq!(
+
            success.as_ref().unwrap().outcome(),
+
            SuccessfulOutcome::MinReplicationFactor {
+
                preferred: 2,
+
                synced: 10,
+
            }
+
        )
+
    }
+

+
    #[test]
+
    fn announcer_timed_out() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(10..=10);
+
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(11),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+
        let mut announcer = Announcer::new(config).unwrap();
+
        let to_sync = announcer.to_sync();
+
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());
+

+
        let mut synced_result = BTreeMap::new();
+
        let mut announcer_result = None;
+
        let mut successes = 0;
+

+
        // Simulate not being able to reach all nodes
+
        for node in to_sync.iter() {
+
            assert_ne!(*node, local);
+
            if successes > 5 {
+
                announcer_result = Some(announcer.timed_out());
+
                break;
+
            }
+
            let t = time::Duration::from_secs(1);
+
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
+
            successes += 1;
+
            match announcer.synced_with(*node, t) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(progress.synced(), successes)
+
                }
+
                ControlFlow::Break(stop) => {
+
                    announcer_result = Some(stop.into());
+
                    break;
+
                }
+
            }
+
        }
+

+
        match announcer_result {
+
            Some(AnnouncerResult::TimedOut(timeout)) => {
+
                assert_eq!(timeout.synced, synced_result);
+
                assert_eq!(
+
                    timeout.timed_out,
+
                    to_sync
+
                        .difference(&synced_result.keys().copied().collect())
+
                        .copied()
+
                        .collect()
+
                );
+
            }
+
            unexpected => panic!("Expected AnnouncerResult::TimedOut, found: {unexpected:#?}"),
+
        }
+
    }
+

+
    #[test]
+
    fn cannot_construct_announcer() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(10..=10);
+
        let synced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
+
        let unsynced = seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let replicas = ReplicationFactor::default();
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::default(),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
        );
+
        assert!(matches!(
+
            Announcer::new(config),
+
            Err(AnnouncerError::NoSeeds)
+
        ));
+

+
        // No nodes to sync
+
        let config = AnnouncerConfig::public(
+
            local,
+
            replicas,
+
            preferred_seeds.clone(),
+
            synced.clone(),
+
            BTreeSet::new(),
+
        );
+
        assert!(matches!(
+
            Announcer::new(config),
+
            Err(AnnouncerError::AlreadySynced { .. })
+
        ));
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(0),
+
            BTreeSet::new(),
+
            synced.clone(),
+
            unsynced.clone(),
+
        );
+
        assert!(matches!(
+
            Announcer::new(config),
+
            Err(AnnouncerError::Target(_))
+
        ));
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::MustReach(2),
+
            preferred_seeds.clone(),
+
            synced.clone(),
+
            unsynced.clone(),
+
        );
+
        // Min replication factor
+
        assert!(matches!(
+
            Announcer::new(config),
+
            Err(AnnouncerError::AlreadySynced { .. })
+
        ));
+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::range(2, 3),
+
            preferred_seeds,
+
            synced,
+
            unsynced,
+
        );
+
        // Max replication factor
+
        assert!(matches!(
+
            Announcer::new(config),
+
            Err(AnnouncerError::AlreadySynced { .. })
+
        ));
+
    }
+
}