Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src worker fetch.rs
use radicle::cob::store::access::ReadOnly;
use radicle::storage::git::TempRepository;
pub(crate) use radicle_protocol::worker::fetch::error;

use std::collections::BTreeSet;
use std::str::FromStr;

use localtime::LocalTime;

use radicle::cob::TypedId;
use radicle::crypto::PublicKey;
use radicle::prelude::NodeId;
use radicle::prelude::RepoId;
use radicle::storage::git::Repository;
use radicle::storage::refs::RefsAt;
use radicle::storage::{
    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
    WriteRepository as _,
};
use radicle::{Storage, cob, git, node};
use radicle_fetch::git::refs::Applied;
use radicle_fetch::{Allowed, BlockList};
pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};

use super::channels::ChannelsFlush;

pub enum Handle {
    Clone {
        handle: radicle_fetch::Handle<TempRepository, ChannelsFlush>,
    },
    Pull {
        handle: radicle_fetch::Handle<Repository, ChannelsFlush>,
        notifications: node::notifications::StoreWriter,
    },
}

impl Handle {
    pub fn new(
        rid: RepoId,
        local: PublicKey,
        storage: &Storage,
        follow: Allowed,
        blocked: BlockList,
        channels: ChannelsFlush,
        notifications: node::notifications::StoreWriter,
    ) -> Result<Self, error::Handle> {
        let exists = storage.contains(&rid)?;
        if exists {
            let repo = storage.repository(rid)?;
            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
            Ok(Handle::Pull {
                handle,
                notifications,
            })
        } else {
            let repo = storage.temporary_repository(rid)?;
            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
            Ok(Handle::Clone { handle })
        }
    }

    pub fn fetch<D: node::refs::Store>(
        self,
        rid: RepoId,
        storage: &Storage,
        cache: &mut cob::cache::StoreWriter,
        refsdb: &mut D,
        config: radicle_fetch::Config,
        remote: PublicKey,
        refs_at: Option<Vec<RefsAt>>,
    ) -> Result<FetchResult, error::Fetch> {
        let (result, clone, notifs) = match self {
            Self::Clone { mut handle } => {
                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
                match radicle_fetch::clone(&mut handle, config, remote) {
                    Err(err) => {
                        handle.into_inner().cleanup();
                        return Err(err.into());
                    }
                    Ok(result) => {
                        if result.is_success() {
                            handle.into_inner().mv(storage.path_of(&rid))?;
                        } else {
                            handle.into_inner().cleanup();
                        }
                        (result, true, None)
                    }
                }
            }
            Self::Pull {
                mut handle,
                notifications,
            } => {
                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
                let result = radicle_fetch::pull(&mut handle, config, remote, refs_at)?;
                (result, false, Some(notifications))
            }
        };

        for rejected in result.rejected() {
            log::debug!(target: "worker", "Rejected update for {}", rejected.refname())
        }

        match result {
            radicle_fetch::FetchResult::Failed {
                threshold,
                delegates,
                validations,
            } => {
                for fail in validations.iter() {
                    log::warn!(target: "worker", "Validation error: {fail}");
                }
                Err(error::Fetch::Validation {
                    threshold,
                    delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
                })
            }
            radicle_fetch::FetchResult::Success {
                applied,
                remotes,
                validations,
            } => {
                for warn in validations {
                    log::debug!(target: "worker", "Validation error: {warn}");
                }

                // N.b. We do not go through handle for this since the cloning handle
                // points to a repository that is temporary and gets moved by [`mv`].
                let repo = storage.repository(rid)?;
                repo.set_identity_head()?;
                match repo.set_head_to_default_branch() {
                    Ok(()) => {
                        log::trace!(target: "worker", "Set HEAD successfully");
                    }
                    Err(RepositoryError::Quorum(e)) => {
                        log::warn!(target: "worker", "Fetch could not set HEAD for {rid}: {e}")
                    }
                    Err(e) => return Err(e.into()),
                }

                let canonical = match set_canonical_refs(&repo, &applied) {
                    Ok(updates) => updates.unwrap_or_default(),
                    Err(e) => {
                        log::warn!(target: "worker", "Failed to set canonical references for {rid}: {e}");
                        UpdatedCanonicalRefs::default()
                    }
                };

                // Notifications are only posted for pulls, not clones.
                if let Some(mut store) = notifs {
                    // Only create notifications for repos that we have
                    // contributed to in some way, otherwise our inbox will
                    // be flooded by all the repos we are seeding.
                    if repo.remote(&storage.info().key).is_ok() {
                        notify(&rid, &applied, &mut store)?;
                    }
                }

                cache_cobs(&rid, &applied.updated, &repo, cache)?;
                cache_refs(&rid, &applied.updated, refsdb)?;

                Ok(FetchResult {
                    updated: applied.updated,
                    canonical,
                    namespaces: remotes.into_iter().collect(),
                    doc: repo.identity_doc()?,
                    clone,
                })
            }
        }
    }
}

