Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Rework peer map to not panic
Alexis Sellier committed 3 years ago
commit 5428420f2761695c94769c2c2dd26f496cccd0ff
parent 7a08a1d45906b81cfd70c23ea3024d989d1d768c
2 files changed +91 -78
modified radicle-node/src/service.rs
@@ -724,8 +724,8 @@ where
        };
        let link = session.link;

-
        // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
-
        // return a failure to any potential fetcher.
+
        // If the peer disconnected while we were fetching, return a failure to any
+
        // potential fetcher.
        for rid in session.fetching() {
            if let Some(resp) = self.fetch_reqs.remove(&(rid, remote)) {
                resp.send(FetchResult::Failed {
modified radicle-node/src/wire/protocol.rs
@@ -229,6 +229,61 @@ impl Peer {
    }
}

+
struct Peers(HashMap<RawFd, Peer>);
+

+
impl Peers {
+
    fn get_mut(&mut self, fd: &RawFd) -> Option<&mut Peer> {
+
        self.0.get_mut(fd)
+
    }
+

+
    fn entry(&mut self, fd: RawFd) -> Entry<RawFd, Peer> {
+
        self.0.entry(fd)
+
    }
+

+
    fn insert(&mut self, fd: RawFd, peer: Peer) {
+
        if self.0.insert(fd, peer).is_some() {
+
            log::warn!(target: "wire", "Replacing existing peer fd={fd}");
+
        }
+
    }
+

+
    fn remove(&mut self, fd: &RawFd) -> Option<Peer> {
+
        self.0.remove(fd)
+
    }
+

+
    fn lookup(&self, node_id: &NodeId) -> Option<(RawFd, &Peer)> {
+
        self.0
+
            .iter()
+
            .find(|(_, peer)| peer.id() == Some(node_id))
+
            .map(|(fd, peer)| (*fd, peer))
+
    }
+

+
    fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(RawFd, &mut Peer)> {
+
        self.0
+
            .iter_mut()
+
            .find(|(_, peer)| peer.id() == Some(node_id))
+
            .map(|(fd, peer)| (*fd, peer))
+
    }
+

+
    fn active(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
+
        self.0.iter().filter_map(|(fd, peer)| match peer {
+
            Peer::Inbound {} => None,
+
            Peer::Outbound { id } => Some((*fd, id)),
+
            Peer::Connected { nid: id, .. } => Some((*fd, id)),
+
            Peer::Disconnecting { .. } => None,
+
        })
+
    }
+

+
    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
+
        self.0.iter().filter_map(|(fd, peer)| {
+
            if let Peer::Connected { nid: id, .. } = peer {
+
                Some((*fd, id))
+
            } else {
+
                None
+
            }
+
        })
+
    }
+
}
+

/// Wire protocol implementation for a set of peers.
pub struct Wire<R, S, W, G: Signer + Ecdh> {
    /// Backing service instance.
@@ -240,7 +295,7 @@ pub struct Wire<R, S, W, G: Signer + Ecdh> {
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Peer sessions.
-
    peers: HashMap<RawFd, Peer>,
+
    peers: Peers,
    /// SOCKS5 proxy address.
    proxy: net::SocketAddr,
}
@@ -269,7 +324,7 @@ where
            signer,
            proxy,
            actions: VecDeque::new(),
-
            peers: HashMap::default(),
+
            peers: Peers(HashMap::default()),
        }
    }

@@ -277,52 +332,6 @@ where
        self.actions.push_back(Action::RegisterListener(socket));
    }

-
    fn fd_by_id(&self, node_id: &NodeId) -> (RawFd, &Peer) {
-
        self.peers
-
            .iter()
-
            .find(|(_, peer)| peer.id() == Some(node_id))
-
            .map(|(fd, peer)| (*fd, peer))
-
            .unwrap_or_else(|| panic!("Peer {node_id} was expected to be known to the transport"))
-
    }
-

-
    fn fd_by_id_mut(&mut self, node_id: &NodeId) -> (RawFd, &mut Peer) {
-
        self.peers
-
            .iter_mut()
-
            .find(|(_, peer)| peer.id() == Some(node_id))
-
            .map(|(fd, peer)| (*fd, peer))
-
            .unwrap_or_else(|| panic!("Peer {node_id} was expected to be known to the transport"))
-
    }
-

-
    fn connected_fd_by_id(&self, node_id: &NodeId) -> RawFd {
-
        match self.fd_by_id(node_id) {
-
            (fd, Peer::Connected { .. }) => fd,
-
            (fd, peer) => {
-
                panic!(
-
                    "Peer {node_id} (fd={fd}) was expected to be in a connected state ({peer:?})"
-
                )
-
            }
-
        }
-
    }
-

-
    fn active(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
-
        self.peers.iter().filter_map(|(fd, peer)| match peer {
-
            Peer::Inbound {} => None,
-
            Peer::Outbound { id } => Some((*fd, id)),
-
            Peer::Connected { nid: id, .. } => Some((*fd, id)),
-
            Peer::Disconnecting { .. } => None,
-
        })
-
    }
