Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Always check refs status before fetching
cloudhead committed 2 years ago
commit f2fe0242e1387bfa0106c6509e0608c95e90803f
parent 36808234a630ff2729977d1102fecd94f472537f
2 files changed +66 -54
modified radicle-node/src/service.rs
@@ -830,17 +830,32 @@ where
        }
    }

-
    /// Initiate an outgoing fetch for some repository, based on
-
    /// another node's announcement.
+
    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
+
    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
    fn fetch_refs_at(
        &mut self,
        rid: RepoId,
        from: NodeId,
        refs: NonEmpty<RefsAt>,
+
        scope: Scope,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) {
-
        self._fetch(rid, from, refs.into(), timeout, channel)
+
    ) -> bool {
+
        match self.refs_status_of(rid, refs, &scope) {
+
            Ok(status) => {
+
                if status.want.is_empty() {
+
                    debug!(target: "service", "Skipping  fetch for {rid}, all refs are already in storage");
+
                } else {
+
                    self._fetch(rid, from, status.want, timeout, channel);
+
                    return true;
+
                }
+
            }
+
            Err(e) => {
+
                error!(target: "service", "Error getting the refs status of {rid}: {e}");
+
            }
+
        }
+
        // We didn't try to fetch anything.
+
        false
    }

    /// Initiate an outgoing fetch for some repository.
@@ -942,6 +957,7 @@ where
            return Err(TryFetchError::SessionCapacityReached);
        }

+
        let remotes = refs_at.len();
        let fetching = fetching.or_insert(FetchState {
            from,
            refs_at: refs_at.clone(),
@@ -952,7 +968,10 @@ where
        self.outbox
            .fetch(session, rid, namespaces, refs_at, timeout);

-
        debug!(target: "service", "Fetch initiated for {rid} with {}..", session.id);
+
        debug!(
+
            target: "service",
+
            "Fetch initiated for {rid} with {} ({remotes} remote(s))..", session.id
+
        );

        Ok(fetching)
    }
@@ -1077,19 +1096,12 @@ where
                .seed_policy(&rid)
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

-
            match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
-
                Ok(status) => {
-
                    if let Some(refs) = NonEmpty::from_vec(status.fresh) {
-
                        self.fetch_refs_at(rid, from, refs, FETCH_TIMEOUT, channel);
-
                        return;
-
                    } else {
-
                        trace!(target: "service", "Skipping dequeued fetch for {rid}, all refs are already in local storage");
-
                    }
-
                }
-
                Err(e) => {
-
                    error!(target: "service", "Error getting the refs status of {rid}: {e}");
-
                    return;
+
            if let Some(refs) = NonEmpty::from_vec(refs_at) {
+
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, FETCH_TIMEOUT, channel) {
+
                    break;
                }
+
            } else {
+
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
            }
        }
    }
@@ -1475,15 +1487,15 @@ where
                };

                // Finally, if there's anything to fetch, we fetch it from the remote.
-
                if let Some(refs) = NonEmpty::from_vec(
-
                    message
-
                        .refs
-
                        .iter()
-
                        .filter(|r| r.remote != self.node_id()) // Don't fetch our own refs.
-
                        .cloned()
-
                        .collect(),
-
                ) {
-
                    self.fetch_refs_at(message.rid, remote.id, refs, FETCH_TIMEOUT, None);
+
                if let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) {
+
                    self.fetch_refs_at(
+
                        message.rid,
+
                        remote.id,
+
                        refs,
+
                        repo_entry.scope,
+
                        FETCH_TIMEOUT,
+
                        None,
+
                    );
                } else {
                    debug!(target: "service", "Skipping fetch, no remote refs in announcement for {}", message.rid);
                }
@@ -1685,31 +1697,30 @@ where
    fn refs_status_of(
        &self,
        rid: RepoId,
-
        refs: Vec<RefsAt>,
+
        refs: NonEmpty<RefsAt>,
        scope: &policy::Scope,
    ) -> Result<RefsStatus, Error> {
        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
-
        // First, check the freshness.
-
        if refs.fresh.is_empty() {
+
        // Check that there's something we want.
+
        if refs.want.is_empty() {
            return Ok(refs);
        }
-
        // Second, check the scope.
-
        match scope {
-
            policy::Scope::All => Ok(refs),
-
            policy::Scope::Followed => {
-
                match self.policies.namespaces_for(&self.storage, &rid) {
-
                    Ok(Namespaces::All) => Ok(refs),
-
                    Ok(Namespaces::Followed(mut followed)) => {
-
                        // Get the set of followed nodes except self.
-
                        followed.remove(self.nid());
-
                        refs.fresh.retain(|r| followed.contains(&r.remote));
-

-
                        Ok(refs)
-
                    }
-
                    Err(e) => Err(e.into()),
+
        // Check scope.
+
        let mut refs = match scope {
+
            policy::Scope::All => refs,
+
            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
+
                Ok(Namespaces::All) => refs,
+
                Ok(Namespaces::Followed(followed)) => {
+
                    refs.want.retain(|r| followed.contains(&r.remote));
+
                    refs
                }
-
            }
-
        }
+
                Err(e) => return Err(e.into()),
+
            },
+
        };
+
        // Remove our own remote, we don't want to fetch that.
+
        refs.want.retain(|r| r.remote != self.node_id());
+

+
        Ok(refs)
    }

    /// Set of initial messages to send to a peer.
modified radicle-node/src/service/message.rs
@@ -1,5 +1,6 @@
use std::{fmt, io, mem};

+
use nonempty::NonEmpty;
use radicle::git;
use radicle::storage::refs::RefsAt;

@@ -167,14 +168,14 @@ pub struct RefsAnnouncement {
#[derive(Default)]
pub struct RefsStatus {
    /// The `rad/sigrefs` was missing or it's ahead of the local
-
    /// `rad/sigrefs`.
-
    pub fresh: Vec<RefsAt>,
-
    /// The `rad/sigrefs` has been seen before.
-
    pub stale: Vec<RefsAt>,
+
    /// `rad/sigrefs`. We want it.
+
    pub want: Vec<RefsAt>,
+
    /// The `rad/sigrefs` has been seen before. We already have it.
+
    pub have: Vec<RefsAt>,
}

impl RefsStatus {
-
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
+
    /// Get the set of `want` and `have` `RefsAt`'s for the given
    /// announcement.
    ///
    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
@@ -183,7 +184,7 @@ impl RefsStatus {
    /// and old refs announcements will be discarded due to their lower timestamps.
    pub fn new<D: node::refs::Store>(
        rid: RepoId,
-
        refs: Vec<RefsAt>,
+
        refs: NonEmpty<RefsAt>,
        db: &D,
    ) -> Result<RefsStatus, storage::Error> {
        let mut status = RefsStatus::default();
@@ -202,13 +203,13 @@ impl RefsStatus {
        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
            Ok(Some((ours, _))) => {
                if theirs.at != ours {
-
                    self.fresh.push(theirs);
+
                    self.want.push(theirs);
                } else {
-
                    self.stale.push(theirs);
+
                    self.have.push(theirs);
                }
            }
            Ok(None) => {
-
                self.fresh.push(theirs);
+
                self.want.push(theirs);
            }
            Err(e) => {
                log::warn!(