Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: use ls-remote when in fetching mode
Fintan Halpenny committed 3 years ago
commit 694bd345d920e4e8549a36e6dff3afb9d69e6e26
parent 5a48cdf507884eca0850beccfa2aa0df94af1e95
3 files changed +221 -74
modified radicle-node/src/worker.rs
@@ -2,7 +2,7 @@ mod channels;
mod fetch;
mod tunnel;

-
use std::collections::HashSet;
+
use std::collections::{BTreeSet, HashSet};
use std::io::{prelude::*, BufReader};
use std::ops::ControlFlow;
use std::thread::JoinHandle;
@@ -13,7 +13,7 @@ use crossbeam_channel as chan;
use radicle::identity::Id;
use radicle::prelude::NodeId;
use radicle::storage::{Namespaces, ReadRepository, RefUpdate};
-
use radicle::{git, Storage};
+
use radicle::{git, storage, Storage};

use crate::runtime::Handle;
use crate::wire::StreamId;
@@ -230,35 +230,48 @@ impl Worker {
        mut channels: Channels,
    ) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError> {
        let staging = fetch::StagingPhaseInitial::new(&self.storage, rid, namespaces.clone())?;
-
        match self._fetch(
-
            &staging.repo,
-
            remote,
-
            staging.refspecs(),
-
            stream,
-
            &mut channels,
-
        ) {
-
            Ok(()) => log::debug!(target: "worker", "Initial fetch for {rid} exited successfully"),
-
            Err(e) => match (&staging.repo, e) {
-
                // When fetching, if the error comes from `git-fetch` returning an error, we
-
                // keep going because it could be due to a rejected ref (eg. on `rad/sigrefs`),
-
                // and that is non-fatal.
-
                (fetch::StagedRepository::Fetching(_), e @ FetchError::CommandFailed { .. }) => {
-
                    log::warn!(target: "worker", "Initial fetch for {rid} returned an error: {e}");
-
                    log::warn!(target: "worker", "It's possible that some of the refs were rejected..");
+
        let refs = if staging.repo.is_cloning() {
+
            match self._fetch(
+
                &staging.repo,
+
                staging.repo.is_cloning(),
+
                remote,
+
                staging.refspecs(),
+
                stream,
+
                &mut channels,
+
            ) {
+
                Ok(_) => {
+
                    log::debug!(target: "worker", "Initial fetch for {rid} exited successfully")
                }
-
                // When cloning, any error is fatal, since we'll end up with an empty repository.
-
                // Likewise, when fetching, if the error is coming from some other place, we
-
                // abort the fetch.
-
                (fetch::StagedRepository::Cloning(_) | fetch::StagedRepository::Fetching(_), e) => {
+
                Err(e) => {
                    log::error!(target: "worker", "Initial fetch for {rid} failed: {e}");
                    return Err(e);
                }
-
            },
-
        }
+
            }
+

+
            // TODO(finto): when cloning we simply fetch the special
+
            // rad refs from the remote side, however, when the
+
            // repository already exists we need to `ls-remote` (see
+
            // below). The result of the ls-remote is a BTreeSet of
+
            // refs and so we need return an empty set here so that we
+
            // can pass them into `into_final`. This is seems like a
+
            // code smell to me due to bad boundaries between the
+
            // logic in this module and the logic in the fetch module.
+
            BTreeSet::new()
+
        } else {
+
            self.ls_refs(
+
                &staging.repo,
+
                staging.ls_remote_refs(),
+
                remote,
+
                stream,
+
                &mut channels,
+
            )?
+
        };
+

+
        let staging = staging.into_final(refs)?;

-
        let staging = staging.into_final()?;
        match self._fetch(
            &staging.repo,
+
            staging.repo.is_cloning(),
            remote,
            staging.refspecs(),
            stream,
@@ -370,9 +383,86 @@ impl Worker {
        })
    }

-
    fn _fetch<S>(
+
    fn ls_refs(
        &self,
        repo: &fetch::StagedRepository,
+
        namespaces: impl IntoIterator<Item = git::PatternString>,
+
        remote: NodeId,
+
        stream: StreamId,
+
        channels: &mut Channels,
+
    ) -> Result<BTreeSet<git::Namespaced<'static>>, FetchError> {
+
        let mut tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
+
        let tunnel_addr = tunnel.local_addr();
+
        let mut cmd = process::Command::new("git");
+
        cmd.current_dir(repo.path())
+
            .env_clear()
+
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
+
            .envs(git::env::GIT_DEFAULT_CONFIG)
+
            .args(["-c", "protocol.version=2"])
+
            .arg("ls-remote")
+
            .arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()));
+

+
        for ns in namespaces.into_iter() {
+
            cmd.arg(ns.as_str());
+
        }
+

+
        cmd.stdout(process::Stdio::piped())
+
            .stderr(process::Stdio::piped())
+
            .stdin(process::Stdio::piped());
+

+
        log::debug!(target: "worker", "Running command: {:?}", cmd);
+

+
        let mut child = cmd.spawn()?;
+
        let stderr = child.stderr.take().unwrap();
+
        let stdout = child.stdout.take().unwrap();
+

+
        thread::Builder::new().name(self.name.clone()).spawn(|| {
+
            for line in BufReader::new(stderr).lines().flatten() {
+
                log::debug!(target: "worker", "Git: {}", line);
+
            }
+
        })?;
+

+
        tunnel.run(self.timeout)?;
+

+
        let result = child.wait()?;
+
        if result.success() {
+
            let mut refs = BTreeSet::new();
+

+
            for line in BufReader::new(stdout).lines().flatten() {
+
                log::debug!(target: "worker", "Git: {line}");
+
                let r = match line.split_whitespace().next_back() {
+
                    Some(r) => r,
+
                    None => {
+
                        log::trace!(target: "worker", "Git: ls-remote returned unexpected format {line}");
+
                        continue;
+
                    }
+
                };
+
                match git::RefString::try_from(r) {
+
                    Ok(r) => {
+
                        if let Some(ns) = r.to_namespaced() {
+
                            refs.insert(ns.to_owned());
+
                        } else {
+
                            log::debug!(target: "worker", "Git: non-namespaced ref '{r}'")
+
                        }
+
                    }
+
                    Err(err) => {
+
                        log::warn!(target: "worker", "Git: invalid refname '{r}' {err}")
+
                    }
+
                }
+
            }
+

+
            Ok(refs)
+
        } else {
+
            Err(FetchError::CommandFailed {
+
                code: result.code().unwrap_or(1),
+
            })
+
        }
+
    }
