Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Flesh out custom git transport
Alexis Sellier committed 3 years ago
commit 1da38708fc6f07ae9ae2f773b03b35c458afe371
parent b4dcc00fe3fd82d7b6cd074f06124506f9c7a825
4 files changed +199 -47
modified Cargo.lock
@@ -768,6 +768,7 @@ dependencies = [
name = "radicle"
version = "0.2.0"
dependencies = [
+
 "crossbeam-channel",
 "ed25519-compact",
 "fastrand",
 "git-ref-format",
modified radicle/Cargo.toml
@@ -10,6 +10,7 @@ default = []
test = ["quickcheck"]

[dependencies]
+
crossbeam-channel = { version = "0.5.6" }
ed25519-compact = { version = "1.0.12", features = ["pem"] }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
modified radicle/src/storage/git.rs
@@ -652,9 +652,13 @@ pub mod paths {

#[cfg(test)]
mod tests {
+
    use std::io::{Read, Write};
+
    use std::{io, net, process, thread};
+

    use super::*;
    use crate::assert_matches;
    use crate::git;
+
    use crate::rad;
    use crate::storage::refs::SIGNATURE_REF;
    use crate::storage::{ReadRepository, ReadStorage, RefUpdate, WriteRepository};
    use crate::test::arbitrary;
@@ -793,23 +797,33 @@ mod tests {

    #[test]
    fn test_upload_pack() {
-
        use std::io::{Read, Write};
-
        use std::{io, net, process, thread};
-

+
        let tmp = tempfile::tempdir().unwrap();
+
        let signer = MockSigner::default();
+
        let remote = *signer.public_key();
+
        let storage = Storage::open(tmp.path().join("storage")).unwrap();
        let socket = net::TcpListener::bind(net::SocketAddr::from(([0, 0, 0, 0], 0))).unwrap();
        let addr = socket.local_addr().unwrap();
-
        let tmp = tempfile::tempdir().unwrap();
        let source_path = tmp.path().join("source");
        let target_path = tmp.path().join("target");
-
        let (_source, _) = fixtures::repository(&source_path);
+
        let (source, _) = fixtures::repository(&source_path);
+
        let (proj, _) = rad::init(
+
            &source,
+
            "radicle",
+
            "radicle",
+
            BranchName::from("master"),
+
            signer,
+
            &storage,
+
        )
+
        .unwrap();

        let t = thread::spawn(move || {
            let (stream, _) = socket.accept().unwrap();
-
            // NOTE: `--stateless-rpc` doesn't work, neither does `GIT_PROTOCOL=version=2`.
+
            let repo = storage.repository(proj).unwrap();
+
            // NOTE: `GIT_PROTOCOL=version=2` doesn't work.
            let mut child = process::Command::new("git")
-
                .current_dir(source_path.join(".git"))
+
                .current_dir(repo.path())
                .arg("upload-pack")
-
                .arg("--strict")
+
                .arg("--strict") // The path to the git repo must be exact.
                .arg(".")
                .stdout(process::Stdio::piped())
                .stdin(process::Stdio::piped())
@@ -854,18 +868,56 @@ mod tests {
            let target = git2::Repository::init_bare(target_path).unwrap();
            let refs: &[&str] = &["refs/*:refs/*"];

+
            let stream = net::TcpStream::connect(addr).unwrap();
+
            let mut stream_r = stream.try_clone().unwrap();
+
            let mut stream_w = stream.try_clone().unwrap();
+
            let (stream_r_send, stream_r_recv) = crossbeam_channel::unbounded::<Vec<u8>>();
+
            let (stream_w_send, stream_w_recv) = crossbeam_channel::unbounded::<Vec<u8>>();
+

+
            let smart = transport::smart();
+
            smart.insert(proj, transport::Stream::new(stream_w_send, stream_r_recv));
+

+
            let rt = thread::spawn(move || {
+
                let mut buf = vec![0u8; 1024];
+

+
                while let Ok(n) = stream_r.read(&mut buf) {
+
                    if n == 0 {
+
                        break;
+
                    }
+
                    stream_r_send.send(buf[..n].to_vec()).unwrap();
+
                }
+
            });
+
            let wt = thread::spawn(move || {
+
                for buf in stream_w_recv.iter() {
+
                    stream_w.write_all(&buf).unwrap();
+
                }
+
            });
+

            // Register the `rad://` transport.
            transport::register("rad").unwrap();
            // Fetch with the `rad://` transport.
            target
-
                .remote_anonymous(&format!("rad://{}", addr))
+
                .remote_anonymous(&format!("rad://{}", proj))
                .unwrap()
                .fetch(refs, Some(&mut opts), None)
                .unwrap();

+
            smart.remove(&proj);
+
            stream.shutdown(net::Shutdown::Both).unwrap();
+

+
            wt.join().unwrap();
+
            rt.join().unwrap();
            t.join().unwrap();
        }
-
        assert_eq!(updates, vec![String::from("refs/heads/master")]);
+

+
        assert_eq!(
+
            updates,
+
            vec![
+
                format!("refs/remotes/{remote}/heads/master"),
+
                format!("refs/remotes/{remote}/heads/radicle/id"),
+
                format!("refs/remotes/{remote}/radicle/signature")
+
            ]
+
        );
    }

    #[test]
modified radicle/src/storage/git/transport.rs
@@ -1,13 +1,65 @@
+
//! Git sub-transport used for fetching radicle data.
+
//!
+
//! To have control over the communication, and to allow git streams to be multiplexed over
+
//! existing TCP connections, we implement the [`git2::transport::SmartSubtransport`] trait.
+
//!
+
//! We choose `rad` as the URL scheme for this custom transport, and include only the identity
+
//! of the repository we're looking to fetch, eg. `rad://zP1GztjSdYNHK7jpdrXbaJ6Ki2Ke`, since
+
//! we expect a connection to a host to already be established.
+
//!
+
//! We then maintain a map from identifier to stream, for all active streams, ie. streams that
+
//! are associated with an underlying TCP connection. When a URL is requested, we lookup
+
//! the stream and return it to the [`git2`] smart-protocol implementation, so that it can carry
+
//! out the git smart protocol.
+
//!
+
//! This module is meant to be used by first registering our transport with [`register`] and then
+
//! adding or removing streams through [`Smart`], which can be obtained by calling [`smart`].
+
use std::collections::HashMap;
+
use std::io;
use std::str::FromStr;
use std::sync::atomic;
-
use std::{io, net};
+
use std::sync::{Arc, Mutex};
+

+
use crossbeam_channel as chan;
+
use once_cell::sync::Lazy;

use crate::git;
+
use crate::identity::Id;
+

+
/// The map of git smart sub-transport streams. We keep a global map because we have
+
/// no control over how [`git2::transport::register`] instantiates our [`Smart`] transport
+
/// or its underlying streams.
+
static STREAMS: Lazy<Arc<Mutex<HashMap<Id, Stream>>>> = Lazy::new(Default::default);
+

+
/// Git transport protocol over an I/O stream.
+
#[derive(Clone)]
+
pub struct Smart {
+
    /// The underlying active streams, keyed by repository identifier.
+
    streams: Arc<Mutex<HashMap<Id, Stream>>>,
+
}
+

+
impl Smart {
+
    pub fn get(&self, id: &Id) -> Option<Stream> {
+
        self.streams.lock().unwrap().get(id).cloned()
+
    }
+

+
    pub fn insert(&self, id: Id, stream: Stream) {
+
        self.streams.lock().unwrap().insert(id, stream);
+
    }

-
/// Git smart protocol over a TCP stream.
-
pub struct Smart;
+
    pub fn remove(&self, id: &Id) {
+
        self.streams.lock().unwrap().remove(id);
+
    }
+
}

impl git2::transport::SmartSubtransport for Smart {
+
    /// Run a git service on this transport.
+
    ///
+
    /// Based on the URL, which must be of the form `rad://zP1GztjSdYNHK7jpdrXbaJ6Ki2Ke`,
+
    /// we retrieve an underlying stream and return it.
+
    ///
+
    /// We only support the upload-pack service, since only fetches are authorized by the
+
    /// remote.
    fn action(
        &self,
        url: &str,
@@ -15,36 +67,34 @@ impl git2::transport::SmartSubtransport for Smart {
    ) -> Result<Box<dyn git2::transport::SmartSubtransportStream>, git2::Error> {
        let url = git::Url::from_bytes(url.as_bytes())
            .map_err(|e| git2::Error::from_str(e.to_string().as_str()))?;
+
        let id = Id::from_str(url.host.unwrap_or_default().as_str())
+
            .map_err(|_| git2::Error::from_str("Git URL does not contain a valid project id"))?;

-
        let addr = if let (Some(host), Some(port)) = (url.host, url.port) {
-
            // TODO: Support hostnames.
-
            net::SocketAddr::new(
-
                net::IpAddr::from_str(&host)
-
                    .map_err(|e| git2::Error::from_str(e.to_string().as_str()))?,
-
                port,
-
            )
-
        } else {
-
            return Err(git2::Error::from_str("Git URL must have a host and port"));
-
        };
-

-
        let stream = std::net::TcpStream::connect(addr)
-
            .map_err(|e| git2::Error::from_str(e.to_string().as_str()))?;
+
        if url.scheme != git::url::Scheme::Radicle {
+
            return Err(git2::Error::from_str("Git URL scheme must be `rad`"));
+
        }

-
        match action {
-
            git2::transport::Service::UploadPackLs => {}
-
            git2::transport::Service::UploadPack => {}
-
            git2::transport::Service::ReceivePack => {
-
                return Err(git2::Error::from_str(
-
                    "git-receive-pack is not supported with the custom transport",
-
                ));
-
            }
-
            git2::transport::Service::ReceivePackLs => {
-
                return Err(git2::Error::from_str(
-
                    "git-receive-pack is not supported with the custom transport",
-
                ));
+
        if let Some(stream) = self.get(&id) {
+
            match action {
+
                git2::transport::Service::UploadPackLs => {}
+
                git2::transport::Service::UploadPack => {}
+
                git2::transport::Service::ReceivePack => {
+
                    return Err(git2::Error::from_str(
+
                        "git-receive-pack is not supported with the custom transport",
+
                    ));
+
                }
+
                git2::transport::Service::ReceivePackLs => {
+
                    return Err(git2::Error::from_str(
+
                        "git-receive-pack is not supported with the custom transport",
+
                    ));
+
                }
            }
+
            Ok(Box::new(stream))
+
        } else {
+
            Err(git2::Error::from_str(&format!(
+
                "repository {id} does not have an associated stream"
+
            )))
        }
-
        Ok(Box::new(Stream { stream }))
    }

    fn close(&self) -> Result<(), git2::Error> {
@@ -52,34 +102,72 @@ impl git2::transport::SmartSubtransport for Smart {
    }
}

-
struct Stream {
-
    stream: std::net::TcpStream,
+
/// A byte stream connected to some I/O source.
+
/// One of these is created for every git operation, eg. `fetch`.
+
#[derive(Clone)]
+
pub struct Stream {
+
    /// Send bytes to the network.
+
    send: chan::Sender<Vec<u8>>,
+
    /// Receive bytes from the network.
+
    recv: chan::Receiver<Vec<u8>>,
+
    /// Bytes read from the receive channel that didn't fit in the read buffer.
+
    pending: Vec<u8>,
+
}
+

+
impl Stream {
+
    /// Create a new stream from a sender and receiver.
+
    pub fn new(send: chan::Sender<Vec<u8>>, recv: chan::Receiver<Vec<u8>>) -> Self {
+
        Self {
+
            send,
+
            recv,
+
            pending: Vec::new(),
+
        }
+
    }
}

impl io::Write for Stream {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-
        self.stream.write(buf)
+
        self.send
+
            .send(buf.to_owned())
+
            .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))?;
+

+
        Ok(buf.len())
    }

-
    fn flush(&mut self) -> std::io::Result<()> {
-
        self.stream.flush()
+
    fn flush(&mut self) -> io::Result<()> {
+
        Ok(())
    }
}

impl io::Read for Stream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
        self.stream.read(buf)
+
        let bytes = self
+
            .recv
+
            .recv()
+
            .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))?;
+
        self.pending.extend(&bytes);
+

+
        // There must be a nicer way to do this...
+
        let count = buf.len().min(self.pending.len());
+
        buf[..count].copy_from_slice(&self.pending[..count]);
+
        self.pending.drain(..count);
+

+
        Ok(count)
    }
}

-
/// Register the "smart" transport with `git`.
+
/// Register the radicle transport with `git`.
+
///
+
/// Returns an error if called more than once.
+
///
pub fn register(prefix: &str) -> Result<(), git2::Error> {
    static REGISTERED: atomic::AtomicBool = atomic::AtomicBool::new(false);

+
    // Registration is not thread-safe, so make sure we prevent re-entrancy.
    if !REGISTERED.swap(true, atomic::Ordering::SeqCst) {
        unsafe {
            git2::transport::register(prefix, move |remote| {
-
                git2::transport::Transport::smart(remote, false, Smart)
+
                git2::transport::Transport::smart(remote, false, self::smart())
            })
        }
    } else {
@@ -88,3 +176,13 @@ pub fn register(prefix: &str) -> Result<(), git2::Error> {
        ))
    }
}
+

+
/// Get access to the radicle smart transport protocol.
+
///
+
/// The returned object has mutable access to the underlying stream map, and is safe to clone.
+
///
+
pub fn smart() -> Smart {
+
    Smart {
+
        streams: STREAMS.clone(),
+
    }
+
}