Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src tests.rs
mod e2e;

use std::collections::BTreeSet;
use std::default::*;
use std::env;
use std::io;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time;

use radicle::storage::ReadRepository;
use test_log::test;

use radicle::cob;
use radicle::identity::Visibility;
use radicle::node::Link;
use radicle::node::address::Store as _;
use radicle::node::device::Device;
use radicle::node::policy;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::RefUpdate;
use radicle::storage::refs::RefsAt;
use radicle::test::arbitrary::r#gen;
use radicle::test::storage::MockRepository;
use radicle_protocol::bounded::BoundedVec;

use crate::collections::{RandomMap, RandomSet};
use crate::identity::RepoId;
use crate::node;
use crate::node::config::*;
use crate::prelude::*;
use crate::prelude::{LocalDuration, Timestamp};
use crate::service::ServiceState as _;
use crate::service::filter::Filter;
use crate::service::io::Io;
use crate::service::message::*;
use crate::service::*;
use crate::storage::ReadStorage;
use crate::storage::git::Storage;
use crate::storage::git::transport::{local, remote};
use crate::storage::refs::SIGREFS_BRANCH;
use crate::test::arbitrary;
use crate::test::assert_matches;
use crate::test::fixtures;
#[allow(unused)]
use crate::test::logger;
use crate::test::peer;
use crate::test::peer::Peer;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};

use crate::LocalTime;
use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
use crate::worker::fetch;
use crate::{git, identity, rad, runtime, service, test};

/// Default number of tests to run when testing things with high variance.
pub const DEFAULT_TEST_CASES: usize = 10;
/// Test cases to run when testing things with high variance.
pub static TEST_CASES: LazyLock<usize> = LazyLock::new(|| {
    env::var("RAD_TEST_CASES")
        .ok()
        .and_then(|s| s.parse::<usize>().ok())
        .unwrap_or(DEFAULT_TEST_CASES)
});

// NOTE
//
// If you wish to see the logs for a running test, simply add the following line to your test:
//
//      logger::init(log::Level::Debug);
//
// You may then run the test with eg. `cargo test -- --nocapture` to always show output.

#[test]
fn test_inventory_decode() {
    let inventory: Vec<RepoId> = arbitrary::r#gen(300);
    let timestamp: Timestamp = LocalTime::now().into();

    let mut buf = Vec::new();
    inventory.as_slice().encode(&mut buf);
    timestamp.encode(&mut buf);

    let m = InventoryAnnouncement::decode(&mut buf.as_slice()).expect("message decodes");
    assert_eq!(inventory.as_slice(), m.inventory.as_slice());
    assert_eq!(timestamp, m.timestamp);
}

#[test]
fn test_ping_response() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let bob = Peer::new("bob", [9, 9, 9, 9]);
    let eve = Peer::new("eve", [7, 7, 7, 7]);

    alice.connect_to(&bob);
    alice.receive(
        bob.id(),
        Message::Ping(Ping {
            ponglen: Ping::MAX_PONG_ZEROES,
            zeroes: ZeroBytes::new(42),
        }),
    );
    assert_matches!(
        alice.messages(bob.id()).next(),
        Some(Message::Pong { zeroes }) if zeroes.len() == Ping::MAX_PONG_ZEROES as usize,
        "respond with correctly formatted pong",
    );

    alice.connect_to(&eve);
    alice.receive(
        eve.id(),
        Message::Ping(Ping {
            ponglen: Ping::MAX_PONG_ZEROES + 1,
            zeroes: ZeroBytes::new(42),
        }),
    );
    assert_matches!(
        alice.messages(eve.id()).next(),
        None,
        "ignore unsupported ping message",
    );
}

#[test]
fn test_disconnecting_unresponsive_peer() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let bob = Peer::new("bob", [9, 9, 9, 9]);

    alice.connect_to(&bob);
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");
    alice.elapse(STALE_CONNECTION_TIMEOUT + LocalDuration::from_secs(1));
    alice
        .outbox()
        .find(|m| matches!(m, &Io::Disconnect(addr, _) if addr == bob.id()))
        .expect("disconnect an unresponsive bob");
}

#[test]
fn test_redundant_connect() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let bob = Peer::new("bob", [9, 9, 9, 9]);
    let opts = ConnectOptions::default();

    alice.command(Command::Connect(bob.id(), bob.address(), opts.clone()));
    alice.command(Command::Connect(bob.id(), bob.address(), opts.clone()));
    alice.command(Command::Connect(bob.id(), bob.address(), opts));

    // Only one connection attempt is made.
    assert_matches!(
        alice.outbox().filter(|o| matches!(o, Io::Connect { .. })).collect::<Vec<_>>().as_slice(),
        [Io::Connect(id, addr)]
        if *id == bob.id() && *addr == bob.addr()
    );
}

#[test]
fn test_connection_kept_alive() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let mut bob = Peer::new("bob", [9, 9, 9, 9]);

    let mut sim = Simulation::new(
        LocalTime::now(),
        alice.rng.clone(),
        simulator::Options::default(),
    )
    .initialize([&mut alice, &mut bob]);

    alice.command(service::Command::Connect(
        bob.id(),
        bob.address(),
        ConnectOptions::default(),
    ));
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");

    let mut elapsed: LocalDuration = LocalDuration::from_secs(0);
    let step: LocalDuration = STALE_CONNECTION_TIMEOUT / 10;
    while elapsed < STALE_CONNECTION_TIMEOUT + step {
        alice.elapse(step);
        bob.elapse(step);
        sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());

        elapsed = elapsed + step;
    }

    assert_eq!(1, alice.sessions().len(), "alice remains connected to Bob");
    assert_eq!(1, bob.sessions().len(), "bob remains connected to Alice");
}

#[test]
fn test_outbound_connection() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let bob = Peer::new("bob", [9, 9, 9, 9]);
    let eve = Peer::new("eve", [7, 7, 7, 7]);

    alice.connect_to(&bob);
    alice.connect_to(&eve);

    let peers = alice
        .service
        .sessions()
        .connected()
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.id()));
    assert!(peers.contains(&bob.id()));
}

#[test]
fn test_inbound_connection() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let bob = Peer::new("bob", [9, 9, 9, 9]);
    let eve = Peer::new("eve", [7, 7, 7, 7]);

    alice.connect_from(&bob);
    alice.connect_from(&eve);

    let peers = alice
        .service
        .sessions()
        .connected()
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.id()));
    assert!(peers.contains(&bob.id()));
}

#[test]
fn test_persistent_peer_connect() {
    use indexmap::IndexSet;

    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let connect = IndexSet::<ConnectAddress>::from_iter([
        (bob.id(), bob.address()).into(),
        (eve.id(), eve.address()).into(),
    ]);
    let mut alice = Peer::config(
        "alice",
        [7, 7, 7, 7],
        MockStorage::empty(),
        peer::Config {
            config: Config {
                connect,
                ..Config::new(node::Alias::new("alice"))
            },
            ..peer::Config::default()
        },
    )
    .initialized();

    let outbox = alice.outbox().collect::<Vec<_>>();
    outbox
        .iter()
        .find(|o| matches!(o, Io::Connect(a, _) if *a == bob.id()))
        .unwrap();
    outbox
        .iter()
        .find(|o| matches!(o, Io::Connect(a, _) if *a == eve.id()))
        .unwrap();
}

