Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Share channel type between modules
Alexis Sellier committed 3 years ago
commit 51b372ecad512fe752bf11ebde11d2f5c373077b
parent d0e24bd0ce73629bbffd72467ce776143c53e631
4 files changed +105 -125
modified radicle-node/src/wire/protocol.rs
@@ -32,6 +32,7 @@ use crate::service::{session, DisconnectReason, Service};
use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
use crate::wire::Encode;
+
use crate::worker;
use crate::worker::{ChannelEvent, Fetch, Task, TaskResult};
use crate::Link;
use crate::{address, service};
@@ -64,20 +65,12 @@ pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::Tcp
/// Reactor action.
type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;

-
/// Worker channels used to send Git frames back and forth.
-
struct WorkerChannels {
-
    /// Send data to the git worker.
-
    sender: chan::Sender<ChannelEvent>,
-
    /// Receive data from the git worker.
-
    receiver: chan::Receiver<ChannelEvent>,
-
}
-

/// Streams associated with a connected peer.
struct Streams {
    /// Active streams and their associated worker channels.
    /// Note that the gossip and control streams are not included here as they are always
    /// implied to exist.
-
    streams: HashMap<StreamId, WorkerChannels>,
+
    streams: HashMap<StreamId, worker::Channels>,
    /// Connection direction.
    link: Link,
    /// Sequence number used to compute the next stream id.
@@ -95,12 +88,12 @@ impl Streams {
    }

