Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Announcer Improvements
Merged fintohaps opened 8 months ago

While changing the target logic for the Announcer, I also put Claude, our trusty QA engineer, to work to think of edge cases that were not tested.

I broke up the patch into a series of commits that test certain pieces of logic around the Announcer. Some were just adding coverage, while others surfaced edge cases that were not considered and were fixed in the commit.

2 files changed +794 -58 d1470948 efa7efac
modified crates/radicle-cli/src/commands/sync.rs
@@ -822,13 +822,18 @@ fn display_success<'a>(
}

fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
+
    use sync::announce::SuccessfulOutcome::*;
    match result {
        sync::AnnouncerResult::Success(success) if verbose => {
            // N.b. Printing how many seeds were synced with is printed
            // elsewhere
            match success.outcome() {
-
                sync::announce::SuccessfulOutcome::MinReplicationFactor { preferred, synced }
-
                | sync::announce::SuccessfulOutcome::MaxReplicationFactor { preferred, synced } => {
+
                MinReplicationFactor { preferred, synced }
+
                | MaxReplicationFactor { preferred, synced }
+
                | PreferredNodes {
+
                    preferred,
+
                    total_nodes_synced: synced,
+
                } => {
                    if preferred == 0 {
                        term::success!("Synced {} seed(s)", term::format::positive(synced));
                    } else {
modified crates/radicle/src/node/sync/announce.rs
@@ -33,16 +33,33 @@ impl Announcer {
    ///   - [`AnnouncerError::Target`]: the target has no preferred seeds and no
    ///     replicas
    pub fn new(mut config: AnnouncerConfig) -> Result<Self, AnnouncerError> {
-
        // N.b. ensure that local node is none of the sets
+
        // N.b. ensure that local node is in none of the sets
        config.preferred_seeds.remove(&config.local_node);
        config.synced.remove(&config.local_node);
        config.unsynced.remove(&config.local_node);

-
        if config.synced.is_empty() && config.unsynced.is_empty() {
+
        // N.b extend the unsynced set with any preferred seeds that are not yet
+
        // synced
+
        let unsynced_preferred = config
+
            .preferred_seeds
+
            .difference(&config.synced)
+
            .copied()
+
            .collect::<BTreeSet<_>>();
+
        config.unsynced.extend(unsynced_preferred);
+

+
        // Ensure that the unsynced set does not contain any of the synced set –
+
        // we trust that the synced nodes are already synced with
+
        let to_sync = config
+
            .unsynced
+
            .difference(&config.synced)
+
            .copied()
+
            .collect::<BTreeSet<_>>();
+

+
        if config.synced.is_empty() && to_sync.is_empty() {
            return Err(AnnouncerError::NoSeeds);
        }

-
        if config.unsynced.is_empty() {
+
        if to_sync.is_empty() {
            let preferred = config.synced.intersection(&config.preferred_seeds).count();
            return Err(AlreadySynced {
                preferred,
@@ -51,13 +68,7 @@ impl Announcer {
            .into());
        }

-
        // N.b extend the unsynced set with any preferred seeds that are not yet
-
        // synced
-
        config
-
            .unsynced
-
            .extend(config.preferred_seeds.difference(&config.synced).copied());
-

-
        let replicas = config.replicas.min(config.unsynced.len());
+
        let replicas = config.replicas.min(to_sync.len());
        let announcer = Self {
            local_node: config.local_node,
            target: Target::new(config.preferred_seeds, replicas)
@@ -67,7 +78,7 @@ impl Announcer {
                .into_iter()
                .map(|nid| (nid, SyncStatus::AlreadySynced))
                .collect(),
-
            to_sync: config.unsynced,
+
            to_sync,
        };
        match announcer.is_target_reached() {
            None => Ok(announcer),
@@ -78,6 +89,14 @@ impl Announcer {
                SuccessfulOutcome::MaxReplicationFactor { preferred, synced } => {
                    Err(AlreadySynced { preferred, synced }.into())
                }
+
                SuccessfulOutcome::PreferredNodes {
+
                    preferred,
+
                    total_nodes_synced,
+
                } => Err(AlreadySynced {
+
                    preferred,
+
                    synced: total_nodes_synced,
+
                }
+
                .into()),
            },
        }
    }
@@ -127,6 +146,11 @@ impl Announcer {
    /// If there are no more nodes, then [`NoNodes`] is returned in the
    /// [`ControlFlow::Break`], otherwise the [`Announcer`] is returned as-is in
    /// the [`ControlFlow::Continue`].
+
    // TODO(finto): I'm not sure this is needed with the change to the target
+
    // logic. Since we can reach the replication factor OR the preferred seeds,
+
    // AND the replication factor is always capped to the maximum number of
+
    // seeds to sync with, I don't think we can ever reach a case where
+
    // `can_continue` hits the `Break`.
    pub fn can_continue(self) -> ControlFlow<NoNodes, Self> {
        if self.to_sync.is_empty() {
            ControlFlow::Break(NoNodes {
@@ -154,7 +178,7 @@ impl Announcer {
    /// Get the [`Progress`] of the [`Announcer`].
    pub fn progress(&self) -> Progress {
        let SuccessCounts { preferred, synced } = self.success_counts();
-
        let unsynced = self.to_sync.len().saturating_sub(synced);
+
        let unsynced = self.to_sync.len();
        Progress {
            preferred,
            synced,
@@ -174,17 +198,29 @@ impl Announcer {
    }

    fn is_target_reached(&self) -> Option<SuccessfulOutcome> {
-
        let SuccessCounts { preferred, synced } = self.success_counts();
-
        let reached_preferred = self.target.preferred_seeds.is_empty()
-
            || preferred >= self.target.preferred_seeds.len();
+
        // It should not be possible to construct a target that has no preferred
+
        // seeds and set the target to 0
+
        debug_assert!(self.target.has_preferred_seeds() || self.target.has_replication_factor());

-
        let replicas = self.target.replicas();
-
        let min = replicas.lower_bound();
-
        match replicas.upper_bound() {
-
            None => (reached_preferred && synced >= min)
-
                .then_some(SuccessfulOutcome::MinReplicationFactor { preferred, synced }),
-
            Some(max) => (reached_preferred && synced >= max)
-
                .then_some(SuccessfulOutcome::MaxReplicationFactor { preferred, synced }),
+
        let SuccessCounts { preferred, synced } = self.success_counts();
+
        if self.target.has_preferred_seeds() && preferred >= self.target.preferred_seeds.len() {
+
            Some(SuccessfulOutcome::PreferredNodes {
+
                preferred: self.target.preferred_seeds.len(),
+
                total_nodes_synced: synced,
+
            })
+
        } else {
+
            // The only target to hit is preferred seeds
+
            if !self.target.has_replication_factor() {
+
                return None;
+
            }
+
            let replicas = self.target.replicas();
+
            let min = replicas.lower_bound();
+
            match replicas.upper_bound() {
+
                None => (synced >= min)
+
                    .then_some(SuccessfulOutcome::MinReplicationFactor { preferred, synced }),
+
                Some(max) => (synced >= max)
+
                    .then_some(SuccessfulOutcome::MaxReplicationFactor { preferred, synced }),
+
            }
        }
    }

@@ -224,6 +260,7 @@ impl SuccessCounts {
}

/// Configuration of the [`Announcer`].
+
#[derive(Clone, Debug)]
pub struct AnnouncerConfig {
    local_node: NodeId,
    replicas: ReplicationFactor,
@@ -247,6 +284,8 @@ impl AnnouncerConfig {
            local_node: local,
            replicas,
            preferred_seeds: network.allowed.clone(),
+
            // TODO(finto): we should check if the seeds are synced with instead
+
            // of assuming they haven't been yet.
            synced: BTreeSet::new(),
            unsynced: network.allowed,
        }
@@ -382,7 +421,7 @@ impl Success {
}

/// Error in constructing the [`Announcer`].
-
#[derive(Debug)]
+
#[derive(Debug, PartialEq, Eq)]
pub enum AnnouncerError {
    /// Both sets of already synchronized and un-synchronized nodes were empty
    /// of nodes were empty.
@@ -416,7 +455,7 @@ impl From<AlreadySynced> for AnnouncerError {
    }
}

-
#[derive(Debug)]
+
#[derive(Debug, PartialEq, Eq)]
pub struct AlreadySynced {
    /// The number of preferred nodes that are synchronized.
    preferred: usize,
@@ -474,7 +513,7 @@ impl Progress {
    }
}

-
#[derive(Debug, thiserror::Error)]
+
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
#[non_exhaustive]
#[error("a minimum number of replicas or set of preferred seeds must be provided")]
pub struct TargetError;
@@ -510,22 +549,124 @@ impl Target {
    pub fn replicas(&self) -> &ReplicationFactor {
        &self.replicas
    }
+

+
    /// Check if the target has preferred seeds
+
    pub fn has_preferred_seeds(&self) -> bool {
+
        !self.preferred_seeds.is_empty()
+
    }
+

+
    /// Check that lower bound of the replication is greater than `0`
+
    pub fn has_replication_factor(&self) -> bool {
+
        self.replicas.lower_bound() != 0
+
    }
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SuccessfulOutcome {
-
    MinReplicationFactor { preferred: usize, synced: usize },
-
    MaxReplicationFactor { preferred: usize, synced: usize },
+
    MinReplicationFactor {
+
        preferred: usize,
+
        synced: usize,
+
    },
+
    MaxReplicationFactor {
+
        preferred: usize,
+
        synced: usize,
+
    },
+
    PreferredNodes {
+
        preferred: usize,
+
        total_nodes_synced: usize,
+
    },
}

#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod test {
-
    use crate::test::arbitrary;
+
    use crate::{assert_matches, test::arbitrary};

    use super::*;

    #[test]
+
    fn all_synced_nodes_are_preferred_seeds() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        // All preferred seeds, no regular seeds in unsynced
+
        let preferred_seeds = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
+
        let unsynced = preferred_seeds.clone(); // Only preferred seeds to sync with
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            // High target that we won't reach with preferred alone
+
            ReplicationFactor::must_reach(5),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced,
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+

+
        // Sync with all preferred seeds
+
        let mut synced_count = 0;
+
        let mut result = None;
+
        for &node in &preferred_seeds {
+
            let duration = time::Duration::from_secs(1);
+
            synced_count += 1;
+

+
            match announcer.synced_with(node, duration) {
+
                ControlFlow::Continue(progress) => {
+
                    assert_eq!(
+
                        progress.preferred(),
+
                        synced_count,
+
                        "Preferred count should increment for each preferred seed"
+
                    );
+
                    assert_eq!(
+
                        progress.synced(),
+
                        synced_count,
+
                        "Total synced should equal preferred since all are preferred"
+
                    );
+
                }
+
                ControlFlow::Break(success) => {
+
                    result = Some(success);
+
                    break;
+
                }
+
            }
+
        }
+
        assert_eq!(
+
            result.unwrap().outcome(),
+
            SuccessfulOutcome::PreferredNodes {
+
                preferred: preferred_seeds.len(),
+
                total_nodes_synced: preferred_seeds.len()
+
            },
+
            "Should succeed with PreferredNodes outcome"
+
        );
+
    }
+

+
    #[test]
+
    fn preferred_seeds_already_synced() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(6..=6);
+

+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let already_synced = preferred_seeds.clone(); // Preferred seeds already synced
+
        let regular_unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(4),
+
            preferred_seeds.clone(),
+
            already_synced.clone(),
+
            regular_unsynced.clone(),
+
        );
+

+
        assert_eq!(
+
            Announcer::new(config).err(),
+
            Some(AnnouncerError::AlreadySynced(AlreadySynced {
+
                preferred: 2,
+
                synced: 2
+
            }))
+
        );
+
    }
+

+
    #[test]
    fn announcer_reached_min_replication_target() {
        let local = arbitrary::gen::<NodeId>(0);
        let seeds = arbitrary::set::<NodeId>(10..=10);
@@ -546,7 +687,7 @@ mod test {
        let mut success = None;
        let mut successes = 0;

-
        for node in preferred_seeds.iter() {
+
        for node in preferred_seeds.iter().take(1) {
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
            successes += 1;
@@ -580,7 +721,7 @@ mod test {
        assert_eq!(
            success.as_ref().unwrap().outcome(),
            SuccessfulOutcome::MinReplicationFactor {
-
                preferred: 2,
+
                preferred: 1,
                synced: 3,
            }
        )
@@ -607,21 +748,7 @@ mod test {
        let mut success = None;
        let mut successes = 0;

-
        for node in preferred_seeds.iter() {
-
            let t = time::Duration::from_secs(1);
-
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
-
            successes += 1;
-
            match announcer.synced_with(*node, t) {
-
                ControlFlow::Continue(progress) => {
-
                    assert_eq!(progress.synced(), successes)
-
                }
-
                ControlFlow::Break(stop) => {
-
                    success = Some(stop);
-
                    break;
-
                }
-
            }
-
        }
-

+
        // Don't sync with preferred so that we don't hit that target.
        for node in unsynced.iter() {
            assert_ne!(*node, local);
            let t = time::Duration::from_secs(1);
@@ -641,14 +768,14 @@ mod test {
        assert_eq!(
            success.as_ref().unwrap().outcome(),
            SuccessfulOutcome::MaxReplicationFactor {
-
                preferred: 2,
+
                preferred: 0,
                synced: 6,
            }
        )
    }

    #[test]
-
    fn announcer_must_reach_preferred_seeds() {
+
    fn announcer_preferred_seeds_or_replica_factor() {
        let local = arbitrary::gen::<NodeId>(0);
        let seeds = arbitrary::set::<NodeId>(10..=10);
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
@@ -668,6 +795,7 @@ mod test {
        let mut success = None;
        let mut successes = 0;

+
        // Reaches max replication factor, and stops.
        for node in unsynced.iter() {
            assert_ne!(*node, local);
            let t = time::Duration::from_secs(1);
@@ -683,6 +811,8 @@ mod test {
                }
            }
        }
+
        // If we try to continue to drive it forward, we get the extra sync of
+
        // the preferred seed, but it stops immediately.
        for node in preferred_seeds.iter() {
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
@@ -702,14 +832,14 @@ mod test {
        assert_eq!(
            success.as_ref().unwrap().outcome(),
            SuccessfulOutcome::MaxReplicationFactor {
-
                preferred: 2,
-
                synced: 10,
+
                preferred: 1,
+
                synced: 7,
            }
        )
    }

    #[test]
-
    fn announcer_will_minimise_replication_factor() {
+
    fn announcer_reached_preferred_seeds() {
        let local = arbitrary::gen::<NodeId>(0);
        let seeds = arbitrary::set::<NodeId>(10..=10);
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
@@ -722,15 +852,14 @@ mod test {
            unsynced.clone(),
        );
        let mut announcer = Announcer::new(config).unwrap();
-
        let to_sync = announcer.to_sync();
-
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());

        let mut synced_result = BTreeMap::new();
        let mut success = None;
        let mut successes = 0;

-
        // Simulate not being able to reach all nodes
-
        for node in to_sync.iter() {
+
        // The preferred seeds then sync, allowing us to reach that part of the
+
        // target
+
        for node in preferred_seeds.iter() {
            assert_ne!(*node, local);
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
@@ -749,9 +878,9 @@ mod test {
        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
        assert_eq!(
            success.as_ref().unwrap().outcome(),
-
            SuccessfulOutcome::MinReplicationFactor {
+
            SuccessfulOutcome::PreferredNodes {
                preferred: 2,
-
                synced: 10,
+
                total_nodes_synced: 2,
            }
        )
    }
@@ -784,6 +913,10 @@ mod test {
                announcer_result = Some(announcer.timed_out());
                break;
            }
+
            // Simulate not being able to reach the preferred seeds
+
            if preferred_seeds.contains(node) {
+
                continue;
+
            }
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
            successes += 1;
@@ -814,6 +947,288 @@ mod test {
    }

    #[test]
+
    fn announcer_adapts_target_to_reach() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        // Only 3 nodes available
+
        let unsynced = arbitrary::set::<NodeId>(3..=3)
+
            .into_iter()
+
            .collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(5), // Want 5 but only have 3
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+
        assert_eq!(announcer.target().replicas().lower_bound(), 3);
+
    }
+

+
    #[test]
+
    fn announcer_with_replication_factor_zero_and_preferred_seeds() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        let preferred_seeds = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+

+
        // Zero replication factor but with preferred seeds should work
+
        let config = AnnouncerConfig::public(
+
            local,
+
            // Zero replication factor
+
            ReplicationFactor::must_reach(0),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced,
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+

+
        // Should succeed immediately when we sync with all preferred seeds
+
        for &node in &preferred_seeds {
+
            let duration = time::Duration::from_secs(1);
+
            match announcer.synced_with(node, duration) {
+
                ControlFlow::Continue(_) => {} // Continue until all preferred are synced
+
                ControlFlow::Break(success) => {
+
                    assert_eq!(
+
                        success.outcome(),
+
                        SuccessfulOutcome::PreferredNodes {
+
                            preferred: preferred_seeds.len(),
+
                            total_nodes_synced: preferred_seeds.len()
+
                        },
+
                        "Should succeed with preferred seeds even with zero replication factor"
+
                    );
+
                    return;
+
                }
+
            }
+
        }
+

+
        panic!("Should have succeeded with preferred seeds");
+
    }
+

+
    #[test]
+
    fn announcer_synced_with_unknown_node() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        let unsynced = seeds.iter().take(3).copied().collect::<BTreeSet<_>>();
+
        let unknown_node = arbitrary::gen::<NodeId>(100); // Node not in any set
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(1),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+

+
        // Try to sync with an unknown node
+
        let duration = time::Duration::from_secs(1);
+
        let mut target_reached = false;
+
        match announcer.synced_with(unknown_node, duration) {
+
            ControlFlow::Continue(_) => {}
+
            ControlFlow::Break(success) => {
+
                target_reached = true;
+
                assert_eq!(
+
                    success.outcome(),
+
                    SuccessfulOutcome::MinReplicationFactor {
+
                        preferred: 0,
+
                        synced: 1
+
                    },
+
                    "Should be able to reach target with unknown node"
+
                );
+
            }
+
        }
+

+
        assert!(target_reached);
+
        // Verify the unknown node is now in the synced map
+
        assert!(
+
            announcer.synced.contains_key(&unknown_node),
+
            "Unknown node should be added to synced map"
+
        );
+
    }
+

+
    #[test]
+
    fn synced_with_same_node_multiple_times() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let unsynced = arbitrary::set::<NodeId>(3..=3)
+
            .into_iter()
+
            .collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(2),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+
        let target_node = *unsynced.iter().next().unwrap();
+

+
        // First sync with the node
+
        let duration1 = time::Duration::from_secs(1);
+
        match announcer.synced_with(target_node, duration1) {
+
            ControlFlow::Continue(progress) => {
+
                assert_eq!(progress.synced(), 1, "First sync should count");
+
                assert_eq!(
+
                    progress.unsynced(),
+
                    unsynced.len() - 1,
+
                    "Should decrease unsynced"
+
                );
+
            }
+
            ControlFlow::Break(_) => panic!("Should not reach target yet"),
+
        }
+

+
        // Sync with the SAME node again with different duration
+
        let duration2 = time::Duration::from_secs(5);
+
        let progress_before_duplicate = announcer.progress();
+
        match announcer.synced_with(target_node, duration2) {
+
            ControlFlow::Continue(progress) => {
+
                // Progress should be UNCHANGED since we already synced with this node
+
                assert_eq!(
+
                    progress.synced(),
+
                    progress_before_duplicate.synced(),
+
                    "Duplicate sync should not change synced count"
+
                );
+
                assert_eq!(
+
                    progress.unsynced(),
+
                    progress_before_duplicate.unsynced(),
+
                    "Duplicate sync should not change unsynced count"
+
                );
+
            }
+
            ControlFlow::Break(_) => panic!("Should not reach target with duplicate sync"),
+
        }
+

+
        // Check that the duration was updated to the latest one
+
        assert_eq!(
+
            announcer.synced[&target_node],
+
            SyncStatus::Synced {
+
                duration: duration2
+
            },
+
            "Duplicate sync should update the duration"
+
        );
+

+
        // Verify the node is no longer in to_sync (should have been removed on first sync)
+
        assert!(
+
            !announcer.to_sync.contains(&target_node),
+
            "Node should not be in to_sync after first sync"
+
        );
+
    }
+

+
    #[test]
+
    fn timed_out_after_reaching_success() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let unsynced = arbitrary::set::<NodeId>(3..=3)
+
            .into_iter()
+
            .collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(2),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+

+
        // Sync with enough nodes to reach the target
+
        let mut synced_nodes = BTreeMap::new();
+
        for node in unsynced {
+
            let duration = time::Duration::from_secs(1);
+
            synced_nodes.insert(node, SyncStatus::Synced { duration });
+

+
            match announcer.synced_with(node, duration) {
+
                ControlFlow::Continue(_) => continue,
+
                ControlFlow::Break(_) => break, // Reached target
+
            }
+
        }
+

+
        // Now call timed_out even though we reached success
+
        match announcer.timed_out() {
+
            AnnouncerResult::Success(success) => {
+
                // Should return Success since target was reached
+
                assert_eq!(
+
                    success.outcome(),
+
                    SuccessfulOutcome::MinReplicationFactor {
+
                        preferred: 0,
+
                        synced: 2
+
                    },
+
                    "Should return success outcome even when called via timed_out"
+
                );
+
            }
+
            other => panic!("Expected Success via timed_out, got: {other:?}"),
+
        }
+
    }
+

+
    #[test]
+
    fn construct_only_preferred_seeds_provided() {
+
        // Test: preferred_seeds non-empty, synced and unsynced empty
+
        // Expected: preferred seeds should be moved to to_sync, constructor succeeds
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let preferred_seeds = arbitrary::set::<NodeId>(2..=2)
+
            .into_iter()
+
            .collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(1),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+

+
        // Constructor should move unsynced preferred seeds to to_sync
+
        assert_eq!(announcer.to_sync, preferred_seeds);
+
        assert_eq!(announcer.target().preferred_seeds(), &preferred_seeds);
+
        assert!(announcer.synced.is_empty());
+
    }
+

+
    #[test]
+
    fn construct_node_appears_in_multiple_input_sets() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let alice = arbitrary::gen::<NodeId>(1);
+
        let bob = arbitrary::gen::<NodeId>(2);
+
        let eve = arbitrary::gen::<NodeId>(3);
+

+
        // alice will appear in synced and unsynced
+
        let synced = [alice].iter().copied().collect::<BTreeSet<_>>();
+
        let unsynced = [alice, bob, eve].iter().copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(2),
+
            BTreeSet::new(),
+
            synced,
+
            unsynced,
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+

+
        // synced takes precedence over to_sync when constructing
+
        assert!(
+
            announcer.synced.contains_key(&alice),
+
            "alice should be synced"
+
        );
+
        assert!(
+
            !announcer.to_sync.contains(&alice),
+
            "alice should not appear in to_sync"
+
        );
+
        // bob and eve should appear in to_sync
+
        assert!(
+
            announcer.to_sync.contains(&bob) && announcer.to_sync.contains(&eve),
+
            "Other node should be in to_sync"
+
        );
+
    }
+

+
    #[test]
    fn cannot_construct_announcer() {
        let local = arbitrary::gen::<NodeId>(0);
        let seeds = arbitrary::set::<NodeId>(10..=10);
@@ -883,4 +1298,320 @@ mod test {
            Err(AnnouncerError::AlreadySynced { .. })
        ));
    }
+

+
    #[test]
+
    fn invariant_progress_should_match_state() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(6..=6);
+

+
        // Set up: 2 already synced, 4 unsynced initially
+
        let already_synced = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(4), // Need 4 total
+
            BTreeSet::new(),                  // No preferred seeds
+
            already_synced.clone(),
+
            unsynced.clone(),
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+

+
        // No progress made, so values should be the same
+
        assert_eq!(
+
            announcer.progress().unsynced(),
+
            announcer.to_sync().len(),
+
            "Expected unsynced progress to be the same as the number of nodes to sync"
+
        );
+

+
        // Expected: progress.synced() should be the number of already synced nodes
+
        assert_eq!(
+
            announcer.progress().synced(),
+
            already_synced.len(),
+
            "Initial synced count should equal already synced nodes"
+
        );
+

+
        // Now sync with one node and check progress again
+
        let first_unsynced = *unsynced.iter().next().unwrap();
+
        let duration = time::Duration::from_secs(1);
+

+
        match announcer.synced_with(first_unsynced, duration) {
+
            ControlFlow::Continue(progress) => {
+
                assert_eq!(
+
                    progress.synced(),
+
                    already_synced.len() + 1,
+
                    "Synced count should increase by 1"
+
                );
+

+
                assert_eq!(
+
                    progress.unsynced(),
+
                    announcer.to_sync().len(),
+
                    "Unsynced count should equal remaining to_sync length"
+
                );
+

+
                assert_eq!(
+
                    progress.unsynced(),
+
                    unsynced.len() - 1,
+
                    "Unsynced should be original unsynced count minus nodes we've synced"
+
                );
+
            }
+
            ControlFlow::Break(outcome) => {
+
                panic!("Should not have reached target yet: {outcome:?}")
+
            }
+
        }
+

+
        // Invariant:
+
        // synced nodes + unsynced nodes = progress.synced() + progress.unsynced()
+
        let final_progress = announcer.progress();
+
        let expected_total = already_synced.len() + unsynced.len();
+
        let actual_total = final_progress.synced() + final_progress.unsynced();
+

+
        assert_eq!(
+
            actual_total, expected_total,
+
            "synced + unsynced should equal the total nodes we started with"
+
        );
+
    }
+

+
    #[test]
+
    fn local_node_in_preferred_seeds() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let other_seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        // Include local node in preferred seeds
+
        let mut preferred_seeds = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        preferred_seeds.insert(local);
+

+
        let unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(3),
+
            preferred_seeds.clone(),
+
            BTreeSet::new(),
+
            unsynced.clone(),
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+

+
        // Verify local node was removed from target's preferred seeds
+
        assert!(
+
            !announcer.target().preferred_seeds().contains(&local),
+
            "Local node should be removed from preferred seeds in target"
+
        );
+

+
        // Verify local node is not in to_sync
+
        assert!(
+
            !announcer.to_sync().contains(&local),
+
            "Local node should not be in to_sync set"
+
        );
+

+
        // The preferred seeds in the target should be one less than what we passed in
+
        assert_eq!(
+
            announcer.target().preferred_seeds().len(),
+
            preferred_seeds.len() - 1,
+
            "Target should have local node removed from preferred seeds"
+
        );
+
    }
+

+
    #[test]
+
    fn local_node_in_synced_set() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let other_seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        // Include local node in synced set
+
        let mut synced = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        synced.insert(local);
+

+
        let unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(4),
+
            BTreeSet::new(),
+
            synced.clone(),
+
            unsynced.clone(),
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+

+
        // Verify local node is not counted in synced nodes
+
        assert!(
+
            !announcer.synced.contains_key(&local),
+
            "Local node should not be in internal synced map"
+
        );
+

+
        // Progress should reflect only the non-local synced nodes
+
        assert_eq!(
+
            announcer.progress().synced(),
+
            synced.len() - 1,
+
            "Progress should not count local node as synced"
+
        );
+
    }
+

+
    #[test]
+
    fn local_node_in_unsynced_set() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let other_seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        let synced = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+

+
        // Include local node in unsynced set
+
        let mut unsynced = other_seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+
        unsynced.insert(local);
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(4),
+
            BTreeSet::new(),
+
            synced.clone(),
+
            unsynced.clone(),
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+

+
        // Verify local node is not in to_sync
+
        assert!(
+
            !announcer.to_sync().contains(&local),
+
            "Local node should not be in to_sync set"
+
        );
+

+
        // The internal to_sync should not contain local node
+
        assert!(
+
            !announcer.to_sync.contains(&local),
+
            "Internal to_sync should not contain local node"
+
        );
+

+
        // Progress unsynced count should not include local node
+
        assert_eq!(
+
            announcer.progress().unsynced(),
+
            unsynced.len() - 1,
+
            "Progress unsynced should not count local node"
+
        );
+
    }
+

+
    #[test]
+
    fn local_node_in_multiple_sets() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let other_seeds = arbitrary::set::<NodeId>(5..=5);
+

+
        // Include local node in ALL sets
+
        let mut preferred_seeds = other_seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        preferred_seeds.insert(local);
+

+
        let mut synced = other_seeds
+
            .iter()
+
            .skip(2)
+
            .take(1)
+
            .copied()
+
            .collect::<BTreeSet<_>>();
+
        synced.insert(local);
+

+
        let mut unsynced = other_seeds.iter().skip(3).copied().collect::<BTreeSet<_>>();
+
        unsynced.insert(local);
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(3),
+
            preferred_seeds.clone(),
+
            synced.clone(),
+
            unsynced.clone(),
+
        );
+

+
        let announcer = Announcer::new(config).unwrap();
+

+
        // Verify local node is completely absent from all internal structures
+
        assert!(
+
            !announcer.target().preferred_seeds().contains(&local),
+
            "Local node should be removed from preferred seeds"
+
        );
+
        assert!(
+
            !announcer.synced.contains_key(&local),
+
            "Local node should not be in synced map"
+
        );
+
        assert!(
+
            !announcer.to_sync().contains(&local),
+
            "Local node should not be in to_sync"
+
        );
+
        assert!(
+
            !announcer.to_sync.contains(&local),
+
            "Local node should not be in internal to_sync"
+
        );
+

+
        // Verify counts are correct (excluding local node from all)
+
        assert_eq!(
+
            announcer.target().preferred_seeds().len(),
+
            preferred_seeds.len() - 1
+
        );
+
        assert_eq!(announcer.progress().synced(), synced.len() - 1);
+
        // The unsynced nodes includes the preferred seeds, since they are not
+
        // in the synced set, and `- 1` from each for the local node
+
        assert_eq!(
+
            announcer.progress().unsynced(),
+
            (unsynced.len() - 1) + (preferred_seeds.len() - 1)
+
        );
+
    }
+

+
    #[test]
+
    fn synced_with_local_node_is_ignored() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let unsynced = arbitrary::set::<NodeId>(3..=3).into_iter().collect();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(2),
+
            BTreeSet::new(),
+
            BTreeSet::new(),
+
            unsynced,
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+
        let initial_progress = announcer.progress();
+

+
        // Try to sync with the local node - this should be ignored
+
        let duration = time::Duration::from_secs(1);
+
        match announcer.synced_with(local, duration) {
+
            ControlFlow::Continue(progress) => {
+
                // Progress should be unchanged
+
                assert_eq!(
+
                    progress.synced(),
+
                    initial_progress.synced(),
+
                    "Syncing with local node should not change synced count"
+
                );
+
                assert_eq!(
+
                    progress.unsynced(),
+
                    initial_progress.unsynced(),
+
                    "Syncing with local node should not change unsynced count"
+
                );
+
            }
+
            ControlFlow::Break(_) => panic!("Should not reach target by syncing with local node"),
+
        }
+

+
        // Verify local node is still not in synced map
+
        assert!(
+
            !announcer.synced.contains_key(&local),
+
            "Local node should not be added to synced map"
+
        );
+
    }
+

+
    #[test]
+
    fn local_node_only_in_all_sets_results_in_no_seeds_error() {
+
        let local = arbitrary::gen::<NodeId>(0);
+

+
        // Create sets that contain ONLY the local node
+
        let preferred_seeds = [local].iter().copied().collect::<BTreeSet<_>>();
+
        let synced = [local].iter().copied().collect::<BTreeSet<_>>();
+
        let unsynced = [local].iter().copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(1),
+
            preferred_seeds,
+
            synced,
+
            unsynced,
+
        );
+

+
        // After removing local node from all sets, we should get NoSeeds error
+
        assert_matches!(Announcer::new(config), Err(AnnouncerError::NoSeeds));
+
    }
}