// Post notifications for the given refs.
fn notify(
    rid: &RepoId,
    refs: &radicle_fetch::git::refs::Applied<'static>,
    store: &mut node::notifications::StoreWriter,
) -> Result<(), error::Fetch> {
    let now = LocalTime::now();

    for update in refs.updated.iter() {
        if let Some(r) = update.name().to_namespaced() {
            let r = r.strip_namespace();
            if r == *git::refs::storage::SIGREFS_BRANCH {
                // Don't notify about signed refs.
                continue;
            }
            if r == *git::refs::storage::IDENTITY_BRANCH {
                // Don't notify about the peers's identity branch pointer, since there will
                // be a separate notification on the identity COB itself.
                continue;
            }
            if r == *git::refs::storage::IDENTITY_ROOT {
                // Don't notify about the peers's identity root pointer. This is only used
                // for sigref verification.
                continue;
            }
            if let Some(rest) = r.strip_prefix(git::fmt::refname!("refs/heads/patches")) {
                if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
                    // Don't notify about patch branches, since we already get
                    // notifications about patch updates.
                    continue;
                }
            }
        }
        if let RefUpdate::Skipped { .. } = update {
            // Don't notify about skipped refs.
        } else if let Err(e) = store.insert(rid, update, now) {
            log::debug!(
                target: "worker",
                "Failed to update notification store for {rid}: {e}"
            );
        }
    }
    Ok(())
}

/// Cache certain ref updates in our database.
fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
where
    D: node::refs::Store,
{
    let time = LocalTime::now();

    for r in refs {
        let name = r.name();
        let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
            Err(e) => {
                log::debug!(target: "worker", "Git reference is invalid: {name:?}: {e}");
                log::debug!(target: "worker", "Skipping refs caching for fetch of {repo}");
                break;
            }
            Ok((n, q)) => (n, q),
        };
        if qualified != *git::refs::storage::SIGREFS_BRANCH {
            // Only cache `rad/sigrefs`.
            continue;
        }
        log::trace!(target: "node", "Updating cache for {name} in {repo}");

        let result = match r {
            RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
            RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
            RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
            RefUpdate::Skipped { .. } => continue,
        };

        if let Err(e) = result {
            log::debug!(target: "worker", "Failed to update git refs cache for {name:?}: {e}");
            log::debug!(target: "worker", "Skipping refs caching for fetch of {repo}");
            break;
        }
    }
    Ok(())
}

