Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: multistep fetching logic
Fintan Halpenny committed 3 years ago
commit db0ceddf6b36d58e33d476b6cc15f871d1282514
parent d73d48584846f445c60c7370a15f7dd89ba2801e
7 files changed +725 -152
modified radicle-node/src/runtime.rs
@@ -185,7 +185,6 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {
        }

        let pool = worker::Pool::with(
-
            id,
            worker_recv,
            handle.clone(),
            worker::Config {
modified radicle-node/src/tests/e2e.rs
@@ -171,7 +171,7 @@ fn test_replication() {
            panic!("Fetch failed from {}: {reason}", bob.id);
        }
    };
-
    assert_eq!(*updated, vec![]);
+
    assert!(!updated.is_empty());

    log::debug!(target: "test", "Fetch complete with {}", bob.id);

modified radicle-node/src/worker.rs
@@ -1,3 +1,5 @@
+
mod fetch;
+

use std::io::{prelude::*, BufReader};
use std::ops::ControlFlow;
use std::thread::JoinHandle;
@@ -8,9 +10,9 @@ use cyphernet::Ecdh;
use netservices::tunnel::Tunnel;
use netservices::{AsConnection, NetSession, SplitIo};

-
use radicle::crypto::{PublicKey, Signer};
+
use radicle::crypto::Signer;
use radicle::identity::{Id, IdentityError};
-
use radicle::storage::{Namespaces, ReadRepository, RefUpdate, WriteRepository, WriteStorage};
+
use radicle::storage::{Namespaces, ReadRepository, RefUpdate};
use radicle::{git, Storage};
use reactor::poller::popol;

@@ -54,6 +56,12 @@ pub enum FetchError {
    Upload(#[from] UploadError),
    #[error("remote aborted fetch")]
    RemoteAbortedFetch,
+
    #[error(transparent)]
+
    StagingInit(#[from] fetch::error::Init),
+
    #[error(transparent)]
+
    StagingTransition(#[from] fetch::error::Transition),
+
    #[error(transparent)]
+
    StagingTransfer(#[from] fetch::error::Transfer),
}

impl FetchError {
@@ -100,7 +108,6 @@ pub struct TaskResult<G: Signer + Ecdh> {

/// A worker that replicates git objects.
struct Worker<G: Signer + Ecdh> {
-
    local: PublicKey,
    storage: Storage,
    tasks: chan::Receiver<Task<G>>,
    daemon: net::SocketAddr,
@@ -153,27 +160,15 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        drain: Vec<u8>,
        mut session: WireSession<G>,
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
-
        let rid = fetch.rid;
        match &fetch.direction {
            FetchDirection::Initiator { namespaces } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.rid);

-
                let mut tunnel =
-
                    match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
-
                        Ok(tunnel) => tunnel,
-
                        Err((session, err)) => return (session, Err(err.into())),
-
                    };
-
                let result = self.fetch(rid, namespaces, &mut tunnel);
-
                let mut session = tunnel.into_session();
-

+
                let (session, result) = self.fetch(fetch.rid, namespaces, session);
                if let Err(err) = &result {
                    log::error!(target: "worker", "Fetch error: {err}");
                }
-
                log::debug!(target: "worker", "Sending `done` packet to remote..");

-
                if let Err(err) = pktline::done(&mut session) {
-
                    log::error!(target: "worker", "Fetch error: error sending `done` packet: {err}");
-
                }
                (session, result)
            }
            FetchDirection::Responder => {
@@ -190,70 +185,13 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                    }
                };
                let mut pktline_r = pktline::Reader::new(drain, &mut stream_r);
-

-
                match self.upload_pack(fetch, &mut pktline_r, &mut stream_w) {
-
                    Ok(()) => {
-
                        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.rid, fetch.remote);
-

-
                        (WireSession::from_split_io(stream_r, stream_w), Ok(vec![]))
-
                    }
-
                    Err(err) => {
-
                        log::error!(target: "worker", "Upload error for {rid}: {err}");
-

-
                        // If we exited without receiving a `done` packet, wait for it here.
-
                        // It's possible that the daemon exited first, or the remote crashed.
-
                        log::debug!(target: "worker", "Waiting for `done` packet from remote..");
-
                        let mut header = [0; pktline::HEADER_LEN];
-

-
                        // Set the read timeout for the `done` packet to twice the configured
-
                        // value that is used for the fetching (initiator) side.
-
                        //
-
                        // This is because the uploader always waits for the `done` packet;
-
                        // so in case the fetch is aborted by the uploader, eg. if
-
                        // it can't connect with the daemon, it will wait long enough for the
-
                        // fetcher to timeout before timing out itself, and will thus receive
-
                        // the `done` packet.
-
                        pktline_r
-
                            .stream()
-
                            .as_connection()
-
                            .set_read_timeout(Some(self.timeout * 2))
-
                            .ok();
-

-
                        loop {
-
                            match pktline_r.read_done_pktline(&mut header) {
-
                                Ok(()) => {
-
                                    log::debug!(target: "worker", "Received `done` packet from remote");
-

-
                                    // If we get the `done` packet, we exit with the original
-
                                    // error.
-
                                    return (
-
                                        WireSession::from_split_io(stream_r, stream_w),
-
                                        Err(err.into()),
-
                                    );
-
                                }
-
                                Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
-
                                    // If we get some other packet, because the fetch request
-
                                    // is still sending stuff, we simply keep reading until we
-
                                    // get a `done` packet.
-
                                    continue;
-
                                }
-
                                Err(_) => {
-
                                    // If we get any other error, eg. a timeout, we abort.
-
                                    log::error!(
-
                                        target: "worker",
-
                                        "Upload of {} to {} aborted: missing `done` packet from remote",
-
                                        fetch.rid,
-
                                        fetch.remote
-
                                    );
-
                                    return (
-
                                        WireSession::from_split_io(stream_r, stream_w),
-
                                        Err(FetchError::RemoteAbortedFetch),
-
                                    );
-
                                }
-
                            }
-
                        }
-
                    }
-
                }
+
                // Nb. two fetches are expected to happen, one for
+
                // the `rad` refs, followed by the refs listed in
+
                // signed refs.
+
                let result = self.upload_pack(fetch, &mut pktline_r, &mut stream_w);
+
                let result =
+
                    result.and_then(|_| self.upload_pack(fetch, &mut pktline_r, &mut stream_w));
+
                (WireSession::from_split_io(stream_r, stream_w), result)
            }
        }
    }