#[test]
fn test_inventory_sync() {
    let tmp = tempfile::tempdir().unwrap();
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
    );
    let bob_signer = Device::mock();
    let bob_storage = fixtures::storage(tmp.path().join("bob"), &bob_signer).unwrap();
    let bob = Peer::with_storage("bob", [8, 8, 8, 8], bob_storage);
    let now = LocalTime::now().into();
    let repos = bob.inventory().into_iter().collect::<Vec<_>>();

    alice.connect_to(&bob);
    alice.receive(
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: repos.clone().try_into().unwrap(),
                timestamp: now,
            },
            bob.signer(),
        ),
    );

    for proj in &repos {
        let seeds = alice.database().routing().get(proj).unwrap();
        assert!(seeds.contains(&bob.node_id()));
    }
}

#[test]
fn test_inventory_pruning() {
    struct Test {
        limits: Limits,
        /// Number of projects by peer
        peer_projects: Vec<usize>,
        wait_time: LocalDuration,
        expected_routing_table_size: usize,
    }
    let tests = [
        // All zero
        Test {
            limits: Limits {
                routing_max_size: 0.into(),
                routing_max_age: LocalDuration::from_secs(0).into(),
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
            expected_routing_table_size: 0,
        },
        // All entries are too young to expire.
        Test {
            limits: Limits {
                routing_max_size: 0.into(),
                routing_max_age: LimitRoutingMaxAge::from(LocalDuration::from_mins(7 * 24 * 60)),
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
            expected_routing_table_size: 0,
        },
        // All entries remain because the table is unconstrained.
        Test {
            limits: Limits {
                routing_max_size: 50.into(),
                routing_max_age: LocalDuration::from_mins(0).into(),
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
            expected_routing_table_size: 50,
        },
        // Some entries are pruned because the table is constrained.
        Test {
            limits: Limits {
                routing_max_size: 25.into(),
                routing_max_age: LocalDuration::from_mins(7 * 24 * 60).into(),
                ..Limits::default()
            },
            peer_projects: vec![10; 5],
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
            expected_routing_table_size: 25,
        },
    ];

    for test in tests {
        let mut alice = Peer::config(
            "alice",
            [7, 7, 7, 7],
            MockStorage::empty(),
            peer::Config {
                config: Config {
                    limits: test.limits,
                    ..Config::new(node::Alias::new("alice"))
                },
                ..peer::Config::default()
            },
        )
        .initialized();

        let bob = Peer::config(
            "bob",
            [8, 8, 8, 8],
            MockStorage::empty(),
            peer::Config {
                local_time: alice.local_time(),
                ..peer::Config::default()
            },
        )
        .initialized();

        // Tell Alice about the amazing projects available
        alice.connect_to(&bob);
        for num_projs in test.peer_projects {
            let peer = Peer::new("other", [9, 9, 9, 9]);

            alice.receive(bob.id(), peer.node_announcement());
            alice.receive(
                bob.id(),
                Message::inventory(
                    InventoryAnnouncement {
                        inventory: test::arbitrary::vec::<RepoId>(num_projs)
                            .try_into()
                            .unwrap(),
                        timestamp: bob.local_time().into(),
                    },
                    peer.signer(),
                ),
            );
        }

        // Wait for things to happen
        assert!(test.wait_time > PRUNE_INTERVAL, "pruning must be triggered");
        alice.elapse(test.wait_time);

        assert_eq!(
            test.expected_routing_table_size,
            alice.database().routing().len().unwrap()
        );
    }
}

#[test]
fn test_seeding() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let proj_id: identity::RepoId = test::arbitrary::r#gen(1);

    let (cmd, receiver) = Command::seed(proj_id, policy::Scope::default());
    alice.command(cmd);
    let policy_change = receiver
        .recv()
        .map_err(runtime::HandleError::from)
        .unwrap()
        .unwrap();
    assert!(policy_change);
    assert!(alice.policies().is_seeding(&proj_id).unwrap());

    let (cmd, receiver) = Command::unseed(proj_id);
    alice.command(cmd);
    let policy_change = receiver
        .recv()
        .map_err(runtime::HandleError::from)
        .unwrap()
        .unwrap();
    assert!(policy_change);
    assert!(!alice.policies().is_seeding(&proj_id).unwrap());
}

#[test]
fn test_inventory_relay_bad_timestamp() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let two_hours = 3600 * 1000 * 2;
    let timestamp = alice.timestamp() + two_hours;

    alice.connect_to(&bob);
    alice.receive(
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: BoundedVec::new(),
                timestamp,
            },
            bob.signer(),
        ),
    );
    assert_matches!(
        alice.outbox().next(),
        Some(Io::Disconnect(addr, DisconnectReason::Session(session::Error::InvalidTimestamp(t))))
        if addr == bob.id() && t == timestamp
    );
}

#[test]
fn test_announcement_rebroadcast() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);

    alice.connect_to(&bob);
    alice.connect_from(&eve);
    alice.outbox().for_each(drop);

    log::debug!(target: "test", "Receiving gossips..");

    let received = test::gossip::messages(6, alice.local_time(), MAX_TIME_DELTA);
    for msg in received.iter().cloned() {
        alice.receive(bob.id(), msg);
    }

    alice.receive(
        eve.id(),
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
            since: Timestamp::MIN,
            until: Timestamp::MAX,
        }),
    );

    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();
    let received = received
        .into_iter()
        .chain(Some(bob.node_announcement()))
        .collect::<BTreeSet<_>>();

    assert_eq!(relayed.len(), received.len());
    assert_eq!(relayed, received);
}

#[test]
fn test_announcement_rebroadcast_duplicates() {
    let mut carol = Peer::new("carol", [4, 4, 4, 4]);
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let rids = arbitrary::set::<RepoId>(3..=3);

    carol.init();
    alice.connect_to(&bob);
    alice.receive(bob.id, carol.node_announcement());

    // These are not expected to be relayed.
    let stale = {
        let mut anns = BTreeSet::new();

        for _ in 0..5 {
            carol.elapse(LocalDuration::from_mins(1));

            anns.insert(carol.inventory_announcement());
            anns.insert(carol.node_announcement());
        }
        anns
    };

    // These are expected to be relayed.
    let expected = {
        let mut anns = BTreeSet::new();

        carol.elapse(LocalDuration::from_mins(1));
        anns.insert(carol.inventory_announcement());
        anns.insert(carol.node_announcement());
        anns.insert(bob.node_announcement());

        for rid in rids {
            alice.seed(&rid, policy::Scope::All).unwrap();
            anns.insert(carol.refs_announcement(rid));
            anns.insert(bob.refs_announcement(rid));
        }
        anns
    };

    let mut all = stale.iter().chain(expected.iter()).collect::<Vec<_>>();
    fastrand::shuffle(&mut all);

    // Alice receives all messages out of order.
    for ann in all {
        alice.receive(bob.id, ann.clone());
    }

    // Alice relays just the expected ones back to Eve.
    alice.connect_from(&eve);
    alice.receive(
        eve.id(),
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
            since: Timestamp::MIN,
            until: Timestamp::MAX,
        }),
    );

    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();

    assert_eq!(relayed.len(), 9);
    assert_eq!(relayed, expected);
}

