Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-protocol src fetcher test state command fetch.rs
use std::time::Duration;

use radicle::test::arbitrary;
use radicle_core::{NodeId, RepoId};

use crate::fetcher::state::{command, event};
use crate::fetcher::test::state::helpers;
use crate::fetcher::{ActiveFetch, FetcherState};
use crate::fetcher::{FetchConfig, RefsToFetch};

#[test]
fn fetch_start_first_fetch_for_node() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let refs_1 = helpers::gen_refs(2);
    let config = FetchConfig::default();

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });

    assert_eq!(
        event,
        event::Fetch::Started {
            rid: repo_1,
            from: node_a,
            refs: refs_1.clone(),
            config,
        }
    );
    assert_eq!(
        state.get_active_fetch(&repo_1),
        Some(&ActiveFetch {
            from: node_a,
            refs: refs_1,
        })
    );
}

#[test]
fn fetch_different_repo_same_node_within_capacity() {
    let mut state = FetcherState::new(helpers::config(2, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let repo_2: RepoId = arbitrary::r#gen(1);
    let config = FetchConfig::default();

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
        config,
    });
    assert!(matches!(event1, event::Fetch::Started { .. }));

    let event2 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
        config,
    });

    assert!(matches!(event2, event::Fetch::Started { rid, .. } if rid == repo_2));
    assert!(state.get_active_fetch(&repo_1).is_some());
    assert!(state.get_active_fetch(&repo_2).is_some());
}

#[test]
fn fetch_same_repo_different_nodes_queues_second() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let node_b: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let refs_1 = helpers::gen_refs(1);
    let config = FetchConfig::default();

    let event1 = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });
    assert!(matches!(event1, event::Fetch::Started { .. }));

    // Same repo from different node - gets queued since repo_1 is already active
    let event2 = state.fetch(command::Fetch {
        from: node_b,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });

    assert!(
        matches!(event2, event::Fetch::Queued { rid, from } if rid == repo_1 && from == node_b)
    );
    // Only node_a's fetch is active
    let active = state.get_active_fetch(&repo_1);
    assert!(active.is_some());
    assert_eq!(*active.unwrap().from(), node_a);
}

#[test]
fn fetch_duplicate_returns_already_fetching() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let refs_1 = helpers::gen_refs(2);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });

    assert_eq!(
        event,
        event::Fetch::AlreadyFetching {
            rid: repo_1,
            from: node_a,
        }
    );
}

#[test]
fn fetch_same_repo_different_refs_enqueues() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let refs_1 = helpers::gen_refs(1);
    let refs_2 = helpers::gen_refs(2);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_2.clone(),
        config,
    });

    assert_eq!(
        event,
        event::Fetch::Queued {
            rid: repo_1,
            from: node_a,
        }
    );
}

#[test]
fn fetch_at_capacity_enqueues() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let repo_2: RepoId = arbitrary::r#gen(1);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
        config,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
        config,
    });

    assert_eq!(
        event,
        event::Fetch::Queued {
            rid: repo_2,
            from: node_a,
        }
    );
    assert!(state.get_active_fetch(&repo_1).is_some());
    assert!(state.get_active_fetch(&repo_2).is_none());
}

#[test]
fn fetch_queue_rejected_capacity_reached() {
    let mut state = FetcherState::new(helpers::config(1, 2));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let repo_2: RepoId = arbitrary::r#gen(1);
    let repo_3: RepoId = arbitrary::r#gen(1);
    let repo_4: RepoId = arbitrary::r#gen(1);
    let config = FetchConfig::default();

    // Fill concurrency
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
        config,
    });

    // Fill queue (capacity 2)
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
        config,
    });
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_3,
        refs: helpers::gen_refs(1),
        config,
    });

    // Exceed queue capacity
    let refs_4 = helpers::gen_refs(1);
    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_4,
        refs: refs_4.clone(),
        config,
    });

    assert_eq!(
        event,
        event::Fetch::QueueAtCapacity {
            rid: repo_4,
            from: node_a,
            refs: refs_4,
            config,
            capacity: 2,
        }
    );
}

#[test]
fn fetch_queue_merges_already_queued() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let repo_2: RepoId = arbitrary::r#gen(1);
    let refs_2a = helpers::gen_refs(1);
    let refs_2b = helpers::gen_refs(1);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
        config,
    });

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: refs_2a.clone(),
        config,
    });

    // Second fetch for same queued repo - should merge refs
    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: refs_2b.clone(),
        config,
    });

    // Returns Queued (merged)
    assert_eq!(
        event,
        event::Fetch::Queued {
            rid: repo_2,
            from: node_a,
        }
    );

    // Dequeue and verify refs were merged
    state.fetched(command::Fetched {
        from: node_a,
        rid: repo_1,
    });
    let queued = state.dequeue(&node_a).unwrap();
    assert_eq!(queued.rid, repo_2);
    // `queued.refs` should be the union of both sets of refs.
    assert_eq!(
        queued.refs.len(),
        Some(
            refs_2a
                .len()
                .unwrap()
                .saturating_add(refs_2b.len().unwrap().into())
        )
    );
}

#[test]
fn fetch_queue_merge_empty_refs_fetches_all() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let repo_2: RepoId = arbitrary::r#gen(1);
    let refs_2 = helpers::gen_refs(2);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
        config,
    });

    // Queue with specific refs
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: refs_2.clone(),
        config,
    });

    // Queue again with empty refs (fetch everything)
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: RefsToFetch::All,
        config,
    });

    // Dequeue and verify refs became empty (fetch all)
    state.fetched(command::Fetched {
        from: node_a,
        rid: repo_1,
    });
    let queued = state.dequeue(&node_a).unwrap();
    assert_eq!(queued.rid, repo_2);
    assert_eq!(queued.refs, RefsToFetch::All);
}

#[test]
fn fetch_queue_merge_takes_longer_timeout() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let repo_2: RepoId = arbitrary::r#gen(1);
    let short_timeout = Duration::from_secs(10);
    let long_timeout = Duration::from_secs(60);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: helpers::gen_refs(1),
        config: config.with_timeout(short_timeout),
    });

    // Queue with short timeout
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
        config: config.with_timeout(short_timeout),
    });

    // Queue again with longer timeout
    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_2,
        refs: helpers::gen_refs(1),
        config: config.with_timeout(long_timeout),
    });

    state.fetched(command::Fetched {
        from: node_a,
        rid: repo_1,
    });
    // Dequeue and verify timeout is the longer one
    let queued = state.dequeue(&node_a).unwrap();
    assert_eq!(queued.config.timeout(), long_timeout);
}

#[test]
fn fetch_after_previous_completed() {
    let mut state = FetcherState::new(helpers::config(1, 10));
    let node_a: NodeId = arbitrary::r#gen(1);
    let repo_1: RepoId = arbitrary::r#gen(1);
    let refs_1 = helpers::gen_refs(1);
    let config = FetchConfig::default();

    state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });
    state.fetched(command::Fetched {
        from: node_a,
        rid: repo_1,
    });

    let event = state.fetch(command::Fetch {
        from: node_a,
        rid: repo_1,
        refs: refs_1.clone(),
        config,
    });

    assert!(matches!(event, event::Fetch::Started { .. }));
}