@@ -262,84 +200,99 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        &self,
        rid: Id,
        namespaces: &Namespaces,
-
        tunnel: &mut Tunnel<WireSession<G>>,
-
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        let repo = match self.storage.repository_mut(rid) {
-
            Ok(r) => Ok(r),
-
            Err(e) if e.is_not_found() => self.storage.create(rid),
-
            Err(e) => Err(e),
-
        }?;
-
        let tunnel_addr = tunnel.local_addr()?;
-
        let mut cmd = process::Command::new("git");
-
        cmd.current_dir(repo.path())
-
            .env_clear()
-
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
-
            .envs(git::env::GIT_DEFAULT_CONFIG)
-
            .args(["-c", "protocol.version=2"])
-
            .arg("fetch")
-
            .arg("--verbose");
+
        session: WireSession<G>,
+
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
+
        let staging = match fetch::StagingPhaseInitial::new(&self.storage, rid, namespaces.clone())
+
        {
+
            Ok(staging) => staging,
+
            Err(err) => return (session, Err(err.into())),
+
        };

-
        match namespaces {
-
            Namespaces::All => {
-
                // We should not prune in this case, because it would mean that namespaces that
-
                // don't exit on the remote would be deleted locally.
-
            }
-
            Namespaces::One(_) => {
-
                // TODO: Make sure we verify before pruning, as pruning may get us into
-
                // a state we can't roll back.
-
                cmd.arg("--prune");
-
            }
-
            Namespaces::Many(_) => {
-
                // Same case as All
-
            }
-
        }
+
        let session = match self.tunnel_fetch(&staging.repo, staging.refspecs(), session) {
+
            (session, Ok(())) => session,
+
            (session, Err(err)) => return (session, Err(err)),
+
        };

-
        if self.atomic {
-
            // Enable atomic fetch. Only works with Git 2.31 and later.
-
            cmd.arg("--atomic");
-
        }
+
        let staging = match staging.into_final().map_err(FetchError::from) {
+
            Ok(staging) => staging,
+
            Err(e) => return (session, Err(e)),
+
        };

-
        // Ignore our own remote when fetching
-
        let mut fetchspecs = namespaces.as_fetchspecs();
-
        fetchspecs.push(format!("^refs/namespaces/{}/*", self.local));
+
        let (session, res) = self.tunnel_fetch(&staging.repo, staging.refspecs(), session);

-
        cmd.arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()))
-
            .args(&fetchspecs)
-
            .stdout(process::Stdio::piped())
-
            .stderr(process::Stdio::piped())
-
            .stdin(process::Stdio::piped());
+
        if let Err(e) = res {
+
            return (session, Err(e));
+
        }

-
        log::debug!(target: "worker", "Running command: {:?}", cmd);
+
        (session, staging.transfer().map_err(FetchError::from))
