Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Improve worker error handling
Alexis Sellier committed 3 years ago
commit dd97ac8384dae651bb7689943c5fb36b6b481991
parent a9d1a71f7acfd6772357d4be533c6a3ce8e400e4
1 file changed +92 -31
modified radicle-node/src/worker.rs
@@ -6,7 +6,7 @@ use std::{env, io, net, process, thread, time};
use crossbeam_channel as chan;
use cyphernet::Ecdh;
use netservices::tunnel::Tunnel;
-
use netservices::{NetSession, SplitIo};
+
use netservices::{AsConnection, NetSession, SplitIo};

use radicle::crypto::{PublicKey, Signer};
use radicle::identity::{Id, IdentityError};
@@ -50,8 +50,10 @@ pub enum FetchError {
    Io(#[from] io::Error),
    #[error(transparent)]
    Identity(#[from] IdentityError),
-
    #[error("remote failed unexpectedly")]
-
    RemoteAborted,
+
    #[error("upload failed: {0}")]
+
    Upload(#[from] UploadError),
+
    #[error("remote aborted fetch")]
+
    RemoteAbortedFetch,
}

impl FetchError {
@@ -61,6 +63,26 @@ impl FetchError {
    }
}

+
/// Error returned by fetch responder.
+
#[derive(thiserror::Error, Debug)]
+
pub enum UploadError {
+
    #[error("worker failed to connect to git daemon: {0}")]
+
    DaemonConnectionFailed(io::Error),
+
    #[error("git pkt-line command does not match fetch request")]
+
    CommandMismatch,
+
    #[error("error parsing git command packet-line: {0}")]
+
    InvalidPacketLine(io::Error),
+
    #[error(transparent)]
+
    Io(#[from] io::Error),
+
}
+

+
impl UploadError {
+
    /// Check if it's an end-of-file error.
+
    pub fn is_eof(&self) -> bool {
+
        matches!(self, UploadError::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof)
+
    }
+
}
+

/// Task to be accomplished on a worker thread.
/// This is either going to be an outgoing or incoming fetch.
pub struct Task<G: Signer + Ecdh> {
@@ -105,7 +127,11 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            drain,
        } = task;

+
        let timeout = session.as_connection().read_timeout().unwrap_or_default();
        let (session, result) = self._process(&fetch, drain, session);
+
        // In case the timeout is changed during the fetch, we reset it here.
+
        session.as_connection().set_read_timeout(timeout).ok();
+

        log::debug!(target: "worker", "Sending response back to service..");

        if self
@@ -143,6 +169,8 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                if let Err(err) = &result {
                    log::error!(target: "worker", "Fetch error: {err}");
                }
+
                log::debug!(target: "worker", "Sending `done` packet to remote..");
+

                if let Err(err) = pktline::done(&mut session) {
                    log::error!(target: "worker", "Fetch error: error sending `done` packet: {err}");
                }
@@ -154,6 +182,7 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                if let Err(err) = session.as_connection_mut().set_nonblocking(false) {
                    return (session, Err(err.into()));
                }
+

                let (mut stream_r, mut stream_w) = match session.split_io() {
                    Ok((r, w)) => (r, w),
                    Err(err) => {
@@ -169,24 +198,59 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                        (WireSession::from_split_io(stream_r, stream_w), Ok(vec![]))
                    }
                    Err(err) => {
+
                        log::error!(target: "worker", "Upload error for {rid}: {err}");
+

                        // If we exited without receiving a `done` packet, wait for it here.
                        // It's possible that the daemon exited first, or the remote crashed.
                        log::debug!(target: "worker", "Waiting for `done` packet from remote..");
-

                        let mut header = [0; pktline::HEADER_LEN];
-
                        if let Ok(()) = pktline_r.read_done_pktline(&mut header) {
-
                            (WireSession::from_split_io(stream_r, stream_w), Err(err))
-
                        } else {
-
                            log::error!(
-
                                target: "worker",
-
                                "Upload of {} to {} aborted: missing `done` packet from remote",
-
                                fetch.rid,
-
                                fetch.remote
-
                            );
-
                            (
-
                                WireSession::from_split_io(stream_r, stream_w),
-
                                Err(FetchError::RemoteAborted),
-
                            )
+

+
                        // Set the read timeout for the `done` packet to twice the configured
+
                        // value that is used for the fetching (initiator) side.
+
                        //
+
                        // This is because the uploader always waits for the `done` packet;
+
                        // so in case the fetch is aborted by the uploader, eg. if
+
                        // it can't connect with the daemon, it will wait long enough for the
+
                        // fetcher to timeout before timing out itself, and will thus receive
+
                        // the `done` packet.
+
                        pktline_r
+
                            .stream()
+
                            .as_connection()
+
                            .set_read_timeout(Some(self.timeout * 2))
+
                            .ok();
+

+
                        loop {
+
                            match pktline_r.read_done_pktline(&mut header) {
+
                                Ok(()) => {
+
                                    log::debug!(target: "worker", "Received `done` packet from remote");
+

+
                                    // If we get the `done` packet, we exit with the original
+
                                    // error.
+
                                    return (
+
                                        WireSession::from_split_io(stream_r, stream_w),
+
                                        Err(err.into()),
+
                                    );
+
                                }
+
                                Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
+
                                    // If we get some other packet, because the fetch request
+
                                    // is still sending stuff, we simply keep reading until we
+
                                    // get a `done` packet.
+
                                    continue;
+
                                }
+
                                Err(_) => {
+
                                    // If we get any other error, eg. a timeout, we abort.
+
                                    log::error!(
+
                                        target: "worker",
+
                                        "Upload of {} to {} aborted: missing `done` packet from remote",
+
                                        fetch.rid,
+
                                        fetch.remote
+
                                    );
+
                                    return (
+
                                        WireSession::from_split_io(stream_r, stream_w),
+
                                        Err(FetchError::RemoteAbortedFetch),
+
                                    );
+
                                }
+
                            }
                        }
                    }
                }
@@ -278,7 +342,7 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        fetch: &Fetch,
        stream_r: &mut pktline::Reader<WireReader>,
        stream_w: &mut WireWriter<G>,
-
    ) -> Result<(), FetchError> {
+
    ) -> Result<(), UploadError> {
        // Read the request packet line to make sure the repository being requested matches what
        // we expect, and that the service requested is valid.
        let request = match stream_r.read_request_pktline() {
@@ -288,21 +352,18 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                    "Parsed git command packet-line for {}: {:?}", fetch.rid, req
                );
                if req.repo != fetch.rid {
-
                    return Err(FetchError::Git(git::raw::Error::from_str(
-
                        "git pkt-line command does not match fetch request",
-
                    )));
+
                    return Err(UploadError::CommandMismatch);
                }
                pktline
            }
            Err(err) => {
-
                return Err(FetchError::Git(git::raw::Error::from_str(&format!(
-
                    "error parsing git command packet-line: {err}"
-
                ))));
+
                return Err(UploadError::InvalidPacketLine(err));
            }
        };

        // Connect to our local git daemon, running as a child process.
-
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)?;
+
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)
+
            .map_err(UploadError::DaemonConnectionFailed)?;
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone()?, daemon);
        let mut daemon_r = pktline::Reader::new(vec![], &mut daemon_r);

