Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Remove redundant handshake
Alexis Sellier committed 3 years ago
commit a7d2fa95cd50bb102cb264981010d6bae82d5a47
parent 71561a8e12951001410a942178c9aede81457ccd
7 files changed +42 -106
modified radicle-node/src/service.rs
@@ -433,7 +433,7 @@ where
                    Ok(seeds) => seeds
                        .into_iter()
                        .filter(|node| *node != self.node_id())
-
                        .partition::<Vec<_>, _>(|node| self.sessions.is_negotiated(node)),
+
                        .partition::<Vec<_>, _>(|node| self.sessions.is_connected(node)),
                    Err(err) => {
                        error!(target: "service", "Error reading routing table for {rid}: {err}");
                        drop(resp);
@@ -465,7 +465,7 @@ where
                // Let all our peers know that we're interested in this repo from now on.
                self.reactor.broadcast(
                    Message::subscribe(self.filter(), self.time(), Timestamp::MAX),
-
                    self.sessions.negotiated().map(|(_, s)| s),
+
                    self.sessions.connected().map(|(_, s)| s),
                );
            }
            Command::UntrackRepo(id, resp) => {
@@ -510,7 +510,7 @@ where
            error!(target: "service", "Session {from} does not exist; cannot initiate fetch");
            return;
        };
-
        debug_assert!(session.is_negotiated());
+
        debug_assert!(session.is_connected());

        let seed = session.id;

