Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-fetch src transport.rs
pub(crate) mod fetch;
pub(crate) mod ls_refs;

use std::collections::BTreeSet;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use bstr::BString;
use gix_features::progress::prodash::progress;
use gix_protocol::Handshake;
use gix_protocol::handshake;
use gix_transport::Protocol;
use gix_transport::Service;
use gix_transport::client;
use radicle::git::Oid;
use radicle::git::fmt::Qualified;
use radicle::storage::git::Repository;
use thiserror::Error;

use crate::git::packfile::Keepfile;
use crate::git::repository;
use crate::stage::RefPrefix;

/// Open a reader and writer stream to pass to the ls-refs and fetch
/// processes for communicating during their respective protocols.
pub trait ConnectionStream {
    type Read: io::Read;
    type Write: io::Write + SignalEof;

    fn open(&mut self) -> (&mut Self::Read, &mut Self::Write);
}

/// The ability to signal EOF to the server side so that it can stop
/// serving for this fetch request.
pub trait SignalEof {
    /// Since the git protocol is tunneled over an existing
    /// connection, we can't signal the end of the protocol via the
    /// usual means, which is to close the connection. Git also
    /// doesn't have any special message we can send to signal the end
    /// of the protocol.
    ///
    /// Hence, there's no other way for the server to know that we're
    /// done sending requests than to send a special message outside
    /// the git protocol. This message can then be processed by the
    /// remote worker to end the protocol. We use the special "eof"
    /// control message for this.
    fn eof(&mut self) -> io::Result<()>;
}

/// Configuration for running a Git `handshake`, `ls-refs`, or
/// `fetch`.
pub struct Transport<S> {
    git_dir: PathBuf,
    repo: BString,
    stream: S,
}

#[derive(Error, Debug)]
pub enum Error {
    #[error("gix ls-refs error: {0}")]
    LsRefs(#[from] gix_protocol::ls_refs::Error),
    #[error("gix fetch error: {0}")]
    Fetch(#[from] gix_protocol::fetch::Error),
    #[error("empty or no packfile received")]
    Empty,
    #[error("wanted object not found: {0}")]
    NotFound(Oid),
    #[error("gix pack index error: {0}")]
    PackIndex(#[from] gix_pack::index::init::Error),
}

impl<S> Transport<S>
where
    S: ConnectionStream,
{
    pub fn new(git_dir: PathBuf, mut repo: BString, stream: S) -> Self {
        let repo = if repo.starts_with(b"/") {
            repo
        } else {
            let mut path = BString::new(b"/".to_vec());
            path.append(&mut repo);
            path
        };
        Self {
            git_dir,
            repo,
            stream,
        }
    }

    /// Perform the handshake with the server side.
    #[allow(clippy::result_large_err)]
    pub(crate) fn handshake(&mut self) -> Result<Handshake, handshake::Error> {
        log::trace!("Performing handshake for {}", self.repo);
        let (read, write) = self.stream.open();
        gix_protocol::handshake(
            &mut Connection::new(read, write, self.repo.clone()),
            Service::UploadPack,
            |_| Ok(None),
            vec![],
            &mut progress::Discard,
        )
    }

    /// Perform ls-refs with the server side.
    pub(crate) fn ls_refs(
        &mut self,
        prefixes: impl IntoIterator<Item = RefPrefix>,
        handshake: &Handshake,
    ) -> Result<Vec<handshake::Ref>, Error> {
        let prefixes = prefixes.into_iter().collect::<BTreeSet<_>>();
        let (read, write) = self.stream.open();
        Ok(ls_refs::run(
            ls_refs::Config {
                prefixes,
                repo: self.repo.clone(),
            },
            handshake,
            Connection::new(read, write, self.repo.clone()),
            &mut progress::Discard,
        )?)
    }

    /// Perform the fetch with the server side.
    pub(crate) fn fetch(
        &mut self,
        wants_haves: WantsHaves,
        interrupt: Arc<AtomicBool>,
        handshake: &Handshake,
    ) -> Result<Option<Keepfile>, Error> {
        log::trace!(
            "Running fetch wants={:?}, haves={:?}",
            wants_haves.wants,
            wants_haves.haves
        );
        let out = {
            let (read, write) = self.stream.open();
            fetch::run(
                wants_haves.clone(),
                fetch::PackWriter {
                    git_dir: self.git_dir.clone(),
                    interrupt,
                },
                handshake,
                Connection::new(read, write, self.repo.clone()),
                &mut progress::Discard,
            )?
        };
        let pack_path = out
            .pack
            .ok_or(Error::Empty)?
            .index_path
            .expect("written packfile must have a path");

        // Validate we got all requested tips in the pack
        //
        // N.b. the lookup is a binary search so is efficient for
        // searching any given oid.
        {
            use gix_pack::index::File;

            let idx = File::at(pack_path, gix_hash::Kind::Sha1)?;
            for oid in wants_haves.wants {
                if idx.lookup(oid).is_none() {
                    return Err(Error::NotFound(oid));
                }
            }
        }

        Ok(out.keepfile)
    }

    /// Signal to the server side that we are done sending ls-refs and
    /// fetch commands.
    pub(crate) fn done(&mut self) -> io::Result<()> {
        let (_, w) = self.stream.open();
        w.eof()
    }
}

pub(crate) struct Connection<R, W> {
    inner: client::git::blocking_io::Connection<R, W>,
}

impl<R, W> Connection<R, W>
where
    R: io::Read,
    W: io::Write,
{
    pub fn new(read: R, write: W, repo: BString) -> Self {
        Self {
            inner: client::git::blocking_io::Connection::new(
                read,
                write,
                Protocol::V2,
                repo,
                None::<(String, Option<u16>)>,
                client::git::ConnectMode::Daemon,
                false,
            ),
        }
    }
}

impl<R, W> client::blocking_io::Transport for Connection<R, W>
where
    R: std::io::Read,
    W: std::io::Write,
{
    fn request(
        &mut self,
        write_mode: client::WriteMode,
        on_into_read: client::MessageKind,
        trace: bool,
    ) -> Result<client::blocking_io::RequestWriter<'_>, client::Error> {
        self.inner.request(write_mode, on_into_read, trace)
    }

    fn handshake<'b>(
        &mut self,
        service: Service,
        extra_parameters: &'b [(&'b str, Option<&'b str>)],
    ) -> Result<client::blocking_io::SetServiceResponse<'_>, client::Error> {
        self.inner.handshake(service, extra_parameters)
    }
}