/// Write new `RefUpdate`s that are related a `Patch` or an `Issue`
/// COB to the COB cache.
fn cache_cobs<S, C>(
    rid: &RepoId,
    refs: &[RefUpdate],
    storage: &S,
    cache: &mut C,
) -> Result<(), error::Cache>
where
    S: ReadRepository + cob::Store<Namespace = NodeId>,
    C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
    C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
{
    let mut issues = cob::store::Store::<cob::issue::Issue, _, _>::open(storage, ReadOnly)?;
    let mut patches = cob::store::Store::<cob::patch::Patch, _, _>::open(storage, ReadOnly)?;

    for update in refs {
        match update {
            RefUpdate::Updated { name, .. }
            | RefUpdate::Created { name, .. }
            | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
                Some(name) => {
                    let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
                        continue;
                    };
                    if identifier.is_issue() {
                        update_or_remove(&mut issues, cache, rid, identifier)?;
                    } else if identifier.is_patch() {
                        update_or_remove(&mut patches, cache, rid, identifier)?;
                    } else {
                        // Unknown COB, don't cache.
                        continue;
                    }
                }
                None => continue,
            },
            RefUpdate::Skipped { .. } => { /* Do nothing */ }
        }
    }

    Ok(())
}

/// Update or remove a cache entry.
fn update_or_remove<T, Repo, C>(
    store: &mut cob::store::Store<T, Repo, ReadOnly>,
    cache: &mut C,
    rid: &RepoId,
    tid: TypedId,
) -> Result<(), error::Cache>
where
    T: cob::Evaluate<Repo> + cob::store::Cob + cob::store::CobWithType,
    Repo: cob::Store<Namespace = NodeId> + ReadRepository,
    C: cob::cache::Update<T> + cob::cache::Remove<T>,
{
    match store.get(&tid.id) {
        Ok(Some(obj)) => {
            // Object loaded correctly, update cache.
            return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
                error::Cache::Update {
                    id: tid.id,
                    type_name: tid.type_name,
                    err: e.into(),
                }
            });
        }
        Ok(None) => {
            // Object was not found. Fall-through.
        }
        Err(e) => {
            // Object was found, but failed to load. Fall-through.
            log::debug!(target: "fetch", "Failed to load COB {tid} from storage: {e}");
        }
    }
    // The object has either been removed entirely from the repository,
    // or it failed to load. So we also remove it from the cache.
    cob::cache::Remove::<T>::remove(cache, &tid.id)
        .map(|_| ())
        .map_err(|e| error::Cache::Remove {
            id: tid.id,
            type_name: tid.type_name,
            err: Box::new(e),
        })?;

    Ok(())
}

fn set_canonical_refs(
    repo: &Repository,
    applied: &Applied,
) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
    let identity = repo.identity()?;
    let rules = identity.doc().canonical_refs()?.rules().clone();

    let mut updated_refs = UpdatedCanonicalRefs::default();
    let refnames = applied
        .updated
        .iter()
        .filter_map(|update| match update {
            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
                let name = name.clone().into_qualified()?;
                let name = name.to_namespaced()?;
                Some(name.strip_namespace())
            }
            _ => None,
        })
        .collect::<BTreeSet<_>>();

    for name in refnames {
        let canonical = match rules.canonical(name.clone(), repo) {
            Some(canonical) => canonical,
            None => continue,
        };

        let canonical = match canonical.find_objects() {
            Err(err) => {
                log::warn!(target: "worker", "Failed to find objects for canonical computation of `{name}`: {err}");
                continue;
            }
            Ok(canonical) => canonical,
        };

        match canonical.quorum() {
            Err(err) => {
                log::warn!(
                    target: "worker",
                    "Failed to calculate canonical reference `{name}`: {err}",
                );
                continue;
            }
            Ok(git::canonical::Quorum {
                refname, object, ..
            }) => {
                let oid = object.id();
                if let Err(e) = repo.backend.reference(
                    refname.clone().as_str(),
                    oid.into(),
                    true,
                    "set-canonical-reference from fetch (radicle)",
                ) {
                    log::warn!(
                        target: "worker",
                        "Failed to set canonical reference {refname}->{oid}: {e}"
                    );
                } else {
                    updated_refs.updated(refname, oid);
                }
            }
        }
    }
    Ok(Some(updated_refs))
}