@@ -630,23 +630,12 @@ where
    pub fn connected(&mut self, remote: NodeId, link: Link) {
        info!(target: "service", "Connected to {} ({:?})", remote, link);

-
        // For outbound connections, we are the first to say "Hello".
-
        // For inbound connections, we wait for the remote to say "Hello" first.
-
        if link.is_outbound() {
-
            let filter = self.filter();
+
        let msgs = self.initial(link);

+
        if link.is_outbound() {
            if let Some(peer) = self.sessions.get_mut(&remote) {
                peer.to_connected(self.clock);
-
                self.reactor.write_all(
-
                    peer,
-
                    gossip::handshake(
-
                        self.clock.as_millis(),
-
                        &self.storage,
-
                        &self.signer,
-
                        filter,
-
                        &self.config,
-
                    ),
-
                );
+
                self.reactor.write_all(peer, msgs);
            }
        } else {
            match self.sessions.entry(remote) {
@@ -657,13 +646,14 @@ where
                    );
                }
                Entry::Vacant(e) => {
-
                    e.insert(Session::connected(
+
                    let peer = e.insert(Session::connected(
                        remote,
                        Link::Inbound,
                        self.config.is_persistent(&remote),
                        self.rng.clone(),
                        self.clock,
                    ));
+
                    self.reactor.write_all(peer, msgs);
                }
            }
        }
@@ -843,7 +833,7 @@ where
                    // Refs can be relayed by peers who don't have the data in storage,
                    // therefore we only check whether we are connected to the *announcer*,
                    // which is required by the protocol to only announce refs it has.
-
                    if self.sessions.is_negotiated(announcer) {
+
                    if self.sessions.is_connected(announcer) {
                        match message.is_fresh(&self.storage) {
                            Ok(is_fresh) => {
                                if is_fresh {
@@ -932,7 +922,6 @@ where
        remote: &NodeId,
        message: Message,
    ) -> Result<(), session::Error> {
-
        let filter = self.filter(); // TODO: Don't call this if it's not used.
        let Some(peer) = self.sessions.get_mut(remote) else {
            return Err(session::Error::NotFound(*remote));
        };
@@ -954,34 +943,6 @@ where

                return Err(session::Error::Misbehavior);
            }
-
            (session::State::Connected { initialized, .. }, Message::Initialize { .. }) => {
-
                // Already initialized!
-
                if *initialized {
-
                    debug!(
-
                        target: "service",
-
                        "Disconnecting peer {} for initializing already initialized session",
-
                        peer.id
-
                    );
-
                    return Err(session::Error::Misbehavior);
-
                }
-
                *initialized = true;
-

-
                if peer.link.is_inbound() {
-
                    self.reactor.write_all(
-
                        peer,
-
                        gossip::handshake(
-
                            self.clock.as_millis(),
-
                            &self.storage,
-
                            &self.signer,
-
                            filter,
-
                            &self.config,
-
                        ),
-
                    );
-
                }
-
                // Nb. we don't set the peer timestamp here, since it is going to be
-
                // set after the first message is received only. Setting it here would
-
                // mean that messages received right after the handshake could be ignored.
-
            }
            // Process a peer announcement.
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
                let relayer = peer.id;
@@ -995,7 +956,7 @@ where
                    // 2. Don't relay to the peer who signed this announcement.
                    let relay_to = self
                        .sessions
-
                        .negotiated()
+
                        .connected()
                        .filter(|(id, _)| *id != remote && *id != &ann.node);

                    self.reactor.relay(ann.clone(), relay_to.map(|(_, p)| p));
@@ -1072,6 +1033,22 @@ where
        Ok(())
    }

+
    /// Set of initial messages to send to a peer.
+
    fn initial(&self, _link: Link) -> Vec<Message> {
+
        let filter = self.filter();
+

+
        // TODO: Only subscribe to outbound connections, otherwise we will consume too
+
        // much bandwidth.
+

+
        gossip::handshake(
+
            self.clock.as_millis(),
+
            &self.storage,
+
            &self.signer,
+
            filter,
+
            &self.config,
+
        )
+
    }
+

    /// Sync, and if needed, announce our local inventory.
    fn sync_and_announce_inventory(&mut self) -> Result<Vec<Id>, Error> {
        let inventory = self.storage.inventory()?;
@@ -1124,7 +1101,7 @@ where
    /// Announce local refs for given id.
    fn announce_refs(&mut self, rid: Id, namespaces: Namespaces) -> Result<(), storage::Error> {
        let repo = self.storage.repository(rid)?;
-
        let peers = self.sessions.negotiated().map(|(_, p)| p);
+
        let peers = self.sessions.connected().map(|(_, p)| p);
        let timestamp = self.time();
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

@@ -1193,7 +1170,7 @@ where
    fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
        let time = self.time();
        let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
-
        for (_, sess) in self.sessions.negotiated() {
+
        for (_, sess) in self.sessions.connected() {
            self.reactor.write(sess, inv.clone());
        }
        Ok(())
@@ -1216,7 +1193,7 @@ where
    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
        let stale = self
            .sessions
-
            .negotiated()
+
            .connected()
            .filter(|(_, session)| session.last_active < *now - STALE_CONNECTION_TIMEOUT);

        for (_, session) in stale {
@@ -1231,7 +1208,7 @@ where
    fn keep_alive(&mut self, now: &LocalTime) {
        let inactive_sessions = self
            .sessions
-
            .negotiated_mut()
+
            .connected_mut()
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
            .map(|(_, session)| session);
        for session in inactive_sessions {
@@ -1451,8 +1428,8 @@ impl Sessions {
        Self(AddressBook::new(rng))
    }

-
    /// Iterator over fully negotiated peers.
-
    pub fn negotiated(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
+
    /// Iterator over fully connected peers.
+
    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
        self.0
            .iter()
            .filter_map(move |(id, sess)| match &sess.state {
@@ -1461,13 +1438,13 @@ impl Sessions {
            })
    }

-
    /// Iterator over mutable fully negotiated peers.
-
    pub fn negotiated_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
+
    /// Iterator over mutable fully connected peers.
+
    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
        self.0.iter_mut().filter(move |(_, p)| p.is_connected())
    }

    /// Return whether this node has a fully established session.
-
    pub fn is_negotiated(&self, id: &NodeId) -> bool {
+
    pub fn is_connected(&self, id: &NodeId) -> bool {
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
    }

@@ -1540,7 +1517,6 @@ mod gossip {
        };

        let mut msgs = vec![
-
            Message::init(*signer.public_key()),
            Message::inventory(gossip::inventory(now, inventory), signer),
            Message::subscribe(
                filter,
modified radicle-node/src/service/message.rs
@@ -309,9 +309,6 @@ impl Announcement {
/// These are the messages peers send to each other.
#[derive(Clone, PartialEq, Eq)]
pub enum Message {
-
    /// The first message sent to a peer after connection.
-
    Initialize { node_id: NodeId },
-

    /// Subscribe to gossip messages matching the filter and time range.
    Subscribe(Subscribe),

@@ -339,10 +336,6 @@ pub enum Message {
}

impl Message {
-
    pub fn init(node_id: NodeId) -> Self {
-
        Self::Initialize { node_id }
-
    }
-

    pub fn announcement(
        node: NodeId,
        message: impl Into<AnnouncementMessage>,
@@ -412,7 +405,6 @@ impl From<Announcement> for Message {
impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::Initialize { .. } => write!(f, "Initialize(..)"),
            Self::Subscribe(Subscribe { since, until, .. }) => {
                write!(f, "Subscribe({since}..{until})")
            }
modified radicle-node/src/service/session.rs
@@ -41,8 +41,6 @@ pub enum State {
    Connecting,
    /// Initial state after handshake protocol hand-off.
    Connected {
-
        /// Whether this session was initialized with a [`Message::Initialize`].
-
        initialized: bool,
        /// Connected since this time.
        since: LocalTime,
        /// Ping state.
@@ -167,7 +165,6 @@ impl Session {
        Self {
            id,
            state: State::Connected {
-
                initialized: false,
                since: time,
                ping: PingState::default(),
                protocol: Protocol::default(),
@@ -193,16 +190,6 @@ impl Session {
        matches!(self.state, State::Disconnected { .. })
    }

-
    pub fn is_negotiated(&self) -> bool {
-
        matches!(
-
            self.state,
-
            State::Connected {
-
                initialized: true,
-
                ..
-
            }
-
        )
-
    }
-

    pub fn is_gossip_allowed(&self) -> bool {
        matches!(
            self.state,
@@ -279,7 +266,6 @@ impl Session {
        );
        self.attempts = 0;
        self.state = State::Connected {
-
            initialized: false,
            since,
            ping: PingState::default(),
            protocol: Protocol::default(),
modified radicle-node/src/test/environment.rs
@@ -138,11 +138,11 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
            let remote_sessions = remote.handle.sessions().unwrap();

            let local_sessions = local_sessions
-
                .negotiated()
+
                .connected()
                .map(|(id, _)| id)
                .collect::<BTreeSet<_>>();
            let remote_sessions = remote_sessions
-
                .negotiated()
+
                .connected()
                .map(|(id, _)| id)
                .collect::<BTreeSet<_>>();

modified radicle-node/src/test/peer.rs
@@ -243,11 +243,8 @@ where

        self.initialize();
        self.service.connected(remote_id, Link::Inbound);
-
        self.receive(remote_id, Message::init(remote_id));

        let mut msgs = self.messages(remote_id);
-
        msgs.find(|m| matches!(m, Message::Initialize { .. }))
-
            .expect("`initialize` must be sent");
        msgs.find(|m| {
            matches!(
                m,
@@ -269,8 +266,6 @@ where
        self.service.connected(remote_id, Link::Outbound);

        let mut msgs = self.messages(remote_id);
-
        msgs.find(|m| matches!(m, Message::Initialize { .. }))
-
            .expect("`initialize` must be sent");
        msgs.find(|m| {
            matches!(
                m,
@@ -281,8 +276,6 @@ where
            )
        })
        .expect("`inventory-announcement` must be sent");
-

-
        self.receive(remote_id, Message::init(remote_id));
    }

    pub fn elapse(&mut self, duration: LocalDuration) {
modified radicle-node/src/tests.rs
@@ -100,7 +100,7 @@ fn test_disconnecting_unresponsive_peer() {
    let bob = Peer::new("bob", [9, 9, 9, 9]);

    alice.connect_to(&bob);
-
    assert_eq!(1, alice.sessions().negotiated().count(), "bob connects");
+
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");
    alice.elapse(STALE_CONNECTION_TIMEOUT + LocalDuration::from_secs(1));
    alice
        .outbox()
@@ -122,7 +122,7 @@ fn test_connection_kept_alive() {

    alice.command(service::Command::Connect(bob.id(), bob.address()));
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
-
    assert_eq!(1, alice.sessions().negotiated().count(), "bob connects");
+
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");

    let mut elapsed: LocalDuration = LocalDuration::from_secs(0);
    let step: LocalDuration = STALE_CONNECTION_TIMEOUT / 10;
@@ -150,7 +150,7 @@ fn test_outbound_connection() {
    let peers = alice
        .service
        .sessions()
-
        .negotiated()
+
        .connected()
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();

@@ -170,7 +170,7 @@ fn test_inbound_connection() {
    let peers = alice
        .service
        .sessions()
-
        .negotiated()
+
        .connected()
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();

@@ -795,7 +795,7 @@ fn test_persistent_peer_reconnect_attempt() {

    let ips = alice
        .sessions()
-
        .negotiated()
+
        .connected()
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();
    assert!(ips.contains(&bob.id()));
modified radicle-node/src/wire/message.rs
@@ -13,7 +13,6 @@ use crate::wire::{Decode, Encode};
#[repr(u16)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MessageType {
-
    Initialize = 0,
    NodeAnnouncement = 2,
    InventoryAnnouncement = 4,
    RefsAnnouncement = 6,
@@ -35,7 +34,6 @@ impl TryFrom<u16> for MessageType {

    fn try_from(other: u16) -> Result<Self, Self::Error> {
        match other {
-
            0 => Ok(MessageType::Initialize),
            2 => Ok(MessageType::NodeAnnouncement),
            4 => Ok(MessageType::InventoryAnnouncement),
            6 => Ok(MessageType::RefsAnnouncement),
@@ -56,7 +54,6 @@ impl Message {

    pub fn type_id(&self) -> u16 {
        match self {
-
            Self::Initialize { .. } => MessageType::Initialize,
            Self::Subscribe { .. } => MessageType::Subscribe,
            Self::Announcement(Announcement { message, .. }) => match message {
                AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
@@ -194,9 +191,6 @@ impl wire::Encode for Message {
        let mut n = self.type_id().encode(writer)?;

        match self {
-
            Self::Initialize { node_id } => {
-
                n += node_id.encode(writer)?;
-
            }
            Self::Subscribe(Subscribe {
                filter,
                since,
@@ -245,11 +239,6 @@ impl wire::Decode for Message {
        let type_id = reader.read_u16::<NetworkEndian>()?;

        match MessageType::try_from(type_id) {
-
            Ok(MessageType::Initialize) => {
-
                let node_id = NodeId::decode(reader)?;
-

-
                Ok(Self::Initialize { node_id })
-
            }
            Ok(MessageType::Subscribe) => {
                let filter = Filter::decode(reader)?;
                let since = Timestamp::decode(reader)?;