Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Fetch-related improvements
Merged did:key:z6MksFqX...wzpT opened 2 years ago

See commits.

8 files changed +121 -103 51e64cfa bc247dff
modified radicle-cli/src/commands/sync.rs
@@ -11,7 +11,7 @@ use radicle::node::AliasStore;
use radicle::node::Seed;
use radicle::node::{FetchResult, FetchResults, Handle as _, Node, SyncStatus};
use radicle::prelude::{NodeId, Profile, RepoId};
-
use radicle::storage::ReadStorage;
+
use radicle::storage::{ReadStorage, RemoteRepository};
use radicle_term::Element;

use crate::node::SyncReporting;
@@ -388,6 +388,16 @@ fn announce_refs(
            "nothing to announce, repository {rid} is not available locally"
        ));
    };
+
    if let Err(e) = repo.remote(&profile.public_key) {
+
        if e.is_not_found() {
+
            term::print(term::format::italic(
+
                "Nothing to announce, you don't have a fork of this repository.",
+
            ));
+
            return Ok(());
+
        } else {
+
            return Err(anyhow!("failed to load local fork of {rid}: {e}"));
+
        }
+
    }

    crate::node::announce(
        &repo,
modified radicle-node/src/service.rs
@@ -830,17 +830,32 @@ where
        }
    }

-
    /// Initiate an outgoing fetch for some repository, based on
-
    /// another node's announcement.
+
    /// Initiate an outgoing fetch for some repository, based on another node's announcement.
+
    /// Returns `true` if the fetch was initiated and `false` if it was skipped.
    fn fetch_refs_at(
        &mut self,
        rid: RepoId,
        from: NodeId,
        refs: NonEmpty<RefsAt>,
+
        scope: Scope,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) {
-
        self._fetch(rid, from, refs.into(), timeout, channel)
+
    ) -> bool {
+
        match self.refs_status_of(rid, refs, &scope) {
+
            Ok(status) => {
+
                if status.want.is_empty() {
+
                    debug!(target: "service", "Skipping  fetch for {rid}, all refs are already in storage");
+
                } else {
+
                    self._fetch(rid, from, status.want, timeout, channel);
+
                    return true;
+
                }
+
            }
+
            Err(e) => {
+
                error!(target: "service", "Error getting the refs status of {rid}: {e}");
+
            }
+
        }
+
        // We didn't try to fetch anything.
+
        false
    }

    /// Initiate an outgoing fetch for some repository.
@@ -947,12 +962,7 @@ where
            refs_at: refs_at.clone(),
            subscribers: vec![],
        });
-
        let namespaces = self.policies.namespaces_for(&self.storage, &rid)?;
-

-
        self.outbox
-
            .fetch(session, rid, namespaces, refs_at, timeout);
-

-
        debug!(target: "service", "Fetch initiated for {rid} with {}..", session.id);
+
        self.outbox.fetch(session, rid, refs_at, timeout);

        Ok(fetching)
    }
@@ -1077,19 +1087,12 @@ where
                .seed_policy(&rid)
                .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

-
            match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
-
                Ok(status) => {
-
                    if let Some(refs) = NonEmpty::from_vec(status.fresh) {
-
                        self.fetch_refs_at(rid, from, refs, FETCH_TIMEOUT, channel);
-
                        return;
-
                    } else {
-
                        trace!(target: "service", "Skipping dequeued fetch for {rid}, all refs are already in local storage");
-
                    }
-
                }
-
                Err(e) => {
-
                    error!(target: "service", "Error getting the refs status of {rid}: {e}");
-
                    return;
+
            if let Some(refs) = NonEmpty::from_vec(refs_at) {
+
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, FETCH_TIMEOUT, channel) {
+
                    break;
                }
+
            } else {
+
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
            }
        }
    }
@@ -1408,6 +1411,11 @@ where
                    refs: message.refs.to_vec(),
                    timestamp: message.timestamp,
                });
+
                // Empty announcements can be safely ignored.
+
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
+
                    debug!(target: "service", "Skipping fetch, no refs in announcement for {}", message.rid);
+
                    return Ok(false);
+
                };
                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
                if let Ok(result) =
@@ -1425,7 +1433,12 @@ where
                }

                // Update sync status of announcer for this repo.
-
                if let Some(refs) = message.refs.iter().find(|r| &r.remote == self.nid()) {
+
                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
+
                    debug!(
+
                        target: "service",
+
                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
+
                        message.rid, refs.at, message.timestamp
+
                    );
                    match self.db.seeds_mut().synced(
                        &message.rid,
                        announcer,
@@ -1444,6 +1457,12 @@ where
                                    remote: *announcer,
                                    at: refs.at,
                                });
+
                            } else {
+
                                debug!(
+
                                    target: "service",
+
                                    "Sync status of {announcer} was not updated for {}",
+
                                    message.rid,
+
                                );
                            }
                        }
                        Err(e) => {
@@ -1473,20 +1492,15 @@ where
                    );
                    return Ok(relay);
                };
-

-
                // Finally, if there's anything to fetch, we fetch it from the remote.
