Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Give each peer its own inbox
Alexis Sellier committed 3 years ago
commit c138cd062c8251a67c0ca784251ae5d7503501b4
parent 054655d79b822ebeb5ab78ec506c59cb82a5271d
1 file changed +41 -19
modified radicle-node/src/wire/protocol.rs
@@ -66,7 +66,11 @@ enum Peer {
    Connecting { link: Link, id: Option<NodeId> },
    /// The state after handshake is completed.
    /// Peers in this state are handled by the underlying service.
-
    Connected { link: Link, id: NodeId },
+
    Connected {
+
        link: Link,
+
        id: NodeId,
+
        inbox: VecDeque<u8>,
+
    },
    /// The state after a peer was disconnected, either during handshake,
    /// or once connected.
    Disconnected {
@@ -79,6 +83,7 @@ enum Peer {
        fetch: Fetch,
        link: Link,
        id: NodeId,
+
        inbox: VecDeque<u8>,
    },
    /// The peer is now upgraded and we are in control of the socket.
    Upgraded { link: Link, id: NodeId },
@@ -89,9 +94,11 @@ impl std::fmt::Debug for Peer {
        match self {
            Self::Connecting { link, id: Some(id) } => write!(f, "Connecting({link:?}, {id})"),
            Self::Connecting { link, id: None } => write!(f, "Connecting({link:?})"),
-
            Self::Connected { link, id } => write!(f, "Connected({link:?}, {id})"),
+
            Self::Connected { link, id, .. } => write!(f, "Connected({link:?}, {id})"),
            Self::Disconnected { .. } => write!(f, "Disconnected"),
-
            Self::Upgrading { fetch, link, id } => write!(
+
            Self::Upgrading {
+
                fetch, link, id, ..
+
            } => write!(
                f,
                "Upgrading(initiated={}, {link:?}, {id})",
                fetch.initiated
@@ -121,7 +128,11 @@ impl Peer {
    /// Switch to connected state.
    fn connected(&mut self, id: NodeId) {
        if let Self::Connecting { link, .. } = self {
-
            *self = Self::Connected { link: *link, id };
+
            *self = Self::Connected {
+
                link: *link,
+
                id,
+
                inbox: VecDeque::new(),
+
            };
        } else {
            panic!("Peer::connected: session for {id} is already established");
        }
@@ -143,28 +154,37 @@ impl Peer {

    /// Switch to upgrading state.
    fn upgrading(&mut self, fetch: Fetch) {
-
        if let Self::Connected { id, link } = self {
+
        if let Self::Connected { id, link, inbox } = self {
            *self = Self::Upgrading {
                fetch,
                id: *id,
                link: *link,
+
                inbox: inbox.clone(),
            };
        } else {
            panic!("Peer::upgrading: session is not connected");
        }
    }

-
    /// Switch to upgraded state.
-
    fn upgraded(&mut self) -> Fetch {
-
        if let Self::Upgrading { fetch, id, link } = self {
+
    /// Switch to upgraded state. Returns the unread bytes from the peer.
+
    #[must_use]
+
    fn upgraded(&mut self) -> (Fetch, Vec<u8>) {
+
        if let Self::Upgrading {
+
            fetch,
+
            id,
+
            link,
+
            inbox,
+
        } = self
+
        {
            let fetch = fetch.clone();
+
            let inbox = inbox.drain(..).collect();
            log::debug!(target: "wire", "Peer {id} upgraded for fetch {}", fetch.rid);

            *self = Self::Upgraded {
                id: *id,
                link: *link,
            };
-
            fetch
+
            (fetch, inbox)
        } else {
            panic!("Peer::upgraded: can't upgrade before handover");
        }
@@ -176,6 +196,7 @@ impl Peer {
            *self = Self::Connected {
                id: *id,
                link: *link,
+
                inbox: VecDeque::new(),
            };
        } else {
            panic!("Peer::downgrade: can't downgrade if not in upgraded state");
@@ -199,8 +220,6 @@ pub struct Wire<R, S, W, G: Signer + EcSign> {
    peers: HashMap<RawFd, Peer>,
    /// SOCKS5 proxy address.
    proxy: net::SocketAddr,
-
    /// Buffer for incoming peer data.
-
    read_queue: VecDeque<u8>,
}

impl<R, S, W, G> Wire<R, S, W, G>
@@ -230,7 +249,6 @@ where
            proxy,
            actions: VecDeque::new(),
            peers: HashMap::default(),
-
            read_queue: VecDeque::new(),
        }
    }

@@ -312,7 +330,7 @@ where
    fn upgraded(&mut self, transport: NetTransport<WireSession<G>>) {
        let fd = transport.as_raw_fd();
        let peer = self.peer_mut_by_fd(fd);
-
        let fetch = peer.upgraded();
+
        let (fetch, drain) = peer.upgraded();
        let session = match transport.into_session() {
            Ok(session) => session,
            Err(_) => panic!("Transport::upgraded: peer write buffer not empty on upgrade"),
@@ -323,7 +341,7 @@ where
            .send(Task {
                fetch,
                session,
-
                drain: self.read_queue.drain(..).collect(),
+
                drain,
            })
            .is_err()
        {
@@ -470,11 +488,11 @@ where
                self.service.connected(node_id, link);
            }
            SessionEvent::Data(data) => {
-
                if let Some(Peer::Connected { id, .. }) = self.peers.get(&fd) {
-
                    self.read_queue.extend(data);
+
                if let Some(Peer::Connected { id, inbox, .. }) = self.peers.get_mut(&fd) {
+
                    inbox.extend(data);

                    loop {
-
                        match Message::decode(&mut self.read_queue) {
+
                        match Message::decode(inbox) {
                            Ok(msg) => self.service.received_message(*id, msg),
                            Err(err) if err.is_eof() => {
                                // Buffer is empty, or message isn't complete.
@@ -488,7 +506,7 @@ where
                                } else {
                                    vec![]
                                };
-
                                leftover.extend(self.read_queue.drain(..));
+
                                leftover.extend(inbox.drain(..));

                                if !leftover.is_empty() {
                                    log::debug!(target: "wire", "Dropping read buffer with `{:?}`", &leftover);
@@ -502,8 +520,12 @@ where
                            }
                        }
                    }
+
                } else if let Some(Peer::Upgrading { inbox, .. }) = self.peers.get_mut(&fd) {
+
                    // If somehow the remote peer managed to send git data before the reactor
+
                    // unregistered our session, we'll hit this branch.
+
                    inbox.extend(data);
                } else {
-
                    log::warn!(target: "wire", "Dropping message from unconnected peer with fd {fd}");
+
                    log::warn!(target: "wire", "Dropping message from unconnected peer (fd={fd})");
                }
            }
            SessionEvent::Terminated(err) => {