Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-fetch src stage.rs
// Weird lint, see <https://github.com/rust-lang/rust-clippy/issues/14275>
#![allow(clippy::doc_overindented_list_items)]

//! The Radicle fetch protocol can be split into two actions: `clone`
//! and `pull`. Each of these actions will interact with the server in
//! multiple stages, where each stage will perform a single roundtrip
//! of fetching. These stages are encapsulated in the
//! [`ProtocolStage`] trait.
//!
//! ### Clone
//!
//! A `clone` is split into three stages:
//!
//!   1. [`CanonicalId`]: fetches the canonical `refs/rad/id` to use
//!      as an anchor for the rest of the fetch, i.e. provides initial
//!      delegate data for the repository.
//!   2. [`SpecialRefs`]: fetches the special references, `rad/id` and
//!      `rad/sigrefs`, for each configured namespace, i.e. followed
//!      and delegate peers if the scope is "followed" and all peers is the
//!      scope is all.
//!   3. [`DataRefs`]: fetches the `Oid`s for each reference listed in
//!      the `rad/sigrefs` for each fetched peer in the
//!      [`SpecialRefs`] stage. Additionally, any references that have
//!      been removed from `rad/sigrefs` are marked for deletion.
//!
//! ### Pull
//!
//! A `pull` is split into two stages:
//!
//!   1. [`SpecialRefs`]: see above.
//!   2. [`DataRefs`]: see above.

use std::collections::{BTreeMap, BTreeSet, HashSet};

use bstr::{BStr, BString};
use either::Either;
use gix_protocol::handshake::Ref;
use nonempty::NonEmpty;
use radicle::crypto::PublicKey;
use radicle::git::fmt::{Component, Namespaced, Qualified, refname};
use radicle::storage::ReadRepository;
use radicle::storage::git::Repository;
use radicle::storage::refs::{RefsAt, Special};

use crate::git::refs::{Policy, Update, Updates};
use crate::policy::BlockList;
use crate::refs::{ReceivedRef, ReceivedRefname};
use crate::sigrefs::RemoteRefs;
use crate::state::FetchState;
use crate::transport::WantsHaves;
use crate::{policy, refs};

pub mod error {
    use radicle::crypto::PublicKey;
    use radicle::git::fmt::RefString;
    use thiserror::Error;

    use crate::transport::WantsHavesError;

    #[derive(Debug, Error)]
    pub enum Layout {
        #[error("missing required refs: {0:?}")]
        MissingRequiredRefs(Vec<String>),
        #[error("expected threshold of {threshold} of references, missing: {missing:?}")]
        InsufficientRefs {
            threshold: usize,
            missing: Vec<String>,
        },
    }

    #[derive(Debug, Error)]
    pub enum Prepare {
        #[error(transparent)]
        References(#[from] radicle::storage::Error),
        #[error("verification of rad/id for {remote} failed")]
        Verification {
            remote: PublicKey,
            #[source]
            err: Box<dyn std::error::Error + Send + Sync + 'static>,
        },
    }

    #[derive(Debug, Error)]
    pub enum WantsHaves {
        #[error(transparent)]
        WantsHavesAdd(#[from] WantsHavesError),
        #[error("expected namespaced ref {0}")]
        NotNamespaced(RefString),
    }
}

/// A `ref-prefix` used in the `ls-refs` step of the fetch protocol.
///
/// Since the Radicle protocol only wants to filter by very specific references,
/// this type captures the possible reference prefixes.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum RefPrefix {
    /// Represents `refs/rad/id`.
    RadId,
    /// Represents `"refs/namespaces/<namespace>/refs/rad/id"`.
    NamespacedRadId { namespace: PublicKey },
    /// Represents `"refs/namespaces/<namespace>/refs/rad/sigrefs"`.
    NamespacedRadSigrefs { namespace: PublicKey },
    /// Represents `"refs/namespaces"`
    AllNamespaces,
}

impl RefPrefix {
    /// Convert the [`RefPrefix`] into its equivalent [`BString`].
    ///
    /// See the [`RefPrefix`] variants for their [`BString`] values.
    pub fn into_bstring(self) -> BString {
        match self {
            RefPrefix::RadId => refs::REFS_RAD_ID.as_bstr().into(),
            RefPrefix::NamespacedRadId { namespace } => {
                radicle::git::refs::storage::id(&namespace).as_bstr().into()
            }
            RefPrefix::NamespacedRadSigrefs { namespace } => {
                radicle::git::refs::storage::sigrefs(&namespace)
                    .as_bstr()
                    .into()
            }
            RefPrefix::AllNamespaces => "refs/namespaces".into(),
        }
    }