#[test]
fn test_announcement_rebroadcast_timestamp_filtered() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);

    alice.connect_to(&bob);

    let delta = LocalDuration::from_mins(10);
    let first = test::gossip::messages(3, alice.local_time() - delta, LocalDuration::from_secs(0));
    let second = test::gossip::messages(3, alice.local_time(), LocalDuration::from_secs(0));
    let third = test::gossip::messages(3, alice.local_time() + delta, LocalDuration::from_secs(0));

    // Alice receives three batches of messages.
    for msg in first
        .iter()
        .chain(second.iter())
        .chain(third.iter())
        .cloned()
    {
        alice.receive(bob.id(), msg);
    }

    // Eve subscribes to messages within the period of the second batch only.
    alice.connect_from(&eve);
    alice.receive(
        eve.id(),
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
            since: alice.local_time().into(),
            until: (alice.local_time() + delta).into(),
        }),
    );

    let relayed = alice.relayed(eve.id()).collect::<BTreeSet<_>>();
    let second = second
        .into_iter()
        .chain(Some(bob.node_announcement()))
        .collect::<BTreeSet<_>>();

    assert_eq!(relayed.len(), second.len());
    assert_eq!(relayed, second);
}

#[test]
fn test_announcement_relay() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let mut bob = Peer::new("bob", [8, 8, 8, 8]);
    let mut eve = Peer::new("eve", [9, 9, 9, 9]);

    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice
        .receive(bob.id(), bob.inventory_announcement())
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_))
    );

    alice.receive(bob.id(), bob.inventory_announcement());
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "Another inventory with the same timestamp is ignored"
    );

    bob.elapse(LocalDuration::from_mins(1));
    alice
        .receive(bob.id(), bob.inventory_announcement())
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "Another inventory with a fresher timestamp is relayed"
    );

    alice
        .receive(bob.id(), bob.node_announcement())
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "A node announcement with the same timestamp as the inventory is relayed"
    );

    alice
        .receive(bob.id(), bob.node_announcement())
        .elapse(service::GOSSIP_INTERVAL);
    assert!(alice.messages(eve.id()).next().is_none(), "Only once");

    alice
        .receive(eve.id(), eve.node_announcement())
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(bob.id()).next(),
        Some(Message::Announcement(_)),
        "A node announcement from Eve is relayed to Bob"
    );
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "But not back to Eve"
    );

    eve.elapse(LocalDuration::from_mins(1));
    alice
        .receive(bob.id(), eve.node_announcement())
        .elapse(service::GOSSIP_INTERVAL);
    assert!(
        alice.messages(bob.id()).next().is_none(),
        "Bob already know about this message, since he sent it"
    );
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "Eve already know about this message, since she signed it"
    );
}

#[test]
fn test_refs_announcement_relay_public() {
    let tmp = tempfile::tempdir().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], MockStorage::empty());
    let eve = Peer::with_storage(
        "eve",
        [8, 8, 8, 8],
        Storage::open(tmp.path().join("eve"), fixtures::user()).unwrap(),
    );

    let bob = {
        let mut rng = fastrand::Rng::new();
        let signer = Device::mock_rng(&mut rng);
        let storage = fixtures::storage(tmp.path().join("bob"), &signer).unwrap();

        Peer::config(
            "bob",
            [9, 9, 9, 9],
            storage,
            peer::Config {
                signer,
                rng,
                ..peer::Config::default()
            },
        )
        .initialized()
    };
    let bob_inv = bob.inventory().into_iter().collect::<Vec<_>>();

    alice.seed(&bob_inv[0], policy::Scope::All).unwrap();
    alice.seed(&bob_inv[1], policy::Scope::All).unwrap();
    alice.seed(&bob_inv[2], policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(eve.id(), Message::Subscribe(Subscribe::all()));
    alice
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
        .elapse(service::GOSSIP_INTERVAL);

    // Pretend Alice cloned Bob's repos.
    let repos = r#gen::<[MockRepository; 3]>(1);
    for (i, mut repo) in repos.into_iter().enumerate() {
        repo.doc.doc = repo
            .doc
            .doc
            .with_edits(|doc| {
                doc.visibility = Visibility::Public; // Public repos are always gossiped.
            })
            .unwrap();
        alice.storage_mut().repos.insert(bob_inv[i], repo);
    }
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "A refs announcement from Bob is relayed to Eve"
    );

    alice
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
        .elapse(service::GOSSIP_INTERVAL);
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "The same ref announcement is not relayed"
    );

    alice
        .receive(bob.id(), bob.refs_announcement(bob_inv[1]))
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "But a different one is"
    );

    alice
        .receive(bob.id(), bob.refs_announcement(bob_inv[2]))
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "And a third one is as well"
    );
}

#[test]
fn test_refs_announcement_relay_private() {
    let tmp = tempfile::tempdir().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], MockStorage::empty());
    let eve = Peer::with_storage(
        "eve",
        [8, 8, 8, 8],
        Storage::open(tmp.path().join("eve"), fixtures::user()).unwrap(),
    );

    let bob = {
        let mut rng = fastrand::Rng::new();
        let signer = Device::mock_rng(&mut rng);
        let storage = fixtures::storage(tmp.path().join("bob"), &signer).unwrap();

        Peer::config(
            "bob",
            [9, 9, 9, 9],
            storage,
            peer::Config {
                signer,
                rng,
                ..peer::Config::default()
            },
        )
        .initialized()
    };
    let bob_inv = bob.inventory().into_iter().collect::<Vec<_>>();

    alice.seed(&bob_inv[0], policy::Scope::All).unwrap();
    alice.seed(&bob_inv[1], policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(eve.id(), Message::Subscribe(Subscribe::all()));

    // The first repo is not visible to Eve.
    let repo1 = {
        let mut repo = r#gen::<MockRepository>(1);
        repo.doc.doc = repo
            .doc
            .doc
            .with_edits(|doc| {
                doc.visibility = Visibility::Private { allow: [].into() };
            })
            .unwrap();
        repo
    };
    alice.storage_mut().repos.insert(bob_inv[0], repo1);

    // The second repo is visible to Eve.
    let repo2 = {
        let mut repo = r#gen::<MockRepository>(1);
        repo.doc.doc = repo
            .doc
            .doc
            .with_edits(|doc| {
                doc.visibility = Visibility::Private {
                    allow: [eve.id.into()].into(),
                };
            })
            .unwrap();
        repo
    };
    alice.storage_mut().repos.insert(bob_inv[1], repo2);
    alice.elapse(service::GOSSIP_INTERVAL);
    alice.messages(eve.id()).for_each(drop);
    alice
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        None,
        "The first ref announcement is not relayed to Eve"
    );

    alice
        .receive(bob.id(), bob.refs_announcement(bob_inv[1]))
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            message: AnnouncementMessage::Refs(_),
            ..
        })),
        "The second ref announcement is relayed to Eve"
    );
}

