Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Simplify handling of fetch completion
cloudhead committed 2 years ago
commit 7e13e0759fb66fd91d71be3b88c9c5688407ec9c
parent fd4f5fff7dcba2c67921c298b5624e75da7306f5
8 files changed +79 -84
modified radicle-cli/tests/commands.rs
@@ -389,6 +389,8 @@ fn rad_id_multi_delegate() {
    eve.fork(acme, working.join("eve")).unwrap();
    eve.has_remote_of(&acme, &bob.id);
    alice.has_remote_of(&acme, &eve.id);
+
    alice.is_synced_with(&acme, &eve.id);
+
    alice.is_synced_with(&acme, &bob.id);

    // TODO: Have formula with two connected nodes and a tracked project.

modified radicle-node/src/service.rs
@@ -963,7 +963,40 @@ where
        remote: NodeId,
        result: Result<fetch::FetchResult, FetchError>,
    ) {
-
        let result = match result {
+
        let Some(fetching) = self.fetching.remove(&rid) else {
+
            error!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
+
            return;
+
        };
+
        debug_assert_eq!(fetching.from, remote);
+

+
        if let Some(s) = self.sessions.get_mut(&remote) {
+
            // Mark this RID as fetched for this session.
+
            s.fetched(rid);
+
        }
+

+
        // Notify all fetch subscribers of the fetch result. This is used when the user requests
+
        // a fetch via the CLI, for example.
+
        for sub in &fetching.subscribers {
+
            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
+

+
            let result = match &result {
+
                Ok(success) => FetchResult::Success {
+
                    updated: success.updated.clone(),
+
                    namespaces: success.namespaces.clone(),
+
                    clone: success.clone,
+
                },
+
                Err(e) => FetchResult::Failed {
+
                    reason: e.to_string(),
+
                },
+
            };
+
            if sub.send(result).is_err() {
+
                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
+
            } else {
+
                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
+
            }
+
        }
+

+
        match result {
            Ok(fetch::FetchResult {
                updated,
                namespaces,
@@ -985,81 +1018,35 @@ where
                    updated: updated.clone(),
                });

-
                FetchResult::Success {
-
                    updated,
-
                    namespaces,
-
                    clone,
-
                    doc,
+
                // Announce our new inventory if this fetch was a full clone.
+
                // Only update and announce inventory for public repositories.
+
                if clone && doc.visibility.is_public() {
+
                    debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
+

+
                    self.storage.insert(rid);
+
                    self.sync_and_announce_inventory();
+
                }
+

+
                // It's possible for a fetch to succeed but nothing was updated.
+
                if updated.is_empty() {
+
                    debug!(target: "service", "Nothing to announce, no refs were updated..");
+
                } else {
+
                    // Finally, announce the refs. This is useful for nodes to know what we've synced,
+
                    // beyond just knowing that we have added an item to our inventory.
+
                    if let Err(e) = self.announce_refs(rid, doc.into(), namespaces) {
+
                        error!(target: "service", "Failed to announce new refs: {e}");
+
                    }
                }
            }
            Err(err) => {
-
                let reason = err.to_string();
-
                error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
+
                error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");

                // For now, we only disconnect the remote in case of timeout. In the future,
                // there may be other reasons to disconnect.
                if err.is_timeout() {
                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
                }
-
                FetchResult::Failed { reason }
-
            }
-
        };
-

-
        let Some(fetching) = self.fetching.remove(&rid) else {
-
            warn!(target: "service", "Received unexpected fetch result for {rid}, from {remote}");
-
            return;
-
        };
-
        debug_assert_eq!(fetching.from, remote);
-

-
        if let Some(s) = self.sessions.get_mut(&remote) {
-
            // Mark this RID as fetched for this session.
-
            s.fetched(rid);
-
        }
-

-
        for sub in &fetching.subscribers {
-
            debug!(target: "service", "Found existing fetch request from {remote}, sending result..");
-

-
            if sub.send(result.clone()).is_err() {
-
                error!(target: "service", "Error sending fetch result for {rid} from {remote}..");
-
            } else {
-
                debug!(target: "service", "Sent fetch result for {rid} from {remote}..");
-
            }
-
        }
-

-
        if fetching.subscribers.is_empty() {
-
            trace!(target: "service", "No fetch requests found for {rid}..");
-

-
            // TODO: Combine this with the below.
-
            // We only announce refs here when the fetch wasn't user-requested. This is
-
            // because the user might want to announce his fork, once he has created one,
-
            // or may choose to not announce anything.
-
            match &result {
-
                FetchResult::Success {
-
                    updated,
-
                    namespaces,
-
                    doc,
-
                    ..
-
                } if !updated.is_empty() => {
-
                    if let Err(e) =
-
                        self.announce_refs(rid, doc.clone().into(), namespaces.iter().cloned())
-
                    {
-
                        error!(target: "service", "Failed to announce new refs: {e}");
-
                    }
-
                }
-
                _ => debug!(target: "service", "Nothing to announce, no refs were updated.."),
-
            }
-
        }
-

-
        // Announce our new inventory if this fetch was a clone.
-
        if let FetchResult::Success {
-
            doc, clone: true, ..
-
        } = result
-
        {
-
            // Only add inventory for public repositories.
-
            if doc.visibility.is_public() {
-
                self.storage.insert(rid);
            }
-
            self.sync_and_announce();
        }

        // We can now try to dequeue another fetch.
@@ -1898,8 +1885,9 @@ where
        let peers = self.sessions.connected().map(|(_, p)| p);
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;

-
        // Update our local sync status. This is useful for determining if refs were updated while
-
        // the node was stopped.
+
        // Update our sync status for our own refs. This is useful for determining if refs were
+
        // updated while the node was stopped.
+
        // TODO: Move to `announce_own_refs`.
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
            info!(target: "service", "Announcing local refs for {rid} to peers ({})..", refs.at);

@@ -1923,7 +1911,7 @@ where
        Ok(refs)
    }

-
    fn sync_and_announce(&mut self) {
+
    fn sync_and_announce_inventory(&mut self) {
        match self.sync_inventory() {
            Ok(synced) => {
                // Only announce if our inventory changed.
modified radicle-node/src/test/handle.rs
@@ -5,9 +5,8 @@ use std::time;

use radicle::git;
use radicle::storage::refs::RefsAt;
-
use radicle::test::arbitrary;

-
use crate::identity::{DocAt, RepoId};
+
use crate::identity::RepoId;
use crate::node::{Alias, Config, ConnectOptions, ConnectResult, Event, FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service::policy;
@@ -63,7 +62,6 @@ impl radicle::node::Handle for Handle {
            updated: vec![],
            namespaces: HashSet::new(),
            clone: false,
-
            doc: arbitrary::gen::<DocAt>(1),
        })
    }

modified radicle-node/src/tests.rs
@@ -1652,21 +1652,24 @@ fn test_init_and_seed() {

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

-
    // TODO: Refs should be compared between the two peers.
+
    log::debug!(target: "test", "Simulation is over");

-
    assert!(eve.storage().get(proj_id).unwrap().is_some());
-
    assert!(bob.storage().get(proj_id).unwrap().is_some());
+
    // TODO: Refs should be compared between the two peers.

+
    log::debug!(target: "test", "Waiting for {} to fetch {} from {}..", bob.id, proj_id,eve.id);
    bob_events
        .iter()
        .find(|e| {
            matches!(
                e,
                service::Event::RefsFetched { remote, .. }
-
                if *remote == eve.node_id(),
+
                if *remote == eve.node_id()
            )
        })
        .expect("Bob fetched from Eve");
+

+
    assert!(eve.storage().get(proj_id).unwrap().is_some());
+
    assert!(bob.storage().get(proj_id).unwrap().is_some());
}

#[test]
modified radicle-node/src/tests/e2e.rs
@@ -938,7 +938,8 @@ fn test_non_fastforward_sigrefs() {

    assert_matches!(
        alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
-
        FetchResult::Success { updated, .. } if updated.is_empty()
+
        FetchResult::Success { updated, .. }
+
        if updated.iter().all(|u| u.is_skipped())
    );
}

modified radicle/src/identity/doc.rs
@@ -130,7 +130,7 @@ impl Deref for Payload {
}

/// A verified identity document at a specific commit.
-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DocAt {
    /// The commit at which this document exists.
    pub commit: Oid,
modified radicle/src/node.rs
@@ -29,7 +29,7 @@ use serde_json as json;

use crate::crypto::PublicKey;
use crate::git;
-
use crate::identity::{DocAt, RepoId};
+
use crate::identity::RepoId;
use crate::profile;
use crate::storage::refs::RefsAt;
use crate::storage::RefUpdate;
@@ -686,7 +686,6 @@ pub enum FetchResult {
        updated: Vec<RefUpdate>,
        namespaces: HashSet<NodeId>,
        clone: bool,
-
        doc: DocAt,
    },
    // TODO: Create enum for reason.
    Failed {
@@ -719,14 +718,13 @@ impl FetchResult {
    }
}

-
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool, DocAt), S>> for FetchResult {
-
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool, DocAt), S>) -> Self {
+
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
+
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
        match value {
-
            Ok((updated, namespaces, clone, doc)) => Self::Success {
+
            Ok((updated, namespaces, clone)) => Self::Success {
                updated,
                namespaces,
                clone,
-
                doc,
            },
            Err(err) => Self::Failed {
                reason: err.to_string(),
modified radicle/src/storage.rs
@@ -239,6 +239,11 @@ impl RefUpdate {
    pub fn is_created(&self) -> bool {
        matches!(self, RefUpdate::Created { .. })
    }
+

+
    /// Is it a skip.
+
    pub fn is_skipped(&self) -> bool {
+
        matches!(self, RefUpdate::Skipped { .. })
+
    }
}

impl fmt::Display for RefUpdate {