Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
fetch: Prune remotes with sigrefs failures
Fintan Halpenny committed 1 month ago
commit 725ced09d54b4a86a863f0772dcef8749d5f476f
parent 6967bf8
4 files changed +91 -85
modified crates/radicle-fetch/src/sigrefs.rs
@@ -1,5 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
-
use std::ops::{Deref, Not as _};
+
use std::ops::Not as _;

use radicle::storage::git::Repository;
pub use radicle::storage::refs::SignedRefsAt;
@@ -36,13 +36,6 @@ pub(crate) enum DelegateStatus<T = ()> {
    NonDelegate { remote: PublicKey, data: T },
}

-
impl DelegateStatus {
-
    /// Construct a `DelegateStatus` without any data.
-
    pub fn empty(remote: PublicKey, delegates: &BTreeSet<PublicKey>) -> Self {
-
        Self::new((), remote, delegates)
-
    }
-
}
-

impl<T> DelegateStatus<T> {
    pub fn new(data: T, remote: PublicKey, delegates: &BTreeSet<PublicKey>) -> Self {
        if delegates.contains(&remote) {
@@ -51,42 +44,6 @@ impl<T> DelegateStatus<T> {
            Self::NonDelegate { remote, data }
        }
    }
-

-
    /// Construct a `DelegateStatus` with [`SignedRefsAt`] signed reference
-
    /// data, if it can be found in `repo`.
-
    pub fn load<R, S>(
-
        self,
-
        cached: &Cached<R, S>,
-
    ) -> Result<
-
        DelegateStatus<Option<SignedRefsAt>>,
-
        radicle::storage::refs::sigrefs::read::error::Read,
-
    >
-
    where
-
        R: AsRef<Repository>,
-
    {
-
        let remote = *self.remote();
-
        self.traverse(|_| cached.load(&remote))
-
    }
-

-
    fn remote(&self) -> &PublicKey {
-
        match self {
-
            Self::Delegate { remote, .. } => remote,
-
            Self::NonDelegate { remote, .. } => remote,
-
        }
-
    }
-

-
    fn traverse<U, E>(self, f: impl FnOnce(T) -> Result<U, E>) -> Result<DelegateStatus<U>, E> {
-
        match self {
-
            Self::Delegate { remote, data } => Ok(DelegateStatus::Delegate {
-
                remote,
-
                data: f(data)?,
-
            }),
-
            Self::NonDelegate { remote, data } => Ok(DelegateStatus::NonDelegate {
-
                remote,
-
                data: f(data)?,
-
            }),
-
        }
-
    }
}

pub(crate) fn validate(
@@ -102,45 +59,52 @@ pub(crate) fn validate(
///
/// Construct using [`RemoteRefs::load`].
#[derive(Debug, Default)]
-
pub struct RemoteRefs(BTreeMap<PublicKey, SignedRefsAt>);
+
pub struct RemoteRefs(
+
    pub(super)  BTreeMap<
+
        PublicKey,
+
        Result<Option<SignedRefsAt>, radicle::storage::refs::sigrefs::read::error::Read>,
+
    >,
+
);

impl RemoteRefs {
    /// Load the sigrefs for each remote in `remotes`.
-
    ///
-
    /// If the sigrefs are missing for a given remote, regardless of delegate
-
    /// status, then that remote is filtered out.
    pub(crate) fn load<'a, R, S>(
        cached: &Cached<R, S>,
        remotes: impl Iterator<Item = &'a PublicKey>,
-
    ) -> Result<Self, error::RemoteRefs>
+
    ) -> Self
    where
        R: AsRef<Repository>,
    {
-
        remotes
-
            .filter_map(|id| match cached.load(id) {
-
                Ok(None) => None,
-
                Ok(Some(sr)) => Some(Ok((id, sr))),
-
                Err(e) => Some(Err(e)),
-
            })
-
            .try_fold(RemoteRefs::default(), |mut acc, remote_refs| {
-
                let (id, sigrefs) = remote_refs?;
-
                acc.0.insert(*id, sigrefs);
-
                Ok(acc)
-
            })
+
        Self(
+
            remotes
+
                .map(|remote| (*remote, cached.load(remote)))
+
                .collect(),
+
        )
    }
-
}

-
impl Deref for RemoteRefs {
-
    type Target = BTreeMap<PublicKey, SignedRefsAt>;
+
    pub(crate) fn len(&self) -> usize {
+
        self.0.len()
+
    }

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
+
    pub(crate) fn into_inner(
+
        self,
+
    ) -> BTreeMap<
+
        PublicKey,
+
        Result<Option<SignedRefsAt>, radicle::storage::refs::sigrefs::read::error::Read>,
+
    > {
+
        self.0
    }
}

impl<'a> IntoIterator for &'a RemoteRefs {
-
    type Item = <&'a BTreeMap<PublicKey, SignedRefsAt> as IntoIterator>::Item;
-
    type IntoIter = <&'a BTreeMap<PublicKey, SignedRefsAt> as IntoIterator>::IntoIter;
+
    type Item = <&'a BTreeMap<
+
        PublicKey,
+
        Result<Option<SignedRefsAt>, radicle::storage::refs::sigrefs::read::error::Read>,
+
    > as IntoIterator>::Item;
+
    type IntoIter = <&'a BTreeMap<
+
        PublicKey,
+
        Result<Option<SignedRefsAt>, radicle::storage::refs::sigrefs::read::error::Read>,
+
    > as IntoIterator>::IntoIter;

