use std::{collections::HashSet, thread, time};
use radicle::cob;
use radicle::cob::Title;
use radicle::cob::store::access::{ReadOnly, WriteAs};
use radicle_crypto::test::signer::MockSigner;
use test_log::test;
use radicle::git::raw::ErrorExt as _;
use radicle::node::Event;
use radicle::node::device::Device;
use radicle::node::policy::Scope;
use radicle::node::{Alias, ConnectResult, DEFAULT_TIMEOUT, FetchResult, Handle as _};
use radicle::storage::{
ReadRepository, ReadStorage, RefUpdate, RemoteRepository, SignRepository, ValidateRepository,
WriteRepository, WriteStorage,
};
use radicle::test::fixtures;
use radicle::{assert_matches, rad};
use radicle::{git, issue};
use crate::node::config::Limits;
use crate::node::{Config, ConnectOptions};
use crate::service;
use crate::storage::git::transport;
use crate::test::node::{Node, NodeHandle, converge};
mod config {
use super::*;
use radicle::node::config::{Config, Relay};
/// Relay node config.
pub fn relay(alias: &'static str) -> Config {
Config {
relay: Relay::Always,
..Config::test(Alias::new(alias))
}
}
/// Get the scale or "test size". This is used to scale tests with more
/// data. Defaults to `1`.
pub fn scale() -> usize {
std::env::var("RAD_TEST_SCALE")
.map(|s| {
s.parse()
.expect("repository: invalid value for `RAD_TEST_SCALE`")
})
.unwrap_or(1)
}
}
#[test]
//
// alice -- bob
//
fn test_inventory_sync_basic() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
alice.project("alice", "");
bob.project("bob", "");
let mut alice = alice.spawn();
let bob = bob.spawn();
alice.connect(&bob);
let routes = converge([&alice, &bob]);
assert_eq!(routes.len(), 2);
}
#[test]
//
// alice -- bob -- eve
//
fn test_inventory_sync_bridge() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let mut eve = Node::init(tmp.path(), config::relay("eve"));
alice.project("alice", "");
bob.project("bob", "");
eve.project("eve", "");
let mut alice = alice.spawn();
let mut eve = eve.spawn();
let bob = bob.spawn();
alice.connect(&bob);
eve.connect(&bob);
let routes = converge([&alice, &bob, &eve]);
assert_eq!(routes.len(), 3);
}
#[test]
//
// alice -- bob
// | |
// carol -- eve
//
fn test_inventory_sync_ring() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let mut eve = Node::init(tmp.path(), config::relay("eve"));
let mut carol = Node::init(tmp.path(), Config::test(Alias::new("carol")));
alice.project("alice", "");
bob.project("bob", "");
eve.project("eve", "");
carol.project("carol", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let mut eve = eve.spawn();
let mut carol = carol.spawn();
alice.connect(&bob);
bob.connect(&eve);
eve.connect(&carol);
carol.connect(&alice);
let routes = converge([&alice, &bob, &eve, &carol]);
assert_eq!(routes.len(), 4);
}
#[test]
//
// dave
// |
// eve -- alice -- bob
// |
// carol
//
fn test_inventory_sync_star() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let mut eve = Node::init(tmp.path(), config::relay("eve"));
let mut carol = Node::init(tmp.path(), Config::test(Alias::new("carol")));
let mut dave = Node::init(tmp.path(), Config::test(Alias::new("dave")));
alice.project("alice", "");
bob.project("bob", "");
eve.project("eve", "");
carol.project("carol", "");
dave.project("dave", "");
let alice = alice.spawn();
let mut bob = bob.spawn();
let mut eve = eve.spawn();
let mut carol = carol.spawn();
let mut dave = dave.spawn();
bob.connect(&alice);
eve.connect(&alice);
carol.connect(&alice);
dave.connect(&alice);
let routes = converge([&alice, &bob, &eve, &carol, &dave]);
assert_eq!(routes.len(), 5);
}
#[test]
fn test_replication() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let acme = bob.project("acme", "");
let mut alice = alice.spawn();
let bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
let inventory = alice.storage.repositories().unwrap();
assert!(inventory.is_empty());
let updated = alice.handle.seed(acme, Scope::All).unwrap();
assert!(updated);
let seeds = alice.handle.seeds_for(acme, None).unwrap();
assert!(seeds.is_connected(&bob.id));
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
let updated = match result {
FetchResult::Success { updated, .. } => updated,
FetchResult::Failed { reason } => {
panic!("Fetch failed from {}: {reason}", bob.id);
}
};
assert!(!updated.is_empty());
log::debug!(target: "test", "Fetch complete with {}", bob.id);
let inventory = alice.storage.repositories().unwrap();
let alice_repo = alice.storage.repository(acme).unwrap();
let bob_repo = bob.storage.repository(acme).unwrap();
let alice_refs = alice_repo
.references()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let bob_refs = bob_repo
.references()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(inventory.first().map(|r| r.rid), Some(acme));
assert_eq!(alice_refs, bob_refs);
assert_matches!(
alice.storage.repository(acme).unwrap().validate(),
Ok(validations) if validations.is_empty()
);
// Ensure that .keep files are deleted upon replication
{
let repo = alice.storage.repository(acme).unwrap();
let pack_dir = repo.path().join("objects").join("pack");
for entry in std::fs::read_dir(pack_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
assert_ne!(
path.extension(),
Some("keep".as_ref()),
"found .keep file after fetch: {path:?}"
);
}
}
}
#[test]
fn test_replication_ref_in_sigrefs() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let acme = bob.project("acme", "");
// Delete one of the signed refs.
bob.storage
.repository_mut(acme)
.unwrap()
.reference(&bob.id, &git::fmt::qualified!("refs/heads/master"))
.unwrap()
.delete()
.unwrap();
let mut alice = alice.spawn();
// At this point, bob will migrate sigrefs, because there only is a
// root commit in his `refs/heads/sigrefs`.
let bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
alice.handle.seed(acme, Scope::All).unwrap();
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert_matches!(result, FetchResult::Success { .. });
// Before automatic migration of sigrefs was introduced,
// alice would still see bob's master branch at this point and we
// would assert `.is_ok()`.
// With automatic migration, refs are signed as bob's node starts
// up, which is after he removes his ref locally, thus we now
// assert `.is_err()`.
assert!(
alice
.storage
.repository(acme)
.unwrap()
.reference(&bob.id, &git::fmt::qualified!("refs/heads/master"))
.is_err(),
"refs/namespaces/{}/refs/heads/master does not exist",
bob.id
);
}
#[test]
fn test_replication_invalid() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let carol = Device::mock();
let acme = bob.project("acme", "");
let repo = bob.storage.repository_mut(acme).unwrap();
let (_, head) = repo.head().unwrap();
let id = repo.identity_head().unwrap();
// Create some unsigned refs for Carol in Bob's storage.
repo.raw()
.reference(
&git::fmt::qualified!("refs/heads/carol").with_namespace(carol.public_key().into()),
head.into(),
true,
&String::default(),
)
.unwrap();
repo.raw()
.reference(
&git::refs::storage::id(carol.public_key()),
id.into(),
true,
&String::default(),
)
.unwrap();
let mut alice = alice.spawn();
let bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
alice.handle.follow(*carol.public_key(), None).unwrap();
alice.handle.seed(acme, Scope::Followed).unwrap();
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
// Fetch is successful despite not fetching Carol's refs, since she isn't a delegate.
assert!(result.is_success());
let repo = alice.storage.repository(acme).unwrap();
let mut remotes = repo.remote_ids().unwrap();
assert_eq!(remotes.next().unwrap().unwrap(), bob.id);
assert!(remotes.next().is_none());
assert!(repo.validate().unwrap().is_empty());
}
#[test]
fn test_migrated_clone() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let acme = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
let updated = bob.handle.seed(acme, Scope::All).unwrap();
assert!(updated);
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
log::debug!(target: "test", "Fetch complete with {}", alice.id);
// Simulate alice deleting the project and cloning it again
{
let path = alice.storage.path().join(acme.canonical());
std::fs::remove_dir_all(path).unwrap();
}
assert!(!alice.storage.contains(&acme).unwrap());
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
let alice_repo = alice.storage.repository(acme).unwrap();
let bob_repo = bob.storage.repository(acme).unwrap();
let alice_refs = alice_repo
.references()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let bob_refs = bob_repo
.references()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(alice_refs, bob_refs);
assert_matches!(
alice.storage.repository(acme).unwrap().validate(),
Ok(validations) if validations.is_empty()
);
}
#[test]
fn test_dont_fetch_owned_refs() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let acme = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
assert!(bob.handle.seed(acme, Scope::Followed).unwrap());
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
log::debug!(target: "test", "Fetch complete with {}", bob.id);
alice.issue(acme, Title::new("Don't fetch self").unwrap(), "Use ^");
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success())
}
#[test]
fn test_fetch_followed_remotes() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let acme = alice.project("acme", "");
let mut signers = Vec::with_capacity(5);
{
for _ in 0..5 {
let signer = Device::mock();
rad::fork_remote(acme, &alice.id, &signer, &alice.storage).unwrap();
signers.push(signer);
}
}
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
let followed = signers
.iter()
.map(|s| *s.public_key())
.take(2)
.collect::<HashSet<_>>();
assert!(
followed.len() < signers.len(),
"Bob is only trusting a subset of peers"
);
assert!(bob.handle.seed(acme, Scope::Followed).unwrap());
for nid in &followed {
assert!(bob.handle.follow(*nid, None).unwrap());
}
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
log::debug!(target: "test", "Fetch complete with {}", bob.id);
let bob_repo = bob.storage.repository(acme).unwrap();
let bob_remotes = bob_repo
.remote_ids()
.unwrap()
.collect::<Result<HashSet<_>, _>>()
.unwrap();
assert_eq!(bob_remotes.len(), followed.len() + 1);
assert!(bob_remotes.is_superset(&followed));
assert!(bob_remotes.contains(&alice.id));
}
#[test]
fn test_missing_remote() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let acme = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let carol = Device::mock();
alice.connect(&bob);
converge([&alice, &bob]);
assert!(bob.handle.seed(acme, Scope::Followed).unwrap());
assert!(bob.handle.follow(*carol.public_key(), None).unwrap());
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
log::debug!(target: "test", "Fetch complete with {}", bob.id);
rad::fork_remote(acme, &alice.id, &carol, &bob.storage).unwrap();
alice.issue(
acme,
Title::new("Missing Remote").unwrap(),
"Fixing the missing remote issue",
);
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
log::debug!(target: "test", "Fetch complete with {}", bob.id);
}
#[test]
fn test_fetch_preserve_owned_refs() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let acme = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
assert!(bob.handle.seed(acme, Scope::Followed).unwrap());
assert!(bob.handle.follow(alice.id, None).unwrap());
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
log::debug!(target: "test", "Fetch complete with {}", bob.id);
alice.issue(acme, Title::new("Bug").unwrap(), "Bugs, bugs, bugs");
let before = alice
.storage
.repository(acme)
.unwrap()
.references_of(&alice.id)
.unwrap();
// Fetch shouldn't prune any of our own refs.
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
let (updated, _) = result.success().unwrap();
assert_eq!(updated, vec![]);
let after = alice
.storage
.repository(acme)
.unwrap()
.references_of(&alice.id)
.unwrap();
assert_eq!(before, after);
}
#[test]
fn test_clone() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let acme = bob.project("acme", "");
let mut alice = alice.spawn();
let bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
transport::local::register(alice.storage.clone());
let _ = alice.handle.seed(acme, Scope::All).unwrap();
let seeds = alice.handle.seeds_for(acme, None).unwrap();
assert!(seeds.is_connected(&bob.id));
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
rad::fork(acme, &alice.signer, &alice.storage).unwrap();
let working = rad::checkout(
acme,
alice.signer.public_key(),
tmp.path().join("clone"),
&alice.storage,
false,
)
.unwrap();
// Makes test finish faster.
drop(alice);
let head = working.head().unwrap();
let oid = head.target().unwrap();
let (_, canonical) = bob
.storage
.repository(acme)
.unwrap()
.canonical_head()
.unwrap();
assert_eq!(canonical, oid);
// Make sure that bob has refs/rad/id set
assert!(
bob.storage
.repository(acme)
.unwrap()
.identity_head()
.is_ok()
);
}
#[test]
fn test_fetch_up_to_date() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let acme = bob.project("acme", "");
let mut alice = alice.spawn();
let bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
transport::local::register(alice.storage.clone());
let _ = alice.handle.seed(acme, Scope::All).unwrap();
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
// Fetch again! This time, everything's up to date.
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert_matches!(
result.success(),
Some((updates, _fetched)) if updates.iter().all(|update| matches!(update, RefUpdate::Skipped { .. }))
);
}
#[test]
fn test_fetch_unseeded() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let acme = bob.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
transport::local::register(alice.storage.clone());
let _ = alice.handle.seed(acme, Scope::All).unwrap();
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
// Bob stops seeding the repository
assert!(bob.handle.unseed(acme).unwrap());
// Alice attempts to fetch but is unauthorized
let result = alice
.handle
.fetch(acme, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert_matches!(result, FetchResult::Failed { .. });
}
#[test]
fn test_large_fetch() {
let tmp = tempfile::tempdir().unwrap();
let scale = config::scale();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let (repo, _) = fixtures::repository(tmp.path());
fixtures::populate(&repo, scale.max(3));
let rid = alice.project_from("acme", "", &repo);
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let bob_events = bob.handle.events();
bob.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
bob_events
.wait(
|e| {
matches!(e, Event::RefsFetched { updated, .. } if !updated.is_empty()).then_some(())
},
time::Duration::from_secs(9 * scale as u64),
)
.unwrap();
let doc = bob.storage.repository(rid).unwrap().identity_doc().unwrap();
let proj = doc.project().unwrap();
assert_eq!(proj.name(), "acme");
}
#[test]
fn test_concurrent_fetches() {
let tmp = tempfile::tempdir().unwrap();
let scale = config::scale();
let repos = scale.max(4);
let limits = Limits {
// By setting fetch concurrency to one less than the total number of repos,
// we guarantee that at least one fetch will be queued while the others
// are in progress.
fetch_concurrency: (repos - 1).into(),
..Limits::default()
};
let mut bob_repos = HashSet::new();
let mut alice_repos = HashSet::new();
let mut alice = Node::init(
tmp.path(),
radicle::node::config::Config {
limits: limits.clone(),
relay: radicle::node::config::Relay::Always,
..config::relay("alice")
},
);
let mut bob = Node::init(
tmp.path(),
radicle::node::config::Config {
limits,
relay: radicle::node::config::Relay::Always,
..config::relay("bob")
},
);
for i in 0..repos {
// Create a repo for Alice.
let tmp = tempfile::tempdir().unwrap();
let (repo, _) = fixtures::repository(tmp.path());
fixtures::populate(&repo, scale);
let rid = alice.project_from(&format!("alice-{i}"), "", &repo);
alice_repos.insert(rid);
// Create a repo for Bob.
let tmp = tempfile::tempdir().unwrap();
let (repo, _) = fixtures::repository(tmp.path());
fixtures::populate(&repo, scale);
let rid = bob.project_from(&format!("bob-{i}"), "", &repo);
bob_repos.insert(rid);
}
// Clone repositories list for assertions so we don't assert over an empty set.
let all_alice_repos = alice_repos.clone();
let all_bob_repos = bob_repos.clone();
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let alice_events = alice.handle.events();
let bob_events = bob.handle.events();
for rid in &bob_repos {
alice.handle.seed(*rid, Scope::All).unwrap();
}
for rid in &alice_repos {
bob.handle.seed(*rid, Scope::All).unwrap();
}
alice.connect(&bob);
while !bob_repos.is_empty() {
match alice_events.recv().unwrap() {
// We're looking for a `RefsFetched` event, which signals a completed fetch.
// We also ensure that `updated` is not empty, meaning data was actually received.
Event::RefsFetched { rid, updated, .. } if !updated.is_empty() => {
// Once a repo is fetched, remove it from our tracking set.
bob_repos.remove(&rid);
log::debug!(target: "test", "{} fetched {rid} ({} left)",alice.id, bob_repos.len());
}
_ => {}
}
}
while !alice_repos.is_empty() {
match bob_events.recv().unwrap() {
Event::RefsFetched { rid, updated, .. } if !updated.is_empty() => {
// Once a repo is fetched, remove it from our tracking set.
alice_repos.remove(&rid);
log::debug!(target: "test", "{} fetched {rid} ({} left)", bob.id, alice_repos.len());
}
_ => {}
}
}
// Positively assert empty sets, not necessary but proves test was previously broken.
assert!(bob_repos.is_empty());
assert!(alice_repos.is_empty());
for rid in &all_bob_repos {
let doc = alice
.storage
.repository(*rid)
.unwrap()
.identity_doc()
.unwrap();
let proj = doc.project().unwrap();
assert!(proj.name().starts_with("bob"));
}
for rid in &all_alice_repos {
let doc = bob
.storage
.repository(*rid)
.unwrap()
.identity_doc()
.unwrap();
let proj = doc.project().unwrap();
assert!(proj.name().starts_with("alice"));
}
}
#[test]
fn test_connection_crossing() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let alice = alice.spawn();
let bob = bob.spawn();
let preferred = alice.id.max(bob.id);
log::debug!(target: "test", "Preferred peer is {preferred}");
let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
let b1 = barrier.clone();
let b2 = barrier.clone();
let t1 = thread::spawn({
let mut alice = alice.handle.clone();
move || {
b1.wait();
alice
.connect(bob.id, bob.addr.into(), ConnectOptions::default())
.unwrap()
}
});
let t2 = thread::spawn({
let mut bob = bob.handle.clone();
move || {
b2.wait();
bob.connect(alice.id, alice.addr.into(), ConnectOptions::default())
.unwrap()
}
});
let r1 = t1.join().unwrap();
let r2 = t2.join().unwrap();
// Note that the non-preferred peer will have their outbound connection fail, and this
// could already show up as the result of the call here (but not always).
if preferred == alice.id {
assert_matches!(r1, ConnectResult::Connected);
} else {
assert_matches!(r2, ConnectResult::Connected);
}
let mut iterations = 0;
let (alice_s, bob_s, s1, s2) = loop {
let alice_s = alice.handle.sessions().unwrap();
let bob_s = bob.handle.sessions().unwrap();
let s1 = alice_s.iter().find(|s| s.nid == bob.id).cloned();
let s2 = bob_s.iter().find(|s| s.nid == alice.id).cloned();
if let (Some(s1), Some(s2)) = (s1, s2) {
// Wait until both sessions are fully connected
if s1.state.is_connected() && s2.state.is_connected() {
break (alice_s, bob_s, s1, s2);
}
}
iterations += 1;
if iterations >= 100 {
panic!("Timeout waiting for sessions to connect");
}
thread::sleep(time::Duration::from_millis(50));
};
// We assert that they have opposite link directions.
// In a true simultaneous crossing, the preferred peer wins the Outbound link.
// However, due to OS thread scheduling and reactor event ordering, one peer
// might fully establish the connection before the other even processes the dial command.
// In all valid cases (crossing or sequential), exactly one is Outbound and one is Inbound.
assert_ne!(
s1.link, s2.link,
"One must be Inbound and the other Outbound"
);
assert_eq!(alice_s.len(), 1);
assert_eq!(bob_s.len(), 1);
}
#[test]
/// Alice is going to try to fetch outdated refs of Bob, from Eve. This is a non-fastfoward fetch
/// on the sigrefs branch.
fn test_non_fastforward_sigrefs() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let eve = Node::init(tmp.path(), config::relay("eve"));
let rid = bob.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let mut eve = eve.spawn();
alice.handle.seed(rid, Scope::All).unwrap();
eve.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
alice.connect(&eve);
eve.connect(&bob);
converge([&alice, &bob, &eve]);
// Eve fetches the initial project from Bob.
eve.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
// Alice fetches it too.
let old_bob = alice
.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
let bob_sigrefs = bob
.storage
.repository(rid)
.unwrap()
.reference_oid(&bob.id, &radicle::storage::refs::SIGREFS_BRANCH)
.unwrap();
let up = old_bob
.find_updated(
&(*radicle::storage::refs::Special::SignedRefs.namespaced(&bob.id)).to_ref_string(),
)
.unwrap();
let old_bob = match up {
RefUpdate::Created { oid, .. } => oid,
RefUpdate::Skipped { oid, .. } => oid,
_ => panic!("rad/sigrefs should have been created or skipped: {up:?}"),
};
assert_eq!(bob_sigrefs, old_bob);
// Log the before Oid value of bob's 'rad/sigrefs', for debugging purposes.
{
let before = alice
.storage
.repository(rid)
.unwrap()
.reference_oid(&bob.id, &radicle::storage::refs::SIGREFS_BRANCH)
.unwrap();
log::debug!(target: "test", "bob's old 'rad/sigrefs': {before}");
}
// Now Eve disconnects from Bob so she doesn't fetch his update.
eve.handle
.command(service::Command::Disconnect(bob.id))
.unwrap();
// Bob updates his refs.
bob.issue(
rid,
Title::new("Updated Sigrefs").unwrap(),
"Updated sigrefs are harshing my vibes",
);
// Alice fetches from Bob.
let new_bob = alice
.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
let bob_sigrefs = bob
.storage
.repository(rid)
.unwrap()
.reference_oid(&bob.id, &radicle::storage::refs::SIGREFS_BRANCH)
.unwrap();
let up = new_bob
.find_updated(
&(*radicle::storage::refs::Special::SignedRefs.namespaced(&bob.id)).to_ref_string(),
)
.unwrap();
let new_bob = match up {
RefUpdate::Updated { new, .. } => new,
// FIXME: Really it shouldn't be skipped but let's see what happens
RefUpdate::Skipped { oid, .. } => oid,
_ => panic!("rad/sigrefs should have been updated {up:?}"),
};
assert_eq!(bob_sigrefs, new_bob);
// Log the after Oid value of bob's 'rad/sigrefs', for debugging purposes.
{
let after = alice
.storage
.repository(rid)
.unwrap()
.reference_oid(&bob.id, &radicle::storage::refs::SIGREFS_BRANCH)
.unwrap();
log::debug!(target: "test", "bob's new 'rad/sigrefs': {after}");
}
assert_matches!(
alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT, None).unwrap(),
FetchResult::Success { updated, .. }
if updated.iter().all(|u| u.is_skipped())
);
}
#[test]
fn test_outdated_sigrefs() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let eve = Node::init(tmp.path(), config::relay("eve"));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let mut eve = eve.spawn();
bob.handle.seed(rid, Scope::All).unwrap();
eve.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
bob.connect(&eve);
eve.connect(&alice);
converge([&alice, &bob, &eve]);
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(bob.storage.contains(&rid).unwrap());
rad::fork(rid, &bob.signer, &bob.storage).unwrap();
eve.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(eve.storage.contains(&rid).unwrap());
rad::fork(rid, &eve.signer, &eve.storage).unwrap();
alice
.handle
.follow(eve.id, Some(Alias::new("eve")))
.unwrap();
alice
.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap();
let repo = alice.storage.repository(rid).unwrap();
assert!(repo.remote(&eve.id).is_ok());
log::debug!(target: "test", "Bob fetches from Eve..");
assert_matches!(
bob.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = bob.storage.repository(rid).unwrap();
let eve_remote = repo.remote(&eve.id).unwrap();
let old_refs = eve_remote.refs;
// At this stage, Alice and Bob have Eve's fork and Eve does not
// have Bob's fork
let issue_id = eve.issue(
rid,
Title::new("Outdated Sigrefs").unwrap(),
"Outdated sigrefs are harshing my vibes",
);
let repo = eve.storage.repository(rid).unwrap();
let eves_refs = repo.remote(&eve.id).unwrap().refs;
// Get the current state of eve's refs in alice's storage
log::debug!(target: "test", "Alice fetches from Eve..");
assert_matches!(
alice
.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = alice.storage.repository(rid).unwrap();
let issues = issue::Issues::open(&repo, WriteAs::new(&alice.signer)).unwrap();
assert!(
issues.get(&issue_id).unwrap().is_some(),
"Alice did not fetch issue {issue_id}"
);
let eve_remote = repo.remote(&eve.id).unwrap();
let eves_refs_expected = eve_remote.refs;
assert_ne!(eves_refs_expected, old_refs);
assert_eq!(eves_refs_expected, eves_refs);
log::debug!(target: "test", "Alice fetches from Bob..");
alice
.handle
.follow(bob.id, Some(Alias::new("bob")))
.unwrap();
assert_matches!(
alice
.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
// Ensure that Eve's refs have not changed after fetching the old refs from Bob.
let repo = alice.storage.repository(rid).unwrap();
let eve_remote = repo.remote(&eve.id).unwrap();
let eves_refs = eve_remote.refs;
assert_ne!(eves_refs, old_refs);
assert_eq!(eves_refs_expected, eves_refs);
}
#[test]
fn test_outdated_delegate_sigrefs() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let eve = Node::init(tmp.path(), config::relay("eve"));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let mut eve = eve.spawn();
bob.handle.seed(rid, Scope::All).unwrap();
eve.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
bob.connect(&eve);
eve.connect(&alice);
converge([&alice, &bob, &eve]);
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(bob.storage.contains(&rid).unwrap());
rad::fork(rid, &bob.signer, &bob.storage).unwrap();
eve.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(eve.storage.contains(&rid).unwrap());
rad::fork(rid, &eve.signer, &eve.storage).unwrap();
alice
.handle
.follow(eve.id, Some(Alias::new("eve")))
.unwrap();
alice
.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap();
let repo = alice.storage.repository(rid).unwrap();
assert!(repo.remote(&eve.id).is_ok());
log::debug!(target: "test", "Bob fetches from Eve..");
assert_matches!(
bob.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = bob.storage.repository(rid).unwrap();
let alice_remote = repo.remote(&alice.id).unwrap();
let old_refs = alice_remote.refs;
// At this stage, Alice and Bob have Eve's fork and Eve does not
// have Bob's fork
alice.issue(
rid,
Title::new("Outdated Sigrefs").unwrap(),
"Outdated sigrefs are harshing my vibes",
);
let repo = alice.storage.repository(rid).unwrap();
let alice_refs = repo.remote(&alice.id).unwrap().refs;
// Get the current state of eve's refs in alice's storage
log::debug!(target: "test", "Alice fetches from Eve..");
assert_matches!(
eve.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = eve.storage.repository(rid).unwrap();
let alice_remote = repo.remote(&alice.id).unwrap();
let alice_refs_expected = alice_remote.refs;
assert_ne!(alice_refs_expected, old_refs);
assert_eq!(alice_refs_expected, alice_refs);
log::debug!(target: "test", "Alice fetches from Bob..");
eve.handle.follow(bob.id, Some(Alias::new("bob"))).unwrap();
assert_matches!(
eve.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
// Ensure that Eve's refs have not changed after fetching the old refs from Bob.
let repo = eve.storage.repository(rid).unwrap();
let alice_remote = repo.remote(&alice.id).unwrap();
let alice_refs = alice_remote.refs;
assert_ne!(alice_refs, old_refs);
assert_eq!(alice_refs_expected, alice_refs);
}
#[test]
fn missing_default_branch() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.handle.seed(rid, Scope::All).unwrap();
bob.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
converge([&alice, &bob]);
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(bob.storage.contains(&rid).unwrap());
// Fetching from still works despite not having
// `refs/heads/master`, but has `rad/sigrefs`.
bob.issue(
rid,
Title::new("Hello, Acme").unwrap(),
"Popping in to say hello",
);
alice
.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap();
{
let repo = bob.storage.repository(rid).unwrap();
assert!(repo.canonical_head().is_ok());
assert!(repo.canonical_identity_doc().is_ok());
assert!(repo.head().is_ok());
}
// If for some reason Alice managed to delete her master reference
{
let repo = alice.storage.repository_mut(rid).unwrap();
let mut r = repo
.backend
.find_reference(&format!("refs/namespaces/{}/refs/heads/master", alice.id))
.unwrap();
r.delete().unwrap();
repo.sign_refs(&alice.signer).unwrap();
}
// Fetching from her will still succeed.
assert_matches!(
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = bob.storage.repository(rid).unwrap();
// The canonical head cannot be computed, though.
assert!(repo.canonical_head().is_err());
}
#[test]
fn missing_delegate_default_branch() {
use radicle::identity::Identity;
use radicle::storage::git::Repository;
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let seed = Node::init(tmp.path(), config::relay("seed"));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let mut seed = seed.spawn();
let bob_events = bob.handle.events();
alice.handle.seed(rid, Scope::All).unwrap();
bob.handle.seed(rid, Scope::All).unwrap();
seed.handle.seed(rid, Scope::All).unwrap();
alice.connect(&seed);
converge([&seed]);
bob.connect(&seed);
bob.handle
.fetch(rid, seed.id, DEFAULT_TIMEOUT, None)
.unwrap();
bob_events
.wait(
|e| {
matches!(e, Event::RefsFetched { updated, .. } if !updated.is_empty()).then_some(())
},
DEFAULT_TIMEOUT,
)
.unwrap();
assert!(bob.storage.contains(&rid).unwrap());
let bob_key = *bob.signer.public_key();
// Helper to assert that Bob's default branch is not in storage
let assert_bobs_default_is_missing = |repo: &Repository| {
let doc = repo.identity_doc().unwrap();
let project = doc.project().unwrap();
let default_branch = repo.reference(
&bob_key,
&radicle::git::refs::branch(project.default_branch()),
);
assert!(matches!(
default_branch,
Err(e) if e.is_not_found()
));
};
// Add Bob as a delegate to the identity document
{
let repo = alice.storage.repository(rid).unwrap();
let mut identity = Identity::load_mut(&repo, &alice.signer).unwrap();
let doc = repo
.identity_doc()
.unwrap()
.doc
.with_edits(|doc| {
doc.delegate(bob.signer.public_key().into());
})
.unwrap();
let rev = identity
.update(Title::new("Add Bob").unwrap(), "", &doc)
.unwrap();
repo.set_identity_head_to(rev).unwrap();
let new = repo.identity_doc().unwrap().doc;
assert!(
new.is_delegate(&bob.signer.public_key().into()),
"Bob must be a delegate after the update"
);
}
// We ensure that Bob does not have the default branch
let repo = bob.storage.repository(rid).unwrap();
assert_bobs_default_is_missing(&repo);
// Create an issue to ensure there are new refs to fetch
let issue = bob.issue(
rid,
Title::new("Delegate Issue").unwrap(),
"Further investigation into delegates",
);
let assert_bobs_issue_exists = |repo: &Repository| {
let issue_ref = radicle::git::refs::storage::cob(
bob.signer.public_key(),
&radicle::cob::issue::TYPENAME,
&issue,
);
assert!(repo.backend.find_reference(issue_ref.as_str()).is_ok(),);
};
// The seed fetches from Bob and checks that:
// a) Bob's default branch is still missing
// b) Bob's issue is there
assert_matches!(
seed.handle
.fetch(rid, bob.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
{
let repo = seed.storage.repository(rid).unwrap();
assert_bobs_default_is_missing(&repo);
assert_bobs_issue_exists(&repo);
}
// Do the same for Alice
assert_matches!(
alice
.handle
.fetch(rid, seed.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
{
let repo = alice.storage.repository(rid).unwrap();
assert_bobs_default_is_missing(&repo);
assert_bobs_issue_exists(&repo);
}
// Check that Bob can still fetch from the seed
assert_matches!(
bob.handle
.fetch(rid, seed.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
}
#[test]
fn test_background_foreground_fetch() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let eve = Node::init(tmp.path(), config::relay("eve"));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let alice_events = alice.handle.events();
let mut bob = bob.spawn();
let mut eve = eve.spawn();
bob.handle.seed(rid, Scope::Followed).unwrap();
eve.handle.seed(rid, Scope::Followed).unwrap();
alice.connect(&bob);
alice.connect(&eve);
converge([&alice, &bob, &eve]);
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(bob.storage.contains(&rid).unwrap());
rad::fork(rid, &bob.signer, &bob.storage).unwrap();
eve.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(eve.storage.contains(&rid).unwrap());
rad::fork(rid, &eve.signer, &eve.storage).unwrap();
// Alice fetches Eve's fork and we make note of the sigrefs
alice
.handle
.follow(eve.id, Some(Alias::new("eve")))
.unwrap();
alice
.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap();
let repo = alice.storage.repository(rid).unwrap();
assert!(repo.remote(&eve.id).is_ok());
let repo = alice.storage.repository(rid).unwrap();
let eve_remote = repo.remote(&eve.id).unwrap();
let old_refs = eve_remote.refs;
// Eve creates an issue, updating their refs, and we make note of
// the new refs
eve.issue(
rid,
Title::new("Outdated Sigrefs").unwrap(),
"Outdated sigrefs are harshing my vibes",
);
let repo = eve.storage.repository(rid).unwrap();
let eves_refs = repo.remote(&eve.id).unwrap().refs;
// Alice follows Bob and they make a new change and announce it,
// this initiates a background fetch for Alice from Bob
alice
.handle
.follow(bob.id, Some(Alias::new("bob")))
.unwrap();
bob.issue(
rid,
Title::new("Concurrent fetches").unwrap(),
"Concurrent fetches are harshing my vibes",
);
bob.handle.announce_refs_for(rid, [bob.id]).unwrap();
alice_events
.wait(
|e| matches!(e, Event::RefsAnnounced { .. }).then_some(()),
DEFAULT_TIMEOUT,
)
.unwrap();
// Alice initiates a fetch from Eve and we ensure that we get the
// updated refs from Eve, and the fetch from Bob should not
// interfere
log::debug!(target: "test", "Alice fetches from Eve..");
assert_matches!(
alice
.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = alice.storage.repository(rid).unwrap();
let eve_remote = repo.remote(&eve.id).unwrap();
let eves_refs_expected = eve_remote.refs;
assert_ne!(eves_refs_expected, old_refs);
assert_eq!(eves_refs_expected, eves_refs);
}
#[test]
/// Alice is offline while Bob pushes some changes to the repo. When Alice reconnects,
/// she is made aware of the changes via the `subscribe` message, and fetches from the seed.
fn test_catchup_on_refs_announcements() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let bob_id = bob.id;
let seed = Node::init(tmp.path(), config::relay("seed"));
let acme = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let mut seed = seed.spawn();
bob.handle.seed(acme, Scope::All).unwrap();
seed.handle.seed(acme, Scope::All).unwrap();
alice.connect(&seed);
seed.has_repository(&acme);
alice.disconnect(&seed);
bob.connect(&seed);
bob.has_repository(&acme);
log::debug!(target: "test", "Bob creating his issue..");
bob.issue(acme, Title::new("Bob's issue").unwrap(), "[..]");
bob.handle.announce_refs_for(acme, [bob.id]).unwrap();
log::debug!(target: "test", "Waiting for seed to fetch Bob's refs from Bob..");
seed.has_remote_of(&acme, &bob.id); // Seed fetches Bob's refs.
bob.disconnect(&seed);
bob.shutdown();
log::debug!(target: "test", "Alice re-connects to the seed..");
alice.connect(&seed);
alice.has_remote_of(&acme, &bob_id);
}
#[test]
fn test_multiple_offline_inits() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let acme = alice.project("acme", "");
let radcliffe = alice.project("radcliffe", "");
let cobs = alice.project("cobs", "");
let projects = [acme, radcliffe, cobs];
let mut alice = alice.spawn();
let mut bob = bob.spawn();
for rid in &projects {
bob.handle.seed(*rid, Scope::All).unwrap();
}
alice.connect(&bob).converge([&bob]);
for repo in bob.storage.repositories().unwrap() {
assert!(projects.contains(&repo.rid), "Bob is missing {}", repo.rid);
}
}
#[test]
fn test_channel_reader_limit() {
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let limits = radicle::node::config::Limits {
fetch_pack_receive: radicle::node::config::FetchPackSizeLimit::bytes(1000),
..radicle::node::config::Limits::default()
};
let bob = Node::init(
tmp.path(),
Config {
limits,
..config::relay("bob")
},
);
let acme = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
let updated = bob.handle.seed(acme, Scope::All).unwrap();
assert!(updated);
let result = bob
.handle
.fetch(acme, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(!result.is_success());
let FetchResult::Failed { reason } = result else {
panic!("fetch result must be failed")
};
// Either gitoxide will error by being unable to consume the packet, or the
// byte limit error will be returned
assert!(
reason.contains("Failed to consume the pack sent by the remote")
|| reason.contains("exceeded number of allowed bytes"),
"actual: {reason}"
);
}
#[test]
fn test_fetch_emits_canonical_ref_update() {
let tmp = tempfile::tempdir().unwrap();
let scale = config::scale();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let (repo, _) = fixtures::repository(tmp.path());
fixtures::populate(&repo, scale.max(3));
let rid = alice.project_from("acme", "", &repo);
let mut alice = alice.spawn();
let mut bob = bob.spawn();
let bob_events = bob.handle.events();
bob.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
let result = bob
.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(result.is_success());
let default_branch: git::fmt::Qualified = {
let repo = alice.storage.repository(rid).unwrap();
let proj = repo.project().unwrap();
git::fmt::lit::refs_heads(proj.default_branch()).into()
};
alice.commit_to(rid, &default_branch);
bob_events
.wait(
|e| {
matches!(e, Event::CanonicalRefUpdated { refname, .. } if *refname == default_branch)
.then_some(())
},
time::Duration::from_secs(9 * scale as u64),
)
.unwrap();
}
#[test]
fn test_non_fastforward_identity_doc() {
use radicle::identity::Identity;
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), Config::test(Alias::new("alice")));
let bob = Node::init(tmp.path(), Config::test(Alias::new("bob")));
let eve = Node::init(tmp.path(), Config::test(Alias::new("eve")));
let alice_laptop = Node::init(tmp.path(), Config::test(Alias::new("alice-laptop")));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let mut alice_laptop = alice_laptop.spawn();
let mut bob = bob.spawn();
let bob_events = bob.handle.events();
let mut eve = eve.spawn();
let has_issue = |node: &NodeHandle<MockSigner>, issue: &cob::ObjectId| -> bool {
let repo = node.storage.repository(rid).unwrap();
repo.contains(**issue).unwrap()
};
alice.connect(&alice_laptop);
alice.connect(&bob);
alice.connect(&eve);
eve.connect(&bob);
eve.connect(&alice_laptop);
// Due to permissive relaying, we need to lock down the scope for the RID.
//
// See: [`radicle-protocol::service::Service::relay()`] and
// [`radicle-protocol::service::Service::relay_announcement()`]
alice.handle.seed(rid, Scope::Followed).unwrap();
// Bob and Eve have the same state for the repository
bob.handle.seed(rid, Scope::Followed).unwrap();
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
alice_laptop.handle.seed(rid, Scope::All).unwrap();
alice_laptop
.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
// Alice pushes new references to her laptop
let issue = alice_laptop.issue(
rid,
"Feature #1".parse().unwrap(),
"Implementing new feature",
);
// Eve will fetch these references since her scope is "all"
eve.handle.seed(rid, Scope::All).unwrap();
eve.handle
.fetch(rid, alice_laptop.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(has_issue(&eve, &issue));
bob_events
.wait(
|e| matches!(e, Event::RefsAnnounced { nid, .. } if *nid == eve.id).then_some(()),
DEFAULT_TIMEOUT,
)
.unwrap();
// Alice updates the identity of the document to include her laptop
let (prev, next) = {
let repo = alice.storage.repository(rid).unwrap();
let mut identity = Identity::load_mut(&repo, &alice.signer).unwrap();
let prev = identity.current;
let doc = repo
.identity_doc()
.unwrap()
.doc
.with_edits(|raw| raw.delegate(alice_laptop.id.into()))
.unwrap();
let rev = identity
.update(Title::new("Add Laptop").unwrap(), "", &doc)
.unwrap();
repo.set_identity_head_to(rev).unwrap();
(prev, rev)
};
assert!(!has_issue(&alice, &issue));
// Bob fetches from Alice and we see the identity document was updated.
//
// Bob does not have the issue because Alice does not have the updates from
// Alice's Laptop.
let result = bob
.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(matches!(result, FetchResult::Success { .. }));
assert!(!has_issue(&bob, &issue));
let repo = bob.storage.repository(rid).unwrap();
let identity = Identity::load_mut(&repo, &bob.signer).unwrap();
assert_eq!(identity.current, next);
assert_eq!(identity.parent, Some(prev));
// Bob fetches from Eve, the identity document should remain the same, but
// since Bob now knows that Alice's Laptop is a delegate, the issue should
// be fetched.
bob.handle
.fetch(rid, eve.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(matches!(result, FetchResult::Success { .. }));
assert!(has_issue(&bob, &issue));
let repo = bob.storage.repository(rid).unwrap();
let identity = Identity::load_mut(&repo, &bob.signer).unwrap();
assert_eq!(identity.current, next);
assert_eq!(identity.parent, Some(prev));
}
#[test]
fn test_block_active_connection() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let mut alice = alice.spawn();
let bob = bob.spawn();
alice.connect(&bob);
converge([&alice, &bob]);
let events = alice.handle.events();
assert!(alice.handle.block(bob.id).unwrap());
events
.wait(
|e| matches!(e, Event::PeerDisconnected { nid, .. } if *nid == bob.id).then_some(()),
DEFAULT_TIMEOUT,
)
.unwrap();
let sessions = alice.handle.sessions().unwrap();
assert!(sessions.iter().all(|s| s.nid != bob.id));
}
#[test]
fn test_block_prevents_connection() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let mut alice = alice.spawn();
let mut bob = bob.spawn();
assert!(alice.handle.block(bob.id).unwrap());
let result = alice
.handle
.connect(bob.id, bob.addr.into(), ConnectOptions::default())
.unwrap();
assert_matches!(result, ConnectResult::Disconnected { .. });
let events = alice.handle.events();
bob.connect(&alice);
// Alice receives Bob's inbound connection, but disconnects from him.
events
.wait(
|e| matches!(e, Event::PeerDisconnected { nid, .. } if *nid == bob.id).then_some(()),
time::Duration::from_secs(10),
)
.unwrap();
let sessions = alice.handle.sessions().unwrap();
assert!(sessions.iter().all(|s| s.nid != bob.id));
}
#[test]
fn test_block_prevents_fetch() {
let tmp = tempfile::tempdir().unwrap();
let alice = Node::init(tmp.path(), config::relay("alice"));
let mut bob = Node::init(tmp.path(), config::relay("bob"));
let rid = bob.project("acme", "");
let mut alice = alice.spawn();
let bob = bob.spawn();
assert!(alice.handle.block(bob.id).unwrap());
let result = alice
.handle
.fetch(rid, bob.id, time::Duration::from_secs(5), None)
.unwrap();
assert_matches!(result, FetchResult::Failed { .. });
}
#[test]
fn fetch_does_not_contain_rad_sigrefs_parent() {
use radicle::storage::refs::SIGREFS_PARENT;
let tmp = tempfile::tempdir().unwrap();
let mut alice = Node::init(tmp.path(), config::relay("alice"));
let bob = Node::init(tmp.path(), config::relay("bob"));
let rid = alice.project("acme", "");
let mut alice = alice.spawn();
let mut bob = bob.spawn();
bob.handle.seed(rid, Scope::All).unwrap();
alice.connect(&bob);
converge([&alice, &bob]);
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap();
assert!(bob.storage.contains(&rid).unwrap());
rad::fork(rid, &bob.signer, &bob.storage).unwrap();
let issue_id = alice.issue(
rid,
Title::new("No rad/sigrefs-parent").unwrap(),
"sigrefs are harshing my vibes",
);
let repo = alice.storage.repository(rid).unwrap();
let alice_signed_refs = repo.remote(&alice.id).unwrap().refs;
log::debug!(target: "test", "Bob fetches from Alice..");
assert_matches!(
bob.handle
.fetch(rid, alice.id, DEFAULT_TIMEOUT, None)
.unwrap(),
FetchResult::Success { .. }
);
let repo = bob.storage.repository(rid).unwrap();
let issues = issue::Issues::open(&repo, ReadOnly).unwrap();
assert!(
issues.get(&issue_id).unwrap().is_some(),
"Bob did not fetch issue {issue_id}"
);
let repo = bob.storage.repository(rid).unwrap();
let alice_remote = repo.remote(&alice.id).unwrap();
assert_eq!(alice_signed_refs.refs(), alice_remote.refs());
assert!(alice_remote.refs().get(&SIGREFS_PARENT).is_none());
}