+
    }

-
        let mut child = cmd.spawn()?;
-
        let stderr = child.stderr.take().unwrap();
+
    fn upload_pack(
+
        &self,
+
        fetch: &Fetch,
+
        pktline_r: &mut pktline::Reader<WireReader>,
+
        stream_w: &mut WireWriter<G>,
+
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
        match self._upload_pack(fetch, pktline_r, stream_w) {
+
            Ok(()) => {
+
                log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.rid, fetch.remote);

-
        thread::Builder::new().name(self.name.clone()).spawn(|| {
-
            for line in BufReader::new(stderr).lines().flatten() {
-
                log::debug!(target: "worker", "Git: {}", line);
+
                Ok(vec![])
+
            }
+
            Err(err) => {
+
                log::error!(target: "worker", "Upload error for {}: {err}", fetch.rid);
+

+
                // If we exited without receiving a `done` packet, wait for it here.
+
                // It's possible that the daemon exited first, or the remote crashed.
+
                log::debug!(target: "worker", "Waiting for `done` packet from remote..");
+
                let mut header = [0; pktline::HEADER_LEN];
+

+
                // Set the read timeout for the `done` packet to twice the configured
+
                // value that is used for the fetching (initiator) side.
+
                //
+
                // This is because the uploader always waits for the `done` packet;
+
                // so in case the fetch is aborted by the uploader, eg. if
+
                // it can't connect with the daemon, it will wait long enough for the
+
                // fetcher to timeout before timing out itself, and will thus receive
+
                // the `done` packet.
+
                pktline_r
+
                    .stream()
+
                    .as_connection()
+
                    .set_read_timeout(Some(self.timeout * 2))
+
                    .ok();
+

+
                loop {
+
                    match pktline_r.read_done_pktline(&mut header) {
+
                        Ok(()) => {
+
                            log::debug!(target: "worker", "Received `done` packet from remote");
+

+
                            // If we get the `done` packet, we exit with the original
+
                            // error.
+
                            return Err(err.into());
+
                        }
+
                        Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
+
                            // If we get some other packet, because the fetch request
+
                            // is still sending stuff, we simply keep reading until we
+
                            // get a `done` packet.
+
                            continue;
+
                        }
+
                        Err(_) => {
+
                            // If we get any other error, eg. a timeout, we abort.
+
                            log::error!(
+
                                target: "worker",
+
                                "Upload of {} to {} aborted: missing `done` packet from remote",
+
                                fetch.rid,
+
                                fetch.remote
+
                            );
+
                            return Err(FetchError::RemoteAbortedFetch);
+
                        }
+
                    }
+
                }
            }
-
        })?;
-

-
        let _ = tunnel.tunnel_once(popol::Poller::new(), self.timeout)?;
-

-
        // TODO: Parse fetch output to return updates.
-
        let result = child.wait()?;
-
        if result.success() {
-
            log::debug!(target: "worker", "Fetch for {} exited successfully", rid);
-
            let head = repo.set_head()?;
-
            log::debug!(target: "worker", "Head for {} set to {head}", rid);
-
            let head = repo.set_identity_head()?;
-
            log::debug!(target: "worker", "'refs/rad/id' for {} set to {head}", rid);
-
            Ok(vec![])
-
        } else {
-
            log::error!(target: "worker", "Fetch for {} failed", rid);
-
            Err(FetchError::CommandFailed {
-
                code: result.code().unwrap_or(1),
-
            })
        }
    }

-
    fn upload_pack(
+
    fn _upload_pack(
        &self,
        fetch: &Fetch,
        stream_r: &mut pktline::Reader<WireReader>,
@@ -403,6 +356,92 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            }
        }
    }
+

+
    fn tunnel_fetch<Specs>(
+
        &self,
+
        repo: &fetch::StagedRepository,
+
        specs: Specs,
+
        session: WireSession<G>,
+
    ) -> (WireSession<G>, Result<(), FetchError>)
+
    where
+
        Specs: fetch::AsRefspecs,
+
    {
+
        let mut tunnel = match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
+
            Ok(tunnel) => tunnel,
+
            Err((session, err)) => return (session, Err(err.into())),
+
        };
+
        let res = self._fetch(repo, specs, &mut tunnel);
+

+
        let mut session = tunnel.into_session();
+

+
        log::debug!(target: "worker", "Sending `done` packet to remote..");
+
        if let Err(err) = pktline::done(&mut session) {
+
            log::error!(target: "worker", "Fetch error: error sending `done` packet: {err}");
+
        }
+
        (session, res)
+
    }
+

