Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Constrain worker channel size
Merged did:key:z6MksFqX...wzpT opened 1 year ago

To avoid buffering large amounts of data in the process, we set the worker channel size to 1. Keep in mind that this is one ChannelEvent, not one byte. ChannelEvent::Data can already contain an arbitrary amount of data via its Vec<u8>.

This forces the worker to block as long as no one is reading data on the other side.

We also remove an unused function, and move the flushing to the ChannelFlushWriter.

2 files changed +5 -20 38592955 a79ca5e8
modified radicle-node/src/worker/channels.rs
@@ -10,7 +10,9 @@ use crate::runtime::Handle;
use crate::wire::StreamId;

/// Maximum size of channel used to communicate with a worker.
-
pub const MAX_WORKER_CHANNEL_SIZE: usize = 4096;
+
/// Note that as long as we're using [`std::io::copy`] to copy data from the
+
/// upload-pack's stdout, the data chunks are of a maximum size of 8192 bytes.
+
pub const MAX_WORKER_CHANNEL_SIZE: usize = 64;

/// A reader and writer pair that can be used in the fetch protocol.
///
@@ -184,7 +186,7 @@ struct ChannelWriter<T = Vec<u8>> {
/// Wraps a [`ChannelWriter`] alongside the associated [`Handle`] and [`NodeId`].
///
/// This allows the channel to [`Write::flush`] when calling
-
/// [`Write::write_all`], which is necessary to signal to the
+
/// [`Write::write`], which is necessary to signal to the
/// controller to send the wire data.
pub struct ChannelFlushWriter<T = Vec<u8>> {
    writer: ChannelWriter<T>,
@@ -206,29 +208,13 @@ impl Write for ChannelFlushWriter<Vec<u8>> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = buf.len();
        self.writer.send(buf.to_vec())?;
+
        self.flush()?;
        Ok(n)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.handle.flush(self.remote, self.stream)
    }
-

-
    fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
-
        while !buf.is_empty() {
-
            match self.write(buf) {
-
                Ok(0) => {
-
                    return Err(io::Error::new(
-
                        io::ErrorKind::WriteZero,
-
                        "failed to write whole buffer",
-
                    ));
-
                }
-
                Ok(n) => buf = &buf[n..],
-
                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
-
                Err(e) => return Err(e),
-
            }
-
        }
-
        self.flush()
-
    }
}

impl<T: AsRef<[u8]>> ChannelWriter<T> {
modified radicle-node/src/worker/upload_pack.rs
@@ -201,7 +201,6 @@ where
{
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = self.send.write(buf)?;
-
        self.send.flush()?;
        self.emit(buf);
        Ok(n)
    }