Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix bug in upload-pack
Alexis Sellier committed 3 years ago
commit be95222fb5d3294598270b93214252edd78e15a6
parent 026dfbde5d8f48394fa7648e8b3a3b361f37dd69
3 files changed +164 -117
modified radicle-node/src/service.rs
@@ -27,7 +27,7 @@ use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::identity::{Doc, Id};
use crate::node;
-
use crate::node::{Address, Features, FetchLookup, FetchResult};
+
use crate::node::{Address, Features, FetchError, FetchLookup, FetchResult};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
@@ -499,22 +499,39 @@ where
    pub fn fetched(&mut self, result: FetchResult) {
        let remote = result.remote;
        let rid = result.rid;
-

-
        match &result.result {
+
        let namespaces = result.namespaces;
+
        let result = match result.result {
            Ok(updated) => {
                self.reactor.event(Event::RefsFetched {
                    remote,
                    rid,
                    updated: updated.clone(),
                });
+
                Ok(updated)
            }
            Err(err) => {
-
                error!("Fetch failed for {rid} from {remote}: {err}");
+
                error!(target: "service", "Fetch failed for {rid} from {remote}: {err}");
+

+
                if let FetchError::Io(_) = err {
+
                    self.reactor
+
                        .disconnect(result.remote, DisconnectReason::Fetch(err));
+
                    return;
+
                } else {
+
                    Err(err)
+
                }
            }
-
        }
+
        };

        if let Some(results) = self.fetch_reqs.get(&rid) {
-
            if results.send(result).is_err() {
+
            if results
+
                .send(FetchResult {
+
                    rid,
+
                    remote,
+
                    namespaces,
+
                    result,
+
                })
+
                .is_err()
+
            {
                self.fetch_reqs.remove(&rid);
            }
        }
@@ -1151,6 +1168,8 @@ pub enum DisconnectReason {
    /// Error with an underlying established connection. Sometimes, reconnecting
    /// after such an error is possible.
    Connection(Arc<dyn std::error::Error + Sync + Send>),
+
    /// Error with a fetch.
+
    Fetch(FetchError),
    /// Session error.
    Session(session::Error),
}
@@ -1169,6 +1188,7 @@ impl DisconnectReason {
            Self::Dial(_) => false,
            Self::Connection(_) => true,
            Self::Session(..) => false,
+
            Self::Fetch(_) => true,
        }
    }
}
@@ -1179,6 +1199,7 @@ impl fmt::Display for DisconnectReason {
            Self::Dial(err) => write!(f, "{}", err),
            Self::Connection(err) => write!(f, "{}", err),
            Self::Session(err) => write!(f, "error: {}", err),
+
            Self::Fetch(err) => write!(f, "fetch: {}", err),
        }
    }
}
modified radicle-node/src/service/reactor.rs
@@ -63,7 +63,7 @@ impl Reactor {
    }

    pub fn write(&mut self, remote: NodeId, msg: Message) {
-
        debug!("Write {:?} to {}", &msg, remote);
+
        debug!(target: "service", "Write {:?} to {}", &msg, remote);

        self.io.push_back(Io::Write(remote, vec![msg]));
    }
