Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Various fixes to node and CLI
Merged did:key:z6MksFqX...wzpT opened 2 years ago

See commits.

9 files changed +262 -115 921f9f2d 15d17098
modified radicle-cli/src/commands/node.rs
@@ -1,4 +1,5 @@
use std::ffi::OsString;
+
use std::path::PathBuf;
use std::time;

use anyhow::anyhow;
@@ -38,6 +39,7 @@ Usage
Start options

    --foreground         Start the node in the foreground
+
    --path <path>        Start node binary at path (default: radicle-node)
    --verbose, -v        Verbose output

Routing options
@@ -79,6 +81,7 @@ pub enum Operation {
    Start {
        foreground: bool,
        verbose: bool,
+
        path: PathBuf,
        options: Vec<OsString>,
    },
    Logs {
@@ -118,6 +121,7 @@ impl Args for Options {
        let mut lines: usize = 60;
        let mut count: usize = usize::MAX;
        let mut timeout = time::Duration::MAX;
+
        let mut path = None;
        let mut verbose = false;

        while let Some(arg) = parser.next()? {
@@ -166,6 +170,10 @@ impl Args for Options {
                Long("verbose") | Short('v') if matches!(op, Some(OperationName::Start)) => {
                    verbose = true;
                }
+
                Long("path") if matches!(op, Some(OperationName::Start)) => {
+
                    let val = parser.value()?;
+
                    path = Some(PathBuf::from(val));
+
                }
                Short('n') if matches!(op, Some(OperationName::Logs)) => {
                    lines = parser.value()?.parse()?;
                }
@@ -191,6 +199,7 @@ impl Args for Options {
                foreground,
                verbose,
                options,
+
                path: path.unwrap_or(PathBuf::from("radicle-node")),
            },
            OperationName::Status => Operation::Status,
            OperationName::Sessions => Operation::Sessions,
@@ -226,9 +235,10 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {
        Operation::Start {
            foreground,
            options,
+
            path,
            verbose,
        } => {
-
            control::start(node, !foreground, verbose, options, &profile)?;
+
            control::start(node, !foreground, verbose, options, &path, &profile)?;
        }
        Operation::Status => {
            control::status(&node, &profile)?;
modified radicle-cli/src/commands/node/control.rs
@@ -1,7 +1,7 @@
use std::ffi::OsString;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
-
use std::{process, thread, time};
+
use std::{path::Path, process, thread, time};

use localtime::LocalTime;

@@ -21,6 +21,7 @@ pub fn start(
    daemon: bool,
    verbose: bool,
    mut options: Vec<OsString>,
+
    cmd: &Path,
    profile: &Profile,
) -> anyhow::Result<()> {
    if node.is_running() {
@@ -54,7 +55,7 @@ pub fn start(
            .create(true)
            .open(profile.home.node().join("node.log"))?;

-
        let child = process::Command::new("radicle-node")
+
        let child = process::Command::new(cmd)
            .args(options)
            .envs(envs)
            .stdin(process::Stdio::null())
@@ -94,7 +95,7 @@ pub fn start(
            }
        }
    } else {
-
        let mut child = process::Command::new("radicle-node")
+
        let mut child = process::Command::new(cmd)
            .args(options)
            .envs(envs)
            .spawn()?;
modified radicle-node/src/service.rs
@@ -264,6 +264,8 @@ struct QueuedFetch {
    rid: RepoId,
    /// Peer being fetched from.
    from: NodeId,
+
    /// Refs being fetched.
+
    refs_at: Vec<RefsAt>,
    /// Result channel.
    channel: Option<chan::Sender<FetchResult>>,
}
@@ -791,8 +793,9 @@ where
        from: NodeId,
        refs: NonEmpty<RefsAt>,
        timeout: time::Duration,
+
        channel: Option<chan::Sender<FetchResult>>,
    ) {
-
        self._fetch(rid, from, refs.into(), timeout, None)
+
        self._fetch(rid, from, refs.into(), timeout, channel)
    }

    /// Initiate an outgoing fetch for some repository.
@@ -814,7 +817,7 @@ where
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
    ) {
-
        match self.try_fetch(rid, &from, refs_at, timeout) {
+
        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
            Ok(fetching) => {
                if let Some(c) = channel {
                    fetching.subscribe(c);
@@ -832,12 +835,22 @@ where
                    }
                } else {
                    debug!(target: "service", "Queueing fetch for {rid} with {from}..");
-
                    self.queue.push_back(QueuedFetch { rid, from, channel });
+
                    self.queue.push_back(QueuedFetch {
+
                        rid,
+
                        refs_at,
+
                        from,
+
                        channel,
+
                    });
                }
            }
            Err(TryFetchError::SessionCapacityReached) => {
                debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
-
                self.queue.push_back(QueuedFetch { rid, from, channel });
+
                self.queue.push_back(QueuedFetch {
+
                    rid,
+
                    refs_at,
+
                    from,
+
                    channel,
+
                });
            }
            Err(e) => {
                if let Some(c) = channel {
@@ -850,6 +863,7 @@ where
        }
    }

+
    // TODO: Buffer/throttle fetches.
    fn try_fetch(
        &mut self,
        rid: RepoId,
@@ -863,6 +877,8 @@ where
        };
        let fetching = self.fetching.entry(rid);

+
        trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");
+

        if let Entry::Occupied(fetching) = fetching {
            // We're already fetching this repo from some peer.
            return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
@@ -991,10 +1007,40 @@ where
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
    pub fn dequeue_fetch(&mut self) {
-
        if let Some(QueuedFetch { rid, from, channel }) = self.queue.pop_front() {
+
        while let Some(QueuedFetch {
+
            rid,
+
            from,
+
            refs_at,
+
            channel,
+
        }) = self.queue.pop_front()
+
        {
            debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");

-
            self.fetch(rid, from, FETCH_TIMEOUT, channel);
+
            // If no refs are specified, always do a full fetch.
+
            if refs_at.is_empty() {
+
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
+
                return;
+
            }
+

+
            let repo_entry = self
+
                .policies
+
                .seed_policy(&rid)
+
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");
+

+
            match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
+
                Ok(status) => {
+
                    if let Some(refs) = NonEmpty::from_vec(status.fresh) {
+
                        self.fetch_refs_at(rid, from, refs, FETCH_TIMEOUT, channel);
+
                        return;
+
                    } else {
+
                        debug!(target: "service", "Skipping dequeued fetch for {rid}, all refs are already in local storage");
+
                    }
+
                }
+
                Err(e) => {
+
                    error!(target: "service", "Error getting the refs status of {rid}: {e}");
+
                    return;
+
                }
+
            }
        }
    }

@@ -1218,7 +1264,7 @@ where
        match self.db.gossip_mut().announced(announcer, announcement) {
            Ok(fresh) => {
                if !fresh {
-
                    trace!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.time());
+
                    trace!(target: "service", "Ignoring stale announcement from {announcer} (t={})", self.time());
                    return Ok(false);
                }
            }
@@ -1330,13 +1376,15 @@ where
                    }
                }

-
                // TODO: Buffer/throttle fetches.
                let repo_entry = self.policies.seed_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo seeding configuration",
                );
-

                if repo_entry.policy == Policy::Allow {
-
                    let (fresh, stale) = match self.refs_status_of(message, &repo_entry.scope) {
+
                    let (fresh, stale) = match self.refs_status_of(
+
                        message.rid,
+
                        message.refs.clone().into(),
+
                        &repo_entry.scope,
+
                    ) {
                        Ok(RefsStatus { fresh, stale }) => (fresh, stale),
                        Err(e) => {
                            error!(target: "service", "Failed to check refs status: {e}");
@@ -1392,7 +1440,9 @@ where
                        // Finally, if there's anything to fetch, we fetch it from the
                        // remote.
                        if let Some(fresh) = NonEmpty::from_vec(fresh) {
-
                            self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT);
+
                            self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT, None);
+
                        } else {
+
                            debug!(target: "service", "Skipping fetch, all refs of {} are already in local storage", message.rid);
                        }
                    } else {
                        trace!(
@@ -1479,14 +1529,14 @@ where
    /// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
    fn refs_status_of(
        &self,
-
        message: &RefsAnnouncement,
+
        rid: RepoId,
+
        refs: Vec<RefsAt>,
        scope: &policy::Scope,
    ) -> Result<RefsStatus, Error> {
-
        let mut refs = message.refs_status(&self.storage)?;
+
        let mut refs = RefsStatus::new(rid, refs, &self.storage)?;

        // First, check the freshness.
        if refs.fresh.is_empty() {
-
            debug!(target: "service", "All refs of {} are already in local storage", &message.rid);
            return Ok(refs);
        }

@@ -1494,7 +1544,7 @@ where
        match scope {
            policy::Scope::All => Ok(refs),
            policy::Scope::Followed => {
-
                match self.policies.namespaces_for(&self.storage, &message.rid) {
+
                match self.policies.namespaces_for(&self.storage, &rid) {
                    Ok(Namespaces::All) => Ok(refs),
                    Ok(Namespaces::Followed(mut followed)) => {
                        // Get the set of followed nodes except self.
modified radicle-node/src/service/message.rs
@@ -176,6 +176,34 @@ pub struct RefsStatus {
}

impl RefsStatus {
+
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
+
    /// announcement.
+
    pub fn new<S: ReadStorage>(
+
        rid: RepoId,
+
        refs: Vec<RefsAt>,
+
        storage: S,
+
    ) -> Result<RefsStatus, storage::Error> {
+
        let repo = match storage.repository(rid) {
+
            // If the repo doesn't exist, we consider this
+
            // announcement "fresh", since we obviously don't
+
            // have the refs.
+
            Err(e) if e.is_not_found() => {
+
                return Ok(RefsStatus {
+
                    fresh: refs.clone(),
+
                    stale: Vec::new(),
+
                })
+
            }
+
            Err(e) => return Err(e),
+
            Ok(r) => r,
+
        };
+

+
        let mut status = RefsStatus::default();
+
        for theirs in refs.iter() {
+
            status.insert(*theirs, &repo)?;
+
        }
+
        Ok(status)
+
    }
+

    fn insert<S: ReadRepository>(
        &mut self,
        theirs: RefsAt,
@@ -216,32 +244,6 @@ impl RefsStatus {
    }
}

-
impl RefsAnnouncement {
-
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
-
    /// announcement.
-
    pub fn refs_status<S: ReadStorage>(&self, storage: S) -> Result<RefsStatus, storage::Error> {
-
        let repo = match storage.repository(self.rid) {
-
            // If the repo doesn't exist, we consider this
-
            // announcement "fresh", since we obviously don't
-
            // have the refs.
-
            Err(e) if e.is_not_found() => {
-
                return Ok(RefsStatus {
-
                    fresh: self.refs.clone().into(),
-
                    stale: Vec::new(),
-
                })
-
            }
-
            Err(e) => return Err(e),
-
            Ok(r) => r,
-
        };
-

-
        let mut status = RefsStatus::default();
-
        for theirs in self.refs.iter() {
-
            status.insert(*theirs, &repo)?;
-
        }
-
        Ok(status)
-
    }
-
}
-

/// Node announcing its inventory to the network.
/// This should be the whole inventory every time.
#[derive(Debug, Clone, PartialEq, Eq)]
modified radicle-node/src/test/peer.rs
@@ -11,7 +11,7 @@ use radicle::node::address::Store as _;
use radicle::node::Database;
use radicle::node::{address, Alias, ConnectOptions};
use radicle::rad;
-
use radicle::storage::refs::RefsAt;
+
use radicle::storage::refs::{RefsAt, SignedRefsAt};
use radicle::storage::{ReadRepository, RemoteRepository};
use radicle::Storage;

@@ -319,14 +319,22 @@ where
            }
        }

-
        let ann = AnnouncementMessage::from(RefsAnnouncement {
+
        self.announcement(RefsAnnouncement {
            rid,
            refs,
            timestamp: self.timestamp(),
-
        });
-
        let msg = ann.signed(self.signer());
+
        })
+
    }

-
        msg.into()
+
    pub fn announcement(&self, ann: impl Into<AnnouncementMessage>) -> Message {
+
        ann.into().signed(self.signer()).into()
+
    }
+

+
    pub fn signed_refs_at(&self, refs: Refs, at: radicle::git::Oid) -> SignedRefsAt {
+
        SignedRefsAt {
+
            sigrefs: refs.signed(self.signer()).unwrap(),
+
            at,
+
        }
    }

    pub fn connect_from(&mut self, peer: &Self) {
modified radicle-node/src/tests.rs
@@ -13,6 +13,7 @@ use radicle::node::address::Store;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
+
use radicle::storage::RefUpdate;

use crate::collections::{RandomMap, RandomSet};
use crate::crypto::test::signer::MockSigner;
@@ -38,7 +39,6 @@ use crate::test::peer;
use crate::test::peer::Peer;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};
-
use crate::test::storage as mock_storage;
use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
@@ -763,37 +763,24 @@ fn test_refs_announcement_followed() {

    // Create MockStorage for Alice and Bob. Both will have repo with `rid`.
    let storage_alice = arbitrary::nonempty_storage(1);
-
    let rid = *storage_alice.inventory.keys().next().unwrap();
+
    let rid = *storage_alice.repos.keys().next().unwrap();
    let storage_bob = storage_alice.clone();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);

-
    let refs = arbitrary::gen::<Refs>(8);
-
    let sigref_at = arbitrary::oid();
-
    let signed_refs = refs.signed(bob.signer()).unwrap();
    let node_id = alice.id;
-
    alice.storage_mut().insert_remote(
-
        rid,
+
    alice.storage_mut().repo_mut(&rid).remotes.insert(
        node_id,
-
        mock_storage::refs::SignedRefsAt {
-
            at: sigref_at,
-
            sigrefs: signed_refs,
-
        },
+
        bob.signed_refs_at(arbitrary::gen::<Refs>(8), arbitrary::oid()),
    );

    // Generate some refs for Bob under their own node_id.
-
    let refs = arbitrary::gen::<Refs>(8);
-
    let sigref_at = arbitrary::oid();
-
    let signed_refs = refs.signed(bob.signer()).unwrap();
+
    let sigrefs = bob.signed_refs_at(arbitrary::gen::<Refs>(8), arbitrary::oid());
    let node_id = bob.id;
-
    bob.storage_mut().insert_remote(
-
        rid,
-
        node_id,
-
        mock_storage::refs::SignedRefsAt {
-
            at: sigref_at,
-
            sigrefs: signed_refs,
-
        },
-
    );
+
    bob.storage_mut()
+
        .repo_mut(&rid)
+
        .remotes
+
        .insert(node_id, sigrefs);

    // Alice uses Scope::Followed, and did not track Bob yet.
    alice.connect_to(&bob);
@@ -827,7 +814,7 @@ fn test_refs_announcement_followed() {
#[test]
fn test_refs_announcement_no_subscribe() {
    let storage = arbitrary::nonempty_storage(1);
-
    let rid = *storage.inventory.keys().next().unwrap();
+
    let rid = *storage.repos.keys().next().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
@@ -1330,7 +1317,7 @@ fn test_fetch_missing_inventory_on_schedule() {
#[test]
fn test_queued_fetch_max_capacity() {
    let storage = arbitrary::nonempty_storage(3);
-
    let mut repo_keys = storage.inventory.keys();
+
    let mut repo_keys = storage.repos.keys();
    let rid1 = *repo_keys.next().unwrap();
    let rid2 = *repo_keys.next().unwrap();
    let rid3 = *repo_keys.next().unwrap();
@@ -1375,9 +1362,77 @@ fn test_queued_fetch_max_capacity() {
}

#[test]
-
fn test_queued_fetch_same_rid() {
+
fn test_queued_fetch_from_ann_same_rid() {
+
    let storage = arbitrary::nonempty_storage(3);
+
    let mut repo_keys = storage.repos.keys();
+
    let rid = *repo_keys.next().unwrap();
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let eve = Peer::new("eve", [9, 9, 9, 9]);
+
    let carol = Peer::new("carol", [10, 10, 10, 10]);
+
    let oid = arbitrary::oid();
+
    let ann = RefsAnnouncement {
+
        rid,
+
        refs: vec![RefsAt {
+
            remote: carol.id(),
+
            at: oid,
+
        }]
+
        .try_into()
+
        .unwrap(),
+
        timestamp: bob.timestamp(),
+
    };
+

+
    logger::init(log::Level::Trace);
+

+
    alice.seed(&rid, policy::Scope::All).unwrap();
+
    alice.connect_to(&bob);
+
    alice.connect_to(&eve);
+
    alice.connect_to(&carol);
+

+
    // Send the first announcement.
+
    alice.receive(bob.id, bob.announcement(ann.clone()));
+
    // Send the 2nd announcement that will be queued.
+
    alice.receive(eve.id, eve.announcement(ann.clone()));
+
    // Send the 3rd announcement that will be queued.
+
    alice.receive(carol.id, carol.announcement(ann));
+

+
    // The first fetch is initiated.
+
    assert_matches!(alice.fetches().next(), Some((rid_, nid_, _)) if rid_ == rid && nid_ == bob.id);
+
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
+
    assert_matches!(alice.outbox().next(), None);
+

+
    // Have enough time pass that Alice sends a "ping" to Bob.
+
    alice.elapse(KEEP_ALIVE_DELTA);
+

+
    let refname = carol
+
        .id()
+
        .to_namespace()
+
        .join(git::refname!("refs/sigrefs"));
+

+
    // Finish the 1st fetch.
+
    alice.storage_mut().repo_mut(&rid).remotes.insert(
+
        carol.id(),
+
        carol.signed_refs_at(arbitrary::gen::<Refs>(1), oid),
+
    );
+
    alice.fetched(
+
        rid,
+
        bob.id,
+
        Ok(fetch::FetchResult {
+
            updated: vec![RefUpdate::Created {
+
                name: refname.clone(),
+
                oid,
+
            }],
+
            namespaces: [carol.id()].into_iter().collect(),
+
        }),
+
    );
+
    // Now the 1st fetch is done, but the 2nd and 3rd fetches are redundant.
+
    assert_matches!(alice.fetches().next(), None);
+
}
+

+
#[test]
+
fn test_queued_fetch_from_command_same_rid() {
    let storage = arbitrary::nonempty_storage(3);
-
    let mut repo_keys = storage.inventory.keys();
+
    let mut repo_keys = storage.repos.keys();
    let rid1 = *repo_keys.next().unwrap();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
@@ -1604,9 +1659,9 @@ fn prop_inventory_exchange_dense() {
        let mut routing = RandomMap::with_hasher(rng.clone().into());

        for (inv, peer) in &[
-
            (alice_inv.inventory, alice.node_id()),
-
            (bob_inv.inventory, bob.node_id()),
-
            (eve_inv.inventory, eve.node_id()),
+
            (alice_inv.repos, alice.node_id()),
+
            (bob_inv.repos, bob.node_id()),
+
            (eve_inv.repos, eve.node_id()),
        ] {
            for id in inv.keys() {
                routing
modified radicle/src/node.rs
@@ -1,4 +1,5 @@
#![allow(clippy::type_complexity)]
+
#![allow(clippy::collapsible_if)]
mod features;

pub mod address;
@@ -897,13 +898,13 @@ impl Node {
        rid: RepoId,
        seeds: impl IntoIterator<Item = NodeId>,
        timeout: time::Duration,
-
        mut callback: impl FnMut(AnnounceEvent, &[PublicKey]) -> ControlFlow<()>,
+
        mut callback: impl FnMut(AnnounceEvent, &HashSet<PublicKey>) -> ControlFlow<()>,
    ) -> Result<AnnounceResult, Error> {
        let events = self.subscribe(timeout)?;
        let refs = self.announce_refs(rid)?;

        let mut unsynced = seeds.into_iter().collect::<BTreeSet<_>>();
-
        let mut synced = Vec::new();
+
        let mut synced = HashSet::new();
        let mut timeout: Vec<NodeId> = Vec::new();

        callback(AnnounceEvent::Announced, &synced);
@@ -915,10 +916,15 @@ impl Node {
                    rid: rid_,
                    at,
                }) if rid == rid_ && refs.at == at => {
+
                    log::debug!(target: "radicle", "Received {e:?}");
+

                    unsynced.remove(&remote);
-
                    synced.push(remote);
-
                    if callback(AnnounceEvent::RefsSynced { remote }, &synced).is_break() {
-
                        break;
+
                    // We can receive synced events from nodes we didn't directly announce to,
+
                    // and it's possible to receive duplicates as well.
+
                    if synced.insert(remote) {
+
                        if callback(AnnounceEvent::RefsSynced { remote }, &synced).is_break() {
+
                            break;
+
                        }
                    }
                }
                Ok(_) => {}
@@ -933,7 +939,10 @@ impl Node {
                break;
            }
        }
-
        Ok(AnnounceResult { timeout, synced })
+
        Ok(AnnounceResult {
+
            timeout,
+
            synced: synced.into_iter().collect(),
+
        })
    }
}

modified radicle/src/test/arbitrary.rs
@@ -1,4 +1,4 @@
-
use std::collections::{BTreeMap, BTreeSet, HashSet};
+
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::hash::Hash;
use std::ops::RangeBounds;
use std::str::FromStr;
@@ -68,7 +68,15 @@ pub fn vec<T: Eq + Arbitrary>(size: usize) -> Vec<T> {
pub fn nonempty_storage(size: usize) -> MockStorage {
    let mut storage = gen::<MockStorage>(size);
    for _ in 0..size {
-
        storage.inventory.insert(gen::<RepoId>(1), gen::<DocAt>(1));
+
        let id = gen::<RepoId>(1);
+
        storage.repos.insert(
+
            id,
+
            MockRepository {
+
                id,
+
                doc: gen::<DocAt>(1),
+
                remotes: HashMap::new(),
+
            },
+
        );
    }
    storage
}
modified radicle/src/test/storage.rs
@@ -16,12 +16,11 @@ use super::fixtures;
#[derive(Clone, Debug)]
pub struct MockStorage {
    pub path: PathBuf,
-
    pub inventory: HashMap<RepoId, DocAt>,
    pub info: git::UserInfo,

    /// All refs keyed by RID.
    /// Each value is a map of refs keyed by node Id (public key).
-
    pub remotes: HashMap<RepoId, HashMap<NodeId, refs::SignedRefsAt>>,
+
    pub repos: HashMap<RepoId, MockRepository>,
}

impl MockStorage {
@@ -29,18 +28,30 @@ impl MockStorage {
        Self {
            path: PathBuf::default(),
            info: fixtures::user(),
-
            inventory: inventory.into_iter().collect(),
-
            remotes: HashMap::new(),
+
            repos: inventory
+
                .into_iter()
+
                .map(|(id, doc)| {
+
                    (
+
                        id,
+
                        MockRepository {
+
                            id,
+
                            doc,
+
                            remotes: HashMap::new(),
+
                        },
+
                    )
+
                })
+
                .collect(),
        }
    }

-
    pub fn empty() -> Self {
-
        Self::new(Vec::new())
+
    pub fn repo_mut(&mut self, rid: &RepoId) -> &mut MockRepository {
+
        self.repos
+
            .get_mut(rid)
+
            .expect("MockStorage::repo_mut: repository does not exist")
    }

-
    /// Add a remote `node` with `signed_refs` for the repo `rid`.
-
    pub fn insert_remote(&mut self, rid: RepoId, node: NodeId, refs: refs::SignedRefsAt) {
-
        self.remotes.entry(rid).or_default().insert(node, refs);
+
    pub fn empty() -> Self {
+
        Self::new(Vec::new())
    }
}

@@ -60,23 +71,18 @@ impl ReadStorage for MockStorage {
    }

    fn contains(&self, rid: &RepoId) -> Result<bool, RepositoryError> {
-
        Ok(self.inventory.contains_key(rid))
+
        Ok(self.repos.contains_key(rid))
    }

    fn inventory(&self) -> Result<Inventory, Error> {
-
        Ok(self.inventory.keys().cloned().collect::<Vec<_>>())
+
        Ok(self.repos.keys().cloned().collect::<Vec<_>>())
    }

    fn repository(&self, rid: RepoId) -> Result<Self::Repository, Error> {
-
        let doc = self
-
            .inventory
+
        self.repos
            .get(&rid)
-
            .ok_or_else(|| Error::Io(io::Error::from(io::ErrorKind::NotFound)))?;
-
        Ok(MockRepository {
-
            id: rid,
-
            doc: doc.clone(),
-
            remotes: self.remotes.get(&rid).cloned().unwrap_or_default(),
-
        })
+
            .ok_or_else(|| Error::Io(io::Error::from(io::ErrorKind::NotFound)))
+
            .cloned()
    }
}

@@ -84,12 +90,10 @@ impl WriteStorage for MockStorage {
    type RepositoryMut = MockRepository;

    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, Error> {
-
        let doc = self.inventory.get(&rid).unwrap();
-
        Ok(MockRepository {
-
            id: rid,
-
            doc: doc.clone(),
-
            remotes: self.remotes.get(&rid).cloned().unwrap_or_default(),
-
        })
+
        self.repos
+
            .get(&rid)
+
            .ok_or(Error::Io(io::ErrorKind::NotFound.into()))
+
            .cloned()
    }

    fn create(&self, _rid: RepoId) -> Result<Self::RepositoryMut, Error> {
@@ -103,9 +107,9 @@ impl WriteStorage for MockStorage {

#[derive(Clone, Debug)]
pub struct MockRepository {
-
    id: RepoId,
-
    doc: DocAt,
-
    remotes: HashMap<NodeId, refs::SignedRefsAt>,
+
    pub id: RepoId,
+
    pub doc: DocAt,
+
    pub remotes: HashMap<NodeId, refs::SignedRefsAt>,
}

impl MockRepository {