Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Constrain worker channel size
cloudhead committed 1 year ago
commit a79ca5e8fcc211b6183bac3d2c3d4f52b106d07f
parent 38592955b25c03f357460d71ad0d56e7188f03cf
2 files changed +5 -20
modified radicle-node/src/worker/channels.rs
@@ -10,7 +10,9 @@ use crate::runtime::Handle;
use crate::wire::StreamId;

/// Maximum size of channel used to communicate with a worker.
-
pub const MAX_WORKER_CHANNEL_SIZE: usize = 4096;
+
/// Note that as long as we're using [`std::io::copy`] to copy data from the
+
/// upload-pack's stdout, the data chunks are of a maximum size of 8192 bytes.
+
pub const MAX_WORKER_CHANNEL_SIZE: usize = 64;

/// A reader and writer pair that can be used in the fetch protocol.
///
@@ -184,7 +186,7 @@ struct ChannelWriter<T = Vec<u8>> {
/// Wraps a [`ChannelWriter`] alongside the associated [`Handle`] and [`NodeId`].
///
/// This allows the channel to [`Write::flush`] when calling
-
/// [`Write::write_all`], which is necessary to signal to the
+
/// [`Write::write`], which is necessary to signal to the
/// controller to send the wire data.
pub struct ChannelFlushWriter<T = Vec<u8>> {
    writer: ChannelWriter<T>,
@@ -206,29 +208,13 @@ impl Write for ChannelFlushWriter<Vec<u8>> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = buf.len();
        self.writer.send(buf.to_vec())?;
+
        self.flush()?;
        Ok(n)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.handle.flush(self.remote, self.stream)
    }
-

-
    fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
-
        while !buf.is_empty() {
-
            match self.write(buf) {
-
                Ok(0) => {
-
                    return Err(io::Error::new(
-
                        io::ErrorKind::WriteZero,
-
                        "failed to write whole buffer",
-
                    ));
-
                }
-
                Ok(n) => buf = &buf[n..],
-
                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
-
                Err(e) => return Err(e),
-
            }
-
        }
-
        self.flush()
-
    }
}

impl<T: AsRef<[u8]>> ChannelWriter<T> {
modified radicle-node/src/worker/upload_pack.rs
@@ -201,7 +201,6 @@ where
{
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = self.send.write(buf)?;
-
        self.send.flush()?;
        self.emit(buf);
        Ok(n)
    }