+
    fn _fetch<S>(
+
        &self,
+
        repo: &storage::git::Repository,
+
        specs: S,
+
        tunnel: &mut Tunnel<WireSession<G>>,
+
    ) -> Result<(), FetchError>
+
    where
+
        S: fetch::AsRefspecs,
+
    {
+
        let rid = repo.id;
+
        let tunnel_addr = tunnel.local_addr()?;
+
        let mut cmd = process::Command::new("git");
+
        cmd.current_dir(repo.path())
+
            .env_clear()
+
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
+
            .envs(git::env::GIT_DEFAULT_CONFIG)
+
            .args(["-c", "protocol.version=2"])
+
            .arg("fetch")
+
            .arg("--verbose");
+

+
        if self.atomic {
+
            // Enable atomic fetch. Only works with Git 2.31 and later.
+
            cmd.arg("--atomic");
+
        }
+

+
        let fetchspecs = specs
+
            .into_refspecs()
+
            .into_iter()
+
            .map(|spec| spec.to_string())
+
            .collect::<Vec<_>>();
+

+
        cmd.arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()))
+
            .args(&fetchspecs)
+
            .stdout(process::Stdio::piped())
+
            .stderr(process::Stdio::piped())
+
            .stdin(process::Stdio::piped());
+

+
        log::debug!(target: "worker", "Running command: {:?}", cmd);
+

+
        let mut child = cmd.spawn()?;
+
        let stderr = child.stderr.take().unwrap();
+

+
        thread::Builder::new().name(self.name.clone()).spawn(|| {
+
            for line in BufReader::new(stderr).lines().flatten() {
+
                log::debug!(target: "worker", "Git: {}", line);
+
            }
+
        })?;
+

+
        let _ = tunnel.tunnel_once(popol::Poller::new(), self.timeout)?;
+

+
        let result = child.wait()?;
+
        if result.success() {
+
            log::debug!(target: "worker", "Fetch for {} exited successfully", rid);
+
            Ok(())
+
        } else {
+
            log::error!(target: "worker", "Fetch for {} failed", rid);
+
            Err(FetchError::CommandFailed {
+
                code: result.code().unwrap_or(1),
+
            })
+
        }
+
    }
}

