Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: upload-pack inter-thread communication
Merged fintohaps opened 1 year ago

If an upload-pack is of a considerable size, the writer thread will be busy writing bytes to the receiving side. During this period, the receiving side will not be sending any bytes.

To ensure that the reader does not exit while the writing thread is performing writes, the reading side will check when the last time a write was performed. If it has not reached the timeout, then it can continue attempting to read. Otherwise, it will break out of the loop; killing the upload-pack process safely.

1 file changed +33 -12 ce8ac663 6dcd5627
modified radicle-node/src/worker/upload_pack.rs
@@ -1,5 +1,5 @@
use std::io;
-
use std::io::Write;
+
use std::io::{Read, Write};
use std::process::{Command, ExitStatus, Stdio};
use std::time::{Duration, Instant};

@@ -86,17 +86,28 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
-
    let mut reporter = Reporter::new(header.repo, remote, emitter.clone(), send);
+
    let reporter = std::sync::Mutex::new(Reporter::new(header.repo, remote, emitter.clone(), send));

    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
-
            // N.b. we indefinitely copy stdout to the sender,
-
            // i.e. there's no need for a loop.
-
            match io::copy(&mut stdout, &mut reporter) {
-
                Ok(_) => {}
-
                Err(e) => {
-
                    log::error!(target: "worker", "Worker channel disconnected for {}; aborting: {e}", header.repo);
-
                    emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
+
            let mut buffer = [0; u16::MAX as usize + 1];
+
            loop {
+
                match stdout.read(&mut buffer) {
+
                    Ok(0) => break,
+
                    Ok(n) => {
+
                        let mut lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
+
                        if let Err(e) = lock.write_all(&buffer[..n]) {
+
                            log::warn!(target: "worker", "Error reading stdout to upload-pack reporter: {e}");
+
                            emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
+
                            break;
+
                        }
+
                        drop(lock);
+
                    }
+
                    Err(e) => {
+
                        log::debug!(target: "worker", "Exiting upload-pack writer thread for {}: {e}", header.repo);
+
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
+
                        break;
+
                    }
                }
            }
        });
@@ -116,11 +127,14 @@ where
                        log::debug!(target: "worker", "Exiting upload-pack reader thread for {}", header.repo);
                        break;
                    }
-
                    // N.b. if the read timed out, ensure that the sender isn't
-
                    // still sending messages.
                    Err(e) if e.kind() == io::ErrorKind::TimedOut => {
                        log::warn!(target: "worker", "Read channel timed out for upload-pack {}", header.repo);
-
                        break;
+
                        // N.b. if the read timed out, ensure that the sender isn't
+
                        // still sending messages.
+
                        let lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
+
                        if lock.is_timeout(timeout) {
+
                            break;
+
                        }
                    }
                    Err(e) => {
                        log::error!(target: "worker", "Error on upload-pack channel read for {}: {e}", header.repo);
@@ -154,6 +168,7 @@ struct Reporter<W> {
    emitter: Emitter<Event>,
    send: W,
    total: usize,
+
    last_sent: Instant,
}

impl<W> Reporter<W> {
@@ -164,9 +179,14 @@ impl<W> Reporter<W> {
            emitter,
            send,
            total: 0,
+
            last_sent: Instant::now(),
        }
    }

+
    fn is_timeout(&self, timeout: Duration) -> bool {
+
        Instant::now().duration_since(self.last_sent) > timeout
+
    }
+

    fn emit(&mut self, buf: &[u8]) {
        let event = match Self::as_upload_pack_progress(buf) {
            Some(progress) => events::UploadPack::write(self.rid, self.remote, progress),
@@ -203,6 +223,7 @@ where
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = self.send.write(buf)?;
        self.emit(buf);
+
        self.last_sent = Instant::now();
        Ok(n)
    }