Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: fetch refs announcements
Fintan Halpenny committed 2 years ago
commit dd16356e4e1a53d26dcaa1c07da44faa080cac0d
parent d27cd0db0c4d5ae4c2f21ab9c4354e532268ed64
7 files changed +204 -40
modified radicle-fetch/src/lib.rs
@@ -17,6 +17,7 @@ pub use transport::Transport;
use std::io;

use radicle::crypto::PublicKey;
+
use radicle::storage::refs::RefsAt;
use state::FetchState;
use thiserror::Error;

@@ -49,6 +50,7 @@ pub fn pull<S>(
    handle: &mut Handle<S>,
    limit: FetchLimit,
    remote: PublicKey,
+
    refs_at: Option<Vec<RefsAt>>,
) -> Result<FetchResult, Error>
where
    S: transport::ConnectionStream,
@@ -65,7 +67,7 @@ where
    // N.b. ensure that we ignore the local peer's key.
    handle.blocked.extend([local]);
    state
-
        .run(handle, &handshake, limit, remote)
+
        .run(handle, &handshake, limit, remote, refs_at)
        .map_err(Error::Protocol)
}

@@ -101,6 +103,6 @@ where
        .map_err(|e| Error::from(state::error::Protocol::from(e)))?;

    state
-
        .run(handle, &handshake, limit, remote)
+
        .run(handle, &handshake, limit, remote, None)
        .map_err(Error::Protocol)
}
modified radicle-fetch/src/stage.rs
@@ -36,6 +36,7 @@ use nonempty::NonEmpty;
use radicle::crypto::PublicKey;
use radicle::git::{refname, Component, Namespaced, Qualified};
use radicle::storage::git::Repository;
+
use radicle::storage::refs::{RefsAt, Special};
use radicle::storage::ReadRepository;

use crate::git::refs::{Policy, Update, Updates};
@@ -294,6 +295,91 @@ impl ProtocolStage for SpecialRefs {
    }
}

+
/// The [`ProtocolStage`] for fetching announce `rad/sigrefs`.
+
///
+
/// This step will ask for the `rad/sigrefs` for the remotes of
+
/// `refs_at`.
+
#[derive(Debug)]
+
pub struct SigrefsAt {
+
    /// The set of nodes that should be blocked from fetching.
+
    pub blocked: BlockList,
+
    /// The node that is being fetched from.
+
    pub remote: PublicKey,
+
    /// The set of remotes and the newly announced `Oid` for their
+
    /// `rad/sigrefs`.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The set of delegates to be fetched, with the local node
+
    /// removed in the case of a `pull`.
+
    pub delegates: BTreeSet<PublicKey>,
+
    /// The data limit for this stage of fetching.
+
    pub limit: u64,
+
}
+

+
impl ProtocolStage for SigrefsAt {
+
    fn ls_refs(&self) -> Option<NonEmpty<BString>> {
+
        // N.b. the `Oid`s are known but the `rad/sigrefs` are still
+
        // asked for to mark them for updating the fetch state.
+
        NonEmpty::collect(self.refs_at.iter().map(|refs_at| {
+
            BString::from(radicle::git::refs::storage::sigrefs(&refs_at.remote).to_string())
+
        }))
+
    }
+

+
    // We only asked for `rad/sigrefs` so we should only get
+
    // `rad/sigrefs`.
+
    fn ref_filter(&self, r: Ref) -> Option<ReceivedRef> {
+
        let (refname, tip) = refs::unpack_ref(r).ok()?;
+
        match refname {
+
            ReceivedRefname::Namespaced { remote, .. } if self.blocked.is_blocked(&remote) => None,
+
            ReceivedRefname::Namespaced {
+
                suffix: Either::Left(Special::SignedRefs),
+
                ..
+
            } => Some(ReceivedRef::new(tip, refname)),
+
            ReceivedRefname::Namespaced { .. } | ReceivedRefname::RadId => None,
+
        }
+
    }
+

+
    fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
+
        Ok(())
+
    }