-

-
    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
-
        self.peers.iter().filter_map(|(fd, peer)| {
-
            if let Peer::Connected { nid: id, .. } = peer {
-
                Some((*fd, id))
-
            } else {
-
                None
-
            }
-
        })
-
    }
-

    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason) {
        match self.peers.get_mut(&fd) {
            Some(Peer::Disconnecting { .. }) => {
@@ -348,25 +357,20 @@ where
        );

        let nid = task.remote;
-
        let Some((fd, peer)) = self
-
            .peers
-
            .iter_mut()
-
            .find(|(_, peer)| peer.id() == Some(&nid))
-
            .map(|(fd, peer)| (*fd, peer)) else {
-
                log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
-
                return;
-
            };
+
        let Some((fd, peer)) = self.peers.lookup_mut(&nid) else {
+
            log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
+
            return;
+
        };

        let Peer::Connected { nid, link, streams, .. } = peer else {
            log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
            return;
        };
-
        let remote = *nid;

        // Only call into the service if we initiated this fetch.
        match task.result {
            FetchResult::Initiator { rid, result } => {
-
                self.service.fetched(rid, remote, result);
+
                self.service.fetched(rid, *nid, result);
            }
            FetchResult::Responder { .. } => {
                // We don't do anything with upload results for now.
@@ -387,14 +391,10 @@ where
    }

    fn flush(&mut self, remote: NodeId, stream: StreamId) {
-
        let Some((fd, peer)) = self
-
            .peers
-
            .iter()
-
            .find(|(_, peer)| peer.id() == Some(&remote))
-
            .map(|(fd, peer)| (*fd, peer)) else {
-
                log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
-
                return;
-
            };
+
        let Some((fd, peer)) = self.peers.lookup(&remote) else {
+
            log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
+
            return;
+
        };
        let Peer::Connected { streams, link, .. } = peer else {
            log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
            return;
@@ -483,6 +483,7 @@ where
                log::debug!(target: "wire", "Session established with {id} (fd={fd})");

                let conflicting = self
+
                    .peers
                    .active()
                    .filter(|(other, d)| **d == id && *other != fd)
                    .map(|(fd, _)| fd)
@@ -734,14 +735,18 @@ where
        while let Some(ev) = self.service.next() {
            match ev {
                Io::Write(node_id, msgs) => {
-
                    let (fd, link) = match self.fd_by_id(&node_id) {
-
                        (fd, Peer::Connected { link, .. }) => (fd, *link),
-
                        (_, peer) => {
+
                    let (fd, link) = match self.peers.lookup(&node_id) {
+
                        Some((fd, Peer::Connected { link, .. })) => (fd, *link),
+
                        Some((_, peer)) => {
                            // If the peer is disconnected by the wire protocol, the service may
                            // not be aware of this yet, and may continue to write messages to it.
                            log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
                            continue;
                        }
+
                        None => {
+
                            log::error!(target: "wire", "Dropping {} message(s) to {node_id}: unknown peer", msgs.len());
+
                            continue;
+
                        }
                    };
                    log::trace!(
                        target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
@@ -756,7 +761,7 @@ where
                    self.actions.push_back(reactor::Action::Send(fd, data));
                }
                Io::Connect(node_id, addr) => {
-
                    if self.connected().any(|(_, id)| id == &node_id) {
+
                    if self.peers.connected().any(|(_, id)| id == &node_id) {
                        log::error!(
                            target: "wire",
                            "Attempt to connect to already connected peer {node_id}"
@@ -793,9 +798,12 @@ where
                        }
                    }
                }
-
                Io::Disconnect(node_id, reason) => {
-
                    let fd = self.connected_fd_by_id(&node_id);
-
                    self.disconnect(fd, reason);
+
                Io::Disconnect(nid, reason) => {
+
                    if let Some((fd, Peer::Connected { .. })) = self.peers.lookup(&nid) {
+
                        self.disconnect(fd, reason);
+
                    } else {
+
                        log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
+
                    }
                }
                Io::Wakeup(d) => {
                    self.actions.push_back(reactor::Action::SetTimer(d.into()));
@@ -807,9 +815,14 @@ where
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");

-
                    let (fd, Peer::Connected { link, streams,  .. }) =
-
                        self.fd_by_id_mut(&remote) else {
-
                            panic!("Wire::next: peer {remote} is not connected");
+
                    let Some((fd, Peer::Connected { link, streams,  .. })) =
+
                        self.peers.lookup_mut(&remote) else {
+
                            // Nb. It's possible that a peer is disconnected while an `Io::Fetch`
+
                            // is in the service's i/o buffer. Since the service may not purge the
+
                            // buffer on disconnect, we should just ignore i/o actions that don't
+
                            // have a connected peer.
+
                            log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
+
                            continue;
                        };
                    let (stream, channels) = streams.open();