    /// Convert the [`RefPrefix`] into its equivalent [`RefSpec`].
    ///
    /// See the [`RefPrefix`] variants for their [`BString`] values.
    ///
    /// # Panics
    ///
    /// This will panic if the reference as a [`BString`] value no longer parses
    /// in the upstream [`gix_refspec`] crate.
    ///
    /// [`RefSpec`]: gix_refspec::RefSpec
    pub fn as_refspec(&self) -> gix_refspec::RefSpec {
        use gix_refspec::parse::Operation;
        let parse = |spec: &BStr| -> gix_refspec::RefSpec {
            gix_refspec::parse(spec, Operation::Fetch)
                .expect("RefPrefix should be valid refspec")
                .to_owned()
        };

        match self {
            RefPrefix::RadId => parse(refs::REFS_RAD_ID.as_bstr()),
            RefPrefix::NamespacedRadId { namespace } => {
                parse(radicle::git::refs::storage::id(namespace).as_bstr())
            }
            RefPrefix::NamespacedRadSigrefs { namespace } => {
                parse(radicle::git::refs::storage::sigrefs(namespace).as_bstr())
            }
            RefPrefix::AllNamespaces => parse(BStr::new("refs/namespaces")),
        }
    }
}

/// A [`ProtocolStage`] describes a single roundtrip with the Radicle
/// node that is serving the data.
///
/// The stages are used as input for [`crate::FetchState::step`] and
/// are called in the order that they are listed here, .i.e:
///
///   1. `ls_refs`: asks the server for the provided reference
///       prefixes.
///   2. `ref_filter`: filter the advertised refs to the set required
///       for inspection.
///   3. `pre_validate`: before fetching the data, ensure the server
///       advertised the references that are required.
///   4. `wants_haves`: build the set of `want`s and `have`s to send
///       to the server.
///   5. `prepare_updates`: prepares the set of updates to update the
///      refdb (in-memory and production).
pub(crate) trait ProtocolStage {
    /// If and how to perform `ls-refs`.
    fn ls_refs(&self) -> Option<NonEmpty<RefPrefix>>;

    /// Filter a remote-advertised [`Ref`].
    ///
    /// Return `Some` if the ref should be considered, `None` otherwise. This
    /// method may be called with the response of `ls-refs`, the `wanted-refs`
    /// of a `fetch` response, or both.
    fn ref_filter(&self, r: Ref) -> Option<ReceivedRef>;

    /// Validate that all advertised refs conform to an expected layout.
    ///
    /// The supplied `refs` are `ls-ref`-advertised refs filtered
    /// through [`ProtocolStage::ref_filter`].
    fn pre_validate(&self, refs: &[ReceivedRef]) -> Result<(), error::Layout>;

    /// Assemble the `want`s and `have`s for a `fetch`, retaining the refs which
    /// would need updating after the `fetch` succeeds.
    ///
    /// The `refs` are the advertised refs from executing `ls-refs`, filtered
    /// through [`ProtocolStage::ref_filter`].
    fn wants_haves(
        &self,
        refdb: &Repository,
        refs: &[ReceivedRef],
    ) -> Result<WantsHaves, error::WantsHaves> {
        let mut wants_haves = WantsHaves::default();
        wants_haves.add(
            refdb,
            refs.iter().map(|recv| (recv.to_qualified(), recv.tip)),
        )?;
        Ok(wants_haves)
    }