+

+
    fn wants_haves(
+
        &self,
+
        refdb: &Repository,
+
        refs: &[ReceivedRef],
+
    ) -> Result<WantsHaves, error::WantsHaves> {
+
        let mut wants_haves = WantsHaves::default();
+
        let sigrefs = self
+
            .refs_at
+
            .iter()
+
            .map(|RefsAt { remote, at }| (Special::SignedRefs.namespaced(remote), *at));
+
        wants_haves.add(refdb, sigrefs)?;
+
        wants_haves.add(
+
            refdb,
+
            refs.iter().map(|recv| (recv.to_qualified(), recv.tip)),
+
        )?;
+
        Ok(wants_haves)
+
    }
+

+
    fn prepare_updates<'a>(
+
        &self,
+
        _s: &FetchState,
+
        _repo: &Repository,
+
        _refs: &'a [ReceivedRef],
+
    ) -> Result<Updates<'a>, error::Prepare> {
+
        let mut updates = Updates::default();
+
        for RefsAt { remote, at } in self.refs_at.iter() {
+
            if let Some(up) =
+
                refs::special_update(remote, &Either::Left(Special::SignedRefs), *at, |remote| {
+
                    self.delegates.contains(remote)
+
                })
+
            {
+
                updates.add(*remote, up);
+
            }
+
        }
+
        Ok(updates)
+
    }
+
}
+

/// The [`ProtocolStage`] for fetching data refs from the set of
/// remotes in `trusted`.
///
modified radicle-fetch/src/state.rs
@@ -7,6 +7,7 @@ use radicle::identity::{Doc, DocError};

use radicle::prelude::Verified;
use radicle::storage;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{
    git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
};
@@ -271,6 +272,85 @@ impl FetchState {
        Ok(fetched)
    }

+
    /// Fetch the set of special refs, depending on `refs_at`.
+
    ///
+
    /// If `refs_at` is `Some`, then run the [`SigrefsAt`] stage,
+
    /// which specifically fetches `rad/sigrefs` which are listed in
+
    /// `refs_at`.
+
    ///
+
    /// If `refs_at` is `None`, then run the [`SpecialRefs`] stage,
+
    /// which fetches `rad/sigrefs` and `rad/id` from all tracked and
+
    /// delegate peers (scope dependent).
+
    ///
+
    /// The resulting [`sigrefs::RemoteRefs`] will be the set of
+
    /// `rad/sigrefs` of the fetched remotes.
+
    fn run_special_refs<S>(
+
        &mut self,
+
        handle: &mut Handle<S>,
+
        handshake: &handshake::Outcome,
+
        delegates: BTreeSet<PublicKey>,
+
        limit: &FetchLimit,
+
        remote: PublicKey,
+
        refs_at: Option<Vec<RefsAt>>,
+
    ) -> Result<sigrefs::RemoteRefs, error::Protocol>
+
    where
+
        S: transport::ConnectionStream,