/// A pool of workers. One thread is allocated for each worker.
@@ -413,7 +452,6 @@ pub struct Pool {
impl Pool {
    /// Create a new worker pool with the given parameters.
    pub fn with<G: Signer + Ecdh + 'static>(
-
        local: PublicKey,
        tasks: chan::Receiver<Task<G>>,
        handle: Handle<G>,
        config: Config,
@@ -421,7 +459,6 @@ impl Pool {
        let mut pool = Vec::with_capacity(config.capacity);
        for _ in 0..config.capacity {
            let worker = Worker {
-
                local,
                tasks: tasks.clone(),
                handle: handle.clone(),
                storage: config.storage.clone(),
added radicle-node/src/worker/fetch.rs
@@ -0,0 +1,320 @@
+
mod refspecs;
+
pub use refspecs::{AsRefspecs, Refspec, SpecialRefs};
+

+
pub mod error;
+

+
use std::collections::BTreeMap;
+
use std::ops::Deref;
+

+
use nonempty::NonEmpty;
+

+
use radicle::crypto::{PublicKey, Unverified, Verified};
+
use radicle::git::url;
+
use radicle::prelude::{Doc, Id};
+
use radicle::storage::git::Repository;
+
use radicle::storage::refs::{SignedRefs, IDENTITY_BRANCH};
+
use radicle::storage::{Namespaces, RefUpdate, Remote, RemoteId};
+
use radicle::storage::{ReadRepository, ReadStorage, WriteRepository, WriteStorage};
+
use radicle::{git, storage, Storage};
+

+
/// The initial phase of staging a fetch from a remote.
+
///
+
/// The [`StagingPhaseInitial::refpsecs`] generated are to fetch the
+
/// `rad/id` and/or `rad/sigrefs` references from the remote end.
+
///
+
/// It is then expected to convert this into [`StagingPhaseFinal`]
+
/// using [`StagingRad::into_final`] to continue the rest of the
+
/// references.
+
pub struct StagingPhaseInitial<'a> {
+
    /// The inner [`Repository`] for staging fetches into.
+
    pub(super) repo: StagedRepository,
+
    /// The original [`Storage`] we are finalising changes into.
+
    production: &'a Storage,
+
    /// The `Namespaces` passed by the fetching caller.
+
    namespaces: Namespaces,
+
    _tmp: tempfile::TempDir,
+
}
+

+
/// Indicates whether the innner [`Repository`] is being cloned into
+
/// or fetched into.
+
pub enum StagedRepository {
+
    Cloning(Repository),
+
    Fetching(Repository),
+
}
+

+
impl Deref for StagedRepository {
+
    type Target = Repository;
+

+
    fn deref(&self) -> &Self::Target {
+
        match self {
+
            StagedRepository::Cloning(repo) => repo,
+
            StagedRepository::Fetching(repo) => repo,
+
        }
+
    }
+
}
+

+
/// The second, and final, phase of staging a fetch from a remote.
+
///
+
/// The [`StagingPhaseFinal::refpsecs`] generated are to fetch any follow-up
+
/// references after the fetch on [`StagingPhaseInitial`]. This may be all the
+
/// delegate's references in the case of cloning the new repository,
+
/// or it could be fetching the latest updates in the case of fetching
+
/// an existing repository.
+
///
+
/// It is then expected to finalise the process by transferring the
+
/// fetched references into the production storage, via
+
/// [`StagingPhaseFinal::transfer`].
+
pub struct StagingPhaseFinal<'a> {
+
    /// The inner [`Repository`] for staging fetches into.
+
    pub(super) repo: StagedRepository,
+
    /// The original [`Storage`] we are finalising changes into.
+
    production: &'a Storage,
+
    /// The remotes that the fetch is being performed for. These are
+
    /// discovered after performing the fetch for [`StagingPhaseInitial`].
+
    remotes: NonEmpty<RemoteId>,
+
    _tmp: tempfile::TempDir,
+
}
+

+
enum VerifiedRemote {
+
    Failed {
+
        reason: String,
+
    },
+
    Success {
+
        // Nb. unused but we want to ensure that we verify the identity
+
        _doc: Doc<Verified>,
+
        remote: Remote<Verified>,
+
    },
+
}
+

+
impl<'a> StagingPhaseInitial<'a> {
+
    /// Construct a [`StagingPhaseInitial`] which sets up its
+
    /// [`StagedRepository`] in a new, temporary directory.
+
    pub fn new(
+
        production: &'a Storage,
+
        rid: Id,
+
        namespaces: Namespaces,
+
    ) -> Result<Self, error::Init> {
+
        let tmp = tempfile::TempDir::new()?;
+
        log::debug!(target: "worker", "Staging fetch in {:?}", tmp.path());
+
        let staging = Storage::open(tmp.path())?;
+
        let repo = Self::repository(&staging, production, rid)?;
+
        Ok(Self {
+
            repo,
+
            production,
+
            namespaces,
+
            _tmp: tmp,
+
        })
+
    }
+

+
    /// Return the fetch refspecs for fetching the necessary `rad`
+
    /// references.
+
    pub fn refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        let id = git::PatternString::from(IDENTITY_BRANCH.clone().into_refstring());
+
        match self.repo {
+
            StagedRepository::Cloning(_) => Refspec {
+
                src: id.clone(),
+
                dst: id,
+
                force: false,
+
            }
+
            .into_refspecs(),
+
            StagedRepository::Fetching(_) => SpecialRefs(self.namespaces.clone()).into_refspecs(),
+
        }
+
    }
+

+
    /// Convert the [`StagingPhaseInitial`] into [`StagingPhaseFinal`] to continue
+
    /// the fetch process.
+
    pub fn into_final(self) -> Result<StagingPhaseFinal<'a>, error::Transition> {
+
        let remotes = match &self.repo {
+
            StagedRepository::Cloning(repo) => {
+
                log::debug!(target: "worker", "Loading remotes for clone");
+
                let oid = ReadRepository::identity_head(repo)?;
+
                log::trace!(target: "worker", "Loading 'rad/id' @ {oid}");
+
                let (doc, _) = Doc::<Unverified>::load_at(oid, repo)?;
+
                let doc = doc.verified()?;
+
                doc.delegates.map(PublicKey::from)
+
            }
+
            StagedRepository::Fetching(repo) => {
+
                log::debug!(target: "worker", "Loading remotes for fetching");
+
                match self.namespaces.clone() {
+
                    // Nb. Namespaces::One is not constructed in
+
                    // namespaces_for so it's safe to just bundle this
+
                    // with Namespaces::All
+
                    Namespaces::One(_) | Namespaces::All => {
+
                        let mut remotes = repo.delegates()?.map(PublicKey::from);
+
                        remotes.extend(repo.remote_ids()?.collect::<Result<Vec<_>, _>>()?);
+
                        remotes
+
                    }
+
                    Namespaces::Many(remotes) => remotes,
+
                }
+
            }
+
        };
+

+
        Ok(StagingPhaseFinal {
+
            repo: self.repo,
+
            production: self.production,
+
            remotes,
+
            _tmp: self._tmp,
+
        })
+
    }
+

+
    fn repository(
+
        staging: &Storage,
+
        production: &Storage,
+
        rid: Id,
+
    ) -> Result<StagedRepository, error::Setup> {
+
        match production.contains(&rid) {
+
            Ok(true) => {
+
                let url = url::File::new(production.path_of(&rid)).to_string();
+
                log::debug!(target: "worker", "Setting up fetch for existing repository: {}", url);
+
                let to = storage::git::paths::repository(&staging, &rid);
+
                let copy = git::raw::build::RepoBuilder::new()
+
                    .bare(true)
+
                    .clone_local(git::raw::build::CloneLocal::Local)
+
                    .clone(&url, &to)?;
+

+
                Ok(StagedRepository::Fetching(Repository {
+
                    id: rid,
+
                    backend: copy,
+
                }))
+
            }
+
            Ok(false) => {
+
                log::debug!(target: "worker", "Setting up clone for new repository {}", rid);
+
                let repo = staging.create(rid)?;
+
                Ok(StagedRepository::Cloning(repo))
+
            }
+
            Err(e) => Err(e.into()),
+
        }
+
    }
+
}
+

+
impl<'a> StagingPhaseFinal<'a> {
+
    /// Return the fetch refspecs for fetching the necessary
+
    /// references.
+
    pub fn refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        match self.repo {
+
            StagedRepository::Cloning(_) => Namespaces::Many(self.remotes.clone()).as_refspecs(),
+
            StagedRepository::Fetching(_) => {
+
                self.remotes().fold(Vec::new(), |mut specs, remote| {
+
                    specs.extend(remote.as_refspecs());
+
                    specs
+
                })
+
            }
+
        }
+
    }
+

+
    /// Finalise the fetching process via the following steps.
+
    ///
+
    /// Verify all `rad/id` and `rad/sigrefs` from fetched
+
    /// remotes. Any remotes that fail will be ignored and not fetched
+
    /// into the production repository.
+
    ///
+
    /// For each remote that verifies, fetch from the staging storage
+
    /// into the production storage using the refspec:
+
    ///
+
    /// ```text
+
    /// refs/namespaces/<remote>/*:refs/namespaces/<remote>/*
+
    /// ```
+
    ///
+
    /// All references that were updated are returned as a
+
    /// [`RefUpdate`].
+
    pub fn transfer(self) -> Result<Vec<RefUpdate>, error::Transfer> {
+
        let verifications = self.verify();
+
        let production = match &self.repo {
+
            StagedRepository::Cloning(repo) => self.production.create(repo.id)?,
+
            StagedRepository::Fetching(repo) => self.production.repository(repo.id)?,
+
        };
+
        let url = url::File::new(self.repo.path().to_path_buf()).to_string();
+
        let mut remote = production.backend.remote_anonymous(&url)?;
+
        let mut updates = Vec::new();
+
        log::debug!(target: "worker", "running transfer fetch");
+
        let callbacks = ref_updates(&mut updates);
+
        {
+
            let specs = verifications
+
                .into_iter()
+
                .filter_map(|(remote, verified)| match verified {
+
                    VerifiedRemote::Failed { reason } => {
+
                        log::warn!(
+
                            target: "worker",
+
                            "{remote} failed to verify, will not fetch any further refs: {reason}",
+
                        );
+
                        None
+
                    }
+
                    VerifiedRemote::Success { remote, .. } => {
+
                        let ns = remote.id.to_namespace().with_pattern(git::refspec::STAR);
+
                        Some(
+
                            Refspec {
+
                                src: ns.clone(),
+
                                dst: ns,
+
                                force: false,
+
                            }
+
                            .to_string(),
+
                        )
+
                    }
+
                })
+
                .collect::<Vec<_>>();
+
            log::debug!(target: "worker", "Transferring staging to production {url}");
+
            let mut opts = git::raw::FetchOptions::default();
+
            opts.remote_callbacks(callbacks);
+
            opts.prune(git::raw::FetchPrune::On);
+
            remote.fetch(&specs, Some(&mut opts), None)?;
+
        }
+
        let head = production.set_head()?;
+
        log::debug!(target: "worker", "Head for {} set to {head}", production.id);
+
        let head = production.set_identity_head()?;
+
        log::debug!(target: "worker", "'refs/rad/id' for {} set to {head}", production.id);
+
        Ok(updates)
+
    }
+

+
    fn remotes(&self) -> impl Iterator<Item = Remote> + '_ {
+
        self.remotes
+
            .iter()
+
            .filter_map(|remote| match SignedRefs::load(remote, self.repo.deref()) {
+
                Ok(refs) => Some(Remote::new(*remote, refs)),
+
                Err(err) => {
+
                    log::warn!(target: "worker", "{remote} failed rad/sigrefs verification: {err}");
+
                    None
+
                }
+
            })
+
    }