    /// Prepare the [`Updates`] based on the received `refs`.
    ///
    /// These updates can then be used to update the refdb.
    fn prepare_updates<'a>(
        &self,
        s: &FetchState,
        repo: &Repository,
        refs: &'a [ReceivedRef],
    ) -> Result<Updates<'a>, error::Prepare>;
}

/// The [`ProtocolStage`] for performing an initial clone from a `remote`.
///
/// This step asks for the canonical `refs/rad/id` reference, which
/// allows us to use it as an anchor for the following steps.
#[derive(Debug)]
pub struct CanonicalId {
    pub remote: PublicKey,
    #[allow(dead_code)]
    pub limit: u64,
}

impl ProtocolStage for CanonicalId {
    fn ls_refs(&self) -> Option<NonEmpty<RefPrefix>> {
        Some(NonEmpty::new(RefPrefix::RadId))
    }

    fn ref_filter(&self, r: Ref) -> Option<ReceivedRef> {
        match refs::unpack_ref(r).ok()? {
            (
                refname @ ReceivedRefname::Namespaced {
                    suffix: Either::Left(_),
                    ..
                },
                tip,
            ) => Some(ReceivedRef::new(tip, refname)),
            (ReceivedRefname::RadId, tip) => Some(ReceivedRef::new(tip, ReceivedRefname::RadId)),
            _ => None,
        }
    }

    fn pre_validate(&self, refs: &[ReceivedRef]) -> Result<(), error::Layout> {
        // Ensures that we fetched the canonical 'refs/rad/id'
        ensure_refs(
            [BString::from(refs::REFS_RAD_ID.as_bstr())]
                .into_iter()
                .collect(),
            refs.iter()
                .map(|r| r.to_qualified().to_string().into())
                .collect(),
        )
    }

    fn prepare_updates<'a>(
        &self,
        s: &FetchState,
        repo: &Repository,
        refs: &'a [ReceivedRef],
    ) -> Result<Updates<'a>, error::Prepare> {
        // SAFETY: checked by `pre_validate` that the `refs/rad/id`
        // was received
        let verified = repo
            .identity_doc_at(
                *s.canonical_rad_id()
                    .expect("ensure we got canonicdal 'rad/id' ref"),
            )
            .map_err(|err| error::Prepare::Verification {
                remote: self.remote,
                err: Box::new(err),
            })?;
        if verified.is_delegate(&self.remote.into()) {
            let is_delegate = |remote: &PublicKey| verified.is_delegate(&remote.into());
            Ok(Updates::build(
                refs.iter()
                    .filter_map(|r| r.as_special_ref_update(is_delegate)),
            ))
        } else {
            Ok(Updates::default())
        }
    }
}

/// The [`ProtocolStage`] for fetching special refs from the set of
/// remotes in `followed` and `delegates`.
///
/// This step asks for all followed and delegate remote's `rad/id` and
/// `rad/sigrefs`, iff the scope is [`policy::Scope::Followed`].
/// Otherwise, it asks for all namespaces.
///
/// It ensures that all delegate refs were fetched.
#[derive(Debug)]
pub struct SpecialRefs {
    /// The set of nodes that should be blocked from fetching.
    pub blocked: BlockList,
    /// The node that is being fetched from.
    #[allow(dead_code)]
    pub remote: PublicKey,
    /// The set of nodes to be fetched.
    pub followed: policy::Allowed,
    /// The set of delegates to be fetched, with the local node
    /// removed in the case of a `pull`.
    pub delegates: BTreeSet<PublicKey>,
    /// The threshold of delegates that needs to be fetched.
    pub threshold: usize,
    /// The data limit for this stage of fetching.
    #[allow(dead_code)]
    pub limit: u64,
}

impl ProtocolStage for SpecialRefs {
    fn ls_refs(&self) -> Option<NonEmpty<RefPrefix>> {
        match &self.followed {
            policy::Allowed::All => Some(NonEmpty::new(RefPrefix::AllNamespaces)),
            policy::Allowed::Followed { remotes } => NonEmpty::collect(
                remotes
                    .iter()
                    .chain(self.delegates.iter())
                    .flat_map(|remote| {
                        [
                            RefPrefix::NamespacedRadSigrefs { namespace: *remote },
                            RefPrefix::NamespacedRadId { namespace: *remote },
                        ]
                    }),
            ),
        }
    }