+
    {
+
        match refs_at {
+
            Some(refs_at) => {
+
                let (must, may): (BTreeSet<PublicKey>, BTreeSet<PublicKey>) = refs_at
+
                    .iter()
+
                    .map(|refs_at| refs_at.remote)
+
                    .partition(|id| delegates.contains(id));
+

+
                let sigrefs_at = stage::SigrefsAt {
+
                    remote,
+
                    delegates,
+
                    refs_at,
+
                    blocked: handle.blocked.clone(),
+
                    limit: limit.special,
+
                };
+
                log::trace!(target: "fetch", "{sigrefs_at:?}");
+
                self.run_stage(handle, handshake, &sigrefs_at)?;
+

+
                let signed_refs = sigrefs::RemoteRefs::load(
+
                    &self.as_cached(handle),
+
                    sigrefs::Select {
+
                        must: &must,
+
                        may: &may,
+
                    },
+
                )?;
+
                Ok(signed_refs)
+
            }
+
            None => {
+
                let tracked = handle.tracked();
+
                log::trace!(target: "fetch", "Tracked nodes {:?}", tracked);
+
                let special_refs = stage::SpecialRefs {
+
                    blocked: handle.blocked.clone(),
+
                    remote,
+
                    delegates: delegates.clone(),
+
                    tracked,
+
                    limit: limit.special,
+
                };
+
                log::trace!(target: "fetch", "{special_refs:?}");
+
                let fetched = self.run_stage(handle, handshake, &special_refs)?;
+

+
                let signed_refs = sigrefs::RemoteRefs::load(
+
                    &self.as_cached(handle),
+
                    sigrefs::Select {
+
                        must: &delegates,
+
                        may: &fetched
+
                            .iter()
+
                            .filter(|id| !delegates.contains(id))
+
                            .copied()
+
                            .collect(),
+
                    },
+
                )?;
+
                Ok(signed_refs)
+
            }
+
        }
+
    }
+

    /// The finalization of the protocol exchange is as follows:
    ///
    ///   1. Load the canonical `rad/id` to use as the anchor for
