Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node/upload_pack: Refactor
Archived lorenz opened 1 month ago
  • Do not check timeout again
  • Handle EOF
  • Remove unnecessary Mutex
  • Simplify to std::io::copy
2 files changed +8 -59 edde15d9 9614a2fe
modified crates/radicle-node/src/worker/channels.rs
@@ -223,9 +223,8 @@ impl Read for ChannelReader<Vec<u8>> {
                self.buffer = io::Cursor::new(data);
                self.buffer.read(buf)
            }
-
            Ok(ChannelEvent::Eof) => Err(io::ErrorKind::UnexpectedEof.into()),
+
            Ok(ChannelEvent::Eof) => Ok(0),
            Ok(ChannelEvent::Close) => Err(io::ErrorKind::ConnectionReset.into()),
-

            Err(chan::RecvTimeoutError::Timeout) => Err(io::Error::new(
                io::ErrorKind::TimedOut,
                "error reading from stream: channel timed out",
modified crates/radicle-node/src/worker/upload_pack.rs
@@ -1,5 +1,4 @@
use std::io;
-
use std::io::{Read, Write};
use std::process::{Command, ExitStatus, Stdio};
use std::time::{Duration, Instant};

@@ -89,62 +88,20 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
-
    let reporter = std::sync::Mutex::new(Reporter::new(header.repo, remote, emitter.clone(), send));
+
    let mut reporter = Reporter::new(header.repo, remote, emitter.clone(), send);

    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
-
            let mut buffer = [0; u16::MAX as usize + 1];
-
            loop {
-
                match stdout.read(&mut buffer) {
-
                    Ok(0) => break,
-
                    Ok(n) => {
-
                        let mut lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
-
                        if let Err(e) = lock.write_all(&buffer[..n]) {
-
                            log::debug!(target: "worker", "Failed to write buffer to upload-pack reporter: {e}");
-
                            emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
-
                            break;
-
                        }
-
                        drop(lock);
-
                    }
-
                    Err(e) => {
-
                        log::debug!(target: "worker", "Exiting upload-pack writer thread for {}: {e}", header.repo);
-
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
-
                        break;
-
                    }
-
                }
+
            if let Err(e) = io::copy(&mut stdout, &mut reporter) {
+
                log::debug!(target: "worker", "Failure on upload-pack writer for {}: {e}", header.repo);
+
                emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
            }
        });

        let reader = thread::spawn_scoped(nid, "upload-pack", s, || {
-
            let mut buffer = [0; u16::MAX as usize + 1];
-
            loop {
-
                match recv.read(&mut buffer) {
-
                    Ok(0) => break,
-
                    Ok(n) => {
-
                        if let Err(e) = stdin.write_all(&buffer[..n]) {
-
                            log::debug!(target: "worker", "Failed to write to upload-pack stdin: {e}");
-
                            break;
-
                        }
-
                    }
-
                    Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
-
                        log::debug!(target: "worker", "Exiting upload-pack reader thread for {}", header.repo);
-
                        break;
-
                    }
-
                    Err(e) if e.kind() == io::ErrorKind::TimedOut => {
-
                        log::debug!(target: "worker", "Read channel timed out for upload-pack {}", header.repo);
-
                        // N.b. if the read timed out, ensure that the sender isn't
-
                        // still sending messages.
-
                        let lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
-
                        if lock.is_timeout(timeout) {
-
                            break;
-
                        }
-
                    }
-
                    Err(e) => {
-
                        log::debug!(target: "worker", "Failure on upload-pack channel read for {}: {e}", header.repo);
-
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
-
                        break;
-
                    }
-
                }
+
            if let Err(e) = io::copy(&mut recv, &mut stdin) {
+
                log::debug!(target: "worker", "Failure on upload-pack reader for {}: {e}", header.repo);
+
                emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
            }
        });

@@ -175,7 +132,6 @@ struct Reporter<W> {
    emitter: Emitter<Event>,
    send: W,
    total: usize,
-
    last_sent: Instant,
}

impl<W> Reporter<W> {
@@ -186,14 +142,9 @@ impl<W> Reporter<W> {
            emitter,
            send,
            total: 0,
-
            last_sent: Instant::now(),
        }
    }

-
    fn is_timeout(&self, timeout: Duration) -> bool {
-
        Instant::now().duration_since(self.last_sent) > timeout
-
    }
-

    fn emit(&mut self, buf: &[u8]) {
        let event = match Self::as_upload_pack_progress(buf) {
            Some(progress) => events::UploadPack::write(self.rid, self.remote, progress),
@@ -230,7 +181,6 @@ where
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = self.send.write(buf)?;
        self.emit(buf);
-
        self.last_sent = Instant::now();
        Ok(n)
    }