    /// Get a known stream.
-
    fn get(&self, stream: &StreamId) -> Option<&WorkerChannels> {
+
    fn get(&self, stream: &StreamId) -> Option<&worker::Channels> {
        self.streams.get(stream)
    }

    /// Open a new stream.
-
    fn open(&mut self) -> (StreamId, WorkerChannels) {
+
    fn open(&mut self) -> (StreamId, worker::Channels) {
        self.seq += 1;

        let id = StreamId::git(self.link)
@@ -114,27 +107,21 @@ impl Streams {
    }

    /// Register an open stream.
-
    fn register(&mut self, stream: StreamId) -> Option<WorkerChannels> {
-
        let (wire_send, wire_recv) = chan::unbounded::<ChannelEvent>();
-
        let (work_send, work_recv) = chan::unbounded::<ChannelEvent>();
+
    fn register(&mut self, stream: StreamId) -> Option<worker::Channels> {
+
        let (wire, worker) =
+
            worker::Channels::pair().expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
            Entry::Vacant(e) => {
-
                e.insert(WorkerChannels {
-
                    sender: wire_send,
-
                    receiver: work_recv,
-
                });
-
                Some(WorkerChannels {
-
                    sender: work_send,
-
                    receiver: wire_recv,
-
                })
+
                e.insert(worker);
+
                Some(wire)
            }
            Entry::Occupied(_) => None,
        }
    }

    /// Unregister an open stream.
-
    fn unregister(&mut self, stream: &StreamId) -> Option<WorkerChannels> {
+
    fn unregister(&mut self, stream: &StreamId) -> Option<worker::Channels> {
        self.streams.remove(stream)
    }
}
@@ -412,7 +399,7 @@ where
            return;
        };

-
        for data in c.receiver.try_iter() {
+
        for data in c.try_iter() {
            let frame = match data {
                ChannelEvent::Data(data) => Frame::git(stream, data),
                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
@@ -534,10 +521,7 @@ where
                            })) => {
                                log::debug!(target: "wire", "Received stream open for id={stream} from {nid}");

-
                                let Some(WorkerChannels {
-
                                    sender: work_send,
-
                                    receiver: wire_recv,
-
                                }) = streams.register(stream) else {
+
                                let Some(channels) = streams.register(stream) else {
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream id={stream}");
                                    continue;
                                };
@@ -545,8 +529,7 @@ where
                                let task = Task {
                                    fetch: Fetch::Responder { remote: *nid },
                                    stream,
-
                                    send: work_send,
-
                                    recv: wire_recv,
+
                                    channels,
                                };
                                if self.worker.send(task).is_err() {
                                    log::error!(target: "wire", "Worker pool is disconnected; cannot send task");
@@ -557,7 +540,7 @@ where
                                ..
                            })) => {
                                if let Some(channels) = streams.get(&stream) {
-
                                    if channels.sender.send(ChannelEvent::Eof).is_err() {
+
                                    if channels.send(ChannelEvent::Eof).is_err() {
                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
                                    }
                                } else {
@@ -571,7 +554,7 @@ where
                                log::debug!(target: "wire", "Received stream close command for id={stream} from {nid}");

                                if let Some(chans) = streams.unregister(&stream) {
-
                                    chans.sender.send(ChannelEvent::Close).ok();
+
                                    chans.send(ChannelEvent::Close).ok();
                                }
                            }
                            Ok(Some(Frame {
@@ -586,7 +569,7 @@ where
                                ..
                            })) => {
                                if let Some(channels) = streams.get(&stream) {
-
                                    if channels.sender.send(ChannelEvent::Data(data)).is_err() {
+
                                    if channels.send(ChannelEvent::Data(data)).is_err() {
                                        log::error!(target: "wire", "Worker is disconnected; cannot send data");
                                    }
                                } else {
@@ -835,8 +818,7 @@ where
                            remote,
                        },
                        stream,
-
                        send: channels.sender,
-
                        recv: channels.receiver,
+
                        channels,
                    };

                    if self.worker.send(task).is_err() {
modified radicle-node/src/worker.rs
@@ -17,10 +17,10 @@ use radicle::{git, Storage};
use crate::runtime::Handle;
use crate::storage;
use crate::wire::StreamId;
-
use channels::{ChannelReader, ChannelWriter, Channels};
+
use channels::{ChannelReader, ChannelWriter};
use tunnel::Tunnel;

-
pub use channels::ChannelEvent;
+
pub use channels::{ChannelEvent, Channels};

/// Worker pool configuration.
pub struct Config {
@@ -133,8 +133,7 @@ impl Fetch {
pub struct Task {
    pub fetch: Fetch,
    pub stream: StreamId,
-
    pub send: chan::Sender<ChannelEvent>,
-
    pub recv: chan::Receiver<ChannelEvent>,
+
    pub channels: Channels,
}

/// Worker response.
@@ -170,11 +169,9 @@ impl Worker {
    fn process(&mut self, task: Task) {
        let Task {
            fetch,
-
            recv,
-
            send,
+
            channels,
            stream,
        } = task;
-
        let channels = Channels::new(send, recv);
        let result = self._process(&fetch, stream, channels);

        log::trace!(target: "worker", "Sending response back to service..");
@@ -339,18 +336,18 @@ impl Worker {
        log::debug!(target: "worker", "Entering Git protocol loop for {rid}..");

        thread::scope(|s| {
-
            let daemon_to_stream = s.spawn(|| -> Result<(), UploadError> {
+
            let daemon_to_stream = thread::Builder::new().name(self.name.clone()).spawn_scoped(s, || {
                let mut buffer = [0; u16::MAX as usize + 1];

                loop {
                    match daemon_r.read(&mut buffer) {
                        Ok(0) => break,
                        Ok(n) => {
-
                            stream_w.write_all(&buffer[..n])?;
+
                            stream_w.send(buffer[..n].to_vec())?;

                            if let Err(e) = self.handle.flush(remote, stream) {
                                log::error!(target: "worker", "Worker channel disconnected; aborting");
-
                                return Err(e.into());
+
                                return Err(e);
                            }
                        }
                        Err(e) => {
@@ -358,12 +355,12 @@ impl Worker {
                                log::debug!(target: "worker", "Daemon closed the git connection for {rid}");
                                break;
                            }
-
                            return Err(e.into());
+
                            return Err(e);
                        }
                    }
                }
-
                Self::eof(remote, stream, stream_w, &mut self.handle).map_err(UploadError::from)
-
            });
+
                Self::eof(remote, stream, stream_w, &mut self.handle)
+
            })?;

            let stream_to_daemon = s.spawn(move || {
                stream_r
@@ -531,11 +528,6 @@ pub mod pktline {
            Self { stream }
        }

-
        /// Get the underlying stream.
-
        pub fn stream(&mut self) -> &mut R {
-
            self.stream
-
        }
-

        /// Parse a Git request packet-line.
        ///
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
modified radicle-node/src/worker/channels.rs
@@ -1,4 +1,5 @@
-
use std::io::{Read, Write};
+
use std::io::Read;
+
use std::ops::Deref;
use std::{fmt, io};

use crossbeam_channel as chan;
@@ -14,6 +15,12 @@ pub enum ChannelEvent<T = Vec<u8>> {
    Eof,
}

+
impl<T> From<T> for ChannelEvent<T> {
+
    fn from(value: T) -> Self {
+
        Self::Data(value)
+
    }
+
}
+

impl<T> fmt::Debug for ChannelEvent<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
@@ -26,43 +33,42 @@ impl<T> fmt::Debug for ChannelEvent<T> {

/// Worker channels for communicating through the git stream with the remote.
pub struct Channels<T = Vec<u8>> {
-
    pub sender: ChannelWriter<T>,
-
    pub receiver: ChannelReader<T>,
+
    sender: ChannelWriter<T>,
+
    receiver: ChannelReader<T>,
}

-
impl Write for Channels {
-
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-
        self.sender.write(buf)
-
    }
+
impl<T: AsRef<[u8]>> Channels<T> {
+
    pub fn new(
+
        sender: chan::Sender<ChannelEvent<T>>,
+
        receiver: chan::Receiver<ChannelEvent<T>>,
+
    ) -> Self {
+
        let sender = ChannelWriter { sender };
+
        let receiver = ChannelReader::new(receiver);

-
    fn flush(&mut self) -> io::Result<()> {
-
        self.sender.flush()
+
        Self { sender, receiver }
    }
-
}

-
impl Read for Channels {
-
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
        self.receiver.read(buf)
+
    pub fn pair() -> io::Result<(Channels<T>, Channels<T>)> {
+
        let (l_send, r_recv) = chan::unbounded::<ChannelEvent<T>>();
+
        let (r_send, l_recv) = chan::unbounded::<ChannelEvent<T>>();
+

+
        let l = Channels::new(l_send, l_recv);
+
        let r = Channels::new(r_send, r_recv);
+

+
        Ok((l, r))
    }
-
}

-
impl<T> Channels<T> {
-
    pub fn new(
-
        sender: chan::Sender<ChannelEvent<T>>,
-
        receiver: chan::Receiver<ChannelEvent<T>>,
-
    ) -> Self {
-
        Channels {
-
            sender: ChannelWriter(sender),
-
            receiver: ChannelReader {
-
                receiver,
-
                buffer: io::Cursor::new(Vec::new()),
-
            },
-
        }
+
    pub fn try_iter(&self) -> impl Iterator<Item = ChannelEvent<T>> + '_ {
+
        self.receiver.try_iter()
    }

    pub fn split(&mut self) -> (&mut ChannelWriter<T>, &mut ChannelReader<T>) {
        (&mut self.sender, &mut self.receiver)
    }
+

+
    pub fn send(&self, event: ChannelEvent<T>) -> io::Result<()> {
+
        self.sender.send(event)
+
    }
}

/// Wraps a [`chan::Receiver`] and provides it with [`io::Read`].
@@ -72,11 +78,26 @@ pub struct ChannelReader<T = Vec<u8>> {
    receiver: chan::Receiver<ChannelEvent<T>>,
}

-
impl ChannelReader<Vec<u8>> {
+
impl<T> Deref for ChannelReader<T> {
+
    type Target = chan::Receiver<ChannelEvent<T>>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.receiver
+
    }
+
}
+

+
impl<T: AsRef<[u8]>> ChannelReader<T> {
+
    pub fn new(receiver: chan::Receiver<ChannelEvent<T>>) -> Self {
+
        Self {
+
            buffer: io::Cursor::new(Vec::new()),
+
            receiver,
+
        }
+
    }
+

    pub fn pipe<W: io::Write>(&mut self, mut writer: W) -> io::Result<()> {
        loop {
            match self.receiver.recv() {
-
                Ok(ChannelEvent::Data(data)) => writer.write_all(&data)?,
+
                Ok(ChannelEvent::Data(data)) => writer.write_all(data.as_ref())?,
                Ok(ChannelEvent::Eof) => return Ok(()),
                Ok(ChannelEvent::Close) => return Err(io::ErrorKind::ConnectionReset.into()),
                Err(_) => {
@@ -93,33 +114,39 @@ impl ChannelReader<Vec<u8>> {
impl Read for ChannelReader<Vec<u8>> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let read = self.buffer.read(buf)?;
-
        if read == 0 {
-
            let event = self.receiver.recv().map_err(|_| {
-
                io::Error::new(
-
                    io::ErrorKind::BrokenPipe,
-
                    "error reading from stream: channel is disconnected",
-
                )
-
            })?;
-

-
            match event {
-
                ChannelEvent::Data(data) => {
-
                    self.buffer = io::Cursor::new(data);
-
                    self.buffer.read(buf)
-
                }
-
                ChannelEvent::Eof => Err(io::ErrorKind::UnexpectedEof.into()),
-
                ChannelEvent::Close => Err(io::ErrorKind::ConnectionReset.into()),
+
        if read > 0 {
+
            return Ok(read);
+
        }
+

+
        match self.receiver.recv() {
+
            Ok(ChannelEvent::Data(data)) => {
+
                self.buffer = io::Cursor::new(data);
+
                self.buffer.read(buf)
            }
-
        } else {
-
            Ok(read)
+
            Ok(ChannelEvent::Eof) => Err(io::ErrorKind::UnexpectedEof.into()),
+
            Ok(ChannelEvent::Close) => Err(io::ErrorKind::ConnectionReset.into()),
+

+
            Err(_) => Err(io::Error::new(
+
                io::ErrorKind::BrokenPipe,
+
                "error reading from stream: channel is disconnected",
+
            )),
        }
    }
}

/// Wraps a [`chan::Sender`] and provides it with [`io::Write`].
#[derive(Clone)]
-
pub struct ChannelWriter<T = Vec<u8>>(chan::Sender<ChannelEvent<T>>);
+
pub struct ChannelWriter<T = Vec<u8>> {
+
    sender: chan::Sender<ChannelEvent<T>>,
+
}
+

+
impl<T: AsRef<[u8]>> ChannelWriter<T> {
+
    pub fn send(&self, event: impl Into<ChannelEvent<T>>) -> io::Result<()> {
+
        self.sender
+
            .send(event.into())
+
            .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))
+
    }

-
impl ChannelWriter {
    /// Since the git protocol is tunneled over an existing connection, we can't signal the end of
    /// the protocol via the usual means, which is to close the connection. Git also doesn't have
    /// any special message we can send to signal the end of the protocol.
@@ -127,28 +154,7 @@ impl ChannelWriter {
    /// Hence, we there's no other way for the server to know that we're done sending requests
    /// than to send a special message outside the git protocol. This message can then be processed
    /// by the remote worker to end the protocol. We use the special "eof" control message for this.
-
    pub fn eof(&self) -> Result<(), chan::SendError<ChannelEvent>> {
-
        self.0.send(ChannelEvent::Eof)
-
    }
-
}
-

-
impl Write for ChannelWriter {
-
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-
        let data = buf.to_vec();
-
        self.0.send(ChannelEvent::Data(data)).map_err(|m| {
-
            io::Error::new(
-
                io::ErrorKind::BrokenPipe,
-
                format!(
-
                    "error writing to stream: channel is disconnected: dropped {:?}",
-
                    m.into_inner()
-
                ),
-
            )
-
        })?;
-

-
        Ok(buf.len())
-
    }
-

-
    fn flush(&mut self) -> io::Result<()> {
-
        Ok(())
+
    pub fn eof(&self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
+
        self.sender.send(ChannelEvent::Eof)
    }
}
modified radicle-node/src/worker/tunnel.rs
@@ -1,5 +1,5 @@
use std::{
-
    io::{self, Read, Write},
+
    io::{self, Read},
    net, thread, time,
};

@@ -69,7 +69,7 @@ impl<'a> Tunnel<'a> {
                        match local_r.read(&mut buffer) {
                            Ok(0) => break,
                            Ok(n) => {
-
                                remote_w.write_all(&buffer[..n])?;
+
                                remote_w.send(buffer[..n].to_vec())?;

                                if let Err(e) = self.handle.flush(nid, stream_id) {
                                    log::error!(