@@ -293,6 +373,7 @@ impl FetchState {
        handshake: &handshake::Outcome,
        limit: FetchLimit,
        remote: PublicKey,
+
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Protocol>
    where
        S: transport::ConnectionStream,
@@ -317,31 +398,14 @@ impl FetchState {

        log::trace!(target: "fetch", "Identity delegates {delegates:?}");

-
        let tracked = handle.tracked();
-
        log::trace!(target: "fetch", "Tracked nodes {:?}", tracked);
-

-
        let special_refs = stage::SpecialRefs {
-
            blocked: handle.blocked.clone(),
+
        let signed_refs = self.run_special_refs(
+
            handle,
+
            handshake,
+
            delegates.clone(),
+
            &limit,
            remote,
-
            delegates: delegates.clone(),
-
            tracked,
-
            limit: limit.special,
-
        };
-
        log::trace!(target: "fetch", "{special_refs:?}");
-
        let fetched = self.run_stage(handle, handshake, &special_refs)?;
-

-
        let signed_refs = sigrefs::RemoteRefs::load(
-
            &self.as_cached(handle),
-
            sigrefs::Select {
-
                must: &delegates,
-
                may: &fetched
-
                    .iter()
-
                    .filter(|id| !delegates.contains(id))
-
                    .copied()
-
                    .collect(),
-
            },
+
            refs_at,
        )?;
-
        log::trace!(target: "fetch", "{signed_refs:?}");

        let data_refs = stage::DataRefs {
            remote,
@@ -360,6 +424,12 @@ impl FetchState {
        let mut failures = sigrefs::Validations::default();
        let signed_refs = data_refs.remotes;

+
        // We may prune fetched remotes, so we keep track of
+
        // non-pruned, fetched remotes here.
+
        let mut remotes = BTreeSet::new();
+

+
        // TODO(finto): this might read better if it got its own
+
        // private function.
        for remote in signed_refs.keys() {
            if handle.is_blocked(remote) {
                continue;
@@ -403,6 +473,8 @@ impl FetchState {
                        );
                        self.prune(&remote);
                        warnings.append(warns);
+
                    } else {
+
                        remotes.insert(remote);
                    }
                }
                sigrefs::DelegateStatus::Delegate {
@@ -429,6 +501,8 @@ impl FetchState {
                        log::warn!(target: "fetch", "Pruning delegate {remote} tips, due to validation failures");
                        self.prune(&remote);
                        failures.append(fails)
+
                    } else {
+
                        remotes.insert(remote);
                    }
                }
            }
@@ -448,7 +522,7 @@ impl FetchState {
            )?;
            Ok(FetchResult::Success {
                applied,
-
                remotes: fetched,
+
                remotes,
                warnings,
            })
        } else {
modified radicle-node/src/service/io.rs
@@ -1,8 +1,7 @@
-
use std::collections::{HashMap, VecDeque};
+
use std::collections::VecDeque;
use std::time;

use log::*;
-
use radicle::git;
use radicle::storage::refs::RefsAt;

use crate::prelude::*;
@@ -29,8 +28,8 @@ pub enum Io {
        remote: NodeId,
        /// Namespaces being fetched.
        namespaces: Namespaces,
-
        /// If a refs announcements was made.
-
        refs_at: Option<HashMap<NodeId, git::Oid>>,
+
        /// If the node is fetching specific `rad/sigrefs`.
+
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -92,13 +91,7 @@ impl Outbox {
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
-
        let refs_at = {
-
            let refs = refs_at
-
                .into_iter()
-
                .map(|RefsAt { remote, at }| (remote, at))
-
                .collect::<HashMap<_, _>>();
-
            (!refs.is_empty()).then_some(refs)
-
        };
+
        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
        self.io.push_back(Io::Fetch {
            rid,
            namespaces,
modified radicle-node/src/wire/protocol.rs
@@ -828,6 +828,7 @@ where
                    rid,
                    remote,
                    timeout,
+
                    refs_at,
                    ..
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
@@ -850,6 +851,7 @@ where
                        fetch: FetchRequest::Initiator {
                            rid,
                            remote,
+
                            refs_at,
                            timeout,
                        },
                        stream,
modified radicle-node/src/worker.rs
@@ -11,6 +11,7 @@ use crossbeam_channel as chan;

use radicle::identity::Id;
use radicle::prelude::NodeId;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{crypto, git, Storage};
use radicle_fetch::FetchLimit;
@@ -95,6 +96,8 @@ pub enum FetchRequest {
        rid: Id,
        /// Remote peer we are interacting with.
        remote: NodeId,
+
        /// If this fetch is for a particular set of `rad/sigrefs`.
+
        refs_at: Option<Vec<RefsAt>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -215,11 +218,12 @@ impl Worker {
            FetchRequest::Initiator {
                rid,
                remote,
+
                refs_at,
                // TODO: nowhere to use this currently
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
-
                let result = self.fetch(rid, remote, channels);
+
                let result = self.fetch(rid, remote, refs_at, channels);
                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
@@ -265,6 +269,7 @@ impl Worker {
        &mut self,
        rid: Id,
        remote: NodeId,
+
        refs_at: Option<Vec<RefsAt>>,
        channels: channels::ChannelsFlush,
    ) -> Result<fetch::FetchResult, FetchError> {
        let FetchConfig {
@@ -292,7 +297,7 @@ impl Worker {
            channels,
        )?;

-
        Ok(handle.fetch(rid, &self.storage, *limit, remote)?)
+
        Ok(handle.fetch(rid, &self.storage, *limit, remote, refs_at)?)
    }
}

modified radicle-node/src/worker/fetch.rs
@@ -6,6 +6,7 @@ use radicle::crypto::PublicKey;
use radicle::git::UserInfo;
use radicle::prelude::Id;
use radicle::storage::git::Repository;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadStorage as _, RefUpdate, WriteRepository as _};
use radicle::Storage;
use radicle_fetch::{BlockList, FetchLimit, Tracked};
@@ -59,6 +60,7 @@ impl Handle {
        storage: &Storage,
        limit: FetchLimit,
        remote: PublicKey,
+
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Fetch> {
        let result = match self {
            Self::Clone { mut handle, tmp } => {
@@ -69,7 +71,7 @@ impl Handle {
            }
            Self::Pull { mut handle } => {
                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
-
                radicle_fetch::pull(&mut handle, limit, remote)?
+
                radicle_fetch::pull(&mut handle, limit, remote, refs_at)?
            }
        };