Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Buffer gossips until fetch is completed
Alexis Sellier committed 3 years ago
commit 011923f1dff08562b79b5d6ae902e5301fc08044
parent 0f81f8a7aabbee5a6bbd75d0a7004931fdd15dae
4 files changed +177 -32
modified radicle-node/src/service.rs
@@ -31,7 +31,6 @@ use crate::node::{Address, Features, FetchResult};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
-
use crate::service::session::Protocol;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteStorage};
use crate::storage::{Namespaces, ReadStorage};
@@ -519,7 +518,8 @@ where
            session::FetchResult::Ready(fetch) => {
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

-
                self.reactor.write(&session, fetch);
+
                self.reactor.write(session, fetch);
+
                session.to_requesting(rid);
            }
            session::FetchResult::AlreadyFetching(other) => {
                if other == rid {
@@ -613,6 +613,9 @@ where
                    );
                }
            }
+
            // Drain any messages in the session's outbox, which might have accumulated during the
+
            // fetch, and send them to the peer.
+
            self.reactor.drain(session);
        } else {
            log::debug!(target: "service", "Session not found for {remote}");
        }
@@ -641,6 +644,7 @@ where
            let filter = self.filter();

            if let Some(peer) = self.sessions.get_mut(&remote) {
+
                peer.to_connected(self.clock);
                self.reactor.write_all(
                    peer,
                    gossip::handshake(
@@ -651,7 +655,6 @@ where
                        &self.config,
                    ),
                );
-
                peer.to_connected(self.clock);
            }
        } else {
            match self.sessions.entry(remote) {
@@ -1026,7 +1029,7 @@ where
                    return Ok(());
                }
                self.reactor.write(
-
                    &peer,
+
                    peer,
                    Message::Pong {
                        zeroes: ZeroBytes::new(ponglen),
                    },
@@ -1039,16 +1042,14 @@ where
                    }
                }
            }