/// Even if Alice is not tracking Bob, Alice will fetch Bob's refs for a repo she doesn't have.
#[test]
fn test_refs_announcement_fetch_trusted_no_inventory() {
    let tmp = tempfile::tempdir().unwrap();
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
    );
    let bob = {
        let mut rng = fastrand::Rng::new();
        let signer = Device::mock_rng(&mut rng);
        let storage = fixtures::storage(tmp.path().join("bob"), &signer).unwrap();

        Peer::config(
            "bob",
            [9, 9, 9, 9],
            storage,
            peer::Config {
                signer,
                rng,
                ..peer::Config::default()
            },
        )
        .initialized()
    };
    let bob_inv = bob.inventory();
    let rid = bob_inv.iter().next().unwrap();

    alice.seed(rid, policy::Scope::Followed).unwrap();
    alice.connect_to(&bob);

    // Alice receives Bob's refs.
    alice.receive(bob.id(), bob.refs_announcement(*rid));

    // Alice fetches Bob's refs as this is a new repo.
    assert_matches!(alice.outbox().next(), Some(Io::Fetch { .. }));
}

/// Alice and Bob both have the same repo.
///
/// First, Alice will not fetch from Bob's `RefsAnnouncement` as Alice does not
/// track Bob as `Followed`.
///
/// Later Alice follows Bob, and will be able to fetch Bob's refs.
#[test]
fn test_refs_announcement_followed() {
    // Create MockStorage for Alice and Bob. Both will have repo with `rid`.
    let storage_alice = arbitrary::nonempty_storage(1);
    let rid = *storage_alice.repos.keys().next().unwrap();
    let storage_bob = storage_alice.clone();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);

    let node_id = alice.id;
    let repo = alice.storage_mut().repo_mut(&rid);
    let root = repo.identity_root().unwrap();
    let sigrefs_at = bob.signed_refs_at(root);

    repo.remotes.insert(node_id, sigrefs_at);

    // Generate some refs for Bob under their own node_id.
    let sigrefs_at = bob.signed_refs_at(root);
    let node_id = bob.id;
    bob.init();
    bob.storage_mut()
        .repo_mut(&rid)
        .remotes
        .insert(node_id, sigrefs_at);

    // Alice uses Scope::Followed, and did not track Bob yet.
    alice.connect_to(&bob);
    alice.seed(&rid, policy::Scope::Followed).unwrap();

    // Alice receives Bob's refs
    alice.receive(bob.id(), bob.refs_announcement(rid));

    // Alice does not fetch as Alice is not tracking Bob.
    assert!(
        alice.messages(bob.id()).next().is_none(),
        "Alice is not tracking bob yet."
    );

    // Alice starts to track Bob.
    let (cmd, receiver) = Command::follow(bob.id, Some(node::Alias::new("bob")));
    alice.command(cmd);
    let policy_change = receiver
        .recv()
        .map_err(runtime::HandleError::from)
        .unwrap()
        .unwrap();
    assert!(policy_change);

    // Bob announces refs again.
    bob.elapse(LocalDuration::from_mins(1)); // Make sure our announcement is fresh.
    alice.receive(bob.id(), bob.refs_announcement(rid));
    assert_matches!(alice.outbox().next(), Some(Io::Fetch { .. }));
}

#[test]
fn test_refs_announcement_no_subscribe() {
    let storage = arbitrary::nonempty_storage(1);
    let rid = *storage.repos.keys().next().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let id = arbitrary::r#gen(1);

    alice.seed(&id, policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(bob.id(), bob.refs_announcement(rid));

    assert!(alice.messages(eve.id()).next().is_none());
}

#[test]
fn test_refs_announcement_offline() {
    let tmp = tempfile::tempdir().unwrap();
    let mut alice = {
        let signer = Device::mock();
        let storage = fixtures::storage(tmp.path().join("alice"), &signer).unwrap();

        Peer::config(
            "alice",
            [7, 7, 7, 7],
            storage,
            peer::Config {
                signer,
                ..peer::Config::default()
            },
        )
    };
    let mut bob = Peer::new("bob", [8, 8, 8, 8]);

    // Make sure alice's service wasn't initialized before.
    assert_eq!(*alice.clock(), LocalTime::default());

    alice.initialize();
    alice.connect_to(&bob);
    alice.receive(bob.id, Message::Subscribe(Subscribe::all()));

    let mut inv = alice.inventory();
    let rid = *inv.iter().next().unwrap();

    bob.seed(&rid, policy::Scope::All).unwrap();

    // Alice announces the refs of all projects since she hasn't announced refs for these projects
    // yet.
    for msg in alice.messages(bob.id()) {
        assert_matches!(
            msg,
            Message::Announcement(Announcement {
                node,
                message: AnnouncementMessage::Refs(RefsAnnouncement {
                    rid,
                    ..
                }),
                ..
            })
            if node == alice.id && inv.remove(&rid)
        );
    }

    // Create an issue without telling the node.
    let repo = alice.storage().repository(rid).unwrap();
    let old_refs = RefsAt::new(&repo, alice.id).unwrap();
    let mut issues = radicle::issue::Cache::no_cache(&repo, alice.signer()).unwrap();
    issues
        .create(
            cob::Title::new("Issue while offline!").unwrap(),
            "",
            &[],
            &[],
            [],
        )
        .unwrap();
    let new_refs = RefsAt::new(&repo, alice.id).unwrap();
    assert_ne!(old_refs, new_refs);

    // Now we restart Alice's node. It should pick up that something's changed in storage.
    alice.elapse(LocalDuration::from_secs(60));
    alice
        .database_mut()
        .addresses_mut()
        .remove(&bob.id)
        .unwrap(); // Make sure we don't reconnect automatically.
    alice.disconnected(
        bob.id,
        Link::Outbound,
        &DisconnectReason::Session(session::Error::Timeout),
    );
    alice.outbox().for_each(drop);
    alice.restart();
    alice.connect_to(&bob);
    alice.receive(
        bob.id,
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
            since: alice.timestamp(),
            until: Timestamp::MAX,
        }),
    );

    let anns = alice
        .messages(bob.id())
        .filter_map(|m| match m {
            Message::Announcement(Announcement {
                message: AnnouncementMessage::Refs(ann),
                ..
            }) if ann.rid == rid => Some(ann),
            _ => None,
        })
        .collect::<Vec<_>>();

    assert_eq!(anns.len(), 1);
    assert_eq!(anns.first().unwrap().rid, rid);
    assert_eq!(anns.first().unwrap().refs.first().unwrap().at, new_refs.at);
}