    fn into_iter(self) -> Self::IntoIter {
        self.0.iter()
modified crates/radicle-fetch/src/stage.rs
@@ -45,7 +45,7 @@ use radicle::storage::ReadRepository;
use crate::git::refs::{Policy, Update, Updates};
use crate::policy::BlockList;
use crate::refs::{ReceivedRef, ReceivedRefname};
-
use crate::sigrefs;
+
use crate::sigrefs::RemoteRefs;
use crate::state::FetchState;
use crate::transport::WantsHaves;
use crate::{policy, refs};
@@ -482,12 +482,18 @@ pub struct DataRefs {
    pub remote: PublicKey,
    /// The set of signed references from each remote that was
    /// fetched.
-
    pub remotes: sigrefs::RemoteRefs,
+
    pub remotes: RemoteRefs,
    /// The data limit for this stage of fetching.
    #[allow(dead_code)]
    pub limit: u64,
}

+
impl DataRefs {
+
    pub(crate) fn into_remote_refs(self) -> RemoteRefs {
+
        self.remotes
+
    }
+
}
+

impl ProtocolStage for DataRefs {
    // We don't need to ask for refs since we have all reference names
    // and `Oid`s in `rad/sigrefs`.
@@ -514,10 +520,13 @@ impl ProtocolStage for DataRefs {
    ) -> Result<WantsHaves, error::WantsHaves> {
        let mut wants_haves = WantsHaves::default();

-
        for (remote, loaded) in &self.remotes {
+
        for (remote, result) in self.remotes.into_iter() {
+
            let Ok(Some(refs)) = result else {
+
                continue;
+
            };
            wants_haves.add(
                refdb,
-
                loaded.refs().iter().filter_map(|(refname, tip)| {
+
                refs.iter().filter_map(|(refname, tip)| {
                    let refname = Qualified::from_refstr(refname)
                        .map(|refname| refname.with_namespace(Component::from(remote)))?;
                    Some((refname, *tip))
@@ -536,7 +545,10 @@ impl ProtocolStage for DataRefs {
    ) -> Result<Updates<'a>, error::Prepare> {
        let mut updates = Updates::default();

-
        for (remote, refs) in &self.remotes {
+
        for (remote, result) in &self.remotes {
+
            let Ok(Some(refs)) = result else {
+
                continue;
+
            };
            let mut signed = HashSet::with_capacity(refs.refs().len());
            for (name, tip) in refs.iter() {
                let tracking: Namespaced<'_> = Qualified::from_refstr(name)
modified crates/radicle-fetch/src/state.rs
@@ -313,7 +313,7 @@ impl FetchState {
                self.run_stage(handle, handshake, &sigrefs_at)?;
                let remotes = refs_at.iter().map(|r| &r.remote);

-
                let signed_refs = sigrefs::RemoteRefs::load(&self.as_cached(handle), remotes)?;
+
                let signed_refs = sigrefs::RemoteRefs::load(&self.as_cached(handle), remotes);
                Ok(signed_refs)
            }
            None => {
@@ -333,7 +333,7 @@ impl FetchState {
                let signed_refs = sigrefs::RemoteRefs::load(
                    &self.as_cached(handle),
                    fetched.iter().chain(delegates.iter()),
-
                )?;
+
                );
                Ok(signed_refs)
            }
        }
@@ -418,6 +418,7 @@ impl FetchState {
            remote,
            refs_at,
        )?;
+

        log::debug!(
            "Fetched data for {} remote(s) ({}ms)",
            signed_refs.len(),
@@ -429,10 +430,10 @@ impl FetchState {
            remotes: signed_refs,
            limit: limit.refs,
        };
-
        self.run_stage(handle, handshake, &data_refs)?;
+
        let fetched = self.run_stage(handle, handshake, &data_refs)?;
        log::debug!(
            "Fetched data refs for {} remotes ({}ms)",
-
            data_refs.remotes.len(),
+
            fetched.len(),
            start.elapsed().as_millis()
        );

@@ -450,7 +451,8 @@ impl FetchState {
        // remotes from the tips, thus not updating the production Git
        // repository.
        let mut failures = sigrefs::Validations::default();
-
        let signed_refs = data_refs.remotes;
+

+
        let signed_refs = data_refs.into_remote_refs();

        // We may prune fetched remotes, so we keep track of
        // non-pruned, fetched remotes here.
@@ -469,21 +471,26 @@ impl FetchState {

        // TODO(finto): this might read better if it got its own
        // private function.
-
        for remote in signed_refs.keys() {
-
            if handle.is_blocked(remote) {
+
        for (remote, refs) in signed_refs.into_inner() {
+
            if handle.is_blocked(&remote) {
                log::trace!("Skipping blocked remote {remote}");
                continue;
            }

-
            let remote = sigrefs::DelegateStatus::empty(*remote, &delegates)
-
                .load(&self.as_cached(handle))?;
+
            let remote = sigrefs::DelegateStatus::new(refs, remote, &delegates);
            match remote {
-
                sigrefs::DelegateStatus::NonDelegate { remote, data: None } => {
+
                sigrefs::DelegateStatus::NonDelegate {
+
                    remote,
+
                    data: Ok(None),
+
                } => {
                    log::debug!("Pruning non-delegate {remote} tips, missing 'rad/sigrefs'");
                    failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
                    self.prune(&remote);
                }
-
                sigrefs::DelegateStatus::Delegate { remote, data: None } => {
+
                sigrefs::DelegateStatus::Delegate {
+
                    remote,
+
                    data: Ok(None),
+
                } => {
                    log::debug!("Pruning delegate {remote} tips, missing 'rad/sigrefs'");
                    failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
                    self.prune(&remote);
@@ -495,9 +502,26 @@ impl FetchState {
                    valid_delegates.remove(&remote);
                    failed_delegates.insert(remote);
                }
+
                sigrefs::DelegateStatus::Delegate {
+
                    remote,
+
                    data: Err(err),
+
                }
+
                | sigrefs::DelegateStatus::NonDelegate {
+
                    remote,
+
                    data: Err(err),
+
                } => {
+
                    log::debug!("Pruning {remote} tips due to: {err}");
+
                    self.prune(&remote);
+
                    valid_delegates.remove(&remote);
+
                    failed_delegates.insert(remote);
+
                    failures.push(sigrefs::Validation::Read {
+
                        remote,
+
                        source: err,
+
                    });
+
                }
                sigrefs::DelegateStatus::NonDelegate {
                    remote,
-
                    data: Some(sigrefs),
+
                    data: Ok(Some(sigrefs)),
                } => {
                    if let Some(SignedRefsAt { at, .. }) =
                        SignedRefsAt::load(remote, handle.repository())?
@@ -527,7 +551,7 @@ impl FetchState {
                }
                sigrefs::DelegateStatus::Delegate {
                    remote,
-
                    data: Some(sigrefs),
+
                    data: Ok(Some(sigrefs)),
                } => {
                    if let Some(SignedRefsAt { at, .. }) =
                        SignedRefsAt::load(remote, handle.repository())?
modified crates/radicle/src/storage/git.rs
@@ -410,6 +410,12 @@ pub enum Validation {
    },
    #[error("missing `refs/namespaces/{0}/refs/rad/sigrefs`")]
    MissingRadSigRefs(RemoteId),
+
    #[error("failed to read `refs/namespaces/{remote}/refs/rad/sigrefs`: {source}")]
+
    Read {
+
        remote: RemoteId,
+
        #[source]
+
        source: crate::storage::refs::sigrefs::read::error::Read,
+
    },
}

impl Repository {