Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix potential protocol mismatch on fetch
Alexis Sellier committed 3 years ago
commit c4a66c20d0b73cfd61e8e25da730e524ee8ae796
parent cf6890f6f1d7b6c3905670a71a24ff72bfa5c7a9
1 file changed +90 -35
modified radicle-node/src/worker.rs
@@ -1,4 +1,5 @@
use std::io::{prelude::*, BufReader};
+
use std::ops::ControlFlow;
use std::thread::JoinHandle;
use std::{env, io, net, process, thread, time};

@@ -49,6 +50,8 @@ pub enum FetchError {
    Io(#[from] io::Error),
    #[error(transparent)]
    Identity(#[from] IdentityError),
+
    #[error("remote failed unexpectedly")]
+
    RemoteAborted,
}

impl FetchError {
@@ -137,12 +140,11 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                let result = self.fetch(rid, namespaces, &mut tunnel);
                let mut session = tunnel.into_session();

-
                // If there are no errors, send a `done` special packet. We don't send this on error,
-
                // as the remote will not be expecting it.
                if let Err(err) = &result {
                    log::error!(target: "worker", "Fetch error: {err}");
-
                } else if let Err(err) = pktline::done(&mut session) {
-
                    log::error!(target: "worker", "Fetch error: {err}");
+
                }
+
                if let Err(err) = pktline::done(&mut session) {
+
                    log::error!(target: "worker", "Fetch error: error sending `done` packet: {err}");
                }
                (session, result)
            }
@@ -158,13 +160,36 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                        return (err.original, Err(err.error.into()));
                    }
                };
-
                let result = self.upload_pack(fetch, drain, &mut stream_r, &mut stream_w);
-
                let session = WireSession::from_split_io(stream_r, stream_w);
+
                let mut pktline_r = pktline::Reader::new(drain, &mut stream_r);

-
                if let Err(err) = &result {
-
                    log::error!(target: "worker", "Upload-pack error: {err}");
+
                match self.upload_pack(fetch, &mut pktline_r, &mut stream_w) {
+
                    Ok(()) => {
+
                        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.rid, fetch.remote);
+

+
                        (WireSession::from_split_io(stream_r, stream_w), Ok(vec![]))
+
                    }
+
                    Err(err) => {
+
                        // If we exited without receiving a `done` packet, wait for it here.
+
                        // It's possible that the daemon exited first, or the remote crashed.
+
                        log::debug!(target: "worker", "Waiting for `done` packet from remote..");
+

+
                        let mut header = [0; pktline::HEADER_LEN];
+
                        if let Ok(pktline::Packetline::Done) = pktline_r.read_pktline(&mut header) {
+
                            (WireSession::from_split_io(stream_r, stream_w), Err(err))
+
                        } else {
+
                            log::error!(
+
                                target: "worker",
+
                                "Upload of {} to {} aborted: missing `done` packet from remote",
+
                                fetch.rid,
+
                                fetch.remote
+
                            );
+
                            (
+
                                WireSession::from_split_io(stream_r, stream_w),
+
                                Err(FetchError::RemoteAborted),
+
                            )
+
                        }
+
                    }
                }
-
                (session, result)
            }
        }
    }