@@ -318,10 +379,7 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                // This is the expected error when the daemon disconnects.
                if e.kind() == io::ErrorKind::UnexpectedEof {
                    log::debug!(target: "worker", "Daemon closed the git connection for {}", fetch.rid);
-
                    return Err(e.into());
                }
-
                log::debug!(target: "worker", "Upload of {} to {} returned error: {e}", fetch.rid, fetch.remote);
-

                return Err(e.into());
            }
            // Read from the stream and write to the daemon.
@@ -337,10 +395,7 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
                Err(e) => {
                    if e.kind() == io::ErrorKind::UnexpectedEof {
                        log::debug!(target: "worker", "Remote closed the git connection for {}", fetch.rid);
-
                        return Err(e.into());
                    }
-
                    log::error!(target: "worker", "Remote returned error for {}: {e}", fetch.rid);
-

                    return Err(e.into());
                }
            }
@@ -441,10 +496,16 @@ pub mod pktline {
    }

    impl<'a, R: io::Read> Reader<'a, R> {
+
        /// Create a new packet-line reader.
        pub fn new(drain: Vec<u8>, stream: &'a mut R) -> Self {
            Self { drain, stream }
        }

+
        /// Return the underlying stream.
+
        pub fn stream(&self) -> &R {
+
            self.stream
+
        }
+

        /// Parse a Git request packet-line.
        ///
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`