@@ -72,6 +72,7 @@ impl Reactor {
        let msgs = msgs.into_iter().collect::<Vec<_>>();
        for (ix, msg) in msgs.iter().enumerate() {
            debug!(
+
                target: "service",
                "Write {:?} message to {} ({}/{})",
                msg,
                remote,
modified radicle-node/src/worker.rs
@@ -1,6 +1,6 @@
use std::io::prelude::*;
use std::thread::JoinHandle;
-
use std::{env, io, net, process, str, thread, time};
+
use std::{env, io, net, process, thread, time};

use crossbeam_channel as chan;
use cyphernet::EcSign;
@@ -125,9 +125,9 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        let tunnel_addr = tunnel.local_addr()?;
        let mut cmd = process::Command::new("git");
        cmd.current_dir(repo.path())
-
            .env("GIT_PROTOCOL", "2")
            .env_clear()
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
+
            .args(["-c", "protocol.version=2"])
            .arg("fetch")
            .arg("--atomic")
            .arg("--verbose")
@@ -178,7 +178,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            .current_dir(repo.path())
            .env_clear()
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
-
            .env("GIT_PROTOCOL", "2")
+
            .args(["-c", "protocol.version=2"])
            .arg("upload-pack")
            .arg("--strict") // The path to the git repo must be exact.
            .arg(".")
@@ -190,10 +190,10 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        let mut stdin = child.stdin.take().unwrap();
        let mut stdout = child.stdout.take().unwrap();
        let mut stderr = child.stderr.take().unwrap();
-
        let mut reader = GitReader::new(drain, stream_r);
+
        let mut reader = pktline::GitReader::new(drain, stream_r);

        match reader.read_command_pkt_line() {
-
            Ok(cmd) => {
+
            Ok((cmd, _pktline)) => {
                log::debug!(
                    target: "worker",
                    "Parsed git command packet-line for {}: {:?}", fetch.repo, cmd
@@ -204,42 +204,49 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
                    )));
                }
            }
-
            Err(_) => {
-
                return Err(FetchError::Git(git::raw::Error::from_str(
-
                    "error parsing git command packet-line",
-
                )));
+
            Err(err) => {
+
                return Err(FetchError::Git(git::raw::Error::from_str(&format!(
+
                    "error parsing git command packet-line: {err}"
+
                ))));
            }
        }

-
        thread::scope(|scope| {
+
        thread::scope::<_, Result<Vec<RefUpdate>, FetchError>>(|scope| {
+
            // Output of `upload-pack` is sent back to the remote peer.
+
            let outgoing = scope.spawn(move || io::copy(&mut stdout, stream_w));
+

+
            let mut buf = [0; 65536];
            // Data coming from the remote peer is written to the standard input of the
            // `upload-pack` process.
-
            // FIXME: This sometimes returns a `WouldBlock`.
-
            let t = scope.spawn(move || io::copy(&mut reader, &mut stdin));
-
            // Output of `upload-pack` is sent back to the remote peer.
-
            io::copy(&mut stdout, stream_w)?;
-
            // SAFETY: The thread should not panic, but if it does, we bubble up the panic.
-
            t.join().unwrap()?;
+
            while !outgoing.is_finished() {
+
                let n = reader.read_pkt_line(&mut buf)?;

-
            Ok::<_, FetchError>(())
-
        })?;
-
        let status = child.wait()?;
+
                stdin.write_all(&buf[..n]).unwrap();
+
                log::trace!(target: "worker", "Received {:?}", String::from_utf8_lossy(&buf[..n]));

-
        if let Some(status) = status.code() {
-
            log::debug!(target: "worker", "Upload-pack for {} exited with status {:?}", fetch.repo, status);
-
        } else {
-
            log::debug!(target: "worker", "Upload-pack for {} exited with unknown status", fetch.repo);
-
        }
+
                if &buf[..n] == pktline::DONE {
+
                    break;
+
                }
+
            }
+
            // SAFETY: The thread should not panic, but if it does, we bubble up the panic.
+
            outgoing.join().unwrap()?;

-
        if !status.success() {
-
            let mut err = Vec::new();
-
            stderr.read_to_end(&mut err)?;
+
            let status = child.wait()?;
+
            if let Some(status) = status.code() {
+
                log::debug!(target: "worker", "Upload-pack for {} exited with status {:?}", fetch.repo, status);
+
            } else {
+
                log::debug!(target: "worker", "Upload-pack for {} exited with unknown status", fetch.repo);
+
            }

-
            let err = String::from_utf8_lossy(&err);
-
            log::debug!(target: "worker", "Upload-pack for {}: stderr: {}", fetch.repo, err);
-
        }
+
            if !status.success() {
+
                let mut err = Vec::new();
+
                stderr.read_to_end(&mut err)?;

-
        Ok(vec![])
+
                let err = String::from_utf8_lossy(&err);
+
                log::debug!(target: "worker", "Upload-pack for {}: stderr: {}", fetch.repo, err);
+
            }
+
            Ok(vec![])
+
        })
    }
}

@@ -291,103 +298,121 @@ impl WorkerPool {
    }
}

-
pub struct GitReader<'a, R> {
-
    drain: Vec<u8>,
-
    stream: &'a mut R,
