Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
radicle: Announcer can exit on preferred seeds or replication factor
Fintan Halpenny committed 8 months ago
commit 8a6e55502c9be752978a6429a22761b2ba10bbda
parent d147094812207130b05dcf66ef79bd113caadce2
2 files changed +142 -44
modified crates/radicle-cli/src/commands/sync.rs
@@ -822,13 +822,18 @@ fn display_success<'a>(
}

fn print_announcer_result(result: &sync::AnnouncerResult, verbose: bool) {
+
    use sync::announce::SuccessfulOutcome::*;
    match result {
        sync::AnnouncerResult::Success(success) if verbose => {
            // N.b. Printing how many seeds were synced with is printed
            // elsewhere
            match success.outcome() {
-
                sync::announce::SuccessfulOutcome::MinReplicationFactor { preferred, synced }
-
                | sync::announce::SuccessfulOutcome::MaxReplicationFactor { preferred, synced } => {
+
                MinReplicationFactor { preferred, synced }
+
                | MaxReplicationFactor { preferred, synced }
+
                | PreferredNodes {
+
                    preferred,
+
                    total_nodes_synced: synced,
+
                } => {
                    if preferred == 0 {
                        term::success!("Synced {} seed(s)", term::format::positive(synced));
                    } else {
modified crates/radicle/src/node/sync/announce.rs
@@ -33,7 +33,7 @@ impl Announcer {
    ///   - [`AnnouncerError::Target`]: the target has no preferred seeds and no
    ///     replicas
    pub fn new(mut config: AnnouncerConfig) -> Result<Self, AnnouncerError> {
-
        // N.b. ensure that local node is none of the sets
+
        // N.b. ensure that local node is in none of the sets
        config.preferred_seeds.remove(&config.local_node);
        config.synced.remove(&config.local_node);
        config.unsynced.remove(&config.local_node);
@@ -78,6 +78,14 @@ impl Announcer {
                SuccessfulOutcome::MaxReplicationFactor { preferred, synced } => {
                    Err(AlreadySynced { preferred, synced }.into())
                }
+
                SuccessfulOutcome::PreferredNodes {
+
                    preferred,
+
                    total_nodes_synced,
+
                } => Err(AlreadySynced {
+
                    preferred,
+
                    synced: total_nodes_synced,
+
                }
+
                .into()),
            },
        }
    }
@@ -154,7 +162,7 @@ impl Announcer {
    /// Get the [`Progress`] of the [`Announcer`].
    pub fn progress(&self) -> Progress {
        let SuccessCounts { preferred, synced } = self.success_counts();
-
        let unsynced = self.to_sync.len().saturating_sub(synced);
+
        let unsynced = self.to_sync.len();
        Progress {
            preferred,
            synced,
@@ -175,16 +183,22 @@ impl Announcer {

    fn is_target_reached(&self) -> Option<SuccessfulOutcome> {
        let SuccessCounts { preferred, synced } = self.success_counts();
-
        let reached_preferred = self.target.preferred_seeds.is_empty()
-
            || preferred >= self.target.preferred_seeds.len();
-

-
        let replicas = self.target.replicas();
-
        let min = replicas.lower_bound();
-
        match replicas.upper_bound() {
-
            None => (reached_preferred && synced >= min)
-
                .then_some(SuccessfulOutcome::MinReplicationFactor { preferred, synced }),
-
            Some(max) => (reached_preferred && synced >= max)
-
                .then_some(SuccessfulOutcome::MaxReplicationFactor { preferred, synced }),
+

+
        if !self.target.preferred_seeds.is_empty() && preferred >= self.target.preferred_seeds.len()
+
        {
+
            Some(SuccessfulOutcome::PreferredNodes {
+
                preferred: self.target.preferred_seeds.len(),
+
                total_nodes_synced: synced,
+
            })
+
        } else {
+
            let replicas = self.target.replicas();
+
            let min = replicas.lower_bound();
+
            match replicas.upper_bound() {
+
                None => (synced >= min)
+
                    .then_some(SuccessfulOutcome::MinReplicationFactor { preferred, synced }),
+
                Some(max) => (synced >= max)
+
                    .then_some(SuccessfulOutcome::MaxReplicationFactor { preferred, synced }),
+
            }
        }
    }

@@ -224,6 +238,7 @@ impl SuccessCounts {
}

/// Configuration of the [`Announcer`].
+
#[derive(Clone, Debug)]
pub struct AnnouncerConfig {
    local_node: NodeId,
    replicas: ReplicationFactor,
@@ -247,6 +262,8 @@ impl AnnouncerConfig {
            local_node: local,
            replicas,
            preferred_seeds: network.allowed.clone(),
+
            // TODO(finto): we should check if the seeds are synced with instead
+
            // of assuming they haven't been yet.
            synced: BTreeSet::new(),
            unsynced: network.allowed,
        }
@@ -514,8 +531,18 @@ impl Target {

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SuccessfulOutcome {
-
    MinReplicationFactor { preferred: usize, synced: usize },
-
    MaxReplicationFactor { preferred: usize, synced: usize },
+
    MinReplicationFactor {
+
        preferred: usize,
+
        synced: usize,
+
    },
+
    MaxReplicationFactor {
+
        preferred: usize,
+
        synced: usize,
+
    },
+
    PreferredNodes {
+
        preferred: usize,
+
        total_nodes_synced: usize,
+
    },
}

#[allow(clippy::unwrap_used)]
@@ -546,7 +573,7 @@ mod test {
        let mut success = None;
        let mut successes = 0;

-
        for node in preferred_seeds.iter() {
+
        for node in preferred_seeds.iter().take(1) {
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
            successes += 1;
@@ -580,7 +607,7 @@ mod test {
        assert_eq!(
            success.as_ref().unwrap().outcome(),
            SuccessfulOutcome::MinReplicationFactor {
-
                preferred: 2,
+
                preferred: 1,
                synced: 3,
            }
        )
@@ -607,21 +634,7 @@ mod test {
        let mut success = None;
        let mut successes = 0;

-
        for node in preferred_seeds.iter() {
-
            let t = time::Duration::from_secs(1);
-
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
-
            successes += 1;
-
            match announcer.synced_with(*node, t) {
-
                ControlFlow::Continue(progress) => {
-
                    assert_eq!(progress.synced(), successes)
-
                }
-
                ControlFlow::Break(stop) => {
-
                    success = Some(stop);
-
                    break;
-
                }
-
            }
-
        }
-

+
        // Don't sync with preferred so that we don't hit that target.
        for node in unsynced.iter() {
            assert_ne!(*node, local);
            let t = time::Duration::from_secs(1);
@@ -641,14 +654,14 @@ mod test {
        assert_eq!(
            success.as_ref().unwrap().outcome(),
            SuccessfulOutcome::MaxReplicationFactor {
-
                preferred: 2,
+
                preferred: 0,
                synced: 6,
            }
        )
    }

    #[test]
-
    fn announcer_must_reach_preferred_seeds() {
+
    fn announcer_preferred_seeds_or_replica_factor() {
        let local = arbitrary::gen::<NodeId>(0);
        let seeds = arbitrary::set::<NodeId>(10..=10);
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
@@ -668,6 +681,7 @@ mod test {
        let mut success = None;
        let mut successes = 0;

+
        // Reaches max replication factor, and stops.
        for node in unsynced.iter() {
            assert_ne!(*node, local);
            let t = time::Duration::from_secs(1);
@@ -683,6 +697,8 @@ mod test {
                }
            }
        }
+
        // If we try to continue to drive it forward, we get the extra sync of
+
        // the preferred seed, but it stops immediately.
        for node in preferred_seeds.iter() {
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
@@ -702,14 +718,14 @@ mod test {
        assert_eq!(
            success.as_ref().unwrap().outcome(),
            SuccessfulOutcome::MaxReplicationFactor {
-
                preferred: 2,
-
                synced: 10,
+
                preferred: 1,
+
                synced: 7,
            }
        )
    }

    #[test]
-
    fn announcer_will_minimise_replication_factor() {
+
    fn announcer_reached_preferred_seeds() {
        let local = arbitrary::gen::<NodeId>(0);
        let seeds = arbitrary::set::<NodeId>(10..=10);
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
@@ -722,15 +738,14 @@ mod test {
            unsynced.clone(),
        );
        let mut announcer = Announcer::new(config).unwrap();
-
        let to_sync = announcer.to_sync();
-
        assert_eq!(to_sync, unsynced.union(&preferred_seeds).copied().collect());

        let mut synced_result = BTreeMap::new();
        let mut success = None;
        let mut successes = 0;

-
        // Simulate not being able to reach all nodes
-
        for node in to_sync.iter() {
+
        // The preferred seeds then sync, allowing us to reach that part of the
+
        // target
+
        for node in preferred_seeds.iter() {
            assert_ne!(*node, local);
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
@@ -749,9 +764,9 @@ mod test {
        assert_eq!(*success.as_ref().unwrap().synced(), synced_result);
        assert_eq!(
            success.as_ref().unwrap().outcome(),
-
            SuccessfulOutcome::MinReplicationFactor {
+
            SuccessfulOutcome::PreferredNodes {
                preferred: 2,
-
                synced: 10,
+
                total_nodes_synced: 2,
            }
        )
    }
@@ -784,6 +799,10 @@ mod test {
                announcer_result = Some(announcer.timed_out());
                break;
            }
+
            // Simulate not being able to reach the preferred seeds
+
            if preferred_seeds.contains(node) {
+
                continue;
+
            }
            let t = time::Duration::from_secs(1);
            synced_result.insert(*node, SyncStatus::Synced { duration: t });
            successes += 1;
@@ -883,4 +902,78 @@ mod test {
            Err(AnnouncerError::AlreadySynced { .. })
        ));
    }
+

+
    #[test]
+
    fn invariant_progress_should_match_state() {
+
        let local = arbitrary::gen::<NodeId>(0);
+
        let seeds = arbitrary::set::<NodeId>(6..=6);
+

+
        // Set up: 2 already synced, 4 unsynced initially
+
        let already_synced = seeds.iter().take(2).copied().collect::<BTreeSet<_>>();
+
        let unsynced = seeds.iter().skip(2).copied().collect::<BTreeSet<_>>();
+

+
        let config = AnnouncerConfig::public(
+
            local,
+
            ReplicationFactor::must_reach(4), // Need 4 total
+
            BTreeSet::new(),                  // No preferred seeds
+
            already_synced.clone(),
+
            unsynced.clone(),
+
        );
+

+
        let mut announcer = Announcer::new(config).unwrap();
+

+
        // No progress made, so values should be the same
+
        assert_eq!(
+
            announcer.progress().unsynced(),
+
            announcer.to_sync().len(),
+
            "Expected unsynced progress to be the same as the number of nodes to sync"
+
        );
+

+
        // Expected: progress.synced() should be the number of already synced nodes
+
        assert_eq!(
+
            announcer.progress().synced(),
+
            already_synced.len(),
+
            "Initial synced count should equal already synced nodes"
+
        );
+

+
        // Now sync with one node and check progress again
+
        let first_unsynced = *unsynced.iter().next().unwrap();
+
        let duration = time::Duration::from_secs(1);
+

+
        match announcer.synced_with(first_unsynced, duration) {
+
            ControlFlow::Continue(progress) => {
+
                assert_eq!(
+
                    progress.synced(),
+
                    already_synced.len() + 1,
+
                    "Synced count should increase by 1"
+
                );
+

+
                assert_eq!(
+
                    progress.unsynced(),
+
                    announcer.to_sync().len(),
+
                    "Unsynced count should equal remaining to_sync length"
+
                );
+

+
                assert_eq!(
+
                    progress.unsynced(),
+
                    unsynced.len() - 1,
+
                    "Unsynced should be original unsynced count minus nodes we've synced"
+
                );
+
            }
+
            ControlFlow::Break(outcome) => {
+
                panic!("Should not have reached target yet: {outcome:?}")
+
            }
+
        }
+

+
        // Invariant:
+
        // synced nodes + unsynced nodes = progress.synced() + progress.unsynced()
+
        let final_progress = announcer.progress();
+
        let expected_total = already_synced.len() + unsynced.len();
+
        let actual_total = final_progress.synced() + final_progress.unsynced();
+

+
        assert_eq!(
+
            actual_total, expected_total,
+
            "synced + unsynced should equal the total nodes we started with"
+
        );
+
    }
}