#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let inv = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
    let now = LocalTime::now().into();

    // Inventory from Bob relayed to Eve.
    alice.init();
    alice.wake(); // Run all periodic tasks now so they don't trigger later.
    alice.connect_to(&bob);
    alice.connect_from(&eve);
    alice
        .receive(
            bob.id(),
            Message::inventory(
                InventoryAnnouncement {
                    inventory: inv.clone(),
                    timestamp: now,
                },
                bob.signer(),
            ),
        )
        .elapse(service::GOSSIP_INTERVAL);

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
            ..
        }))
        if node == bob.node_id() && timestamp == now
    );
    assert_matches!(
        alice.inventory_announcements(bob.id()).next(),
        None,
        "The inventory is not sent back to Bob"
    );

    alice
        .receive(
            bob.id(),
            Message::inventory(
                InventoryAnnouncement {
                    inventory: inv.clone(),
                    timestamp: now,
                },
                bob.signer(),
            ),
        )
        .elapse(service::GOSSIP_INTERVAL);

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        None,
        "Sending the same inventory again doesn't trigger a relay"
    );

    alice
        .receive(
            bob.id(),
            Message::inventory(
                InventoryAnnouncement {
                    inventory: inv.clone(),
                    timestamp: now + 1,
                },
                bob.signer(),
            ),
        )
        .elapse(service::GOSSIP_INTERVAL);

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
            ..
        }))
        if node == bob.node_id() && timestamp == now + 1,
        "Sending a new inventory does trigger the relay"
    );

    // Inventory from Eve relayed to Bob.
    alice
        .receive(
            eve.id(),
            Message::inventory(
                InventoryAnnouncement {
                    inventory: inv,
                    timestamp: now,
                },
                eve.signer(),
            ),
        )
        .elapse(service::GOSSIP_INTERVAL);

    assert_matches!(
        alice.inventory_announcements(bob.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
            ..
        }))
        if node == eve.node_id() && timestamp == now
    );
}

#[test]
fn test_persistent_peer_reconnect_attempt() {
    use indexmap::IndexSet;

    let mut bob = Peer::new("bob", [8, 8, 8, 8]);
    let mut eve = Peer::new("eve", [9, 9, 9, 9]);
    let mut alice = Peer::config(
        "alice",
        [7, 7, 7, 7],
        MockStorage::empty(),
        peer::Config {
            config: Config {
                connect: IndexSet::from_iter([
                    (bob.id(), bob.address()).into(),
                    (eve.id(), eve.address()).into(),
                ]),
                ..Config::new(node::Alias::new("alice"))
            },
            ..peer::Config::default()
        },
    )
    .initialized();

    let mut sim = Simulation::new(
        LocalTime::now(),
        alice.rng.clone(),
        simulator::Options::default(),
    )
    .initialize([&mut alice, &mut bob, &mut eve]);

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

    let ips = alice
        .sessions()
        .connected()
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();
    assert!(ips.contains(&bob.id()));
    assert!(ips.contains(&eve.id()));

    // ... Negotiated ...
    //
    // Now let's disconnect a peer.

    // A non-transient disconnect, such as one due to peer misbehavior will still trigger a
    // a reconnection, since this is a persistent peer.
    let reason = DisconnectReason::Session(session::Error::Misbehavior);

    for _ in 0..3 {
        alice.disconnected(bob.id(), Link::Outbound, &reason);
        alice.elapse(service::MAX_RECONNECTION_DELTA);
        alice
            .outbox()
            .find(|io| matches!(io, Io::Connect(a, _) if a == &bob.id()))
            .unwrap();

        alice.attempted(bob.id(), bob.address());
    }
}

#[test]
fn test_persistent_peer_reconnect_success() {
    use indexmap::IndexSet;

    let bob = Peer::with_storage("bob", [9, 9, 9, 9], MockStorage::empty());
    let mut alice = Peer::config(
        "alice",
        [7, 7, 7, 7],
        MockStorage::empty(),
        peer::Config {
            config: Config {
                connect: IndexSet::from_iter([(bob.id, bob.addr()).into()]),
                ..Config::new(node::Alias::new("alice"))
            },
            ..peer::Config::default()
        },
    )
    .initialized();
    alice.connect_to(&bob);

    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    alice.disconnected(
        bob.id(),
        Link::Outbound,
        &DisconnectReason::Connection(error),
    );
    alice.elapse(service::MIN_RECONNECTION_DELTA);
    alice.elapse(service::MIN_RECONNECTION_DELTA); // Trigger a second wakeup to test idempotence.

    alice
        .outbox()
        .find_map(|o| match o {
            Io::Connect(id, _) => Some(id),
            _ => None,
        })
        .expect("Alice attempts a re-connection");

    alice.attempted(bob.id(), bob.addr());
    alice.connected(bob.id(), bob.addr(), Link::Outbound);
}

#[test]
fn test_maintain_connections() {
    // Peers alice starts out connected to.
    let connected = vec![
        Peer::new("connected", [8, 8, 8, 1]),
        Peer::new("connected", [8, 8, 8, 2]),
        Peer::new("connected", [8, 8, 8, 3]),
    ];
    // Peers alice will connect to once the others disconnect.
    let mut unconnected = vec![
        Peer::new("unconnected", [9, 9, 9, 1]),
        Peer::new("unconnected", [9, 9, 9, 2]),
        Peer::new("unconnected", [9, 9, 9, 3]),
    ];

    let mut alice = Peer::new("alice", [7, 7, 7, 7]);

    for peer in connected.iter() {
        alice.connect_to(peer);
    }
    assert_eq!(
        connected.len(),
        alice.sessions().len(),
        "alice should be connected to the first set of peers"
    );
    // We now import the other addresses.
    alice.import_addresses(&unconnected);

    // A non-transient error such as this will cause Alice to attempt a different peer.
    let error = session::Error::Misbehavior;
    for peer in connected.iter() {
        alice.disconnected(peer.id(), Link::Outbound, &DisconnectReason::Session(error));

        let id = alice
            .outbox()
            .find_map(|o| match o {
                Io::Connect(id, _) => Some(id),
                _ => None,
            })
            .expect("Alice connects to a new peer");
        assert_ne!(id, peer.id());
        unconnected.retain(|p| p.id() != id);
    }
    assert!(
        unconnected.is_empty(),
        "alice should connect to all unconnected peers"
    );
}

#[test]
fn test_maintain_connections_transient() {
    // Peers alice starts out connected to.
    let connected = vec![
        Peer::new("connected", [8, 8, 8, 1]),
        Peer::new("connected", [8, 8, 8, 2]),
        Peer::new("connected", [8, 8, 8, 3]),
    ];
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);

    for peer in connected.iter() {
        alice.connect_to(peer);
    }
    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    for peer in connected.iter() {
        alice.disconnected(
            peer.id(),
            Link::Outbound,
            &DisconnectReason::Connection(error.clone()),
        );
        alice
            .outbox()
            .find(|o| matches!(o, Io::Connect(id, _) if id == &peer.id()))
            .unwrap();
    }
}