impl<R, W> client::TransportWithoutIO for Connection<R, W>
where
    R: std::io::Read,
    W: std::io::Write,
{
    fn to_url(&self) -> std::borrow::Cow<'_, bstr::BStr> {
        self.inner.to_url()
    }

    fn connection_persists_across_multiple_requests(&self) -> bool {
        false
    }

    fn configure(
        &mut self,
        config: &dyn std::any::Any,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
        self.inner.configure(config)
    }

    fn supported_protocol_versions(&self) -> &[Protocol] {
        &[Protocol::V2]
    }
}

#[derive(Debug, Error)]
pub enum WantsHavesError {
    #[error(transparent)]
    Ancestry(#[from] repository::error::Ancestry),
    #[error(transparent)]
    Contains(#[from] repository::error::Contains),
    #[error(transparent)]
    Resolve(#[from] repository::error::Resolve),
}

#[derive(Clone, Default)]
pub(crate) struct WantsHaves {
    pub wants: BTreeSet<Oid>,
    pub haves: BTreeSet<Oid>,
}

impl WantsHaves {
    pub fn want(&mut self, oid: Oid) {
        // N.b. if we have it, then we don't want it.
        if !self.haves.contains(&oid) {
            self.wants.insert(oid);
        }
    }

    pub fn have(&mut self, oid: Oid) {
        // N.b. ensure that oid is not in wants
        self.wants.remove(&oid);
        self.haves.insert(oid);
    }

    /// Add a set of references to the `wants` and `haves`.
    ///
    /// For each reference we want to build the range between its
    /// current `Oid` and the advertised `Oid`. This allows the server
    /// to send all objects between that range.
    ///
    /// If the reference exists, the range is given by marking the
    /// existing `Oid` as a `have` and the tip as the `want`. If the
    /// `tip`, however, is the same as the existing `Oid` or is in the
    /// Odb, then there is no need to mark it as a `want`.
    ///
    /// If the reference does not exist, the range is simply marking
    /// the tip as a `want`, iff it does not already exist in the Odb.
    pub fn add<'a, N>(
        &mut self,
        repo: &Repository,
        refs: impl IntoIterator<Item = (N, Oid)>,
    ) -> Result<&mut Self, WantsHavesError>
    where
        N: Into<Qualified<'a>>,
    {
        refs.into_iter().try_fold(self, |acc, (refname, tip)| {
            match repository::refname_to_id(repo, refname)? {
                Some(oid) => {
                    let want = oid != tip && !repository::contains(repo, tip)?;
                    acc.have(oid);

                    if want {
                        acc.want(tip)
                    }
                }
                None => {
                    if !repository::contains(repo, tip)? {
                        acc.want(tip);
                    }
                }
            };
            Ok(acc)
        })
    }
}

fn agent_name() -> String {
    let version = match radicle::git::version() {
        Ok(version) => version,
        Err(err) => {
            use radicle::git::VERSION_REQUIRED;
            log::debug!("The git version could not be determined: {err}");
            log::debug!("Pretending that we are on git version {VERSION_REQUIRED}.");
            VERSION_REQUIRED
        }
    };
    format!("git/{version}")
}