Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: upload-pack inter-thread communication
Fintan Halpenny committed 1 year ago
commit 6dcd56275e606396ec82f4e95fab1d736e8152a4
parent ce8ac663f7fbfa3d28b4677ea28e8b2d47d7a032
1 file changed +33 -12
modified radicle-node/src/worker/upload_pack.rs
@@ -1,5 +1,5 @@
use std::io;
-
use std::io::Write;
+
use std::io::{Read, Write};
use std::process::{Command, ExitStatus, Stdio};
use std::time::{Duration, Instant};

@@ -86,17 +86,28 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
-
    let mut reporter = Reporter::new(header.repo, remote, emitter.clone(), send);
+
    let reporter = std::sync::Mutex::new(Reporter::new(header.repo, remote, emitter.clone(), send));

    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
-
            // N.b. we indefinitely copy stdout to the sender,
-
            // i.e. there's no need for a loop.
-
            match io::copy(&mut stdout, &mut reporter) {
-
                Ok(_) => {}
-
                Err(e) => {
-
                    log::error!(target: "worker", "Worker channel disconnected for {}; aborting: {e}", header.repo);
-
                    emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
+
            let mut buffer = [0; u16::MAX as usize + 1];
+
            loop {
+
                match stdout.read(&mut buffer) {
+
                    Ok(0) => break,
+
                    Ok(n) => {
+
                        let mut lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
+
                        if let Err(e) = lock.write_all(&buffer[..n]) {
+
                            log::warn!(target: "worker", "Error reading stdout to upload-pack reporter: {e}");
+
                            emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
+
                            break;
+
                        }
+
                        drop(lock);
+
                    }
+
                    Err(e) => {
+
                        log::debug!(target: "worker", "Exiting upload-pack writer thread for {}: {e}", header.repo);
+
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
+
                        break;
+
                    }
                }
            }
        });
@@ -116,11 +127,14 @@ where
                        log::debug!(target: "worker", "Exiting upload-pack reader thread for {}", header.repo);
                        break;
                    }
-
                    // N.b. if the read timed out, ensure that the sender isn't
-
                    // still sending messages.
                    Err(e) if e.kind() == io::ErrorKind::TimedOut => {
                        log::warn!(target: "worker", "Read channel timed out for upload-pack {}", header.repo);
-
                        break;
+
                        // N.b. if the read timed out, ensure that the sender isn't
+
                        // still sending messages.
+
                        let lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
+
                        if lock.is_timeout(timeout) {
+
                            break;
+
                        }
                    }
                    Err(e) => {
                        log::error!(target: "worker", "Error on upload-pack channel read for {}: {e}", header.repo);
@@ -154,6 +168,7 @@ struct Reporter<W> {
    emitter: Emitter<Event>,
    send: W,
    total: usize,
+
    last_sent: Instant,
}

impl<W> Reporter<W> {
@@ -164,9 +179,14 @@ impl<W> Reporter<W> {
            emitter,
            send,
            total: 0,
+
            last_sent: Instant::now(),
        }
    }

+
    fn is_timeout(&self, timeout: Duration) -> bool {
+
        Instant::now().duration_since(self.last_sent) > timeout
+
    }
+

    fn emit(&mut self, buf: &[u8]) {
        let event = match Self::as_upload_pack_progress(buf) {
            Some(progress) => events::UploadPack::write(self.rid, self.remote, progress),
@@ -203,6 +223,7 @@ where
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = self.send.write(buf)?;
        self.emit(buf);
+
        self.last_sent = Instant::now();
        Ok(n)
    }