Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Fetch optimizations & improvements
Merged did:key:z6MksFqX...wzpT opened 2 years ago
  • radicle: Switch error type for repository methods
  • node: Include identity doc in fetch result
  • node: Improvements to test reliability
  • node: Don’t fetch if only our own remote
  • node: Simplify handling of fetch completion
14 files changed +288 -145 2b9a0142 7e13e075
modified radicle-cli/tests/commands.rs
@@ -12,7 +12,7 @@ use radicle::node::{Address, Alias, DEFAULT_TIMEOUT};
use radicle::prelude::RepoId;
use radicle::profile;
use radicle::profile::Home;
-
use radicle::storage::{ReadStorage, RemoteRepository};
+
use radicle::storage::{ReadStorage, RefUpdate, RemoteRepository};
use radicle::test::fixtures;

use radicle_cli_test::TestFormula;
@@ -333,7 +333,7 @@ fn rad_id() {
    let events = alice.handle.events();
    bob.fork(acme, bob.home.path()).unwrap();
    bob.announce(acme, 2, bob.home.path()).unwrap();
-
    alice.has_inventory_of(&acme, &bob.id);
+
    alice.has_remote_of(&acme, &bob.id);

    // Alice must have Bob to try add them as a delegate
    events
@@ -383,12 +383,14 @@ fn rad_id_multi_delegate() {
    eve.connect(&alice).converge([&alice]);

    bob.fork(acme, working.join("bob")).unwrap();
-
    bob.has_inventory_of(&acme, &alice.id);
-
    alice.has_inventory_of(&acme, &bob.id);
+
    bob.has_remote_of(&acme, &alice.id);
+
    alice.has_remote_of(&acme, &bob.id);

    eve.fork(acme, working.join("eve")).unwrap();
-
    eve.has_inventory_of(&acme, &bob.id);
-
    alice.has_inventory_of(&acme, &eve.id);
+
    eve.has_remote_of(&acme, &bob.id);
+
    alice.has_remote_of(&acme, &eve.id);
+
    alice.is_synced_with(&acme, &eve.id);
+
    alice.is_synced_with(&acme, &bob.id);

    // TODO: Have formula with two connected nodes and a tracked project.

@@ -541,7 +543,7 @@ fn rad_id_conflict() {

    bob.fork(acme, working.join("bob")).unwrap();
    bob.announce(acme, 2, bob.home.path()).unwrap();
-
    alice.has_inventory_of(&acme, &bob.id);
+
    alice.has_remote_of(&acme, &bob.id);

    formula(&environment.tmp(), "examples/rad-id-conflict.md")
        .unwrap()
@@ -983,9 +985,9 @@ fn rad_clean() {

    bob.fork(acme, bob.home.path()).unwrap();
    bob.announce(acme, 1, bob.home.path()).unwrap();
-
    bob.has_inventory_of(&acme, &alice.id);
-
    alice.has_inventory_of(&acme, &bob.id);
-
    eve.has_inventory_of(&acme, &alice.id);
+
    bob.has_remote_of(&acme, &alice.id);
+
    alice.has_remote_of(&acme, &bob.id);
+
    eve.has_remote_of(&acme, &alice.id);

    formula(&environment.tmp(), "examples/rad-clean.md")
        .unwrap()
@@ -1117,8 +1119,8 @@ fn rad_clone_all() {
    // Fork and sync repo.
    bob.fork(acme, bob.home.path()).unwrap();
    bob.announce(acme, 2, bob.home.path()).unwrap();
-
    bob.has_inventory_of(&acme, &alice.id);
-
    alice.has_inventory_of(&acme, &bob.id);
+
    bob.has_remote_of(&acme, &alice.id);
+
    alice.has_remote_of(&acme, &bob.id);

    test(
        "examples/rad-clone-all.md",
@@ -1127,7 +1129,7 @@ fn rad_clone_all() {
        [],
    )
    .unwrap();
-
    eve.has_inventory_of(&acme, &bob.id);
+
    eve.has_remote_of(&acme, &bob.id);
}

#[test]
@@ -1509,7 +1511,14 @@ fn test_cob_replication() {
    // Wait for Alice to fetch the clone refs.
    events
        .wait(
-
            |e| matches!(e, Event::RefsFetched { .. }).then_some(()),
+
            |e| {
+
                matches!(
+
                    e,
+
                    Event::RefsFetched { updated, .. }
+
                    if updated.iter().any(|u| matches!(u, RefUpdate::Created { .. }))
+
                )
+
                .then_some(())
+
            },
            time::Duration::from_secs(6),
        )
        .unwrap();
@@ -1634,6 +1643,8 @@ fn rad_sync() {
    bob.routes_to(&[(acme, alice.id)]);
    eve.routes_to(&[(acme, alice.id)]);
    alice.routes_to(&[(acme, alice.id), (acme, eve.id), (acme, bob.id)]);
+
    alice.is_synced_with(&acme, &eve.id);
+
    alice.is_synced_with(&acme, &bob.id);

    test(
        "examples/rad-sync.md",
@@ -1709,12 +1720,18 @@ fn test_replication_via_seed() {
    seed.routes_to(&[(rid, alice.id), (rid, seed.id), (rid, bob.id)]);
    bob.routes_to(&[(rid, alice.id), (rid, seed.id), (rid, bob.id)]);

-
    seed_events
-
        .iter()
-
        .any(|e| matches!(e, Event::RefsFetched { remote, .. } if remote == bob.id));
-
    alice_events
-
        .iter()
-
        .any(|e| matches!(e, Event::RefsFetched { remote, .. } if remote == seed.id));
+
    seed_events.iter().any(|e| {
+
        matches!(
+
            e, Event::RefsFetched { updated, remote, .. }
+
            if remote == bob.id && updated.iter().any(|u| u.is_created())
+
        )
+
    });
+
    alice_events.iter().any(|e| {
+
        matches!(
+
            e, Event::RefsFetched { updated, remote, .. }
+
            if remote == seed.id && updated.iter().any(|u| u.is_created())
+
        )
+
    });

    seed.storage
        .repository(rid)
@@ -1763,13 +1780,13 @@ fn rad_remote() {
    bob.routes_to(&[(rid, alice.id)]);
    bob.fork(rid, bob.home.path()).unwrap();
    bob.announce(rid, 2, bob.home.path()).unwrap();
-
    alice.has_inventory_of(&rid, &bob.id);
+
    alice.has_remote_of(&rid, &bob.id);

    eve.connect(&bob);
    eve.routes_to(&[(rid, alice.id)]);
    eve.fork(rid, eve.home.path()).unwrap();
    eve.announce(rid, 2, eve.home.path()).unwrap();
-
    alice.has_inventory_of(&rid, &eve.id);
+
    alice.has_remote_of(&rid, &eve.id);

    test(
        "examples/rad-remote.md",
@@ -2090,7 +2107,7 @@ fn git_push_diverge() {

    bob.connect(&alice).converge([&alice]);
    bob.fork(acme, working.join("bob")).unwrap();
-
    alice.has_inventory_of(&acme, &bob.id);
+
    alice.has_remote_of(&acme, &bob.id);

    formula(&environment.tmp(), "examples/git/git-push-diverge.md")
        .unwrap()
@@ -2131,7 +2148,7 @@ fn rad_push_and_pull_patches() {

    bob.connect(&alice).converge([&alice]);
    bob.fork(acme, working.join("bob")).unwrap();
-
    alice.has_inventory_of(&acme, &bob.id);
+
    alice.has_remote_of(&acme, &bob.id);

    formula(&environment.tmp(), "examples/rad-push-and-pull-patches.md")
        .unwrap()
modified radicle-httpd/src/api/error.rs
@@ -129,6 +129,9 @@ impl IntoResponse for Error {
            Error::Storage(err) if err.is_not_found() => {
                (StatusCode::NOT_FOUND, Some(err.to_string()))
            }
+
            Error::Repository(err) if err.is_not_found() => {
+
                (StatusCode::NOT_FOUND, Some(err.to_string()))
+
            }
            Error::StorageRef(err) if err.is_not_found() => {
                (StatusCode::NOT_FOUND, Some(err.to_string()))
            }
modified radicle-node/src/service.rs
@@ -50,7 +50,7 @@ use crate::service::message::{
};
use crate::service::policy::{store::Write, Policy, Scope};
use crate::storage;
-
use crate::storage::{refs::RefsAt, Namespaces, ReadRepository, ReadStorage};
+
use crate::storage::{refs::RefsAt, Namespaces, ReadStorage};
use crate::worker::fetch;
use crate::worker::FetchError;
use crate::Link;
@@ -780,18 +780,32 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => match self.announce_own_refs(id) {
-
                Ok(refs) => match refs.as_slice() {
-
                    &[refs] => {
-
                        resp.send(refs).ok();
+
            Command::AnnounceRefs(id, resp) => {
+
                let doc = match self.storage.get(id) {
+
                    Ok(Some(doc)) => doc,
+
                    Ok(None) => {
+
                        error!(target: "service", "Error announcing refs: repository {id} not found");
+
                        return;
+
                    }
+
                    Err(e) => {
+
                        error!(target: "service", "Error announcing refs: doc error: {e}");
+
                        return;
+
                    }
+
                };
+

+
                match self.announce_own_refs(id, doc) {
+
                    Ok(refs) => match refs.as_slice() {
+
                        &[refs] => {
+
                            resp.send(refs).ok();
+
                        }
+
                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
+
                        [..] => panic!("Service::command: unexpected refs returned"),
+
                    },
+
                    Err(err) => {
+
                        error!(target: "service", "Error announcing refs: {err}");
                    }
-
                    // SAFETY: Since we passed in one NID, we should get exactly one item back.
-
                    [..] => panic!("Service::command: unexpected refs returned"),
-
                },
-
                Err(err) => {
-
                    error!(target: "service", "Error announcing refs: {err}");
                }
-
            },
+
            }
            Command::AnnounceInventory => {
                if let Err(err) = self
                    .storage
@@ -949,11 +963,45 @@ where
        remote: NodeId,
        result: Result<fetch::FetchResult, FetchError>,
    ) {
-
        let result = match result {
+
        let Some(fetching) = self.fetching.remove(&rid) else {
+
            error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
+
            return;
+
        };
+
        debug_assert_eq!(fetching.from, remote);
+

+
        if let Some(s) = self.sessions.get_mut(&remote) {
+
            // Mark this RID as fetched for this session.
+
            s.fetched(rid);
+
        }
+

+
        // Notify all fetch subscribers of the fetch result. This is used when the user requests
+
        // a fetch via the CLI, for example.
+
        for sub in &fetching.subscribers {
+
            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
+

+
            let result = match &result {
+
                Ok(success) => FetchResult::Success {
+
                    updated: success.updated.clone(),
+
                    namespaces: success.namespaces.clone(),
+
                    clone: success.clone,
+
                },
+
                Err(e) => FetchResult::Failed {
+
                    reason: e.to_string(),
+
                },
+
            };
+
            if sub.send(result).is_err() {
+
                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
+
            } else {
+
                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
+
            }
+
        }
+

+
        match result {
            Ok(fetch::FetchResult {
                updated,
                namespaces,
                clone,
+
                doc,
            }) => {
                info!(target: "service", "Fetched {rid} from {remote} successfully");

@@ -970,72 +1018,37 @@ where
                    updated: updated.clone(),
                });

-
                FetchResult::Success {
-
                    updated,
-
                    namespaces,
-
                    clone,
+
                // Announce our new inventory if this fetch was a full clone.
+
                // Only update and announce inventory for public repositories.
+
                if clone && doc.visibility.is_public() {
+
                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
+

+
                    self.storage.insert(rid);
+
                    self.sync_and_announce_inventory();
+
                }
+

+
                // It's possible for a fetch to succeed but nothing was updated.
+
                if updated.is_empty() {
+
                    debug!(target: "service", "Nothing to announce, no refs were updated..");
+
                } else {
+
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
+
                    // beyond just knowing that we have added an item to our inventory.
+
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
+
                        error!(target: "service", "Failed to announce new refs: {e}");
+
                    }
                }
            }
            Err(err) => {
-
                let reason = err.to_string();
-
                error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
+
                error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");

                // For now, we only disconnect the remote in case of timeout. In the future,
                // there may be other reasons to disconnect.
                if err.is_timeout() {
                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
                }
-
                FetchResult::Failed { reason }
-
            }
-
        };
-

-
        let Some(fetching) = self.fetching.remove(&rid) else {
-
            warn!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
-
            return;
-
        };
-
        debug_assert_eq!(fetching.from, remote);
-

-
        if let Some(s) = self.sessions.get_mut(&remote) {
-
            // Mark this RID as fetched for this session.
-
            s.fetched(rid);
-
        }
-

-
        for sub in &fetching.subscribers {
-
            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
-

-
            if sub.send(result.clone()).is_err() {
-
                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
-
            } else {
-
                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
-
            }
-
        }
-

-
        if fetching.subscribers.is_empty() {
-
            trace!(target: "service", "No fetch requests found for {rid}..");
-

-
            // We only announce refs here when the fetch wasn't user-requested. This is
-
            // because the user might want to announce his fork, once he has created one,
-
            // or may choose to not announce anything.
-
            match &result {
-
                FetchResult::Success {
-
                    updated,
-
                    namespaces,
-
                    ..
-
                } if !updated.is_empty() => {
-
                    if let Err(e) = self.announce_refs(rid, namespaces.iter().cloned()) {
-
                        error!(target: "service", "Failed to announce new refs: {e}");
-
                    }
-
                }
-
                _ => debug!(target: "service", "Nothing to announce, no refs were updated.."),
            }
        }

-
        // Announce our new inventory if this fetch was a clone.
-
        if let FetchResult::Success { clone: true, .. } = result {
-
            self.storage.insert(rid);
-
            self.sync_and_announce();
-
        }
-

        // We can now try to dequeue another fetch.
        self.dequeue_fetch();
    }
@@ -1410,10 +1423,6 @@ where
                        info!(target: "service", "Routing table updated for {} with seed {announcer}", message.rid);
                    }
                }
-
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
-
                    debug!(target: "service", "Skipping fetch, empty refs announcement for {}", message.rid);
-
                    return Ok(false);
-
                };

                // Update sync status of announcer for this repo.
                if let Some(refs) = message.refs.iter().find(|r| &r.remote == self.nid()) {
@@ -1442,7 +1451,6 @@ where
                        }
                    }
                }
-

                let repo_entry = self.policies.seed_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo seeding configuration",
                );
@@ -1465,9 +1473,20 @@ where
                    );
                    return Ok(relay);
                };
-
                // Finally, if there's anything to fetch, we fetch it from the remote.
-
                self.fetch_refs_at(message.rid, remote.id, refs, FETCH_TIMEOUT, None);

+
                // Finally, if there's anything to fetch, we fetch it from the remote.
+
                if let Some(refs) = NonEmpty::from_vec(
+
                    message
+
                        .refs
+
                        .iter()
+
                        .filter(|r| r.remote != self.node_id()) // Don't fetch our own refs.
+
                        .cloned()
+
                        .collect(),
+
                ) {
+
                    self.fetch_refs_at(message.rid, remote.id, refs, FETCH_TIMEOUT, None);
+
                } else {
+
                    debug!(target: "service", "Skipping fetch, no remote refs in announcement for {}", message.rid);
+
                }
                return Ok(relay);
            }
            AnnouncementMessage::Node(
@@ -1832,8 +1851,8 @@ where
    }

    /// Announce our own refs for the given repo.
-
    fn announce_own_refs(&mut self, rid: RepoId) -> Result<Vec<RefsAt>, Error> {
-
        let refs = self.announce_refs(rid, [self.node_id()])?;
+
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc<Verified>) -> Result<Vec<RefsAt>, Error> {
+
        let refs = self.announce_refs(rid, doc, [self.node_id()])?;

        // Update refs database with our signed refs branches.
        // This isn't strictly necessary for now, as we only use the database for fetches, and
@@ -1860,15 +1879,15 @@ where
    fn announce_refs(
        &mut self,
        rid: RepoId,
+
        doc: Doc<Verified>,
        remotes: impl IntoIterator<Item = NodeId>,
    ) -> Result<Vec<RefsAt>, Error> {
-
        let repo = self.storage.repository(rid)?;
-
        let doc = repo.identity_doc()?;
        let peers = self.sessions.connected().map(|(_, p)| p);
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;

-
        // Update our local sync status. This is useful for determining if refs were updated while
-
        // the node was stopped.
+
        // Update our sync status for our own refs. This is useful for determining if refs were
+
        // updated while the node was stopped.
+
        // TODO: Move to `announce_own_refs`.
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
            info!(target: "service", "Announcing local refs for {rid} to peers ({})..", refs.at);

@@ -1892,7 +1911,7 @@ where
        Ok(refs)
    }

-
    fn sync_and_announce(&mut self) {
+
    fn sync_and_announce_inventory(&mut self) {
        match self.sync_inventory() {
            Ok(synced) => {
                // Only announce if our inventory changed.
modified radicle-node/src/test/arbitrary.rs
@@ -1,7 +1,10 @@
+
use std::collections::HashSet;
+

use bloomy::BloomFilter;
use qcheck::Arbitrary;

use crate::crypto;
+
use crate::identity::DocAt;
use crate::node::Alias;
use crate::prelude::{BoundedVec, NodeId, RepoId, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
@@ -10,6 +13,7 @@ use crate::service::message::{
    Subscribe, ZeroBytes,
};
use crate::wire::MessageType;
+
use crate::worker::fetch::FetchResult;

pub use radicle::test::arbitrary::*;

@@ -27,6 +31,17 @@ impl Arbitrary for Filter {
    }
}

+
impl Arbitrary for FetchResult {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        FetchResult {
+
            updated: vec![],
+
            namespaces: HashSet::arbitrary(g),
+
            clone: bool::arbitrary(g),
+
            doc: DocAt::arbitrary(g),
+
        }
+
    }
+
}
+

impl Arbitrary for Message {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        let type_id = g
modified radicle-node/src/test/environment.rs
@@ -19,6 +19,7 @@ use radicle::identity::{RepoId, Visibility};
use radicle::node::config::ConnectAddress;
use radicle::node::policy::store as policy;
use radicle::node::routing::Store;
+
use radicle::node::seed::Store as _;
use radicle::node::Database;
use radicle::node::{Alias, POLICIES_DB_FILE};
use radicle::node::{ConnectOptions, Handle as _};
@@ -244,6 +245,14 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
            .unwrap()
    }

+
    /// Get sync status of a repo.
+
    pub fn synced_seeds(&self, rid: &RepoId) -> Vec<node::seed::SyncedSeed> {
+
        let db = Database::reader(self.home.node().join(node::NODE_DB_FILE)).unwrap();
+
        let seeds = db.seeds_for(rid).unwrap();
+

+
        seeds.into_iter().collect::<Result<Vec<_>, _>>().unwrap()
+
    }
+

    /// Wait until this node's routing table matches the remotes.
    pub fn converge<'a>(
        &'a self,
@@ -278,9 +287,23 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
        }
    }

+
    /// Wait until this node is synced with another node, for the given repository.
+
    #[track_caller]
+
    pub fn is_synced_with(&mut self, rid: &RepoId, nid: &NodeId) {
+
        log::debug!(target: "test", "Waiting for {} to be in sync with {nid} for {rid}", self.id);
+

+
        loop {
+
            let seeds = self.handle.seeds(*rid).unwrap();
+
            if seeds.iter().any(|s| s.nid == *nid && s.is_synced()) {
+
                break;
+
            }
+
            thread::sleep(Duration::from_millis(100));
+
        }
+
    }
+

    /// Wait until this node has the inventory of another node.
    #[track_caller]
-
    pub fn has_inventory_of(&self, rid: &RepoId, nid: &NodeId) {
+
    pub fn has_remote_of(&self, rid: &RepoId, nid: &NodeId) {
        log::debug!(target: "test", "Waiting for {} to have {rid}/{nid}", self.id);
        let events = self.handle.events();

modified radicle-node/src/test/simulator.rs
@@ -19,7 +19,8 @@ use crate::prelude::{Address, RepoId};
use crate::service::io::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::Namespaces;
-
use crate::storage::WriteStorage;
+
use crate::storage::{ReadRepository, WriteStorage};
+
use crate::test::arbitrary;
use crate::test::peer::Service;
use crate::worker::{fetch, FetchError};
use crate::Link;
@@ -409,20 +410,30 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            .extend(msgs.into_iter().map(|m| (from, p.node_id(), m)));
                    }
                    Input::Fetched(rid, nid, result) => {
-
                        let result = Rc::try_unwrap(result).unwrap();
+
                        let mut result = Rc::try_unwrap(result).unwrap();
                        let repo = match p.storage().repository_mut(rid) {
                            Ok(repo) => repo,
                            Err(e) if e.is_not_found() => p.storage().create(rid).unwrap(),
                            Err(e) => panic!("Failed to open repository: {e}"),
                        };
-
                        match &result {
-
                            Ok(fetch::FetchResult { namespaces, .. }) => {
-
                                radicle::test::fetch(
-
                                    &repo,
-
                                    &nid,
-
                                    Namespaces::Followed(namespaces.clone()),
-
                                )
-
                                .unwrap();
+

+
                        match &mut result {
+
                            Ok(fetch::FetchResult {
+
                                namespaces,
+
                                updated,
+
                                doc,
+
                                ..
+
                            }) => {
+
                                *updated =
+
                                    radicle::test::fetch(&repo, &nid, Namespaces::All).unwrap();
+
                                *namespaces = updated
+
                                    .iter()
+
                                    .map(|r| {
+
                                        NodeId::from_namespaced(&r.name().to_namespaced().unwrap())
+
                                            .unwrap()
+
                                    })
+
                                    .collect();
+
                                *doc = repo.identity_doc().unwrap();
                            }
                            Err(err) => panic!("Error fetching: {err}"),
                        }
@@ -657,6 +668,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                                        Namespaces::All => HashSet::new(),
                                    },
                                    clone: true,
+
                                    doc: arbitrary::gen(1),
                                })),
                            ),
                        },
modified radicle-node/src/tests.rs
@@ -1344,6 +1344,7 @@ fn test_queued_fetch_max_capacity() {
    let rid1 = *repo_keys.next().unwrap();
    let rid2 = *repo_keys.next().unwrap();
    let rid3 = *repo_keys.next().unwrap();
+
    let doc = storage.repos.get(&rid1).unwrap().doc.clone();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);

@@ -1372,14 +1373,14 @@ fn test_queued_fetch_max_capacity() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid2);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::new(doc)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid3);
}
@@ -1453,6 +1454,7 @@ fn test_queued_fetch_from_ann_same_rid() {
            }],
            namespaces: [carol.id()].into_iter().collect(),
            clone: false,
+
            doc: arbitrary::gen(1),
        }),
    );
    // Now the 1st fetch is done, but the 2nd and 3rd fetches are redundant.