-
            (session::State::Connected { protocol, .. }, Message::Fetch { rid }) => {
+
            (session::State::Connected { .. }, Message::Fetch { rid }) => {
                debug!(target: "service", "Fetch requested for {rid} from {remote}..");

                // TODO: Check that we have the repo first?

-
                *protocol = Protocol::Fetch { rid };
                // Accept the request and instruct the transport to handover the socket to the worker.
                self.reactor.write(peer, Message::FetchOk { rid });
-
                self.reactor
-
                    .fetch(*remote, rid, Namespaces::default(), false);
+
                self.reactor.fetch(peer, rid, Namespaces::default(), false);
            }
            (session::State::Connected { protocol, .. }, Message::FetchOk { rid }) => {
                if *protocol
@@ -1066,10 +1067,8 @@ where
                }
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

-
                *protocol = Protocol::Fetch { rid };
                // Instruct the transport to handover the socket to the worker.
-
                self.reactor
-
                    .fetch(*remote, rid, Namespaces::default(), true);
+
                self.reactor.fetch(peer, rid, Namespaces::default(), true);
            }
            (session::State::Connecting { .. }, msg) => {
                error!("Received {:?} from connecting peer {}", msg, peer.id);
@@ -1241,7 +1240,7 @@ where
        let inactive_sessions = self
            .sessions
            .negotiated_mut()
-
            .filter(|(_, session)| session.last_active < *now - KEEP_ALIVE_DELTA)
+
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
            .map(|(_, session)| session);
        for session in inactive_sessions {
            session.ping(&mut self.reactor).ok();
modified radicle-node/src/service/reactor.rs
@@ -1,4 +1,5 @@
-
use std::collections::VecDeque;
+
use std::collections::{HashMap, VecDeque};
+
use std::mem;

use log::*;

@@ -43,6 +44,10 @@ pub struct Fetch {
pub struct Reactor {
    /// Outgoing I/O queue.
    io: VecDeque<Io>,
+
    /// Message outbox for each node.
+
    /// If messages can't be sent to a node immediately, they are stored in the outbox.
+
    /// This can happen if for eg. a fetch is ongoing with that node.
+
    outbox: HashMap<NodeId, Vec<Message>>,
}

impl Reactor {
@@ -62,35 +67,74 @@ impl Reactor {
    }

    pub fn write(&mut self, remote: &Session, msg: Message) {
-
        debug!(target: "service", "Write {:?} to {}", &msg, remote);
-

-
        self.io.push_back(Io::Write(remote.id, vec![msg]));
+
        if remote.is_gossip_allowed() {
+
            debug!(target: "service", "Write {:?} to {}", &msg, remote);
+
            self.io.push_back(Io::Write(remote.id, vec![msg]));
+
        } else {
+
            debug!(target: "service", "Queue {:?} for {}", &msg, remote);
+
            self.outbox.entry(remote.id).or_default().push(msg);
+
        }
    }

    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();
+
        let is_gossip_allowed = remote.is_gossip_allowed();
+

        for (ix, msg) in msgs.iter().enumerate() {
-
            debug!(
-
                target: "service",
-
                "Write {:?} message to {} ({}/{})",
-
                msg,
-
                remote,
-
                ix + 1,
-
                msgs.len()
-
            );
+
            if is_gossip_allowed {
+
                debug!(
+
                    target: "service",
+
                    "Write {:?} to {} ({}/{})",
+
                    msg,
+
                    remote,
+
                    ix + 1,
+
                    msgs.len()
+
                );
+
            } else {
+
                debug!(
+
                    target: "service",
+
                    "Queue {:?} for {} ({}/{})",
+
                    msg,
+
                    remote,
+
                    ix + 1,
+
                    msgs.len()
+
                );
+
            }
+
        }
+
        if is_gossip_allowed {
+
            self.io.push_back(Io::Write(remote.id, msgs));
+
        } else {
+
            self.outbox.entry(remote.id).or_default().extend(msgs);
+
        }
+
    }
+

+
    pub fn drain(&mut self, remote: &Session) {
+
        if let Some(outbox) = self.outbox.get_mut(&remote.id) {
+
            debug!(target: "service", "Draining outbox for session {} ({} message(s))", remote.id, outbox.len());
+

+
            let msgs = mem::take(outbox);
+
            self.write_all(remote, msgs);
        }
-
        self.io.push_back(Io::Write(remote.id, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
        self.io.push_back(Io::Wakeup(after));
    }

-
    pub fn fetch(&mut self, remote: NodeId, rid: Id, namespaces: Namespaces, initiated: bool) {
+
    pub fn fetch(
+
        &mut self,
+
        remote: &mut Session,
+
        rid: Id,
+
        namespaces: Namespaces,
+
        initiated: bool,
+
    ) {
+
        // Transition the session state machine to "fetching".
+
        remote.to_fetching(rid);
+

        self.io.push_back(Io::Fetch(Fetch {
            rid,
            namespaces,
-
            remote,
+
            remote: remote.id,
            initiated,
        }));
    }
modified radicle-node/src/service/session.rs
@@ -145,7 +145,7 @@ impl fmt::Display for Session {
        }
        attrs.push(state.as_str());

-
        write!(f, "{}", attrs.join(" "))
+
        write!(f, "{} [{}]", self.id, attrs.join(" "))
    }
}

@@ -203,6 +203,16 @@ impl Session {
        )
    }

+
    pub fn is_gossip_allowed(&self) -> bool {
+
        matches!(
+
            self.state,
+
            State::Connected {
+
                protocol: Protocol::Gossip { requested: None },
+
                ..
+
            }
+
        )
+
    }
+

    pub fn attempts(&self) -> usize {
        self.attempts
    }
@@ -214,9 +224,6 @@ impl Session {
                    if let Some(requested) = requested {
                        FetchResult::AlreadyFetching(*requested)
                    } else {
-
                        *protocol = Protocol::Gossip {
-
                            requested: Some(rid),
-
                        };
                        FetchResult::Ready(Message::Fetch { rid })
                    }
                }
@@ -227,6 +234,22 @@ impl Session {
        }
    }

+
    pub fn to_requesting(&mut self, rid: Id) {
+
        let State::Connected { protocol, .. } = &mut self.state else {
+
            panic!("Session::to_requesting: cannot transition to 'requesting': session is not connected");
+
        };
+
        *protocol = Protocol::Gossip {
+
            requested: Some(rid),
+
        };
+
    }
+

+
    pub fn to_fetching(&mut self, rid: Id) {
+
        let State::Connected { protocol, .. } = &mut self.state else {
+
            panic!("Session::to_fetching: cannot transition to 'fetching': session is not connected");
+
        };
+
        *protocol = Protocol::Fetch { rid };
+
    }
+

    pub fn to_connecting(&mut self) {
        assert!(
            self.is_disconnected(),
modified radicle-node/src/tests.rs
@@ -10,6 +10,7 @@ use netservices::LinkDirection as Link;
use crate::collections::{HashMap, HashSet};
use crate::crypto::test::signer::MockSigner;
use crate::identity::Id;
+
use crate::node;
use crate::prelude::*;
use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
@@ -20,7 +21,7 @@ use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
use crate::storage::git::Storage;
-
use crate::storage::ReadStorage;
+
use crate::storage::{Namespaces, ReadStorage};
use crate::test::arbitrary;
use crate::test::assert_matches;
use crate::test::fixtures;
@@ -593,6 +594,84 @@ fn test_refs_announcement_no_subscribe() {
}

#[test]
+
fn test_gossip_during_fetch() {
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let eve = Peer::new("eve", [9, 9, 9, 9]);
+
    let now = LocalTime::now().as_millis();
+
    let rid = arbitrary::gen::<Id>(1);
+
    let (send, _recv) = chan::bounded::<node::FetchResult>(1);
+
    let inventory1 = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
+
    let inventory2 = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
+

+
    alice.connect_to(&bob);
+
    alice.connect_to(&eve);
+
    alice.command(Command::Fetch(rid, bob.id, send));
+

+
    assert_matches!(alice.messages(bob.id).next(), Some(Message::Fetch { .. }));
+

+
    logger::init(log::Level::Debug);
+

+
    alice.receive(
+
        eve.id(),
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: inventory1.clone(),
+
                timestamp: now + 1,
+
            },
+
            eve.signer(),
+
        ),
+
    );
+
    // We shouldn't relay to Bob while we're fetching from him.
+
    assert_matches!(alice.messages(bob.id).next(), None);
+

+
    alice.receive(bob.id(), Message::FetchOk { rid });
+
    alice.receive(
+
        eve.id(),
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: inventory2.clone(),
+
                timestamp: now + 2,
+
            },
+
            eve.signer(),
+
        ),
+
    );
+
    // We shouldn't relay to Bob while we're fetching from him.
+
    assert_matches!(alice.messages(bob.id).next(), None);
+

+
    // Have enough time pass that Alice sends a "ping" to Bob.
+
    alice.elapse(KEEP_ALIVE_DELTA);
+

+
    // Now that the fetch is done, the messages Bob missed should be relayed to him.
+
    alice.fetched(
+
        Fetch {
+
            rid,
+
            namespaces: Namespaces::All,
+
            remote: bob.id,
+
            initiated: true,
+
        },
+
        Ok(vec![]),
+
    );
+
    let mut messages = alice.messages(bob.id);
+

+
    assert_matches!(
+
        messages.next(),
+
        Some(Message::Announcement(Announcement {
+
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, .. }),
+
            ..
+
        })) if inventory == inventory1
+
    );
+
    assert_matches!(
+
        messages.next(),
+
        Some(Message::Announcement(Announcement {
+
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, .. }),
+
            ..
+
        })) if inventory == inventory2
+
    );
+
    assert_matches!(messages.next(), Some(Message::Ping { .. }));
+
}
+

+
#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);