Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Update io-reactor
Dr Maxim Orlovsky committed 3 years ago
commit cf0d55fb119f619ad081934ac5238536d111c202
parent 947c89a398f5c06c0544072bbd23618fa7b63a8e
3 files changed +61 -59
modified Cargo.lock
@@ -1370,7 +1370,7 @@ dependencies = [
[[package]]
name = "io-reactor"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#2a09c1714bea9baf8a96b9bd09d2dac4181a5019"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#32b3d308566228ebeb04b6aea4d6f17a9c4dfe6c"
dependencies = [
 "amplify",
 "crossbeam-channel",
@@ -1618,7 +1618,7 @@ dependencies = [
[[package]]
name = "netservices"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#2a09c1714bea9baf8a96b9bd09d2dac4181a5019"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#32b3d308566228ebeb04b6aea4d6f17a9c4dfe6c"
dependencies = [
 "amplify",
 "cyphernet",
modified radicle-node/src/wire/transport.rs
@@ -24,11 +24,12 @@ use radicle::storage::WriteStorage;
use crate::crypto::Signer;
use crate::service::reactor::{Fetch, Io};
use crate::service::{routing, session, Message, Service};
+
use crate::wire::{Decode, Encode};
use crate::worker::{WorkerReq, WorkerResp};
use crate::{address, service};

/// Reactor action.
-
type Action<G> = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>, Message>>;
+
type Action<G> = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>>>;

/// Peer connection state machine.
#[derive(Debug)]
@@ -113,12 +114,12 @@ impl<G: Negotiator> Peer<G> {
    }

    /// Switch back from upgraded to connected state.
-
    fn downgrade(&mut self) -> Link {
+
    fn downgrade(&mut self) {
        if let Self::Upgraded { id, link, .. } = self {
-
            let link = *link;
-
            *self = Self::Connected { id: *id, link };
-

-
            link
+
            *self = Self::Connected {
+
                id: *id,
+
                link: *link,
+
            };
        } else {
            panic!("Peer::downgrade: can't downgrade if not in upgraded state");
        }
@@ -139,6 +140,8 @@ pub struct Transport<R, S, W, G: Negotiator> {
    peers: HashMap<RawFd, Peer<G>>,
    /// SOCKS5 proxy address.
    proxy: net::SocketAddr,
+
    /// Buffer for incoming peer data.
+
    read_queue: VecDeque<u8>,
}

impl<R, S, W, G> Transport<R, S, W, G>
@@ -164,6 +167,7 @@ where
            proxy,
            actions: VecDeque::new(),
            peers: HashMap::default(),
+
            read_queue: VecDeque::new(),
        }
    }

@@ -214,28 +218,21 @@ where
        self.actions.push_back(Action::UnregisterTransport(fd));
    }

-
    fn upgraded(&mut self, transport: NetTransport<NoiseXk<G>, Message>) {
-
        let fd = transport.as_raw_fd();
+
    fn upgraded(&mut self, session: NetTransport<NoiseXk<G>>) {
+
        let fd = session.as_raw_fd();
        let Some(peer) = self.peers.get_mut(&fd) else {
            log::error!(target: "transport", "Peer with fd {fd} was not found");
            return;
        };
        let (send, recv) = chan::bounded::<WorkerResp<G>>(1);
        let fetch = peer.upgraded(recv);
-
        // Downgrade the transport to a simple session and the buffer of incoming data that is
-
        // unprocessed. This buffer is provided as initial input to the worker.
-
        let Ok((session, drain)) = transport.downgrade() else {
-
            // This can happen in case the service attempts to send data to a peer after it has
-
            // initiated an upgrade protocol.
-
            panic!("Transport::upgraded: outgoing messages buffer is not empty");
-
        };

        if self
            .worker
            .send(WorkerReq {
                fetch,
                session,
-
                drain,
+
                drain: self.read_queue.drain(..).collect(),
                channel: send,
            })
            .is_err()
@@ -255,11 +252,9 @@ where
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
            return;
        };
-
        let link = peer.downgrade();
-
        let transport = NetTransport::upgrade(session, link == Link::Inbound)
-
            .expect("unable to set socket into non-blocking mode");
+
        peer.downgrade();

-
        self.actions.push_back(Action::RegisterTransport(transport));
+
        self.actions.push_back(Action::RegisterTransport(session));
        self.service.fetch_complete(resp.result);
    }
}
@@ -272,7 +267,7 @@ where
    G: Signer + Negotiator + Send,
{
    type Listener = NetAccept<NoiseXk<G>>;
-
    type Transport = NetTransport<NoiseXk<G>, Message>;
+
    type Transport = NetTransport<NoiseXk<G>>;
    type Command = service::Command;

    fn tick(&mut self, time: Instant) {
@@ -313,7 +308,7 @@ where
                self.peers
                    .insert(session.as_raw_fd(), Peer::connecting(Link::Inbound));

-
                let transport = match NetTransport::<NoiseXk<G>, Message>::upgrade(session, true) {
+
                let transport = match NetTransport::<NoiseXk<G>>::accept(session) {
                    Ok(transport) => transport,
                    Err(err) => {
                        log::error!(target: "transport", "Failed to upgrade accepted peer socket: {err}");
@@ -324,20 +319,15 @@ where
                self.actions
                    .push_back(reactor::Action::RegisterTransport(transport))
            }
-
            ListenerEvent::Error(err) => {
+
            ListenerEvent::Failure(err) => {
                log::error!(target: "transport", "Error listening for inbound connections: {err}");
            }
        }
    }

-
    fn handle_transport_event(
-
        &mut self,
-
        fd: RawFd,
-
        event: SessionEvent<NoiseXk<G>, Message>,
-
        _: Instant,
-
    ) {
+
    fn handle_transport_event(&mut self, fd: RawFd, event: SessionEvent<NoiseXk<G>>, _: Instant) {
        match event {
-
            SessionEvent::SessionEstablished(node_id) => {
+
            SessionEvent::Established(node_id) => {
                log::debug!(target: "transport", "Session established with {node_id}");

                let conflicting = self
@@ -374,34 +364,42 @@ where
                peer.connected(node_id);
                self.service.connected(node_id, link);
            }
-
            SessionEvent::Message(msg) => {
+
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected { link, id }) = self.peers.get(&fd) {
-
                    log::debug!(
-
                        target: "transport", "Received message {:?} from {} ({:?})", msg, id, link
-
                    );
-
                    self.service.received_message(*id, msg);
+
                    self.read_queue.extend(data);
+

+
                    loop {
+
                        match Message::decode(&mut self.read_queue) {
+
                            Ok(msg) => {
+
                                log::debug!(
+
                                    target: "transport", "Received message {:?} from {} ({:?})", msg, id, link
+
                                );
+
                                self.service.received_message(*id, msg)
+
                            }
+
                            Err(err) if err.is_eof() => {
+
                                // Buffer is empty, or message isn't complete.
+
                                break;
+
                            }
+
                            Err(err) => {
+
                                // TODO(cloudhead): Include error in reason.
+
                                log::error!(target: "transport", "Invalid message from {}: {err}", id);
+
                                self.disconnect(
+
                                    fd,
+
                                    DisconnectReason::Protocol(service::DisconnectReason::Error(
+
                                        session::Error::Misbehavior,
+
                                    )),
+
                                );
+
                                break;
+
                            }
+
                        }
+
                    }
                } else {
                    log::warn!(target: "transport", "Dropping message from unconnected peer with fd {fd}");
                }
            }
-
            SessionEvent::FrameFailure(_err) => {
-
                // TODO(cloudhead): Include error in reason.
-
                self.disconnect(
-
                    fd,
-
                    DisconnectReason::Protocol(service::DisconnectReason::Error(
-
                        session::Error::Misbehavior,
-
                    )),
-
                );
-
            }
-
            SessionEvent::ConnectionFailure(err) => {
+
            SessionEvent::Terminated(err) => {
                self.disconnect(fd, DisconnectReason::ConnectionError(Arc::new(err)));
            }
-
            SessionEvent::Disconnected => {
-
                self.disconnect(
-
                    fd,
-
                    DisconnectReason::Protocol(service::DisconnectReason::Peer),
-
                );
-
            }
        }
    }

@@ -462,7 +460,7 @@ where
    W: WriteStorage + 'static,
    G: Signer + Negotiator,
{
-
    type Item = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>, Message>>;
+
    type Item = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>>>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(event) = self.actions.pop_front() {
@@ -476,8 +474,11 @@ where
                        target: "transport", "Sending {} message(s) to {}", msgs.len(), node_id
                    );
                    let fd = self.by_id(&node_id);
-

-
                    return Some(reactor::Action::Send(fd, msgs));
+
                    let mut data = Vec::new();
+
                    for msg in msgs {
+
                        msg.encode(&mut data).expect("in-memory writes never fail");
+
                    }
+
                    return Some(reactor::Action::Send(fd, data));
                }
                Io::Event(_e) => {
                    log::warn!(
@@ -499,7 +500,7 @@ where
                        break;
                    }

-
                    match NetTransport::<NoiseXk<G>, Message>::connect(
+
                    match NetTransport::<NoiseXk<G>>::connect(
                        PeerAddr::new(node_id, socket_addr),
                        &self.keypair,
                    ) {
modified radicle-node/src/worker.rs
@@ -1,5 +1,6 @@
use crossbeam_channel as chan;
use netservices::noise::NoiseXk;
+
use netservices::wire::NetTransport;
use std::thread;
use std::thread::JoinHandle;

@@ -13,7 +14,7 @@ use crate::service::FetchResult;
/// Worker request.
pub struct WorkerReq<G: Negotiator> {
    pub fetch: Fetch,
-
    pub session: NoiseXk<G>,
+
    pub session: NetTransport<NoiseXk<G>>,
    pub drain: Vec<u8>,
    pub channel: chan::Sender<WorkerResp<G>>,
}
@@ -21,7 +22,7 @@ pub struct WorkerReq<G: Negotiator> {
/// Worker response.
pub struct WorkerResp<G: Negotiator> {
    pub result: FetchResult,
-
    pub session: NoiseXk<G>,
+
    pub session: NetTransport<NoiseXk<G>>,
}

pub struct Worker<G: Negotiator> {