Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Handle case of crossing fetch requests
Alexis Sellier committed 3 years ago
commit c3a5aced07d230f3ec909533a9e6d68b88a1246c
parent fb635d80c391e12c1de80842da92c53e97b5c01e
4 files changed +55 -17
modified radicle-cli/tests/commands.rs
@@ -412,7 +412,7 @@ fn test_cob_replication() {

    // Make sure that Bob's issue refs announcement has a different timestamp than his fork's
    // announcement, otherwise Alice will consider it stale.
-
    thread::sleep(time::Duration::from_secs(1));
+
    thread::sleep(time::Duration::from_millis(3));

    bob.handle.announce_refs(rid).unwrap();

modified radicle-node/src/service.rs
@@ -1057,11 +1057,39 @@ where
                    }
                }
            }
-
            (session::State::Connected { .. }, Message::Fetch { rid }) => {
+
            (
+
                session::State::Connected {
+
                    protocol: session::Protocol::Gossip { requested },
+
                    ..
+
                },
+
                Message::Fetch { rid },
+
            ) => {
                debug!(target: "service", "Fetch requested for {rid} from {remote}..");

                // TODO: Check that we have the repo first?

+
                // We got a fetch request right after sending our own. We have to decide on which
+
                // fetch to run: our own, or the remote's.
+
                if let Some(req) = requested {
+
                    debug!(target: "service", "Received fetch request from {remote} while attempting to fetch {req}..");
+

+
                    // When fetch requests cross, the inbound peer takes precedence.
+
                    if peer.link.is_inbound() {
+
                        debug!(target: "service", "Cancelling fetch request to {remote}..");
+

+
                        // Cancel our own fetch request. This doesn't send anything to the remote,
+
                        // it simply updates the local session's state machine.
+
                        *requested = None;
+

+
                        // TODO: Queue the fetch request as if we tried to request twice from
+
                        // the same node.
+
                    } else {
+
                        // In this case, the remote node will cancel its request, so we don't
+
                        // want to handover the session to the worker here, we will do it when
+
                        // we get the `FetchOk` from the remote.
+
                        return Ok(());
+
                    }
+
                }
                // Accept the request and instruct the transport to handover the socket to the worker.
                self.reactor.write(peer, Message::FetchOk { rid });
                self.reactor.fetch(peer, rid, Namespaces::default(), false);
@@ -1086,7 +1114,7 @@ where
                self.reactor.fetch(peer, rid, Namespaces::default(), true);
            }
            (session::State::Connecting { .. }, msg) => {
-
                error!("Received {:?} from connecting peer {}", msg, peer.id);
+
                error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id);
            }
            (session::State::Disconnected { .. }, msg) => {
                debug!(target: "service", "Ignoring {:?} from disconnected peer {}", msg, peer.id);
modified radicle-node/src/service/reactor.rs
@@ -67,24 +67,26 @@ impl Reactor {
    }

    pub fn write(&mut self, remote: &Session, msg: Message) {
-
        if remote.is_gossip_allowed() {
-
            debug!(target: "service", "Write {:?} to {}", &msg, remote);
-
            self.io.push_back(Io::Write(remote.id, vec![msg]));
-
        } else {
+
        // If we've requested a fetch or are currently fetching, any message to be written
+
        // to the remote peer should be queued.
+
        if remote.is_requesting() || remote.is_fetching() {
            debug!(target: "service", "Queue {:?} for {}", &msg, remote);
            self.outbox.entry(remote.id).or_default().push(msg);
+
        } else {
+
            debug!(target: "service", "Write {:?} to {}", &msg, remote);
+
            self.io.push_back(Io::Write(remote.id, vec![msg]));
        }
    }

    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();
-
        let is_gossip_allowed = remote.is_gossip_allowed();
+
        let queue = remote.is_fetching() || remote.is_requesting();

        for (ix, msg) in msgs.iter().enumerate() {
-
            if is_gossip_allowed {
+
            if queue {
                debug!(
                    target: "service",
-
                    "Write {:?} to {} ({}/{})",
+
                    "Queue {:?} for {} ({}/{})",
                    msg,
                    remote,
                    ix + 1,
@@ -93,7 +95,7 @@ impl Reactor {
            } else {
                debug!(
                    target: "service",
-
                    "Queue {:?} for {} ({}/{})",
+
                    "Write {:?} to {} ({}/{})",
                    msg,
                    remote,
                    ix + 1,
@@ -101,10 +103,10 @@ impl Reactor {
                );
            }
        }
-
        if is_gossip_allowed {
-
            self.io.push_back(Io::Write(remote.id, msgs));
-
        } else {
+
        if queue {
            self.outbox.entry(remote.id).or_default().extend(msgs);
+
        } else {
+
            self.io.push_back(Io::Write(remote.id, msgs));
        }
    }

modified radicle-node/src/service/session.rs
@@ -59,9 +59,17 @@ impl fmt::Display for State {
                write!(f, "connecting")
            }
            Self::Connected { protocol, .. } => match protocol {
-
                Protocol::Gossip { .. } => {
+
                Protocol::Gossip {
+
                    requested: None, ..
+
                } => {
                    write!(f, "connected <gossip>")
                }
+
                Protocol::Gossip {
+
                    requested: Some(rid),
+
                    ..
+
                } => {
+
                    write!(f, "connected <gossip> requested={rid}")
+
                }
                Protocol::Fetch { .. } => {
                    write!(f, "connected <fetch>")
                }
@@ -200,11 +208,11 @@ impl Session {
        matches!(self.state, State::Disconnected { .. })
    }

-
    pub fn is_gossip_allowed(&self) -> bool {
+
    pub fn is_requesting(&self) -> bool {
        matches!(
            self.state,
            State::Connected {
-
                protocol: Protocol::Gossip { requested: None },
+
                protocol: Protocol::Gossip { requested: Some(_) },
                ..
            }
        )