Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add worker channel timeouts
Alexis Sellier committed 3 years ago
commit 10fe2ae299ee384d572290554e3dff9bf74471d6
parent 5428420f2761695c94769c2c2dd26f496cccd0ff
2 files changed +38 -16
modified radicle-node/src/wire/protocol.rs
@@ -7,7 +7,7 @@ use std::collections::VecDeque;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::sync::Arc;
-
use std::{io, net};
+
use std::{io, net, time};

use amplify::Wrapper as _;
use crossbeam_channel as chan;
@@ -43,6 +43,10 @@ pub const NOISE_XK: HandshakePattern = HandshakePattern {
    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
};

+
/// Default time to wait to receive something from a worker channel. Applies to
+
/// workers waiting for data from remotes as well.
+
pub const DEFAULT_CHANNEL_TIMEOUT: time::Duration = time::Duration::from_secs(9);
+

/// Control message used internally between workers, users, and the service.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
@@ -108,8 +112,8 @@ impl Streams {

    /// Register an open stream.
    fn register(&mut self, stream: StreamId) -> Option<worker::Channels> {
-
        let (wire, worker) =
-
            worker::Channels::pair().expect("Streams::register: fatal: unable to create channels");
+
        let (wire, worker) = worker::Channels::pair(DEFAULT_CHANNEL_TIMEOUT)
+
            .expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
            Entry::Vacant(e) => {
modified radicle-node/src/worker/channels.rs
@@ -1,6 +1,6 @@
use std::io::Read;
use std::ops::Deref;
-
use std::{fmt, io};
+
use std::{fmt, io, time};

use crossbeam_channel as chan;

@@ -41,19 +41,20 @@ impl<T: AsRef<[u8]>> Channels<T> {
    pub fn new(
        sender: chan::Sender<ChannelEvent<T>>,
        receiver: chan::Receiver<ChannelEvent<T>>,
+
        timeout: time::Duration,
    ) -> Self {
-
        let sender = ChannelWriter { sender };
-
        let receiver = ChannelReader::new(receiver);
+
        let sender = ChannelWriter { sender, timeout };
+
        let receiver = ChannelReader::new(receiver, timeout);

        Self { sender, receiver }
    }

-
    pub fn pair() -> io::Result<(Channels<T>, Channels<T>)> {
+
    pub fn pair(timeout: time::Duration) -> io::Result<(Channels<T>, Channels<T>)> {
        let (l_send, r_recv) = chan::unbounded::<ChannelEvent<T>>();
        let (r_send, l_recv) = chan::unbounded::<ChannelEvent<T>>();

-
        let l = Channels::new(l_send, l_recv);
-
        let r = Channels::new(r_send, r_recv);
+
        let l = Channels::new(l_send, l_recv, timeout);
+
        let r = Channels::new(r_send, r_recv, timeout);

        Ok((l, r))
    }
@@ -76,6 +77,7 @@ impl<T: AsRef<[u8]>> Channels<T> {
pub struct ChannelReader<T = Vec<u8>> {
    buffer: io::Cursor<Vec<u8>>,
    receiver: chan::Receiver<ChannelEvent<T>>,
+
    timeout: time::Duration,
}

impl<T> Deref for ChannelReader<T> {
@@ -87,24 +89,31 @@ impl<T> Deref for ChannelReader<T> {
}

impl<T: AsRef<[u8]>> ChannelReader<T> {
-
    pub fn new(receiver: chan::Receiver<ChannelEvent<T>>) -> Self {
+
    pub fn new(receiver: chan::Receiver<ChannelEvent<T>>, timeout: time::Duration) -> Self {
        Self {
            buffer: io::Cursor::new(Vec::new()),
            receiver,
+
            timeout,
        }
    }

    pub fn pipe<W: io::Write>(&mut self, mut writer: W) -> io::Result<()> {
        loop {
-
            match self.receiver.recv() {
+
            match self.receiver.recv_timeout(self.timeout) {
                Ok(ChannelEvent::Data(data)) => writer.write_all(data.as_ref())?,
                Ok(ChannelEvent::Eof) => return Ok(()),
                Ok(ChannelEvent::Close) => return Err(io::ErrorKind::ConnectionReset.into()),
-
                Err(_) => {
+
                Err(chan::RecvTimeoutError::Timeout) => {
+
                    return Err(io::Error::new(
+
                        io::ErrorKind::TimedOut,
+
                        "error reading from stream: channel timed out",
+
                    ));
+
                }
+
                Err(chan::RecvTimeoutError::Disconnected) => {
                    return Err(io::Error::new(
                        io::ErrorKind::BrokenPipe,
                        "error reading from stream: channel is disconnected",
-
                    ))
+
                    ));
                }
            }
        }
@@ -138,13 +147,22 @@ impl Read for ChannelReader<Vec<u8>> {
#[derive(Clone)]
pub struct ChannelWriter<T = Vec<u8>> {
    sender: chan::Sender<ChannelEvent<T>>,
+
    timeout: time::Duration,
}

impl<T: AsRef<[u8]>> ChannelWriter<T> {
    pub fn send(&self, event: impl Into<ChannelEvent<T>>) -> io::Result<()> {
-
        self.sender
-
            .send(event.into())
-
            .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))
+
        match self.sender.send_timeout(event.into(), self.timeout) {
+
            Ok(()) => Ok(()),
+
            Err(chan::SendTimeoutError::Timeout(_)) => Err(io::Error::new(
+
                io::ErrorKind::TimedOut,
+
                "error writing to stream: channel timed out",
+
            )),
+
            Err(chan::SendTimeoutError::Disconnected(_)) => Err(io::Error::new(
+
                io::ErrorKind::BrokenPipe,
+
                "error writing to stream: channel is disconnected",
+
            )),
+
        }
    }

    /// Since the git protocol is tunneled over an existing connection, we can't signal the end of