@@ -1496,14 +1498,14 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid1, bob.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == eve.id);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid1, eve.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid1, eve.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == carol.id);
}
@@ -1557,7 +1559,7 @@ fn test_refs_synced_event() {
}

#[test]
-
fn test_push_and_pull() {
+
fn test_init_and_seed() {
    let tempdir = tempfile::tempdir().unwrap();

    let storage_alice = Storage::open(
@@ -1624,19 +1626,22 @@ fn test_push_and_pull() {
    assert!(bob.get(proj_id).unwrap().is_none());

    // Bob seeds Alice's project.
-
    let (sender, _) = chan::bounded(1);
+
    let (sender, receiver) = chan::bounded(1);
    bob.command(service::Command::Seed(
        proj_id,
        policy::Scope::default(),
        sender,
    ));
+
    assert!(receiver.recv().unwrap());
+

    // Eve seeds Alice's project.
-
    let (sender, _) = chan::bounded(1);
+
    let (sender, receiver) = chan::bounded(1);
    eve.command(service::Command::Seed(
        proj_id,
        policy::Scope::default(),
        sender,
    ));
+
    assert!(receiver.recv().unwrap());

    let (send, _) = chan::bounded(1);
    // Alice announces her inventory.
@@ -1647,21 +1652,24 @@ fn test_push_and_pull() {

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

-
    // TODO: Refs should be compared between the two peers.
+
    log::debug!(target: "test", "Simulation is over");

-
    assert!(eve.storage().get(proj_id).unwrap().is_some());
-
    assert!(bob.storage().get(proj_id).unwrap().is_some());
+
    // TODO: Refs should be compared between the two peers.

+
    log::debug!(target: "test", "Waiting for {} to fetch {} from {}..", bob.id, proj_id,eve.id);
    bob_events
        .iter()
        .find(|e| {
            matches!(
                e,
                service::Event::RefsFetched { remote, .. }
-
                if *remote == eve.node_id(),
+
                if *remote == eve.node_id()
            )
        })
        .expect("Bob fetched from Eve");
+

+
    assert!(eve.storage().get(proj_id).unwrap().is_some());
+
    assert!(bob.storage().get(proj_id).unwrap().is_some());
}

#[test]
modified radicle-node/src/tests/e2e.rs
@@ -938,7 +938,8 @@ fn test_non_fastforward_sigrefs() {

    assert_matches!(
        alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
-
        FetchResult::Success { updated, .. } if updated.is_empty()
+
        FetchResult::Success { updated, .. }
+
        if updated.iter().all(|u| u.is_skipped())
    );
}

modified radicle-node/src/worker/fetch.rs
@@ -6,6 +6,7 @@ use std::str::FromStr;
use localtime::LocalTime;

use radicle::crypto::PublicKey;
+
use radicle::identity::DocAt;
use radicle::prelude::RepoId;
use radicle::storage::refs::RefsAt;
use radicle::storage::{
@@ -16,14 +17,27 @@ use radicle_fetch::{Allowed, BlockList, FetchLimit};

use super::channels::ChannelsFlush;

-
#[derive(Debug, Default)]
+
#[derive(Debug, Clone)]
pub struct FetchResult {
-
    /// The set of updates references.
+
    /// The set of updated references.
    pub updated: Vec<RefUpdate>,
    /// The set of remote namespaces that were updated.
    pub namespaces: HashSet<PublicKey>,
    /// The fetch was a full clone.
    pub clone: bool,
+
    /// Identity doc of fetched repo.
+
    pub doc: DocAt,
+
}
+

+
impl FetchResult {
+
    pub fn new(doc: DocAt) -> Self {
+
        Self {
+
            updated: vec![],
+
            namespaces: HashSet::new(),
+
            clone: false,
+
            doc,
+
        }
+
    }
}

pub enum Handle {
@@ -129,6 +143,7 @@ impl Handle {
                Ok(FetchResult {
                    updated: applied.updated,
                    namespaces: remotes.into_iter().collect(),
+
                    doc: repo.identity_doc()?,
                    clone,
                })
            }
modified radicle-remote-helper/src/lib.rs
@@ -54,9 +54,9 @@ pub enum Error {
    /// Invalid reference name.
    #[error("invalid ref: {0}")]
    InvalidRef(#[from] radicle::git::fmt::Error),
-
    /// Storage error.
+
    /// Repository error.
    #[error(transparent)]
-
    Storage(#[from] radicle::storage::Error),
+
    Repository(#[from] radicle::storage::RepositoryError),
    /// Fetch error.
    #[error(transparent)]
    Fetch(#[from] fetch::Error),
modified radicle/src/storage.rs
@@ -105,6 +105,17 @@ pub enum RepositoryError {
    Refs(#[from] refs::Error),
}

+
impl RepositoryError {
+
    pub fn is_not_found(&self) -> bool {
+
        match self {
+
            Self::Storage(e) if e.is_not_found() => true,
+
            Self::Git(e) if git_ext::is_not_found_err(e) => true,
+
            Self::GitExt(git_ext::Error::NotFound(_)) => true,
+
            _ => false,
+
        }
+
    }
+
}
+

/// Storage error.
#[derive(Error, Debug)]
pub enum Error {
@@ -218,6 +229,21 @@ impl RefUpdate {
            RefUpdate::Skipped { name, .. } => name.as_refstr(),
        }
    }
+

+
    /// Is it an update.
+
    pub fn is_updated(&self) -> bool {
+
        matches!(self, RefUpdate::Updated { .. })
+
    }
+

+
    /// Is it a create.
+
    pub fn is_created(&self) -> bool {
+
        matches!(self, RefUpdate::Created { .. })
+
    }
+

+
    /// Is it a skip.
+
    pub fn is_skipped(&self) -> bool {
+
        matches!(self, RefUpdate::Skipped { .. })
+
    }
}

impl fmt::Display for RefUpdate {
@@ -377,13 +403,13 @@ pub trait ReadStorage {
    /// Insert this repository into the inventory.
    fn insert(&self, rid: RepoId);
    /// Open or create a read-only repository.
-
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, Error>;
+
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, RepositoryError>;
    /// Get a repository's identity if it exists.
    fn get(&self, rid: RepoId) -> Result<Option<Doc<Verified>>, RepositoryError> {
        match self.repository(rid) {
            Ok(repo) => Ok(Some(repo.identity_doc()?.into())),
            Err(e) if e.is_not_found() => Ok(None),
-
            Err(e) => Err(e.into()),
+
            Err(e) => Err(e),
        }
    }
}
@@ -393,7 +419,7 @@ pub trait WriteStorage: ReadStorage {
    type RepositoryMut: WriteRepository;

    /// Open a read-write repository.
-
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, Error>;
+
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, RepositoryError>;
    /// Create a read-write repository.
    fn create(&self, rid: RepoId) -> Result<Self::RepositoryMut, Error>;

@@ -642,7 +668,7 @@ where
        self.deref().get(rid)
    }

-
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, Error> {
+
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, RepositoryError> {
        self.deref().repository(rid)
    }
}
@@ -654,7 +680,7 @@ where
{
    type RepositoryMut = S::RepositoryMut;

-
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, Error> {
+
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, RepositoryError> {
        self.deref().repository_mut(rid)
    }

modified radicle/src/storage/git.rs
@@ -145,7 +145,7 @@ impl ReadStorage for Storage {
        }
    }

-
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, Error> {
+
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, RepositoryError> {
        Repository::open(paths::repository(self, &rid), rid)
    }
}
@@ -153,7 +153,7 @@ impl ReadStorage for Storage {
impl WriteStorage for Storage {
    type RepositoryMut = Repository;

-
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, Error> {
+
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, RepositoryError> {
        Repository::open(paths::repository(self, &rid), rid)
    }

@@ -295,7 +295,7 @@ impl Storage {
        })
    }

-
    pub fn inspect(&self) -> Result<(), Error> {
+
    pub fn inspect(&self) -> Result<(), RepositoryError> {
        for r in self.repositories()? {
            let rid = r.rid;
            let repo = self.repository(rid)?;
@@ -386,7 +386,7 @@ pub enum Validation {

impl Repository {
    /// Open an existing repository.
-
    pub fn open<P: AsRef<Path>>(path: P, id: RepoId) -> Result<Self, Error> {
+
    pub fn open<P: AsRef<Path>>(path: P, id: RepoId) -> Result<Self, RepositoryError> {
        let backend = git2::Repository::open_bare(path.as_ref())?;

        Ok(Self { id, backend })
modified radicle/src/test.rs
@@ -59,11 +59,11 @@ pub fn fetch<W: WriteRepository>(

    repo.set_identity_head()?;
    repo.set_head()?;
+

    let validations = repo.validate()?;
    if !validations.is_empty() {
        return Err(crate::storage::FetchError::Validation { validations });
    }
-

    Ok(updates)
}

modified radicle/src/test/storage.rs
@@ -81,10 +81,12 @@ impl ReadStorage for MockStorage {

    fn insert(&self, _rid: RepoId) {}

-
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, Error> {
+
    fn repository(&self, rid: RepoId) -> Result<Self::Repository, RepositoryError> {
        self.repos
            .get(&rid)
-
            .ok_or_else(|| Error::Io(io::Error::from(io::ErrorKind::NotFound)))
+
            .ok_or_else(|| {
+
                RepositoryError::Storage(Error::Io(io::Error::from(io::ErrorKind::NotFound)))
+
            })
            .cloned()
    }
}
@@ -92,10 +94,12 @@ impl ReadStorage for MockStorage {
impl WriteStorage for MockStorage {
    type RepositoryMut = MockRepository;

-
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, Error> {
+
    fn repository_mut(&self, rid: RepoId) -> Result<Self::RepositoryMut, RepositoryError> {
        self.repos
            .get(&rid)
-
            .ok_or(Error::Io(io::ErrorKind::NotFound.into()))
+
            .ok_or(RepositoryError::Storage(Error::Io(io::Error::from(
+
                io::ErrorKind::NotFound,
+
            ))))
            .cloned()
    }