@@ -251,12 +276,9 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
    fn upload_pack(
        &self,
        fetch: &Fetch,
-
        drain: Vec<u8>,
-
        stream_r: &mut WireReader,
+
        stream_r: &mut pktline::Reader<WireReader>,
        stream_w: &mut WireWriter<G>,
-
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        let mut stream_r = pktline::Reader::new(drain, stream_r);
-

+
    ) -> Result<(), FetchError> {
        // Read the request packet line to make sure the repository being requested matches what
        // we expect, and that the service requested is valid.
        let request = match stream_r.read_request_pktline() {
@@ -290,31 +312,39 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        // We now loop, alternating between reading requests from the client, and writing responses
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
        let mut buffer = [0; u16::MAX as usize + 1];
-

        loop {
-
            if let Err(e) = daemon_r.read_pktlines(stream_w, &mut buffer) {
+
            // Read from the daemon and write to the stream.
+
            if let Err(e) = daemon_r.pipe(stream_w, &mut buffer) {
                // This is the expected error when the remote disconnects.
                if e.kind() == io::ErrorKind::UnexpectedEof {
-
                    break;
+
                    log::debug!(target: "worker", "Daemon closed the git connection for {}", fetch.rid);
+
                    return Err(e.into());
                }
                log::debug!(target: "worker", "Upload of {} to {} returned error: {e}", fetch.rid, fetch.remote);

                return Err(e.into());
            }
-
            if let Err(e) = stream_r.read_pktlines(&mut daemon_w, &mut buffer) {
+
            // Read from the stream and write to the daemon.
+
            match stream_r.pipe(&mut daemon_w, &mut buffer) {
                // Triggered by a [`pktline::DONE_PKT`] packet.
-
                if e.kind() == io::ErrorKind::UnexpectedEof {
-
                    break;
+
                Ok(ControlFlow::Break(())) => {
+
                    log::debug!(target: "worker", "Received `done` packet from remote for {}", fetch.rid);
+
                    return Ok(());
                }
-
                log::error!(target: "worker", "Remote returned error for {}: {e}", fetch.rid);
+
                Ok(ControlFlow::Continue(())) => {
+
                    continue;
+
                }
+
                Err(e) => {
+
                    if e.kind() == io::ErrorKind::UnexpectedEof {
+
                        log::debug!(target: "worker", "Remote closed the git connection for {}", fetch.rid);
+
                        return Err(e.into());
+
                    }
+
                    log::error!(target: "worker", "Remote returned error for {}: {e}", fetch.rid);

-
                return Err(e.into());
+
                    return Err(e.into());
+
                }
            }
        }
-
        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.rid, fetch.remote);
-

-
        // When we aren't the one fetching, no refs are updated.
-
        Ok(vec![])
    }
}

@@ -371,6 +401,7 @@ impl Pool {
mod pktline {
    use std::io;
    use std::io::Read;
+
    use std::ops::ControlFlow;
    use std::str;

    use super::Id;
@@ -384,6 +415,15 @@ mod pktline {
    /// not sent to the deamon.
    pub const DONE_PKT: &[u8; HEADER_LEN] = b"done";

+
    /// Packetline read result.
+
    #[derive(Debug, PartialEq, Eq)]
+
    pub enum Packetline {
+
        /// Received a `done` control packet.
+
        Done,
+
        /// Received a git packet with the given length.
+
        Git(usize),
+
    }
+

    /// Send a special `done` packet. Since the git protocol is tunneled over an existing
    /// connection, we can't signal the end of the protocol via the usual means, which is
    /// to close the connection and trigger an EOF on the other side. Git also doesn't have
@@ -411,7 +451,9 @@ mod pktline {
        ///
        pub fn read_request_pktline(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
            let mut pktline = [0u8; 1024];
-
            let length = self.read_pktline(&mut pktline)?;
+
            let Packetline::Git(length) = self.read_pktline(&mut pktline)? else {
+
                return Err(io::ErrorKind::InvalidInput.into());
+
            };
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
                return Err(io::ErrorKind::InvalidInput.into());
            };
@@ -419,17 +461,17 @@ mod pktline {
        }

        /// Parse a Git packet-line.
-
        pub fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        pub fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<Packetline> {
            self.read_exact(&mut buf[..HEADER_LEN])?;

            if &buf[..HEADER_LEN] == DONE_PKT {
-
                return Err(io::ErrorKind::UnexpectedEof.into());
+
                return Ok(Packetline::Done);
            }
            if &buf[..HEADER_LEN] == FLUSH_PKT
                || &buf[..HEADER_LEN] == DELIM_PKT
                || &buf[..HEADER_LEN] == RESPONSE_END_PKT
            {
-
                return Ok(HEADER_LEN);
+
                return Ok(Packetline::Git(HEADER_LEN));
            }
            let length = str::from_utf8(&buf[..HEADER_LEN])
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
@@ -438,22 +480,35 @@ mod pktline {

            self.read_exact(&mut buf[HEADER_LEN..length])?;

-
            Ok(length)
+
            Ok(Packetline::Git(length))
        }

-
        pub fn read_pktlines<W: io::Write>(&mut self, w: &mut W, buf: &mut [u8]) -> io::Result<()> {
+
        /// Read packet-lines from the internal reader into `buf`,
+
        /// and write them to the given writer.
+
        ///
+
        /// Returns [`ControlFlow::Break`] if the fetch should be terminated.
+
        /// Otherwise, returns [`ControlFlow::Continue`] to mean that we're
+
        /// expecting a response from the remote.
+
        pub fn pipe<W: io::Write>(
+
            &mut self,
+
            w: &mut W,
+
            buf: &mut [u8],
+
        ) -> io::Result<ControlFlow<()>> {
            loop {
-
                let n = self.read_pktline(buf)?;
+
                let n = match self.read_pktline(buf)? {
+
                    Packetline::Done => return Ok(ControlFlow::Break(())),
+
                    Packetline::Git(n) => n,
+
                };
                if n == 0 {
                    break;
                }
                w.write_all(&buf[..n])?;

                if &buf[..n] == FLUSH_PKT {
-
                    return Ok(());
+
                    break;
                }
            }
-
            Ok(())
+
            Ok(ControlFlow::Continue(()))
        }
    }