Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Tunnel git data without parsing packet-lines
Alexis Sellier committed 3 years ago
commit bda5d6f0803e1c78c78d84460ce899f28a7b2176
parent a05de1fbf3902aa895b95cc8020ba0ab0efabc7e
3 files changed +120 -140
modified radicle-node/src/wire/protocol.rs
@@ -411,11 +411,6 @@ where
            return;
        };

-
        #[cfg(test)]
-
        if c.receiver.is_empty() {
-
            panic!("Wire::flush: redundant flush");
-
        }
-

        for data in c.receiver.try_iter() {
            let frame = match data {
                ChannelEvent::Data(data) => Frame::git(stream, data),
modified radicle-node/src/worker.rs
@@ -211,12 +211,11 @@ impl Worker {
            Fetch::Responder { .. } => {
                log::debug!(target: "worker", "Worker processing incoming fetch..");

-
                let (stream_w, mut stream_r) = channels.split();
-
                let mut pktline_r = pktline::Reader::new(&mut stream_r);
+
                let (stream_w, stream_r) = channels.split();
                // Nb. two fetches are usually expected: one for the *special* refs,
                // followed by another for the signed refs.
                loop {
-
                    match self.upload_pack(fetch, stream, &mut pktline_r, stream_w) {
+
                    match self.upload_pack(fetch, stream, stream_r, stream_w) {
                        Ok(ControlFlow::Continue(())) => continue,
                        Ok(ControlFlow::Break(())) => break,
                        Err(e) => return Err(e.into()),
@@ -251,7 +250,6 @@ impl Worker {
                return Err(e);
            }
        }
-
        Self::eof(remote, stream, &mut channels.sender, &mut self.handle)?;

        let staging = staging.into_final()?;
        match self._fetch(
@@ -267,7 +265,6 @@ impl Worker {
                return Err(e);
            }
        }
-
        Self::eof(remote, stream, &mut channels.sender, &mut self.handle)?;

        staging.transfer().map_err(FetchError::from)
    }
@@ -276,14 +273,14 @@ impl Worker {
        &mut self,
        fetch: &Fetch,
        stream: StreamId,
-
        pktline_r: &mut pktline::Reader<&mut ChannelReader>,
+
        stream_r: &mut ChannelReader,
        stream_w: &mut ChannelWriter,
    ) -> Result<ControlFlow<()>, UploadError> {
        log::debug!(target: "worker", "Waiting for Git request pktline from {}..", fetch.remote());

        // Read the request packet line to make sure the repository being requested matches what
        // we expect, and that the service requested is valid.
-
        let (rid, request) = match pktline_r.read_request_pktline() {
+
        let (rid, request) = match pktline::Reader::new(stream_r).read_request_pktline() {
            Ok((req, pktline)) => (req.repo, pktline),
            Err(err) if err.kind() == io::ErrorKind::ConnectionReset => {
                log::debug!(
@@ -298,7 +295,7 @@ impl Worker {
        };
        log::debug!(target: "worker", "Received Git request pktline for {rid}..");

-
        match self._upload_pack(rid, fetch.remote(), request, stream, pktline_r, stream_w) {
+
        match self._upload_pack(rid, fetch.remote(), request, stream, stream_r, stream_w) {
            Ok(()) => {
                log::debug!(target: "worker", "Upload of {rid} to {} exited successfully", fetch.remote());

@@ -314,7 +311,7 @@ impl Worker {
        remote: NodeId,
        request: Vec<u8>,
        stream: StreamId,
-
        stream_r: &mut pktline::Reader<&mut ChannelReader>,
+
        stream_r: &mut ChannelReader,
        stream_w: &mut ChannelWriter,
    ) -> Result<(), UploadError> {
        log::debug!(target: "worker", "Connecting to daemon..");
@@ -323,47 +320,59 @@ impl Worker {
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)
            .map_err(UploadError::DaemonConnectionFailed)?;
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone()?, daemon);
-
        let mut daemon_r = pktline::Reader::new(&mut daemon_r);

        // Write the raw request to the daemon, once we've parsed it.
        daemon_w.write_all(&request)?;

        log::debug!(target: "worker", "Entering Git protocol loop for {rid}..");
-
        // We now loop, alternating between reading requests from the client, and writing responses
-
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
-
        let mut buffer = [0; u16::MAX as usize + 1];
-
        loop {
-
            // Read from the daemon and write to the stream.
-
            if let Err(e) = daemon_r.pipe(stream_w, &mut buffer) {
-
                // This is the expected error when the daemon disconnects.
-
                if e.kind() == io::ErrorKind::UnexpectedEof {
-
                    log::debug!(target: "worker", "Daemon closed the git connection for {rid}");
-
                    log::debug!(target: "worker", "Waiting for end-of-file from remote..");

-
                    stream_r.wait_for_eof()?;
+
        thread::scope(|s| {
+
            let daemon_to_stream = s.spawn(|| -> Result<(), UploadError> {
+
                let mut buffer = [0; u16::MAX as usize + 1];

-
                    return Ok(());
+
                loop {
+
                    match daemon_r.read(&mut buffer) {
+
                        Ok(0) => break,
+
                        Ok(n) => {
+
                            stream_w.write_all(&buffer[..n]).unwrap();
+

+
                            if let Err(e) = self.handle.flush(remote, stream) {
+
                                log::error!(target: "worker", "Worker channel disconnected; aborting");
+
                                return Err(e.into());
+
                            }
+
                        }
+
                        Err(e) => {
+
                            if e.kind() == io::ErrorKind::UnexpectedEof {
+
                                log::debug!(target: "worker", "Daemon closed the git connection for {rid}");
+
                                break;
+
                            }
+
                            return Err(e.into());
+
                        }
+
                    }
                }
-
                return Err(e.into());
-
            }
+
                Ok(())
+
            });

-
            if let Err(e) = self.handle.flush(remote, stream) {
-
                log::error!(target: "worker", "Worker channel disconnected; aborting");
-
                return Err(e.into());
-
            }
+
            let stream_to_daemon = s.spawn(move || -> Result<(), io::Error> {
+
                let mut buffer = [0; u16::MAX as usize + 1];

-
            // Read from the stream and write to the daemon.
-
            match stream_r.pipe(&mut daemon_w, &mut buffer) {
-
                Ok(()) => continue,
-
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
-
                Err(e) => {
-
                    if e.kind() == io::ErrorKind::ConnectionReset {
-
                        log::debug!(target: "worker", "Remote closed the git connection for {rid}");
+
                loop {
+
                    match stream_r.read(&mut buffer) {
+
                        Ok(n) => daemon_w.write_all(&buffer[..n])?,
+
                        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
+
                        Err(e) => return Err(e),
                    }
-
                    return Err(e.into());
                }
-
            }
-
        }
+
                daemon_w.shutdown(net::Shutdown::Both)
+
            });
+

+
            stream_to_daemon.join().unwrap()?;
+
            daemon_to_stream.join().unwrap()?;
+

+
            Ok::<(), UploadError>(())
+
        })?;
+

+
        Self::eof(remote, stream, stream_w, &mut self.handle).map_err(UploadError::from)
    }

    fn _fetch<S>(
@@ -377,7 +386,7 @@ impl Worker {
    where
        S: fetch::AsRefspecs,
    {
-
        let mut tunnel = Tunnel::with(channels, stream, remote, self.handle.clone())?;
+
        let mut tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
        let tunnel_addr = tunnel.local_addr();
        let mut cmd = process::Command::new("git");
        cmd.current_dir(repo.path())
@@ -442,12 +451,12 @@ impl Worker {
        stream: StreamId,
        sender: &mut ChannelWriter,
        handle: &mut Handle,
-
    ) -> Result<(), FetchError> {
+
    ) -> Result<(), io::Error> {
        log::debug!(target: "worker", "Sending end-of-file to remote {remote}..");

-
        if let Err(e) = sender.eof() {
-
            log::error!(target: "worker", "Fetch error: error sending end-of-file message: {e}");
-
            return Err(e.into());
+
        if sender.eof().is_err() {
+
            log::error!(target: "worker", "Fetch error: error sending end-of-file message: channel disconnected");
+
            return Err(io::ErrorKind::BrokenPipe.into());
        }
        if let Err(e) = handle.flush(remote, stream) {
            log::error!(target: "worker", "Error flushing worker stream: {e}");
@@ -504,37 +513,16 @@ impl Pool {
pub mod pktline {
    use std::io;
    use std::io::Read;
-
    use std::net::TcpStream;
    use std::str;

    use super::Id;

    pub const HEADER_LEN: usize = 4;
-
    pub const FLUSH_PKT: &[u8; HEADER_LEN] = b"0000";
-
    pub const DELIM_PKT: &[u8; HEADER_LEN] = b"0001";
-
    pub const RESPONSE_END_PKT: &[u8; HEADER_LEN] = b"0002";

    pub struct Reader<'a, R> {
        stream: &'a mut R,
    }

-
    impl<'a> Reader<'a, TcpStream> {
-
        /// Check whether the stream ended.
-
        pub fn is_eof(&self) -> io::Result<bool> {
-
            // Use non-blocking mode instead of timeouts, as we don't want to mess
-
            // with existing timeouts.
-
            self.stream.set_nonblocking(true)?;
-
            let eof = match self.stream.peek(&mut []) {
-
                Ok(0) => true,
-
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => true,
-
                _ => false,
-
            };
-
            self.stream.set_nonblocking(false)?;
-

-
            Ok(eof)
-
        }
-
    }
-

    impl<'a, R: io::Read> Reader<'a, R> {
        /// Create a new packet-line reader.
        pub fn new(stream: &'a mut R) -> Self {
@@ -546,15 +534,6 @@ pub mod pktline {
            self.stream
        }

-
        /// Wait for EOF.
-
        pub fn wait_for_eof(&mut self) -> io::Result<()> {
-
            match self.stream.read_to_end(&mut Vec::new()) {
-
                Ok(_) => Ok(()),
-
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(()),
-
                Err(e) => Err(e),
-
            }
-
        }
-

        /// Parse a Git request packet-line.
        ///
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
@@ -569,14 +548,9 @@ pub mod pktline {
        }

        /// Parse a Git packet-line.
-
        pub fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.read_exact(&mut buf[..HEADER_LEN])?;
-
            if &buf[..HEADER_LEN] == FLUSH_PKT
-
                || &buf[..HEADER_LEN] == DELIM_PKT
-
                || &buf[..HEADER_LEN] == RESPONSE_END_PKT
-
            {
-
                return Ok(HEADER_LEN);
-
            }
+

            let length = str::from_utf8(&buf[..HEADER_LEN])
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
            let length = usize::from_str_radix(length, 16)
@@ -586,23 +560,6 @@ pub mod pktline {

            Ok(length)
        }
-

-
        /// Read packet-lines from the internal reader into `buf`,
-
        /// and write them to the given writer. Exits when a [`FLUSH_PKT`] packet is received.
-
        pub fn pipe<W: io::Write>(&mut self, w: &mut W, buf: &mut [u8]) -> io::Result<()> {
-
            loop {
-
                let n = self.read_pktline(buf)?;
-
                if n == 0 {
-
                    break;
-
                }
-
                w.write_all(&buf[..n])?;
-

-
                if &buf[..n] == FLUSH_PKT {
-
                    break;
-
                }
-
            }
-
            Ok(())
-
        }
    }

    impl<'a, R: io::Read> io::Read for Reader<'a, R> {
modified radicle-node/src/worker/tunnel.rs
@@ -1,25 +1,27 @@
use std::{
-
    io::{self, Write},
-
    net, time,
+
    io::{self, Read, Write},
+
    net, thread, time,
};

use super::channels::Channels;
-
use super::{pktline, Handle, NodeId, StreamId};
+
use super::{Handle, NodeId, StreamId, Worker};

/// Tunnels fetches to a remote peer.
pub struct Tunnel<'a> {
-
    stream: &'a mut Channels,
+
    channels: &'a mut Channels,
    listener: net::TcpListener,
    local_addr: net::SocketAddr,
-
    channel: StreamId,
+
    stream: StreamId,
+
    local: NodeId,
    remote: NodeId,
    handle: Handle,
}

impl<'a> Tunnel<'a> {
    pub(super) fn with(
-
        stream: &'a mut Channels,
-
        channel: StreamId,
+
        channels: &'a mut Channels,
+
        stream: StreamId,
+
        local: NodeId,
        remote: NodeId,
        handle: Handle,
    ) -> io::Result<Self> {
@@ -27,10 +29,11 @@ impl<'a> Tunnel<'a> {
        let local_addr = listener.local_addr()?;

        Ok(Self {
-
            stream,
+
            channels,
            listener,
            local_addr,
-
            channel,
+
            stream,
+
            local,
            remote,
            handle,
        })
@@ -42,39 +45,64 @@ impl<'a> Tunnel<'a> {

    /// Run the tunnel until the connection is closed.
    pub fn run(&mut self, timeout: time::Duration) -> io::Result<()> {
-
        // We now loop, alternating between reading requests from the client, and writing responses
-
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
-
        let mut buffer = [0; u16::MAX as usize + 1];
-
        let (mut remote_w, mut remote_r) = self.stream.split();
-
        let (mut stream, _) = self.listener.accept()?;
+
        let (remote_w, remote_r) = self.channels.split();
+
        let (local, _) = self.listener.accept()?;
+
        let (mut local_r, mut local_w) = (local.try_clone()?, local);
+

+
        local_r.set_read_timeout(Some(timeout))?;
+
        local_w.set_write_timeout(Some(timeout))?;
+

+
        let nid = self.remote;
+
        let stream_id = self.stream;

-
        let mut local = pktline::Reader::new(&mut stream);
-
        let mut remote_r = pktline::Reader::new(&mut remote_r);
+
        thread::scope(|s| {
+
            let remote_to_local = thread::Builder::new()
+
                .name(self.local.to_string())
+
                .spawn_scoped(s, || {
+
                    let mut buffer = [0; u16::MAX as usize + 1];

-
        local.stream().set_read_timeout(Some(timeout))?;
-
        local.stream().set_write_timeout(Some(timeout))?;
+
                    loop {
+
                        match remote_r.read(&mut buffer) {
+
                            Ok(0) => return Ok(()),
+
                            Ok(n) => local_w.write_all(&buffer[..n])?,
+
                            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
+
                                // This is the expected error when the git fetch closes the connection.
+
                                return Ok(());
+
                            }
+
                            Err(e) => return Err(e),
+
                        }
+
                    }
+
                })?;

-
        let (_, buf) = local.read_request_pktline()?;
-
        remote_w.write_all(&buf)?;
+
            let local_to_remote = thread::Builder::new()
+
                .name(self.local.to_string())
+
                .spawn_scoped(s, || {
+
                    let mut buffer = [0; u16::MAX as usize + 1];

-
        // Nb. Annoyingly, we have to always check if the fetch stream is closed on every
-
        // iteration, otherwise we may get stuck waiting for data from the remote while
-
        // we're actually done. After measurement, this checking for EOF only takes
-
        // between 1µs and 4µs, and is therefore an okay compromise.
-
        while !local.is_eof()? {
-
            if self.handle.flush(self.remote, self.channel).is_err() {
-
                return Err(io::ErrorKind::BrokenPipe.into());
-
            }
-
            remote_r.pipe(local.stream(), &mut buffer)?;
+
                    loop {
+
                        match local_r.read(&mut buffer) {
+
                            Ok(0) => break,
+
                            Ok(n) => {
+
                                remote_w.write_all(&buffer[..n])?;

-
            if let Err(e) = local.pipe(&mut remote_w, &mut buffer) {
-
                // This is the expected error when the git fetch closes the connection.
-
                if e.kind() == io::ErrorKind::UnexpectedEof {
-
                    break;
-
                }
-
                return Err(e);
-
            }
-
        }
-
        Ok(())
+
                                if let Err(e) = self.handle.flush(nid, stream_id) {
+
                                    log::error!(
+
                                        target: "worker", "Worker channel disconnected; aborting"
+
                                    );
+
                                    return Err(e);
+
                                }
+
                            }
+
                            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
+
                            Err(e) => return Err(e),
+
                        }
+
                    }
+
                    Worker::eof(nid, stream_id, remote_w, &mut self.handle)
+
                })?;
+

+
            remote_to_local.join().unwrap()?;
+
            local_to_remote.join().unwrap()?;
+

+
            Ok::<(), io::Error>(())
+
        })
    }
}