+

+
    fn verify(&self) -> BTreeMap<RemoteId, VerifiedRemote> {
+
        self.remotes
+
            .iter()
+
            .map(|remote| {
+
                let verification = match (
+
                    self.repo.identity_doc_of(remote),
+
                    SignedRefs::load(remote, self.repo.deref()),
+
                ) {
+
                    (Ok(doc), Ok(refs)) => VerifiedRemote::Success {
+
                        _doc: doc,
+
                        remote: Remote::new(*remote, refs),
+
                    },
+
                    (Err(e), _) => VerifiedRemote::Failed {
+
                        reason: e.to_string(),
+
                    },
+
                    (_, Err(e)) => VerifiedRemote::Failed {
+
                        reason: e.to_string(),
+
                    },
+
                };
+
                (*remote, verification)
+
            })
+
            .collect()
+
    }
+
}
+

+
fn ref_updates(updates: &mut Vec<RefUpdate>) -> git::raw::RemoteCallbacks<'_> {
+
    let mut callbacks = git::raw::RemoteCallbacks::new();
+
    callbacks.update_tips(|name, old, new| {
+
        if let Ok(name) = git::RefString::try_from(name) {
+
            if name.to_namespaced().is_some() {
+
                updates.push(RefUpdate::from(name, old, new));
+
                // Returning `true` ensures the process is not aborted.
+
                return true;
+
            }
+
        }
+
        log::warn!(target: "worker", "Invalid ref `{}` detected; aborting fetch", name);
+

+
        false
+
    });