+

+
    fn _fetch<S>(
+
        &self,
+
        repo: &storage::git::Repository,
+
        is_cloning: bool,
        remote: NodeId,
        specs: S,
        stream: StreamId,
@@ -402,11 +492,11 @@ impl Worker {
            .into_refspecs()
            .into_iter()
            // Filter out our own refs, if we aren't cloning.
-
            .filter(|fs| repo.is_cloning() || !fs.dst.starts_with(namespace.as_str()))
+
            .filter(|fs| is_cloning || !fs.dst.starts_with(namespace.as_str()))
            .map(|spec| spec.to_string())
            .collect::<Vec<_>>();

-
        if !repo.is_cloning() {
+
        if !is_cloning {
            // Make sure we don't fetch our own refs via a glob pattern.
            fetchspecs.push(format!("^refs/namespaces/{}/*", self.nid));
        }
modified radicle-node/src/worker/fetch.rs
@@ -3,11 +3,11 @@ pub use refspecs::{AsRefspecs, Refspec, SpecialRefs};

pub mod error;

-
use std::collections::{BTreeMap, HashSet};
+
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::ops::Deref;

use radicle::crypto::{PublicKey, Unverified, Verified};
-
use radicle::git::url;
+
use radicle::git::{url, Namespaced};
use radicle::prelude::{Doc, Id, NodeId};
use radicle::storage::git::Repository;
use radicle::storage::refs::IDENTITY_BRANCH;
@@ -29,7 +29,7 @@ pub struct StagingPhaseInitial<'a> {
    /// The original [`Storage`] we are finalising changes into.
    production: &'a Storage,
    /// The `Namespaces` passed by the fetching caller.
-
    namespaces: Namespaces,
+
    pub(super) namespaces: Namespaces,
    _tmp: tempfile::TempDir,
}

@@ -51,8 +51,36 @@ impl Deref for StagedRepository {

    fn deref(&self) -> &Self::Target {
        match self {
-
            StagedRepository::Cloning(repo) => repo,
-
            StagedRepository::Fetching(repo) => repo,
+
            Self::Cloning(repo) => repo,
+
            Self::Fetching(repo) => repo,
+
        }
+
    }
+
}
+

+
pub enum FinalStagedRepository {
+
    Cloning {
+
        repo: Repository,
+
        trusted: HashSet<NodeId>,
+
    },
+
    Fetching {
+
        repo: Repository,
+
        refs: BTreeSet<Namespaced<'static>>,
+
    },
+
}
+

+
impl FinalStagedRepository {
+
    pub fn is_cloning(&self) -> bool {
+
        matches!(self, Self::Cloning { .. })
+
    }
+
}
+

+
impl Deref for FinalStagedRepository {
+
    type Target = Repository;
+

+
    fn deref(&self) -> &Self::Target {
+
        match self {
+
            Self::Cloning { repo, .. } => repo,
+
            Self::Fetching { repo, .. } => repo,
        }
    }
}
@@ -70,13 +98,9 @@ impl Deref for StagedRepository {
/// [`StagingPhaseFinal::transfer`].
pub struct StagingPhaseFinal<'a> {
    /// The inner [`Repository`] for staging fetches into.
-
    pub(super) repo: StagedRepository,
+
    pub(super) repo: FinalStagedRepository,
    /// The original [`Storage`] we are finalising changes into.
    production: &'a Storage,
-
    /// The delegates and tracked remotes that the fetch is being
-
    /// performed for. These are passed through from the
-
    /// [`StagingPhaseInitial::namespaces`], if the variant is `Trusted`.
-
    trusted: HashSet<RemoteId>,
    _tmp: tempfile::TempDir,
}

@@ -128,15 +152,34 @@ impl<'a> StagingPhaseInitial<'a> {
        }
    }

+
    pub fn ls_remote_refs(&self) -> Vec<git::PatternString> {
+
        match &self.namespaces {
+
            Namespaces::All => {
+
                vec![git::refspec::pattern!("refs/namespaces/*")]
+
            }
+
            Namespaces::Trusted(trusted) => trusted
+
                .iter()
+
                .map(|ns| {
+
                    git::refname!("refs/namespaces")
+
                        .join(git::Component::from(ns))
+
                        .with_pattern(git::refspec::STAR)
+
                })
+
                .collect::<Vec<_>>(),
+
        }
+
    }
+

    /// Convert the [`StagingPhaseInitial`] into [`StagingPhaseFinal`] to continue
    /// the fetch process.
-
    pub fn into_final(self) -> Result<StagingPhaseFinal<'a>, error::Transition> {
-
        let trusted = match &self.repo {
+
    pub fn into_final(
+
        self,
+
        refs: BTreeSet<Namespaced<'static>>,
+
    ) -> Result<StagingPhaseFinal<'a>, error::Transition> {
+
        let repo = match self.repo {
            StagedRepository::Cloning(repo) => {
-
                log::debug!(target: "worker", "Loading remotes for clone of {}", self.repo.id);
-
                let oid = ReadRepository::identity_head(repo)?;
+
                log::debug!(target: "worker", "Loading remotes for clone of {}", repo.id);
+
                let oid = ReadRepository::identity_head(&repo)?;
                log::trace!(target: "worker", "Loading 'rad/id' @ {oid}");
-
                let (doc, _) = Doc::<Unverified>::load_at(oid, repo)?;
+
                let (doc, _) = Doc::<Unverified>::load_at(oid, &repo)?;
                let doc = doc.verified()?;
                let mut trusted = match self.namespaces.clone() {
                    Namespaces::All => HashSet::new(),
@@ -144,26 +187,14 @@ impl<'a> StagingPhaseInitial<'a> {
                };
                let delegates = doc.delegates.map(PublicKey::from);
                trusted.extend(delegates);
-
                trusted
-
            }
-
            StagedRepository::Fetching(repo) => {
-
                log::debug!(target: "worker", "Loading remotes for fetching of {}", self.repo.id);
-
                match self.namespaces.clone() {
-
                    Namespaces::All => {
-
                        let mut trusted = repo.remote_ids()?.collect::<Result<HashSet<_>, _>>()?;
-
                        trusted.extend(repo.delegates()?.map(PublicKey::from));
-
                        trusted
-
                    }
-

-
                    Namespaces::Trusted(trusted) => trusted,
-
                }
+
                FinalStagedRepository::Cloning { repo, trusted }
            }
+
            StagedRepository::Fetching(repo) => FinalStagedRepository::Fetching { repo, refs },
        };

        Ok(StagingPhaseFinal {
-
            repo: self.repo,
+
            repo,
            production: self.production,
-
            trusted,
            _tmp: self._tmp,
        })
    }
@@ -218,14 +249,11 @@ impl<'a> StagingPhaseFinal<'a> {
    /// Return the fetch refspecs for fetching the necessary
    /// references.
    pub fn refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
-
        match self.repo {
-
            StagedRepository::Cloning(_) => Namespaces::Trusted(self.trusted.clone()).as_refspecs(),
-
            StagedRepository::Fetching(_) => {
-
                self.remotes().fold(Vec::new(), |mut specs, remote| {
-
                    specs.extend(remote.as_refspecs());
-
                    specs
-
                })
+
        match &self.repo {
+
            FinalStagedRepository::Cloning { trusted, .. } => {
+
                Namespaces::Trusted(trusted.clone()).as_refspecs()
            }
+
            FinalStagedRepository::Fetching { refs, .. } => refs.as_refspecs(),
        }
    }

@@ -245,10 +273,10 @@ impl<'a> StagingPhaseFinal<'a> {
    /// All references that were updated are returned as a
    /// [`RefUpdate`].
    pub fn transfer(self) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), error::Transfer> {
-
        let verifications = self.verify();
+
        let verifications = self.verify()?;
        let production = match &self.repo {
-
            StagedRepository::Cloning(repo) => self.production.create(repo.id)?,
-
            StagedRepository::Fetching(repo) => self.production.repository(repo.id)?,
+
            FinalStagedRepository::Cloning { repo, .. } => self.production.create(repo.id)?,
+
            FinalStagedRepository::Fetching { repo, .. } => self.production.repository(repo.id)?,
        };
        let url = url::File::new(self.repo.path().to_path_buf()).to_string();
        let mut remote = production.backend.remote_anonymous(&url)?;
@@ -373,14 +401,22 @@ impl<'a> StagingPhaseFinal<'a> {
        Ok((updates, remotes))
    }

-
    fn remotes(&self) -> impl Iterator<Item = Remote> + '_ {
-
        self.trusted
-
            .iter()
-
            .filter_map(|remote| self.repo.remote(remote).ok())
+
    fn remotes(&self) -> Result<Box<dyn Iterator<Item = Remote> + '_>, git::raw::Error> {
+
        match &self.repo {
+
            FinalStagedRepository::Cloning { trusted, .. } => Ok(Box::new(
+
                trusted
+
                    .iter()
+
                    .filter_map(|remote| self.repo.remote(remote).ok()),
+
            )),
+
            FinalStagedRepository::Fetching { repo, .. } => Ok(Box::new(
+
                repo.remotes()?.filter_map(|r| r.ok().map(|(_, r)| r)),
+
            )),
+
        }
    }

-
    fn verify(&self) -> BTreeMap<RemoteId, VerifiedRemote> {
-
        self.remotes()
+
    fn verify(&self) -> Result<BTreeMap<RemoteId, VerifiedRemote>, git::raw::Error> {
+
        Ok(self
+
            .remotes()?
            .map(|remote| {
                let remote_id = remote.id;
                let verification = match self.repo.identity_doc_of(&remote_id) {
@@ -400,7 +436,7 @@ impl<'a> StagingPhaseFinal<'a> {
                };
                (remote_id, verification)
            })
-
            .collect()
+
            .collect())
    }
}

modified radicle-node/src/worker/fetch/refspecs.rs
@@ -1,3 +1,4 @@
+
use std::collections::BTreeSet;
use std::fmt;
use std::fmt::Write as _;

@@ -159,3 +160,23 @@ impl AsRefspecs for Remote {
            .collect()
    }
}
+

+
impl<'a> AsRefspecs for BTreeSet<git::Namespaced<'a>> {
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        let reserved = [(*IDENTITY_BRANCH).clone(), (*SIGREFS_BRANCH).clone()]
+
            .into_iter()
+
            .collect::<BTreeSet<_>>();
+
        self.iter()
+
            .map(|r| {
+
                // Only force ordinary refs.
+
                let suffix = r.strip_namespace();
+
                let force = !reserved.contains(&suffix);
+
                Refspec {
+
                    src: r.clone().to_ref_string().into(),
+
                    dst: r.clone().to_ref_string().into(),
+
                    force,
+
                }
+
            })
+
            .collect()
+
    }
+
}