Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Simplify worker data piping
Alexis Sellier committed 3 years ago
commit d0e24bd0ce73629bbffd72467ce776143c53e631
parent c99497eb2151ef41bde39b72c7287808f80f63de
3 files changed +24 -27
modified radicle-node/src/worker.rs
@@ -365,17 +365,10 @@ impl Worker {
                Self::eof(remote, stream, stream_w, &mut self.handle).map_err(UploadError::from)
            });

-
            let stream_to_daemon = s.spawn(move || -> Result<(), io::Error> {
-
                let mut buffer = [0; u16::MAX as usize + 1];
-

-
                loop {
-
                    match stream_r.read(&mut buffer) {
-
                        Ok(n) => daemon_w.write_all(&buffer[..n])?,
-
                        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
-
                        Err(e) => return Err(e),
-
                    }
-
                }
-
                daemon_w.shutdown(net::Shutdown::Both)
+
            let stream_to_daemon = s.spawn(move || {
+
                stream_r
+
                    .pipe(&mut daemon_w)
+
                    .and_then(|()| daemon_w.shutdown(net::Shutdown::Both))
            });

            stream_to_daemon.join().unwrap()?;
modified radicle-node/src/worker/channels.rs
@@ -72,6 +72,24 @@ pub struct ChannelReader<T = Vec<u8>> {
    receiver: chan::Receiver<ChannelEvent<T>>,
}

+
impl ChannelReader<Vec<u8>> {
+
    pub fn pipe<W: io::Write>(&mut self, mut writer: W) -> io::Result<()> {
+
        loop {
+
            match self.receiver.recv() {
+
                Ok(ChannelEvent::Data(data)) => writer.write_all(&data)?,
+
                Ok(ChannelEvent::Eof) => return Ok(()),
+
                Ok(ChannelEvent::Close) => return Err(io::ErrorKind::ConnectionReset.into()),
+
                Err(_) => {
+
                    return Err(io::Error::new(
+
                        io::ErrorKind::BrokenPipe,
+
                        "error reading from stream: channel is disconnected",
+
                    ))
+
                }
+
            }
+
        }
+
    }
+
}
+

impl Read for ChannelReader<Vec<u8>> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let read = self.buffer.read(buf)?;
modified radicle-node/src/worker/tunnel.rs
@@ -47,7 +47,7 @@ impl<'a> Tunnel<'a> {
    pub fn run(&mut self, timeout: time::Duration) -> io::Result<()> {
        let (remote_w, remote_r) = self.channels.split();
        let (local, _) = self.listener.accept()?;
-
        let (mut local_r, mut local_w) = (local.try_clone()?, local);
+
        let (mut local_r, local_w) = (local.try_clone()?, local);

        local_r.set_read_timeout(Some(timeout))?;
        local_w.set_write_timeout(Some(timeout))?;
@@ -58,21 +58,7 @@ impl<'a> Tunnel<'a> {
        thread::scope(|s| {
            let remote_to_local = thread::Builder::new()
                .name(self.local.to_string())
-
                .spawn_scoped(s, || {
-
                    let mut buffer = [0; u16::MAX as usize + 1];
-

-
                    loop {
-
                        match remote_r.read(&mut buffer) {
-
                            Ok(0) => return Ok(()),
-
                            Ok(n) => local_w.write_all(&buffer[..n])?,
-
                            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
-
                                // This is the expected error when the git fetch closes the connection.
-
                                return Ok(());
-
                            }
-
                            Err(e) => return Err(e),
-
                        }
-
                    }
-
                })?;
+
                .spawn_scoped(s, || remote_r.pipe(local_w))?;

            let local_to_remote = thread::Builder::new()
                .name(self.local.to_string())