Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add test for message amplification
cloudhead committed 1 year ago
commit 3075a399e9fba16700421171d5bdc4960561b432
parent 1a8b2f5e2cfebf7ea9eb02ec681190eaa7a9ee25
1 file changed +152 -1
modified radicle-node/src/tests.rs
@@ -2,19 +2,23 @@ mod e2e;

use std::collections::BTreeSet;
use std::default::*;
+
use std::env;
use std::io;
use std::sync::Arc;
use std::time;

use crossbeam_channel as chan;
use netservices::Direction as Link;
+
use once_cell::sync::Lazy;
use radicle::identity::Visibility;
-
use radicle::node::address::Store;
+
use radicle::node::address::Store as _;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
use radicle::storage::RefUpdate;
+
use radicle::test::arbitrary::gen;
+
use radicle::test::storage::MockRepository;

use crate::collections::{RandomMap, RandomSet};
use crate::crypto::test::signer::MockSigner;
@@ -41,6 +45,7 @@ use crate::test::peer;
use crate::test::peer::Peer;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};
+

use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
@@ -49,6 +54,16 @@ use crate::worker::fetch;
use crate::LocalTime;
use crate::{git, identity, rad, runtime, service, test};

+
/// Default number of tests to run when testing things with high variance.
+
pub const DEFAULT_TEST_CASES: usize = 10;
+
/// Test cases to run when testing things with high variance.
+
pub static TEST_CASES: Lazy<usize> = Lazy::new(|| {
+
    env::var("RAD_TEST_CASES")
+
        .ok()
+
        .and_then(|s| s.parse::<usize>().ok())
+
        .unwrap_or(DEFAULT_TEST_CASES)
+
});
+

// NOTE
//
// If you wish to see the logs for a running test, simply add the following line to your test:
@@ -1745,3 +1760,139 @@ fn prop_inventory_exchange_dense() {
        .tests(20)
        .quickcheck(property as fn(MockStorage, MockStorage, MockStorage));
}
+

+
#[test]
+
fn test_announcement_message_amplification() {
+
    let mut results = Vec::new();
+

+
    while results.len() < *TEST_CASES {
+
        let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
        let mut bob = Peer::new("bob", [8, 8, 8, 8]);
+
        let mut eve = Peer::new("eve", [9, 9, 9, 9]);
+
        let mut zod = Peer::new("zod", [5, 5, 5, 5]);
+
        let mut tom = Peer::new("tom", [4, 4, 4, 4]);
+
        let mut sim = Simulation::new(
+
            LocalTime::now(),
+
            alice.rng.clone(),
+
            simulator::Options {
+
                latency: 0..1, // 0 - 1s
+
                failure_rate: 0.,
+
            },
+
        );
+
        let rid = gen::<RepoId>(1);
+

+
        // Fully-connected network.
+
        alice.command(Command::Connect(
+
            bob.id,
+
            bob.address(),
+
            ConnectOptions::default(),
+
        ));
+
        alice.command(Command::Connect(
+
            eve.id,
+
            eve.address(),
+
            ConnectOptions::default(),
+
        ));
+
        alice.command(Command::Connect(
+
            zod.id,
+
            zod.address(),
+
            ConnectOptions::default(),
+
        ));
+
        alice.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            eve.id,
+
            eve.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            zod.id,
+
            zod.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+
        eve.command(Command::Connect(
+
            zod.id,
+
            zod.address(),
+
            ConnectOptions::default(),
+
        ));
+
        eve.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+
        zod.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+

+
        // Let the nodes connect to each other.
+
        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
+
            s.elapsed() < LocalDuration::from_mins(3)
+
        });
+

+
        // Ensure nodes are all connected, otherwise skip this test run.
+
        if alice.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if bob.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if eve.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if zod.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if tom.sessions().connected().count() != 4 {
+
            continue;
+
        }
+

+
        let (tx, _) = chan::bounded(1);
+
        let timestamp = (*alice.clock()).into();
+
        alice
+
            .storage_mut()
+
            .repos
+
            .insert(rid, gen::<MockRepository>(1));
+
        alice.command(Command::UpdateInventory(rid, tx));
+
        alice.command(Command::AnnounceInventory);
+

+
        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
+
            s.elapsed() < LocalDuration::from_mins(3)
+
        });
+

+
        // Count how many copies of Alice's inventory message have been received by peers.
+
        let received = sim.messages().iter().filter(|m| {
+
            matches!(
+
                m,
+
                (_, _, Message::Announcement(Announcement {
+
                    node,
+
                    message: AnnouncementMessage::Inventory(i),
+
                    ..
+
                }))
+
                if node == &alice.id && i.inventory.to_vec() == vec![rid] && i.timestamp == timestamp
+
            )
+
        });
+
        results.push(received.count());
+
    }
+
    // Calculate the average amplification factor based on all simulation runs.
+
    let avg = results.iter().sum::<usize>() as f64 / results.len() as f64;
+
    // Amplification is total divided by minimum, ie. it's a relative metric.
+
    let amp = avg / 4.;
+

+
    // The worse case scenario is (n - 1)^2 messages received for one message announced.
+
    // In the above case of 5 nodes, this is 4 * 4 = 16 messages. This is an amplification of 4.0.
+
    // The best case is an amplification of 1.0, ie. each node receives the message once only.
+
    //
+
    // By using delayed message propagation though, we can bring this down closer to the minimum.
+
    log::debug!(target: "test", "Average message amplification: {amp}");
+

+
    assert!(amp < 4., "Amplification factor of {amp} is too high");
+
}