Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Document worker a little more
Alexis Sellier committed 3 years ago
commit 7d5347b74f283566246741cd2ec0d917ad27da98
parent b92dc59204cd87d10c9f019621d8e91934b98c6b
1 file changed +23 -18
modified radicle-node/src/worker.rs
@@ -109,6 +109,9 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            let result = self.fetch(fetch, &mut tunnel);
            let mut session = tunnel.into_session();

+
            // A flush after all commands have been sent is optional; we do it because we're not
+
            // closing the connection and therefore there's no other way for the server to know
+
            // we're done sending commands.
            if let Err(err) = pktline::flush(&mut session) {
                log::error!(target: "worker", "Fetch error: {err}");
            }
@@ -197,13 +200,16 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        stream_r: &mut WireReader,
        stream_w: &mut WireWriter<G>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
        // Connect to our local git daemon, running as a child process.
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)?;
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone().unwrap(), daemon);
-
        let mut stream_reader = pktline::GitReader::new(drain, stream_r);
-
        let mut daemon_reader = pktline::GitReader::new(vec![], &mut daemon_r);
+
        let mut stream_r = pktline::Reader::new(drain, stream_r);
+
        let mut daemon_r = pktline::Reader::new(vec![], &mut daemon_r);
        let mut buffer = [0; u16::MAX as usize + 1];

-
        let request = match stream_reader.read_request_pkt_line() {
+
        // Read the request packet line to make sure the repository being requested matches what
+
        // we expect, and that the service requested is valid.
+
        let request = match stream_r.read_request_pktline() {
            Ok((req, pktline)) => {
                log::debug!(
                    target: "worker",
@@ -222,10 +228,13 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
                ))));
            }
        };
+
        // Write the raw request to the daemon, once we've verified it.
        daemon_w.write_all(&request)?;

+
        // We now loop, alternating between reading requests from the client, and writing responses
+
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
        loop {
-
            if let Err(e) = daemon_reader.read_pkt_lines(stream_w, &mut buffer) {
+
            if let Err(e) = daemon_r.read_pktlines(stream_w, &mut buffer) {
                // This is the expected error when the remote disconnects.
                if e.kind() == io::ErrorKind::UnexpectedEof {
                    break;
@@ -234,14 +243,14 @@ impl<G: Signer + EcSign + 'static> Worker<G> {

                return Err(e.into());
            }
-
            if let Err(e) = stream_reader.read_pkt_lines(&mut daemon_w, &mut buffer) {
+
            if let Err(e) = stream_r.read_pktlines(&mut daemon_w, &mut buffer) {
                log::debug!(target: "worker", "Remote returned error: {e}");
                break;
            }
        }
        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.repo, fetch.remote);

-
        // TODO: Don't return anything when uploading.
+
        // When we aren't the one fetching, no refs are updated.
        Ok(vec![])
    }
}
@@ -310,12 +319,12 @@ mod pktline {
        write!(w, "0000")
    }

-
    pub struct GitReader<'a, R> {
+
    pub struct Reader<'a, R> {
        drain: Vec<u8>,
        stream: &'a mut R,
    }

-
    impl<'a, R: io::Read> GitReader<'a, R> {
+
    impl<'a, R: io::Read> Reader<'a, R> {
        pub fn new(drain: Vec<u8>, stream: &'a mut R) -> Self {
            Self { drain, stream }
        }
@@ -324,9 +333,9 @@ mod pktline {
        ///
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
        ///
-
        pub fn read_request_pkt_line(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
+
        pub fn read_request_pktline(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
            let mut pktline = [0u8; 1024];
-
            let length = self.read_pkt_line(&mut pktline)?;
+
            let length = self.read_pktline(&mut pktline)?;
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
                return Err(io::ErrorKind::InvalidInput.into());
            };
@@ -334,7 +343,7 @@ mod pktline {
        }

        /// Parse a Git packet-line.
-
        pub fn read_pkt_line(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        pub fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.read_exact(&mut buf[..HEADER_LEN])?;

            if &buf[..HEADER_LEN] == FLUSH_PKT
@@ -353,13 +362,9 @@ mod pktline {
            Ok(length)
        }

-
        pub fn read_pkt_lines<W: io::Write>(
-
            &mut self,
-
            w: &mut W,
-
            buf: &mut [u8],
-
        ) -> io::Result<()> {
+
        pub fn read_pktlines<W: io::Write>(&mut self, w: &mut W, buf: &mut [u8]) -> io::Result<()> {
            loop {
-
                let n = self.read_pkt_line(buf)?;
+
                let n = self.read_pktline(buf)?;
                if n == 0 {
                    break;
                }
@@ -373,7 +378,7 @@ mod pktline {
        }
    }

-
    impl<'a, R: io::Read> io::Read for GitReader<'a, R> {
+
    impl<'a, R: io::Read> io::Read for Reader<'a, R> {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            if !self.drain.is_empty() {
                let count = buf.len().min(self.drain.len());