Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Properly parse replication data
Alexis Sellier committed 3 years ago
commit 3d04849b7b7b31d6cf7785cd6804a9f821f12a99
parent df9346fefbde0a2b10f21d6c0a0b2c9c4b890bc2
2 files changed +102 -59
modified radicle-node/src/tests/e2e.rs
@@ -294,23 +294,24 @@ fn test_replication() {
    assert_eq!(updated, vec![]);

    let inventory = alice.handle.inventory().unwrap();
+
    let alice_refs = alice
+
        .storage
+
        .repository(acme)
+
        .unwrap()
+
        .remotes()
+
        .unwrap()
+
        .map(|r| r.unwrap())
+
        .collect::<Vec<_>>();
+
    let bob_refs = bob
+
        .storage
+
        .repository(acme)
+
        .unwrap()
+
        .remotes()
+
        .unwrap()
+
        .map(|r| r.unwrap())
+
        .collect::<Vec<_>>();
+

    assert_eq!(inventory.try_iter().next(), Some(acme));
-
    assert_eq!(
-
        alice
-
            .storage
-
            .repository(acme)
-
            .unwrap()
-
            .remotes()
-
            .unwrap()
-
            .map(|r| r.unwrap())
-
            .collect::<Vec<_>>(),
-
        bob.storage
-
            .repository(acme)
-
            .unwrap()
-
            .remotes()
-
            .unwrap()
-
            .map(|r| r.unwrap())
-
            .collect::<Vec<_>>(),
-
    );
+
    assert_eq!(alice_refs, bob_refs);
    assert_matches!(alice.storage.repository(acme).unwrap().verify(), Ok(()));
}
modified radicle-node/src/worker.rs
@@ -1,6 +1,6 @@
use std::io::prelude::*;
use std::thread::JoinHandle;
-
use std::{env, io, net, process, thread, time};
+
use std::{env, io, net, process, str, thread, time};

use crossbeam_channel as chan;
use cyphernet::EcSign;
@@ -10,7 +10,7 @@ use netservices::{NetSession, SplitIo};
use radicle::crypto::Signer;
use radicle::identity::Id;
use radicle::storage::{ReadRepository, RefUpdate, WriteRepository, WriteStorage};
-
use radicle::Storage;
+
use radicle::{git, Storage};
use reactor::poller::popol;

use crate::service::reactor::Fetch;
@@ -185,39 +185,35 @@ impl<G: Signer + EcSign> Worker<G> {
        let mut stdin = child.stdin.take().unwrap();
        let mut stdout = child.stdout.take().unwrap();
        let mut stderr = child.stderr.take().unwrap();
+
        let mut reader = GitReader::new(drain, stream_r);
+

+
        match reader.read_command_pkt_line() {
+
            Ok(cmd) => {
+
                log::debug!(
+
                    target: "worker",
+
                    "Parsed git command packet-line for {}: {:?}", fetch.repo, cmd
+
                );
+
                if cmd.repo != fetch.repo {
+
                    return Err(FetchError::Git(git::raw::Error::from_str(
+
                        "git pkt-line command does not match fetch request",
+
                    )));
+
                }
+
            }
+
            Err(_) => {
+
                return Err(FetchError::Git(git::raw::Error::from_str(
+
                    "error parsing git command packet-line",
+
                )));
+
            }
+
        }

        thread::scope(|scope| {
-
            let t = scope.spawn(move || {
-
                let mut buf = [0u8; 65535];
-
                // First drain the buffer of incoming data that was waiting.
-
                if stdin.write_all(&drain[..]).is_err() {
-
                    return;
-
                }
-
                // Then process any new data coming into the socket, and write it
-
                // to the standard input of the `upload-pack` process.
-
                while let Ok(n) = stream_r.read(&mut buf) {
-
                    if let Ok(line) = std::str::from_utf8(&buf[..n]) {
-
                        // FIXME: The git command could come in the drain object.
-
                        // FIXME: We should only call this once, before looping.
-
                        if let Some(cmd) = GitCommand::parse(line) {
-
                            // FIXME: Convert this into an error.
-
                            debug_assert_eq!(cmd.repo, fetch.repo);
-
                            continue;
-
                        }
-
                    }
-
                    if n == 0 {
-
                        break;
-
                    }
-
                    if stdin.write_all(&buf[..n]).is_err() {
-
                        break;
-
                    }
-
                }
-
            });
+
            // Data coming from the remote peer is written to the standard input of the
+
            // `upload-pack` process.
+
            let t = scope.spawn(move || io::copy(&mut reader, &mut stdin));
            // Output of `upload-pack` is sent back to the remote peer.
            io::copy(&mut stdout, stream_w)?;
-
            // SAFETY: The thread does not panic, unless the implementations of read/write
-
            // internally panic.
-
            t.join().unwrap();
+
            // SAFETY: The thread should not panic, but if it does, we bubble up the panic.
+
            t.join().unwrap()?;

            Ok::<_, FetchError>(())
        })?;
@@ -287,6 +283,59 @@ impl WorkerPool {
    }
}

+
pub struct GitReader<'a, R> {
+
    drain: Vec<u8>,
+
    stream: &'a mut R,
+
}
+

+
impl<'a, R: io::Read> GitReader<'a, R> {
+
    fn new(drain: Vec<u8>, stream: &'a mut R) -> Self {
+
        Self { drain, stream }
+
    }
+

+
    /// Parse a Git command packet-line.
+
    ///
+
    /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
+
    ///
+
    fn read_command_pkt_line(&mut self) -> io::Result<GitCommand> {
+
        let mut pktline = [0u8; 1024];
+
        let length = self.read_pkt_line(&mut pktline)?;
+
        let Some(cmd) = GitCommand::parse(&pktline[..length]) else {
+
            return Err(io::ErrorKind::InvalidInput.into());
+
        };
+
        Ok(cmd)
+
    }
+

+
    /// Parse a Git packet-line.
+
    fn read_pkt_line(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        let mut length = [0; 4];
+
        self.read_exact(&mut length)?;
+

+
        let length = str::from_utf8(&length)
+
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
+
        let length = usize::from_str_radix(length, 16)
+
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
+
        let remaining = length - 4;
+

+
        self.read_exact(&mut buf[..remaining])?;
+

+
        Ok(remaining)
+
    }
+
}
+

+
impl<'a, R: io::Read> io::Read for GitReader<'a, R> {
+
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        if !self.drain.is_empty() {
+
            let count = buf.len().min(self.drain.len());
+
            buf[..count].copy_from_slice(&self.drain[..count]);
+
            self.drain.drain(..count);
+

+
            return Ok(count);
+
        }
+
        self.stream.read(buf)
+
    }
+
}
+

#[derive(Debug)]
pub struct GitCommand {
    pub repo: Id,
@@ -296,17 +345,10 @@ pub struct GitCommand {
}

impl GitCommand {
-
    /// Parse a Git command packet-line.
-
    ///
-
    /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
-
    ///
-
    fn parse(input: &str) -> Option<Self> {
-
        let (left, right) = input.split_at(4);
-
        let len = usize::from_str_radix(left, 16).ok()?;
-
        if len != input.len() {
-
            return None;
-
        }
-
        let mut parts = right
+
    /// Parse a Git command from a packet-line.
+
    fn parse(input: &[u8]) -> Option<Self> {
+
        let input = str::from_utf8(input).ok()?;
+
        let mut parts = input
            .strip_prefix("git-upload-pack ")?
            .split_terminator('\0');