+
    callbacks
+
}
added radicle-node/src/worker/fetch/error.rs
@@ -0,0 +1,45 @@
+
use std::io;
+

+
use thiserror::Error;
+

+
use radicle::{git, identity, storage, storage::refs};
+

+
#[derive(Debug, Error)]
+
pub enum Init {
+
    #[error(transparent)]
+
    Io(#[from] io::Error),
+
    #[error(transparent)]
+
    Setup(#[from] Setup),
+
}
+

+
#[derive(Debug, Error)]
+
pub enum Setup {
+
    #[error(transparent)]
+
    Git(#[from] git::raw::Error),
+
    #[error(transparent)]
+
    Identity(#[from] identity::IdentityError),
+
    #[error(transparent)]
+
    Storage(#[from] storage::Error),
+
}
+

+
#[derive(Debug, Error)]
+
pub enum Transfer {
+
    #[error(transparent)]
+
    Git(#[from] git::raw::Error),
+
    #[error(transparent)]
+
    Identity(#[from] identity::IdentityError),
+
    #[error(transparent)]
+
    Storage(#[from] storage::Error),
+
}
+

+
#[derive(Debug, Error)]
+
pub enum Transition {
+
    #[error(transparent)]
+
    Doc(#[from] identity::doc::DocError),
+
    #[error(transparent)]
+
    Git(#[from] git::raw::Error),
+
    #[error(transparent)]
+
    Identity(#[from] identity::IdentityError),
+
    #[error(transparent)]
+
    Refs(#[from] refs::Error),
+
}
added radicle-node/src/worker/fetch/refspecs.rs
@@ -0,0 +1,170 @@
+
use std::fmt;
+
use std::fmt::Write as _;
+

+
use radicle::crypto::PublicKey;
+
use radicle::git;
+
use radicle::git::refs::storage::{IDENTITY_BRANCH, SIGREFS_BRANCH};
+
use radicle::storage;
+
use radicle::storage::git::NAMESPACES_GLOB;
+
use radicle::storage::{Namespaces, Remote};
+

+
/// A Git [refspec].
+
///
+
/// [refspec]: https://git-scm.com/book/en/v2/Git-Internals-The-Refspec
+
// TODO(finto): this should go into radicle-git-ext/git-ref-format
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Refspec<T, U> {
+
    pub src: T,
+
    pub dst: U,
+
    pub force: bool,
+
}
+

+
impl<T, U> fmt::Display for Refspec<T, U>
+
where
+
    T: AsRef<git::PatternStr>,
+
    U: AsRef<git::PatternStr>,
+
{
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        if self.force {
+
            f.write_char('+')?;
+
        }
+
        write!(f, "{}:{}", self.src.as_ref(), self.dst.as_ref())
+
    }
+
}
+

+
/// Radicle special refs, i.e. `refs/rad/*`.
+
pub struct SpecialRefs(pub(super) Namespaces);
+

+
impl AsRefspecs for SpecialRefs {
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        match &self.0 {
+
            Namespaces::All => {
+
                let id = NAMESPACES_GLOB.join(&*IDENTITY_BRANCH);
+
                let sigrefs = NAMESPACES_GLOB.join(&*SIGREFS_BRANCH);
+
                vec![
+
                    Refspec {
+
                        src: id.clone(),
+
                        dst: id,
+
                        force: false,
+
                    },
+
                    Refspec {
+
                        src: sigrefs.clone(),
+
                        dst: sigrefs,
+
                        force: false,
+
                    },
+
                ]
+
            }
+
            Namespaces::One(pk) => rad_refs(pk),
+
            Namespaces::Many(pks) => pks.iter().flat_map(rad_refs).collect(),
+
        }
+
    }
+

+
    fn into_refspecs(self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        self.as_refspecs()
+
    }
+
}
+

+
fn rad_refs(pk: &PublicKey) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
    let ns = pk.to_namespace();
+
    let id = git::PatternString::from(ns.join(&*IDENTITY_BRANCH));
+
    let id = Refspec {
+
        src: id.clone(),
+
        dst: id,
+
        force: false,
+
    };
+
    let sigrefs = git::PatternString::from(ns.join(&*SIGREFS_BRANCH));
+
    let sigrefs = Refspec {
+
        src: sigrefs.clone(),
+
        dst: sigrefs,
+
        force: false,
+
    };
+
    vec![id, sigrefs]
+
}
+

+
/// A conversion trait for producing a set of Git [`Refspec`]s.
+
pub trait AsRefspecs
+
where
+
    Self: Sized,
+
{
+
    /// Convert the borrowed data into a set of [`Refspec`]s.
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>>;
+

+
    /// Convert the owned data into a set of [`Refspec`]s.
+
    ///
+
    /// Nb. The default implementation uses
+
    /// [`AsRefspecs::as_refspecs`], which may clone data.
+
    fn into_refspecs(self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        self.as_refspecs()
+
    }
+
}
+

+
impl AsRefspecs for Namespaces {
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        match self {
+
            Namespaces::All => vec![Refspec {
+
                src: (*storage::git::NAMESPACES_GLOB).clone(),
+
                dst: (*storage::git::NAMESPACES_GLOB).clone(),
+
                force: false,
+
            }],
+
            Namespaces::One(pk) => {
+
                let ns = pk.to_namespace().with_pattern(git::refspec::STAR);
+
                vec![Refspec {
+
                    src: ns.clone(),
+
                    dst: ns,
+
                    force: false,
+
                }]
+
            }
+
            Namespaces::Many(pks) => pks
+
                .iter()
+
                .map(|pk| {
+
                    let ns = pk.to_namespace().with_pattern(git::refspec::STAR);
+
                    Refspec {
+
                        src: ns.clone(),
+
                        dst: ns,
+
                        force: false,
+
                    }
+
                })
+
                .collect(),
+
        }
+
    }
+
}
+

+
impl AsRefspecs for Refspec<git::PatternString, git::PatternString> {
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        vec![self.clone()]
+
    }
+

+
    fn into_refspecs(self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        vec![self]
+
    }
+
}
+

+
impl<T: AsRefspecs> AsRefspecs for Vec<T> {
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        self.iter().flat_map(AsRefspecs::as_refspecs).collect()
+
    }
+

+
    fn into_refspecs(self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        self.into_iter()
+
            .flat_map(AsRefspecs::into_refspecs)
+
            .collect()
+
    }
+
}
+

+
impl AsRefspecs for Remote {
+
    fn as_refspecs(&self) -> Vec<Refspec<git::PatternString, git::PatternString>> {
+
        let ns = self.id.to_namespace();
+
        // Nb. the references in Refs are expected to be Qualified
+
        self.refs
+
            .iter()
+
            .map(|(name, _)| {
+
                let name = git::PatternString::from(ns.join(name));
+
                Refspec {
+
                    src: name.clone(),
+
                    dst: name,
+
                    force: true,
+
                }
+
            })
+
            .collect()
+
    }
+
}
modified radicle/src/git.rs
@@ -19,7 +19,9 @@ pub use ext::Oid;
pub use git2 as raw;
pub use git_ref_format as fmt;
pub use git_ref_format::{
-
    component, lit, name, qualified, refname, Component, Namespaced, Qualified, RefStr, RefString,
+
    component, lit, name, qualified, refname, refspec,
+
    refspec::{PatternStr, PatternString},
+
    Component, Namespaced, Qualified, RefStr, RefString,
};
pub use radicle_git_ext as ext;
pub use storage::git::transport::local::Url;