#[test]
fn test_maintain_connections_failed_attempt() {
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let reason =
        DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)));

    // Make sure Alice knows about Eve.
    alice.connect_to(&eve);
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    alice
        .outbox()
        .find(|o| matches!(o, Io::Connect(id, _) if id == &eve.id))
        .expect("Alice attempts Eve");
    alice.attempted(eve.id, eve.addr());

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    assert_matches!(
        alice.outbox().find(|o| matches!(o, Io::Connect(_, _))),
        None
    );

    // Now pass some time and try again.
    alice.elapse(MAX_RECONNECTION_DELTA);
    alice
        .outbox()
        .find(|o| matches!(o, Io::Connect(id, _) if id == &eve.id))
        .expect("Alice attempts Eve again");

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    assert!(!alice.outbox().any(|o| matches!(o, Io::Connect(_, _))));
    // Or even after some short time..
    alice.elapse(MIN_RECONNECTION_DELTA);
    assert!(!alice.outbox().any(|o| matches!(o, Io::Connect(_, _))));
}

#[test]
fn test_seed_repo_subscribe() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let rid = arbitrary::r#gen::<RepoId>(1);

    alice.connect_to(&bob);
    let (cmd, recv) = Command::seed(rid, policy::Scope::default());
    alice.command(cmd);
    assert!(recv.recv().unwrap().unwrap());

    assert_matches!(
        alice.messages(bob.id).next(),
        Some(Message::Subscribe(Subscribe {
            filter,
            since,
            ..
        })) if since == alice.timestamp() && filter.contains(&rid)
    );
}

#[test]
fn test_fetch_missing_inventory_on_gossip() {
    let rid = arbitrary::r#gen::<RepoId>(1);
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let now = LocalTime::now();

    alice.seed(&rid, node::policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.receive(
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: vec![rid].try_into().unwrap(),
                timestamp: now.into(),
            },
            bob.signer(),
        ),
    );
    alice
        .outbox()
        .find(|m| matches!(m, Io::Fetch { rid: other, .. } if other == &rid))
        .unwrap();
}

#[test]
fn test_fetch_missing_inventory_on_schedule() {
    let rid = arbitrary::r#gen::<RepoId>(1);
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let now = LocalTime::now();

    alice.seed(&rid, node::policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.receive(
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: vec![rid].try_into().unwrap(),
                timestamp: now.into(),
            },
            bob.signer(),
        ),
    );
    alice.fetched(
        rid,
        bob.id,
        Err(radicle_protocol::worker::FetchError::Io(
            io::ErrorKind::ConnectionReset.into(),
        )),
    );
    alice.outbox().for_each(drop);
    alice.elapse(service::SYNC_INTERVAL);
    alice
        .outbox()
        .find(|m| matches!(m, Io::Fetch { rid: other, .. } if other == &rid))
        .unwrap();
}

#[test]
fn test_queued_fetch_max_capacity() {
    let storage = arbitrary::nonempty_storage(3);
    let mut repo_keys = storage.repos.keys();
    let rid1 = *repo_keys.next().unwrap();
    let rid2 = *repo_keys.next().unwrap();
    let rid3 = *repo_keys.next().unwrap();
    let doc = storage.repos.get(&rid1).unwrap().doc.clone();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);

    alice.connect_to(&bob);

    // Send the first fetch.
    let (cmd, _recv1) = Command::fetch(rid1, bob.id, DEFAULT_TIMEOUT, None);
    alice.command(cmd);

    // Send the 2nd fetch that will be queued.
    let (cmd, _recv2) = Command::fetch(rid2, bob.id, DEFAULT_TIMEOUT, None);
    alice.command(cmd);

    // Send the 3rd fetch that will be queued.
    let (cmd, _recv3) = Command::fetch(rid3, bob.id, DEFAULT_TIMEOUT, None);
    alice.command(cmd);

    // The first fetch is initiated.
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid1);
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.outbox().next(), None);

    // Have enough time pass that Alice sends a "ping" to Bob.
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));

    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_eq!(alice.fetches().next(), Some((rid2, bob.id)));
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::new(doc)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_eq!(alice.fetches().next(), Some((rid3, bob.id)));
}

#[test]
fn test_queued_fetch_from_ann_same_rid() {
    let storage = arbitrary::nonempty_storage(1); // We're testing both public and private repos.
    let mut repo_keys = storage.repos.keys();
    let rid = *repo_keys.next().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let carol = Peer::new("carol", [10, 10, 10, 10]);
    let oid = arbitrary::oid();
    let ann = RefsAnnouncement {
        rid,
        refs: vec![RefsAt {
            remote: carol.id(),
            at: oid,
        }]
        .try_into()
        .unwrap(),
        timestamp: bob.timestamp(),
    };

    alice.seed(&rid, policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.connect_to(&carol);

    // Send the first announcement.
    alice.receive(bob.id, bob.announcement(ann.clone()));
    // Send the 2nd announcement that will be queued.
    alice.receive(eve.id, eve.announcement(ann.clone()));
    // Send the 3rd announcement that will be queued.
    alice.receive(carol.id, carol.announcement(ann));

    // The first fetch is initiated.
    assert_matches!(alice.fetches().next(), Some((rid_, nid_)) if rid_ == rid && nid_ == bob.id);
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.fetches().next(), None);

    // Have enough time pass that Alice sends a "ping" to Bob.
    alice.elapse(KEEP_ALIVE_DELTA);

    let refname = carol
        .id()
        .to_namespace()
        .join(git::fmt::refname!("refs/sigrefs"));

    // Finish the 1st fetch.
    // Ensure the ref is in the storage and cache.
    let repo = alice.storage_mut().repo_mut(&rid);
    let sigrefs_at = carol.signed_refs_at(repo.identity_root().unwrap());
    repo.remotes.insert(carol.id(), sigrefs_at);
    alice
        .database_mut()
        .refs_mut()
        .set(&rid, &carol.id, &SIGREFS_BRANCH, oid, LocalTime::now())
        .unwrap();
    alice.fetched(
        rid,
        bob.id,
        Ok(fetch::FetchResult {
            updated: vec![RefUpdate::Created {
                name: refname.clone(),
                oid,
            }],
            canonical: fetch::UpdatedCanonicalRefs::default(),
            namespaces: [carol.id()].into_iter().collect(),
            clone: false,
            doc: arbitrary::r#gen(1),
        }),
    );
    // Now the 1st fetch is done, but the 2nd and 3rd fetches are redundant.
    assert_matches!(alice.fetches().next(), None);
}