-
}
+
mod pktline {
+
    use std::io;
+
    use std::io::Read;
+
    use std::str;
+

+
    use super::Id;

-
impl<'a, R: io::Read> GitReader<'a, R> {
-
    fn new(drain: Vec<u8>, stream: &'a mut R) -> Self {
-
        Self { drain, stream }
+
    pub const HEADER_LEN: usize = 4;
+
    pub const FLUSH_PKT: &[u8; HEADER_LEN] = b"0000";
+
    pub const DELIM_PKT: &[u8; HEADER_LEN] = b"0001";
+
    pub const RESPONSE_END_PKT: &[u8; HEADER_LEN] = b"0002";
+
    pub const DONE: &[u8] = b"0009done\n";
+

+
    pub struct GitReader<'a, R> {
+
        drain: Vec<u8>,
+
        stream: &'a mut R,
    }

-
    /// Parse a Git command packet-line.
-
    ///
-
    /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
-
    ///
-
    fn read_command_pkt_line(&mut self) -> io::Result<GitCommand> {
-
        let mut pktline = [0u8; 1024];
-
        let length = self.read_pkt_line(&mut pktline)?;
-
        let Some(cmd) = GitCommand::parse(&pktline[..length]) else {
+
    impl<'a, R: io::Read> GitReader<'a, R> {
+
        pub fn new(drain: Vec<u8>, stream: &'a mut R) -> Self {
+
            Self { drain, stream }
+
        }
+

+
        /// Parse a Git command packet-line.
+
        ///
+
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
+
        ///
+
        pub fn read_command_pkt_line(&mut self) -> io::Result<(GitCommand, Vec<u8>)> {
+
            let mut pktline = [0u8; 1024];
+
            let length = self.read_pkt_line(&mut pktline)?;
+
            let Some(cmd) = GitCommand::parse(&pktline[4..length]) else {
            return Err(io::ErrorKind::InvalidInput.into());
        };
-
        Ok(cmd)
-
    }
+
            Ok((cmd, Vec::from(&pktline[..length])))
+
        }

-
    /// Parse a Git packet-line.
-
    fn read_pkt_line(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
        let mut length = [0; 4];
-
        self.read_exact(&mut length)?;
+
        /// Parse a Git packet-line.
+
        pub fn read_pkt_line(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
            self.read_exact(&mut buf[..HEADER_LEN])?;

-
        let length = str::from_utf8(&length)
-
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
-
        let length = usize::from_str_radix(length, 16)
-
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
-
        let remaining = length - 4;
+
            if &buf[..HEADER_LEN] == FLUSH_PKT
+
                || &buf[..HEADER_LEN] == DELIM_PKT
+
                || &buf[..HEADER_LEN] == RESPONSE_END_PKT
+
            {
+
                return Ok(HEADER_LEN);
+
            }
+
            let length = str::from_utf8(&buf[..HEADER_LEN])
+
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
+
            let length = usize::from_str_radix(length, 16)
+
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;

-
        self.read_exact(&mut buf[..remaining])?;
+
            self.read_exact(&mut buf[HEADER_LEN..length])?;

-
        Ok(remaining)
+
            Ok(length)
+
        }
    }
-
}

-
impl<'a, R: io::Read> io::Read for GitReader<'a, R> {
-
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
        if !self.drain.is_empty() {
-
            let count = buf.len().min(self.drain.len());
-
            buf[..count].copy_from_slice(&self.drain[..count]);
-
            self.drain.drain(..count);
+
    impl<'a, R: io::Read> io::Read for GitReader<'a, R> {
+
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
            if !self.drain.is_empty() {
+
                let count = buf.len().min(self.drain.len());
+
                buf[..count].copy_from_slice(&self.drain[..count]);
+
                self.drain.drain(..count);

-
            return Ok(count);
+
                return Ok(count);
+
            }
+
            self.stream.read(buf)
        }
-
        self.stream.read(buf)
    }
-
}

-
#[derive(Debug)]
-
pub struct GitCommand {
-
    pub repo: Id,
-
    pub path: String,
-
    pub host: Option<(String, Option<u16>)>,
-
    pub extra: Vec<(String, Option<String>)>,
-
}
+
    #[derive(Debug)]
+
    pub struct GitCommand {
+
        pub repo: Id,
+
        pub path: String,
+
        pub host: Option<(String, Option<u16>)>,
+
        pub extra: Vec<(String, Option<String>)>,
+
    }

-
impl GitCommand {
-
    /// Parse a Git command from a packet-line.
-
    fn parse(input: &[u8]) -> Option<Self> {
-
        let input = str::from_utf8(input).ok()?;
-
        let mut parts = input
-
            .strip_prefix("git-upload-pack ")?
-
            .split_terminator('\0');
-

-
        let path = parts.next()?.to_owned();
-
        let repo = path.strip_prefix('/')?.parse().ok()?;
-
        let host = match parts.next() {
-
            None | Some("") => None,
-
            Some(host) => {
-
                let host = host.strip_prefix("host=")?;
-
                match host.split_once(':') {
-
                    None => Some((host.to_owned(), None)),
-
                    Some((host, port)) => {
-
                        let port = port.parse::<u16>().ok()?;
-
                        Some((host.to_owned(), Some(port)))
+
    impl GitCommand {
+
        /// Parse a Git command from a packet-line.
+
        fn parse(input: &[u8]) -> Option<Self> {
+
            let input = str::from_utf8(input).ok()?;
+
            let mut parts = input
+
                .strip_prefix("git-upload-pack ")?
+
                .split_terminator('\0');
+

+
            let path = parts.next()?.to_owned();
+
            let repo = path.strip_prefix('/')?.parse().ok()?;
+
            let host = match parts.next() {
+
                None | Some("") => None,
+
                Some(host) => {
+
                    let host = host.strip_prefix("host=")?;
+
                    match host.split_once(':') {
+
                        None => Some((host.to_owned(), None)),
+
                        Some((host, port)) => {
+
                            let port = port.parse::<u16>().ok()?;
+
                            Some((host.to_owned(), Some(port)))
+
                        }
                    }
                }
-
            }
-
        };
-
        let extra = parts
-
            .skip_while(|part| part.is_empty())
-
            .map(|part| match part.split_once('=') {
-
                None => (part.to_owned(), None),
-
                Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
+
            };
+
            let extra = parts
+
                .skip_while(|part| part.is_empty())
+
                .map(|part| match part.split_once('=') {
+
                    None => (part.to_owned(), None),
+
                    Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
+
                })
+
                .collect();
+

+
            Some(Self {
+
                repo,
+
                path,
+
                host,
+
                extra,
            })
-
            .collect();
-

-
        Some(Self {
-
            repo,
-
            path,
-
            host,
-
            extra,
-
        })
+
        }
    }
}