Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Revise git transport to use I/O more directly
Alexis Sellier committed 3 years ago
commit 48a5c75ae3257a830d43e2b845076557b2b07df1
parent f85a0426d7c41a312e005a9ce4024d7760613c8e
2 files changed +26 -106
modified radicle/src/storage/git.rs
@@ -865,48 +865,24 @@ mod tests {
            });
            opts.remote_callbacks(callbacks);

-
            let target = git2::Repository::init_bare(target_path).unwrap();
-
            let refs: &[&str] = &["refs/*:refs/*"];
+
            // Register the `rad://` transport.
+
            transport::register().unwrap();

+
            let target = git2::Repository::init_bare(target_path).unwrap();
            let stream = net::TcpStream::connect(addr).unwrap();
-
            let mut stream_r = stream.try_clone().unwrap();
-
            let mut stream_w = stream.try_clone().unwrap();
-
            let (stream_r_send, stream_r_recv) = crossbeam_channel::unbounded::<Vec<u8>>();
-
            let (stream_w_send, stream_w_recv) = crossbeam_channel::unbounded::<Vec<u8>>();
+
            let smart = transport::Smart::singleton();

-
            let smart = transport::smart();
-
            smart.insert(proj, transport::Stream::new(stream_w_send, stream_r_recv));
+
            smart.insert(proj, Box::new(stream.try_clone().unwrap()));

-
            let rt = thread::spawn(move || {
-
                let mut buf = vec![0u8; 1024];
-

-
                while let Ok(n) = stream_r.read(&mut buf) {
-
                    if n == 0 {
-
                        break;
-
                    }
-
                    stream_r_send.send(buf[..n].to_vec()).unwrap();
-
                }
-
            });
-
            let wt = thread::spawn(move || {
-
                for buf in stream_w_recv.iter() {
-
                    stream_w.write_all(&buf).unwrap();
-
                }
-
            });
-

-
            // Register the `rad://` transport.
-
            transport::register().unwrap();
            // Fetch with the `rad://` transport.
            target
                .remote_anonymous(&format!("rad://{}", proj))
                .unwrap()
-
                .fetch(refs, Some(&mut opts), None)
+
                .fetch(&["refs/*:refs/*"], Some(&mut opts), None)
                .unwrap();

-
            smart.remove(&proj);
            stream.shutdown(net::Shutdown::Both).unwrap();

-
            wt.join().unwrap();
-
            rt.join().unwrap();
            t.join().unwrap();
        }

modified radicle/src/storage/git/transport.rs
@@ -13,14 +13,13 @@
//! out the git smart protocol.
//!
//! This module is meant to be used by first registering our transport with [`register`] and then
-
//! adding or removing streams through [`Smart`], which can be obtained by calling [`smart`].
+
//! adding or removing streams through [`Smart`], which can be obtained via [`Smart::singleton`].
use std::collections::HashMap;
-
use std::io;
use std::str::FromStr;
use std::sync::atomic;
use std::sync::{Arc, Mutex};

-
use crossbeam_channel as chan;
+
use git2::transport::SmartSubtransportStream;
use once_cell::sync::Lazy;

use crate::git;
@@ -31,6 +30,9 @@ use crate::identity::Id;
/// or its underlying streams.
static STREAMS: Lazy<Arc<Mutex<HashMap<Id, Stream>>>> = Lazy::new(Default::default);

+
/// The stream associated with a repository.
+
type Stream = Box<dyn SmartSubtransportStream>;
+