    fn ref_filter(&self, r: Ref) -> Option<ReceivedRef> {
        let (refname, tip) = refs::unpack_ref(r).ok()?;
        match refname {
            // N.b. ensure that any blocked peers are filtered since
            // `Scope::All` can ls for them
            ReceivedRefname::Namespaced { remote, .. } if self.blocked.is_blocked(&remote) => None,
            ReceivedRefname::Namespaced { ref suffix, .. } if suffix.is_left() => {
                Some(ReceivedRef::new(tip, refname))
            }
            ReceivedRefname::Namespaced { .. } | ReceivedRefname::RadId => None,
        }
    }

    fn pre_validate(&self, refs: &[ReceivedRef]) -> Result<(), error::Layout> {
        ensure_threshold(
            self.delegates
                .iter()
                .filter(|id| !self.blocked.is_blocked(id))
                .map(|id| {
                    // N.b. we asked for the rad/id but do not need to ensure it
                    BString::from(radicle::git::refs::storage::sigrefs(id).to_string())
                })
                .collect(),
            refs.iter()
                .filter_map(|r| r.name.to_namespaced())
                .map(|r| r.to_string().into())
                .collect(),
            self.threshold,
        )
    }

    fn prepare_updates<'a>(
        &self,
        _s: &FetchState,
        _repo: &Repository,
        refs: &'a [ReceivedRef],
    ) -> Result<Updates<'a>, error::Prepare> {
        special_refs_updates(&self.delegates, &self.blocked, refs)
    }
}

/// The [`ProtocolStage`] for fetching announce `rad/sigrefs`.
///
/// This step will ask for the `rad/sigrefs` for the remotes of
/// `refs_at`.
#[derive(Debug)]
pub struct SigrefsAt {
    /// The set of nodes that should be blocked from fetching.
    pub blocked: BlockList,
    /// The node that is being fetched from.
    #[allow(dead_code)]
    pub remote: PublicKey,
    /// The set of remotes and the newly announced `Oid` for their
    /// `rad/sigrefs`.
    pub refs_at: Vec<RefsAt>,
    /// The set of delegates to be fetched, with the local node
    /// removed in the case of a `pull`.
    pub delegates: BTreeSet<PublicKey>,
    /// The data limit for this stage of fetching.
    #[allow(dead_code)]
    pub limit: u64,
}

impl ProtocolStage for SigrefsAt {
    fn ls_refs(&self) -> Option<NonEmpty<RefPrefix>> {
        // N.b. the `Oid`s are known but the `rad/sigrefs` are still
        // asked for to mark them for updating the fetch state.
        NonEmpty::collect(
            self.refs_at
                .iter()
                .map(|refs_at| RefPrefix::NamespacedRadSigrefs {
                    namespace: refs_at.remote,
                }),
        )
    }

    // We only asked for `rad/sigrefs` so we should only get
    // `rad/sigrefs`.
    fn ref_filter(&self, r: Ref) -> Option<ReceivedRef> {
        let (refname, tip) = refs::unpack_ref(r).ok()?;
        match refname {
            ReceivedRefname::Namespaced { remote, .. } if self.blocked.is_blocked(&remote) => None,
            ReceivedRefname::Namespaced {
                suffix: Either::Left(Special::SignedRefs),
                ..
            } => Some(ReceivedRef::new(tip, refname)),
            ReceivedRefname::Namespaced { .. } | ReceivedRefname::RadId => None,
        }
    }

    fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
        Ok(())
    }

    fn wants_haves(
        &self,
        refdb: &Repository,
        refs: &[ReceivedRef],
    ) -> Result<WantsHaves, error::WantsHaves> {
        let mut wants_haves = WantsHaves::default();
        let sigrefs = self
            .refs_at
            .iter()
            .map(|RefsAt { remote, at }| (Special::SignedRefs.namespaced(remote), *at));
        wants_haves.add(refdb, sigrefs)?;
        wants_haves.add(
            refdb,
            refs.iter().map(|recv| (recv.to_qualified(), recv.tip)),
        )?;
        Ok(wants_haves)
    }

    fn prepare_updates<'a>(
        &self,
        _s: &FetchState,
        _repo: &Repository,
        _refs: &'a [ReceivedRef],
    ) -> Result<Updates<'a>, error::Prepare> {
        let mut updates = Updates::default();
        for RefsAt { remote, at } in self.refs_at.iter() {
            if let Some(up) =
                refs::special_update(remote, &Either::Left(Special::SignedRefs), *at, |remote| {
                    self.delegates.contains(remote)
                })
            {
                updates.add(*remote, up);
            }
        }
        Ok(updates)
    }
}

/// The [`ProtocolStage`] for fetching data refs from the set of
/// remotes in `trusted`.
///
/// All refs that are listed in the `remotes` sigrefs are checked
/// against our refdb/odb to build a set of `wants` and `haves`. The
/// `wants` will then be fetched from the server side to receive those
/// particular objects.
///
/// Those refs and objects are then prepared for updating, removing
/// any that were found to exist before the latest fetch.
#[derive(Debug)]
pub struct DataRefs {
    /// The set of signed references from each remote that was
    /// fetched.
    remotes: RemoteRefs,
}

impl DataRefs {
    pub(crate) fn new(remotes: RemoteRefs) -> Self {
        Self { remotes }
    }

    pub(crate) fn into_inner(self) -> RemoteRefs {
        self.remotes
    }
}

impl ProtocolStage for DataRefs {
    // We don't need to ask for refs since we have all reference names
    // and `Oid`s in `rad/sigrefs`.
    fn ls_refs(&self) -> Option<NonEmpty<RefPrefix>> {
        None
    }

    // Since we don't ask for refs, we don't need to filter them.
    fn ref_filter(&self, _: Ref) -> Option<ReceivedRef> {
        None
    }

    // Since we don't ask for refs, we don't need to validate them.
    fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
        Ok(())
    }

    // We ignore the `ReceivedRef`s since we are using the `remotes`
    // as the source for refnames and `Oid`s.
    fn wants_haves(
        &self,
        refdb: &Repository,
        _refs: &[ReceivedRef],
    ) -> Result<WantsHaves, error::WantsHaves> {
        let mut wants_haves = WantsHaves::default();

        for (remote, result) in self.remotes.iter() {
            let Ok(Some(refs)) = result else {
                continue;
            };
            wants_haves.add(
                refdb,
                refs.iter().filter_map(|(refname, tip)| {
                    let refname = Qualified::from_refstr(refname)
                        .map(|refname| refname.with_namespace(Component::from(remote)))?;
                    Some((refname, *tip))
                }),
            )?;
        }

        Ok(wants_haves)
    }

    fn prepare_updates<'a>(
        &self,
        _s: &FetchState,
        repo: &Repository,
        _refs: &'a [ReceivedRef],
    ) -> Result<Updates<'a>, error::Prepare> {
        let mut updates = Updates::default();

        for (remote, result) in &self.remotes {
            let Ok(Some(refs)) = result else {
                continue;
            };
            let mut signed = HashSet::with_capacity(refs.refs().len());
            for (name, tip) in refs.iter() {
                let tracking: Namespaced<'_> = Qualified::from_refstr(name)
                    .and_then(|q| refs::ReceivedRefname::remote(*remote, q).to_namespaced())
                    .expect("we checked sigrefs well-formedness in wants_refs already");
                signed.insert(tracking.clone());
                updates.add(
                    *remote,
                    Update::Direct {
                        name: tracking,
                        target: *tip,
                        no_ff: Policy::Allow,
                    },
                );
            }

            // Prune refs not in signed
            let prefix_rad = refname!("refs/rad");
            for (name, target) in repo.references_of(remote)? {
                // 'rad/' refs are never subject to pruning
                if name.starts_with(prefix_rad.as_str()) {
                    continue;
                }

                let name = Qualified::from_refstr(name)
                    .expect("BUG: reference is guaranteed to be Qualified")
                    .with_namespace(Component::from(remote));

                if !signed.contains(&name) {
                    updates.add(
                        *remote,
                        Update::Prune {
                            name,
                            prev: either::Left(target),
                        },
                    );
                }
            }
        }

        Ok(updates)
    }
}