-
                if let Some(refs) = NonEmpty::from_vec(
-
                    message
-
                        .refs
-
                        .iter()
-
                        .filter(|r| r.remote != self.node_id()) // Don't fetch our own refs.
-
                        .cloned()
-
                        .collect(),
-
                ) {
-
                    self.fetch_refs_at(message.rid, remote.id, refs, FETCH_TIMEOUT, None);
-
                } else {
-
                    debug!(target: "service", "Skipping fetch, no remote refs in announcement for {}", message.rid);
-
                }
+
                // Finally, start the fetch.
+
                self.fetch_refs_at(
+
                    message.rid,
+
                    remote.id,
+
                    refs,
+
                    repo_entry.scope,
+
                    FETCH_TIMEOUT,
+
                    None,
+
                );
                return Ok(relay);
            }
            AnnouncementMessage::Node(
@@ -1685,31 +1699,30 @@ where
    fn refs_status_of(
        &self,
        rid: RepoId,
-
        refs: Vec<RefsAt>,
+
        refs: NonEmpty<RefsAt>,
        scope: &policy::Scope,
    ) -> Result<RefsStatus, Error> {
        let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
-
        // First, check the freshness.
-
        if refs.fresh.is_empty() {
+
        // Check that there's something we want.
+
        if refs.want.is_empty() {
            return Ok(refs);
        }
-
        // Second, check the scope.
-
        match scope {
-
            policy::Scope::All => Ok(refs),
-
            policy::Scope::Followed => {
-
                match self.policies.namespaces_for(&self.storage, &rid) {
-
                    Ok(Namespaces::All) => Ok(refs),
-
                    Ok(Namespaces::Followed(mut followed)) => {
-
                        // Get the set of followed nodes except self.
-
                        followed.remove(self.nid());
-
                        refs.fresh.retain(|r| followed.contains(&r.remote));
-

-
                        Ok(refs)
-
                    }
-
                    Err(e) => Err(e.into()),
+
        // Check scope.
+
        let mut refs = match scope {
+
            policy::Scope::All => refs,
+
            policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
+
                Ok(Namespaces::All) => refs,
+
                Ok(Namespaces::Followed(followed)) => {
+
                    refs.want.retain(|r| followed.contains(&r.remote));
+
                    refs
                }
-
            }
-
        }
+
                Err(e) => return Err(e.into()),
+
            },
+
        };
+
        // Remove our own remote, we don't want to fetch that.
+
        refs.want.retain(|r| r.remote != self.node_id());
+

+
        Ok(refs)
    }

    /// Set of initial messages to send to a peer.
@@ -1889,7 +1902,11 @@ where
        // updated while the node was stopped.
        // TODO: Move to `announce_own_refs`.
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
-
            info!(target: "service", "Announcing local refs for {rid} to peers ({})..", refs.at);
+
            info!(
+
                target: "service",
+
                "Announcing own refs for {rid} to peers ({}) (t={})..",
+
                refs.at, ann.timestamp()
+
            );

            if let Err(e) = self
                .db
modified radicle-node/src/service/io.rs
@@ -7,7 +7,6 @@ use radicle::storage::refs::RefsAt;
use crate::prelude::*;
use crate::service::session::Session;
use crate::service::Link;
-
use crate::storage::Namespaces;

use super::gossip;
use super::message::{Announcement, AnnouncementMessage};
@@ -27,8 +26,6 @@ pub enum Io {
        rid: RepoId,
        /// Remote node being fetched from.
        remote: NodeId,
-
        /// Namespaces being fetched.
-
        namespaces: Namespaces,
        /// If the node is fetching specific `rad/sigrefs`.
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
@@ -123,20 +120,28 @@ impl Outbox {

    pub fn fetch(
        &mut self,
-
        remote: &mut Session,
+
        peer: &mut Session,
        rid: RepoId,
-
        namespaces: Namespaces,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
-
        remote.fetching(rid);
+
        peer.fetching(rid);

        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
+

+
        if let Some(refs_at) = &refs_at {
+
            debug!(
+
                target: "service",
+
                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
+
            );
+
        } else {
+
            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
+
        }
+

        self.io.push_back(Io::Fetch {
            rid,
-
            namespaces,
            refs_at,
-
            remote: remote.id,
+
            remote: peer.id,
            timeout,
        });
    }
modified radicle-node/src/service/message.rs
@@ -1,5 +1,6 @@
use std::{fmt, io, mem};

+
use nonempty::NonEmpty;
use radicle::git;
use radicle::storage::refs::RefsAt;

@@ -167,14 +168,14 @@ pub struct RefsAnnouncement {
#[derive(Default)]
pub struct RefsStatus {
    /// The `rad/sigrefs` was missing or it's ahead of the local
-
    /// `rad/sigrefs`.
-
    pub fresh: Vec<RefsAt>,
-
    /// The `rad/sigrefs` has been seen before.
-
    pub stale: Vec<RefsAt>,
+
    /// `rad/sigrefs`. We want it.
+
    pub want: Vec<RefsAt>,
+
    /// The `rad/sigrefs` has been seen before. We already have it.
+
    pub have: Vec<RefsAt>,
}

impl RefsStatus {
-
    /// Get the set of `fresh` and `stale` `RefsAt`'s for the given
+
    /// Get the set of `want` and `have` `RefsAt`'s for the given
    /// announcement.
    ///
    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
@@ -183,7 +184,7 @@ impl RefsStatus {
    /// and old refs announcements will be discarded due to their lower timestamps.
    pub fn new<D: node::refs::Store>(
        rid: RepoId,
-
        refs: Vec<RefsAt>,
+
        refs: NonEmpty<RefsAt>,
        db: &D,
    ) -> Result<RefsStatus, storage::Error> {
        let mut status = RefsStatus::default();
@@ -202,13 +203,13 @@ impl RefsStatus {
        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
            Ok(Some((ours, _))) => {
                if theirs.at != ours {
-
                    self.fresh.push(theirs);
+
                    self.want.push(theirs);
                } else {
-
                    self.stale.push(theirs);
+
                    self.have.push(theirs);
                }
            }
            Ok(None) => {
-
                self.fresh.push(theirs);
+
                self.want.push(theirs);
            }
            Err(e) => {
                log::warn!(
modified radicle-node/src/test/peer.rs
@@ -28,7 +28,7 @@ use crate::service::policy::{Policy, Scope};
use crate::service::*;
use crate::storage::git::transport::remote;
use crate::storage::Inventory;
-
use crate::storage::{Namespaces, RemoteId, WriteStorage};
+
use crate::storage::{RemoteId, WriteStorage};
use crate::test::storage::MockStorage;
use crate::test::{arbitrary, fixtures, simulator};
use crate::wire::MessageType;
@@ -476,16 +476,10 @@ where
    }

    /// Get a draining iterator over the peer's I/O outbox, which only returns fetches.
-
    pub fn fetches(&mut self) -> impl Iterator<Item = (RepoId, NodeId, Namespaces)> + '_ {
+
    pub fn fetches(&mut self) -> impl Iterator<Item = (RepoId, NodeId)> + '_ {
        iter::from_fn(|| self.service.outbox().next()).filter_map(|io| {
-
            if let Io::Fetch {
-
                rid,
-
                remote,
-
                namespaces,
-
                ..
-
            } = io
-
            {
-
                Some((rid, remote, namespaces))
+
            if let Io::Fetch { rid, remote, .. } = io {
+
                Some((rid, remote))
            } else {
                None
            }
modified radicle-node/src/test/simulator.rs
@@ -627,12 +627,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    );
                }
            }
-
            Io::Fetch {
-
                rid,
-
                remote,
-
                namespaces,
-
                ..
-
            } => {
+
            Io::Fetch { rid, remote, .. } => {
                log::info!(
                    target: "sim",
                    "{:05} {} ~> {} ({}): Fetch outgoing",
@@ -663,10 +658,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                                remote,
                                Rc::new(Ok(fetch::FetchResult {
                                    updated: vec![],
-
                                    namespaces: match namespaces {
-
                                        Namespaces::Followed(hs) => hs,
-
                                        Namespaces::All => HashSet::new(),
-
                                    },
+
                                    namespaces: HashSet::new(),
                                    clone: true,
                                    doc: arbitrary::gen(1),
                                })),
modified radicle-node/src/tests.rs
@@ -1365,7 +1365,7 @@ fn test_queued_fetch_max_capacity() {
    alice.command(Command::Fetch(rid3, bob.id, DEFAULT_TIMEOUT, send3));

    // The first fetch is initiated.
-
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid1);
+
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid1);
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.outbox().next(), None);

@@ -1375,14 +1375,14 @@ fn test_queued_fetch_max_capacity() {
    // Finish the 1st fetch.
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid2);
+
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid2);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::new(doc)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid3);
+
    assert_matches!(alice.fetches().next(), Some((rid, _)) if rid == rid3);
}

#[test]
@@ -1421,7 +1421,7 @@ fn test_queued_fetch_from_ann_same_rid() {
    alice.receive(carol.id, carol.announcement(ann));

    // The first fetch is initiated.
-
    assert_matches!(alice.fetches().next(), Some((rid_, nid_, _)) if rid_ == rid && nid_ == bob.id);
+
    assert_matches!(alice.fetches().next(), Some((rid_, nid_)) if rid_ == rid && nid_ == bob.id);
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.fetches().next(), None);

@@ -1490,7 +1490,7 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.command(Command::Fetch(rid1, carol.id, DEFAULT_TIMEOUT, send3));

    // The first fetch is initiated.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == bob.id);
+
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == bob.id);
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
    assert_matches!(alice.outbox().next(), None);

@@ -1500,14 +1500,14 @@ fn test_queued_fetch_from_command_same_rid() {
    // Finish the 1st fetch.
    alice.fetched(rid1, bob.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == eve.id);
+
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == eve.id);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
    alice.fetched(rid1, eve.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
-
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == carol.id);
+
    assert_matches!(alice.fetches().next(), Some((rid, nid)) if rid == rid1 && nid == carol.id);
}

#[test]
modified radicle-node/src/wire/protocol.rs
@@ -964,7 +964,6 @@ where
                    remote,
                    timeout,
                    refs_at,
-
                    ..
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");