#[test]
fn test_queued_fetch_from_command_same_rid() {
    let storage = arbitrary::nonempty_storage(3);
    let mut repo_keys = storage.repos.keys();
    let rid1 = *repo_keys.next().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let carol = Peer::new("carol", [10, 10, 10, 10]);

    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.connect_to(&carol);

    // Send the first fetch.
    let (cmd, _recv1) = Command::fetch(rid1, bob.id, DEFAULT_TIMEOUT, None);
    alice.command(cmd);

    // Send the 2nd fetch that will be queued.
    let (cmd, _recv2) = Command::fetch(rid1, eve.id, DEFAULT_TIMEOUT, None);
    alice.command(cmd);

    // Send the 3rd fetch that will be queued.
    let (cmd, _recv3) = Command::fetch(rid1, carol.id, DEFAULT_TIMEOUT, None);
    alice.command(cmd);

    // Peers Alice will fetch from.
    let mut peers = [bob.id, eve.id, carol.id]
        .into_iter()
        .collect::<BTreeSet<_>>();

    // The first fetch is initiated.
    let (rid, nid) = alice.fetches().next().unwrap();
    assert_eq!(rid, rid1);
    assert!(peers.remove(&nid));

    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.outbox().next(), None);

    // Have enough time pass that Alice sends a "ping" to Bob.
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
    alice.fetched(rid1, nid, Ok(arbitrary::r#gen::<fetch::FetchResult>(1)));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    let (rid, nid) = alice.fetches().next().unwrap();
    assert_eq!(rid, rid1);
    assert!(peers.remove(&nid));

    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
    alice.fetched(rid1, nid, Ok(arbitrary::r#gen::<fetch::FetchResult>(1)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && peers.remove(&nid));
    // All fetches were initiated.
    assert!(peers.is_empty());
}

#[test]
fn test_refs_synced_event() {
    let temp = tempfile::tempdir().unwrap();
    let storage = Storage::open(temp.path(), fixtures::user()).unwrap();
    let mut alice = Peer::with_storage("alice", [8, 8, 8, 8], storage.clone());
    let bob = Peer::new("bob", [9, 9, 9, 9]);
    let eve = Peer::with_storage("eve", [7, 7, 7, 7], storage);
    let acme = alice.project("acme", "");
    let events = alice.events();
    let ann = AnnouncementMessage::from(RefsAnnouncement {
        rid: acme,
        refs: vec![RefsAt::new(&alice.storage().repository(acme).unwrap(), alice.id).unwrap()]
            .try_into()
            .unwrap(),
        timestamp: bob.timestamp(),
    });
    let msg = ann.signed(bob.signer());

    alice.seed(&acme, policy::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.receive(bob.id, Message::Announcement(msg));

    events
        .wait(
            |e| {
                matches!(
                    e,
                    Event::RefsSynced { remote, rid, .. }
                    if rid == &acme && remote == &bob.id
                )
                .then_some(())
            },
            time::Duration::from_secs(3),
        )
        .unwrap();

    // Now a relayed announcement.
    alice.receive(bob.id, eve.node_announcement());
    alice.receive(bob.id, eve.refs_announcement(acme));

    events
        .wait(
            |e| matches!(e, Event::RefsSynced { remote, .. } if remote == &eve.id).then_some(()),
            time::Duration::from_secs(3),
        )
        .unwrap();
}

#[test]
fn test_init_and_seed() {
    let tempdir = tempfile::tempdir().unwrap();

    let storage_alice = Storage::open(
        tempdir.path().join("alice").join("storage"),
        fixtures::user(),
    )
    .unwrap();
    let (repo, _) = fixtures::repository(tempdir.path().join("working"));
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);

    let storage_bob =
        Storage::open(tempdir.path().join("bob").join("storage"), fixtures::user()).unwrap();
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);

    let storage_eve =
        Storage::open(tempdir.path().join("eve").join("storage"), fixtures::user()).unwrap();
    let mut eve = Peer::with_storage("eve", [9, 9, 9, 9], storage_eve);

    remote::mock::register(&alice.node_id(), alice.storage().path());
    remote::mock::register(&eve.node_id(), eve.storage().path());
    remote::mock::register(&bob.node_id(), bob.storage().path());
    local::register(alice.storage().clone());

    // Alice and Bob connect to Eve.
    alice.command(service::Command::Connect(
        eve.id(),
        eve.address(),
        ConnectOptions::default(),
    ));
    bob.command(service::Command::Connect(
        eve.id(),
        eve.address(),
        ConnectOptions::default(),
    ));

    // Alice creates a new project.
    let (proj_id, _, _) = rad::init(
        &repo,
        "alice".try_into().unwrap(),
        "alice's repo",
        git::fmt::refname!("master"),
        Visibility::default(),
        alice.signer(),
        alice.storage(),
    )
    .unwrap();

    let mut sim = Simulation::new(
        LocalTime::now(),
        alice.rng.clone(),
        simulator::Options::default(),
    )
    .initialize([&mut alice, &mut bob, &mut eve]);

    let bob_events = bob.events();

    // Neither Eve nor Bob have Alice's project for now.
    assert!(eve.get(proj_id).unwrap().is_none());
    assert!(bob.get(proj_id).unwrap().is_none());

    // Bob seeds Alice's project.
    let (cmd, receiver) = service::Command::seed(proj_id, policy::Scope::default());
    bob.command(cmd);
    assert!(receiver.recv().unwrap().unwrap());

    // Eve seeds Alice's project.
    let (cmd, receiver) = service::Command::seed(proj_id, policy::Scope::default());
    eve.command(cmd);
    assert!(receiver.recv().unwrap().unwrap());

    // We now expect Eve to fetch Alice's project from Alice.
    // Then we expect Bob to fetch Alice's project from Eve.
    alice.elapse(LocalDuration::from_secs(1)); // Make sure our announcement is fresh.
    let (cmd, _) = service::Command::add_inventory(proj_id);
    alice.command(cmd);

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

    log::debug!(target: "test", "Simulation is over");

    // TODO: Refs should be compared between the two peers.

    log::debug!(target: "test", "Waiting for {} to fetch {} from {}..", bob.id, proj_id,eve.id);
    bob_events
        .iter()
        .find(|e| {
            matches!(
                e,
                radicle::node::events::Event::RefsFetched { remote, .. }
                if *remote == eve.node_id()
            )
        })
        .expect("Bob fetched from Eve");

    assert!(eve.storage().get(proj_id).unwrap().is_some());
    assert!(bob.storage().get(proj_id).unwrap().is_some());
}

#[test]
fn prop_inventory_exchange_dense() {
    fn property(alice_inv: MockStorage, bob_inv: MockStorage, eve_inv: MockStorage) {
        let rng = fastrand::Rng::new();
        let alice = Peer::with_storage(
            "alice",
            [7, 7, 7, 7],
            alice_inv
                .clone()
                .map(|doc| doc.visibility = Visibility::Public),
        );
        let mut bob = Peer::with_storage(
            "bob",
            [8, 8, 8, 8],
            bob_inv
                .clone()
                .map(|doc| doc.visibility = Visibility::Public),
        );
        let mut eve = Peer::with_storage(
            "eve",
            [9, 9, 9, 9],
            eve_inv
                .clone()
                .map(|doc| doc.visibility = Visibility::Public),
        );
        let mut routing = RandomMap::with_hasher(rng.clone().into());

        for (inv, peer) in &[
            (alice_inv.repos, alice.node_id()),
            (bob_inv.repos, bob.node_id()),
            (eve_inv.repos, eve.node_id()),
        ] {
            for id in inv.keys() {
                routing
                    .entry(*id)
                    .or_insert_with(|| RandomSet::with_hasher(rng.clone().into()))
                    .insert(*peer);
            }
        }

        // Fully-connected.
        bob.command(Command::Connect(
            alice.id(),
            alice.address(),
            ConnectOptions::default(),
        ));
        bob.command(Command::Connect(
            eve.id(),
            eve.address(),
            ConnectOptions::default(),
        ));
        eve.command(Command::Connect(
            alice.id(),
            alice.address(),
            ConnectOptions::default(),
        ));

        let mut peers: RandomMap<_, _> = [
            (alice.node_id(), alice),
            (bob.node_id(), bob),
            (eve.node_id(), eve),
        ]
        .into_iter()
        .collect();
        let mut simulator = Simulation::new(LocalTime::now(), rng, simulator::Options::default())
            .initialize(peers.values_mut());

        simulator.run_while(peers.values_mut(), |s| !s.is_settled());

        for (proj_id, remotes) in &routing {
            for peer in peers.values() {
                let lookup = peer.lookup(*proj_id).unwrap();

                if lookup.local.is_some() {
                    peer.get(*proj_id)
                        .expect("There are no errors querying storage")
                        .expect("The project is available locally");
                } else {
                    for remote in &lookup.remote {
                        peers[remote]
                            .get(*proj_id)
                            .expect("There are no errors querying storage")
                            .expect("The project is available remotely");
                    }
                    assert!(
                        !lookup.remote.is_empty(),
                        "There are remote locations for the project"
                    );
                    assert_eq!(
                        &lookup.remote.into_iter().collect::<RandomSet<_>>(),
                        remotes,
                        "The remotes match the global routing table"
                    );
                }
            }
        }
    }
    qcheck::QuickCheck::new()
        .r#gen(qcheck::Gen::new(5))
        .tests(20)
        .quickcheck(property as fn(MockStorage, MockStorage, MockStorage));
}

#[test]
fn test_announcement_message_amplification() {
    let mut results = Vec::new();
    let mut rng = fastrand::Rng::new();

    while results.len() < *TEST_CASES {
        let mut alice = Peer::new("alice", [7, 7, 7, 7]);
        let mut bob = Peer::new("bob", [8, 8, 8, 8]);
        let mut eve = Peer::new("eve", [9, 9, 9, 9]);
        let mut zod = Peer::new("zod", [5, 5, 5, 5]);
        let mut tom = Peer::new("tom", [4, 4, 4, 4]);
        let mut sim = Simulation::new(
            LocalTime::now(),
            alice.rng.clone(),
            simulator::Options {
                latency: 0..1, // 0 - 1s
                failure_rate: 0.,
            },
        );
        let rid = r#gen::<RepoId>(1);

        // Make sure the node gossip intervals are not accidentally synchronized.
        alice.elapse(LocalDuration::from_millis(
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
        ));
        bob.elapse(LocalDuration::from_millis(
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
        ));
        eve.elapse(LocalDuration::from_millis(
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
        ));
        zod.elapse(LocalDuration::from_millis(
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
        ));
        tom.elapse(LocalDuration::from_millis(
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
        ));

        // Fully-connected network.
        alice.command(Command::Connect(
            bob.id,
            bob.address(),
            ConnectOptions::default(),
        ));
        alice.command(Command::Connect(
            eve.id,
            eve.address(),
            ConnectOptions::default(),
        ));
        alice.command(Command::Connect(
            zod.id,
            zod.address(),
            ConnectOptions::default(),
        ));
        alice.command(Command::Connect(
            tom.id,
            tom.address(),
            ConnectOptions::default(),
        ));
        bob.command(Command::Connect(
            eve.id,
            eve.address(),
            ConnectOptions::default(),
        ));
        bob.command(Command::Connect(
            zod.id,
            zod.address(),
            ConnectOptions::default(),
        ));
        bob.command(Command::Connect(
            tom.id,
            tom.address(),
            ConnectOptions::default(),
        ));
        eve.command(Command::Connect(
            zod.id,
            zod.address(),
            ConnectOptions::default(),
        ));
        eve.command(Command::Connect(
            tom.id,
            tom.address(),
            ConnectOptions::default(),
        ));
        zod.command(Command::Connect(
            tom.id,
            tom.address(),
            ConnectOptions::default(),
        ));

        // Let the nodes connect to each other.
        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
            s.elapsed() < LocalDuration::from_mins(3)
        });

        // Ensure nodes are all connected, otherwise skip this test run.
        if alice.sessions().connected().count() != 4 {
            continue;
        }
        if bob.sessions().connected().count() != 4 {
            continue;
        }
        if eve.sessions().connected().count() != 4 {
            continue;
        }
        if zod.sessions().connected().count() != 4 {
            continue;
        }
        if tom.sessions().connected().count() != 4 {
            continue;
        }

        let timestamp = (*alice.clock()).into();
        alice
            .storage_mut()
            .repos
            .insert(rid, r#gen::<MockRepository>(1));
        let (cmd, _) = Command::add_inventory(rid);
        alice.command(cmd);

        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
            s.elapsed() < LocalDuration::from_mins(3)
        });

        // Make sure they have the routing table entry.
        for node in [&bob, &eve, &zod, &tom] {
            assert!(
                node.service
                    .database()
                    .routing()
                    .get(&rid)
                    .unwrap()
                    .contains(&alice.id)
            );
        }

        // Count how many copies of Alice's inventory message have been received by peers.
        let received = sim.messages().iter().filter(|m| {
            matches!(
                m,
                (_, _, Message::Announcement(Announcement {
                    node,
                    message: AnnouncementMessage::Inventory(i),
                    ..
                }))
                if node == &alice.id && i.inventory.to_vec() == vec![rid] && i.timestamp == timestamp
            )
        });
        results.push(received.count());
    }
    // Calculate the average amplification factor based on all simulation runs.
    let avg = results.iter().sum::<usize>() as f64 / results.len() as f64;
    // Amplification is total divided by minimum, ie. it's a relative metric.
    let amp = avg / 4.;

    // The worse case scenario is (n - 1)^2 messages received for one message announced.
    // In the above case of 5 nodes, this is 4 * 4 = 16 messages. This is an amplification of 4.0.
    // The best case is an amplification of 1.0, ie. each node receives the message once only.
    //
    // By using delayed message propagation though, we can bring this down closer to the minimum.
    log::debug!(target: "test", "Average message amplification: {amp}");

    assert!(amp < 2., "Amplification factor of {amp} is too high");
    assert!(amp >= 1., "Amplification can't be lower than 1");
}