// N.b. the `delegates` are the delegates of the repository, with the
// potential removal of the local peer in the case of a `pull`.
fn special_refs_updates<'a>(
    delegates: &BTreeSet<PublicKey>,
    blocked: &BlockList,
    refs: &'a [ReceivedRef],
) -> Result<Updates<'a>, error::Prepare> {
    use either::Either::*;

    let grouped = refs
        .iter()
        .filter_map(|r| match &r.name {
            refs::ReceivedRefname::Namespaced { remote, suffix } => {
                (!blocked.is_blocked(remote)).then_some((remote, r.tip, suffix.clone()))
            }
            refs::ReceivedRefname::RadId => None,
        })
        .fold(
            BTreeMap::<PublicKey, Vec<_>>::new(),
            |mut acc, (remote_id, tip, name)| {
                acc.entry(*remote_id).or_default().push((tip, name));
                acc
            },
        );

    let mut updates = Updates::default();

    for (remote_id, refs) in grouped {
        let mut tips_inner = Vec::with_capacity(2);
        for (tip, suffix) in &refs {
            match &suffix {
                Left(refs::Special::Id) => {
                    if let Some(u) = refs::special_update(&remote_id, suffix, *tip, |remote| {
                        delegates.contains(remote)
                    }) {
                        tips_inner.push(u);
                    }
                }

                Left(refs::Special::SignedRefs) => {
                    if let Some(u) = refs::special_update(&remote_id, suffix, *tip, |remote| {
                        delegates.contains(remote)
                    }) {
                        tips_inner.push(u);
                    }
                }

                Right(_) => continue,
            }
        }

        updates.append(remote_id, tips_inner);
    }

    Ok(updates)
}

fn ensure_refs<T>(required: BTreeSet<T>, wants: BTreeSet<T>) -> Result<(), error::Layout>
where
    T: Ord + ToString,
{
    if wants.is_empty() {
        return Ok(());
    }

    let diff = required.difference(&wants).collect::<Vec<_>>();

    if diff.is_empty() {
        Ok(())
    } else {
        Err(error::Layout::MissingRequiredRefs(
            diff.into_iter().map(|ns| ns.to_string()).collect(),
        ))
    }
}

fn ensure_threshold<T>(
    wants: BTreeSet<T>,
    haves: BTreeSet<T>,
    threshold: usize,
) -> Result<(), error::Layout>
where
    T: Ord + ToString,
    T: std::fmt::Debug,
{
    // N.b. there's no threshold to meet. This generally means that
    // the local peer is a delegate and the original threshold is 1,
    // so they don't require the other peer.
    if threshold == 0 {
        return Ok(());
    }

    if wants.is_empty() {
        return Ok(());
    }

    if haves.len() < threshold {
        let missing = wants
            .difference(&haves)
            .map(|ns| ns.to_string())
            .collect::<Vec<_>>();
        return Err(error::Layout::InsufficientRefs { threshold, missing });
    }

    Ok(())
}

#[cfg(test)]
mod test {
    use super::RefPrefix;

    /// Ensure that the call to [`RefPrefix::as_refspec`] does not panic
    #[test]
    fn valid_refspecs() {
        let namespace = "z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk"
            .parse()
            .unwrap();
        let prefixes = [
            RefPrefix::AllNamespaces,
            RefPrefix::RadId,
            RefPrefix::NamespacedRadId { namespace },
            RefPrefix::NamespacedRadSigrefs { namespace },
        ];

        for prefix in prefixes {
            prefix.as_refspec();
        }
    }
}