/// Git transport protocol over an I/O stream.
#[derive(Clone)]
pub struct Smart {
@@ -39,16 +41,22 @@ pub struct Smart {
}

impl Smart {
-
    pub fn get(&self, id: &Id) -> Option<Stream> {
-
        self.streams.lock().unwrap().get(id).cloned()
+
    /// Get access to the radicle smart transport protocol.
+
    /// The returned object has mutable access to the underlying stream map, and is safe to clone.
+
    pub fn singleton() -> Self {
+
        Self {
+
            streams: STREAMS.clone(),
+
        }
    }

-
    pub fn insert(&self, id: Id, stream: Stream) {
-
        self.streams.lock().unwrap().insert(id, stream);
+
    /// Take a stream from the map.
+
    /// This makes the stream unavailable until it is re-inserted.
+
    pub fn take(&self, id: &Id) -> Option<Stream> {
+
        self.streams.lock().unwrap().remove(id)
    }

-
    pub fn remove(&self, id: &Id) {
-
        self.streams.lock().unwrap().remove(id);
+
    pub fn insert(&self, id: Id, stream: Stream) {
+
        self.streams.lock().unwrap().insert(id, stream);
    }
}

@@ -74,7 +82,7 @@ impl git2::transport::SmartSubtransport for Smart {
            return Err(git2::Error::from_str("Git URL scheme must be `rad`"));
        }

-
        if let Some(stream) = self.get(&id) {
+
        if let Some(stream) = self.take(&id) {
            match action {
                git2::transport::Service::UploadPackLs => {}
                git2::transport::Service::UploadPack => {}
@@ -89,7 +97,7 @@ impl git2::transport::SmartSubtransport for Smart {
                    ));
                }
            }
-
            Ok(Box::new(stream))
+
            Ok(stream)
        } else {
            Err(git2::Error::from_str(&format!(
                "repository {id} does not have an associated stream"
@@ -102,60 +110,6 @@ impl git2::transport::SmartSubtransport for Smart {
    }
}

-
/// A byte stream connected to some I/O source.
-
/// One of these is created for every git operation, eg. `fetch`.
-
#[derive(Clone)]
-
pub struct Stream {
-
    /// Send bytes to the network.
-
    send: chan::Sender<Vec<u8>>,
-
    /// Receive bytes from the network.
-
    recv: chan::Receiver<Vec<u8>>,
-
    /// Bytes read from the receive channel that didn't fit in the read buffer.
-
    pending: Vec<u8>,
-
}
-

-
impl Stream {
-
    /// Create a new stream from a sender and receiver.
-
    pub fn new(send: chan::Sender<Vec<u8>>, recv: chan::Receiver<Vec<u8>>) -> Self {
-
        Self {
-
            send,
-
            recv,
-
            pending: Vec::new(),
-
        }
-
    }
-
}
-

-
impl io::Write for Stream {
-
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-
        self.send
-
            .send(buf.to_owned())
-
            .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))?;
-

-
        Ok(buf.len())
-
    }
-

-
    fn flush(&mut self) -> io::Result<()> {
-
        Ok(())
-
    }
-
}
-

-
impl io::Read for Stream {
-
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
        let bytes = self
-
            .recv
-
            .recv()
-
            .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))?;
-
        self.pending.extend(&bytes);
-

-
        // There must be a nicer way to do this...
-
        let count = buf.len().min(self.pending.len());
-
        buf[..count].copy_from_slice(&self.pending[..count]);
-
        self.pending.drain(..count);
-

-
        Ok(count)
-
    }
-
}
-

/// Register the radicle transport with `git`.
///
/// Returns an error if called more than once.
@@ -168,7 +122,7 @@ pub fn register() -> Result<(), git2::Error> {
        unsafe {
            let prefix = git::url::Scheme::Radicle.to_string();
            git2::transport::register(&prefix, move |remote| {
-
                git2::transport::Transport::smart(remote, false, self::smart())
+
                git2::transport::Transport::smart(remote, false, Smart::singleton())
            })
        }
    } else {
@@ -177,13 +131,3 @@ pub fn register() -> Result<(), git2::Error> {
        ))
    }
}
-

-
/// Get access to the radicle smart transport protocol.
-
///
-
/// The returned object has mutable access to the underlying stream map, and is safe to clone.
-
///
-
pub fn smart() -> Smart {
-
    Smart {
-
        streams: STREAMS.clone(),
-
    }
-
}