Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: handle announcement relay correctly
Alexis Sellier committed 3 years ago
commit 7c459719981b6b57c7eb6efd173f2d19da3e682d
parent 45ea9cc97d1d5bfa3912a2cf04500e892b681aa1
3 files changed +133 -22
modified radicle-node/src/service.rs
@@ -568,19 +568,8 @@ where

    pub fn received_message(&mut self, addr: &net::SocketAddr, envelope: Envelope) {
        match self.handle_message(addr, envelope) {
-
            Ok(relay) => {
-
                if let Some(ann) = relay {
-
                    let negotiated = self
-
                        .sessions
-
                        .negotiated()
-
                        .filter(|(ip, _)| **ip != addr.ip())
-
                        .map(|(_, p)| p);
-

-
                    self.reactor.relay(ann, negotiated.clone());
-
                }
-
            }
            Err(SessionError::NotFound(ip)) => {
-
                error!("Session not found for {}", ip);
+
                error!("Session not found for {ip}");
            }
            Err(err) => {
                // If there's an error, stop processing messages from this peer.
@@ -590,6 +579,7 @@ where
                // FIXME: The peer should be set in a state such that we don'that
                // process further messages.
            }
+
            Ok(()) => {}
        }
    }

@@ -726,7 +716,7 @@ where
        &mut self,
        remote: &net::SocketAddr,
        envelope: Envelope,
-
    ) -> Result<Option<Announcement>, peer::SessionError> {
+
    ) -> Result<(), peer::SessionError> {
        let peer_ip = remote.ip();
        let peer = if let Some(peer) = self.sessions.get_mut(&peer_ip) {
            peer
@@ -790,7 +780,19 @@ where
                // Returning true here means that the message should be relayed.
                if self.handle_announcement(&id, &git, &ann)? {
                    self.gossip.received(ann.clone(), ann.message.timestamp());
-
                    return Ok(Some(ann));
+

+
                    // Choose peers we should relay this message to.
+
                    // 1. Don't relay to the peer who sent us this message.
+
                    // 2. Don't relay to the peer who signed this announcement.
+
                    let relay_to = self
+
                        .sessions
+
                        .negotiated()
+
                        .filter(|(ip, _, _)| **ip != remote.ip())
+
                        .filter(|(_, id, _)| **id != ann.node);
+

+
                    self.reactor.relay(ann.clone(), relay_to.map(|(_, _, p)| p));
+

+
                    return Ok(());
                }
            }
            (SessionState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
@@ -813,7 +815,7 @@ where
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.ip());
            }
        }
-
        Ok(None)
+
        Ok(())
    }

    /// Process a peer inventory announcement by updating our routing table.
@@ -842,7 +844,7 @@ where
        let node = self.node_id();
        let repo = self.storage.repository(id)?;
        let remote = repo.remote(&node)?;
-
        let peers = self.sessions.negotiated().map(|(_, p)| p);
+
        let peers = self.sessions.negotiated().map(|(_, _, p)| p);
        let refs = remote.refs.into();
        let timestamp = self.clock.timestamp();
        let msg = AnnouncementMessage::from(RefsAnnouncement {
@@ -869,7 +871,7 @@ where
            &self.signer,
        );

-
        for addr in self.sessions.negotiated().map(|(_, p)| p.addr) {
+
        for addr in self.sessions.negotiated().map(|(_, _, p)| p.addr) {
            self.reactor.write(addr, inv.clone());
        }
        Ok(())
@@ -1069,8 +1071,13 @@ impl Sessions {
    }

    /// Iterator over fully negotiated peers.
-
    pub fn negotiated(&self) -> impl Iterator<Item = (&IpAddr, &Session)> + Clone {
-
        self.0.iter().filter(move |(_, p)| p.is_negotiated())
+
    pub fn negotiated(&self) -> impl Iterator<Item = (&IpAddr, &NodeId, &Session)> + Clone {
+
        self.0
+
            .iter()
+
            .filter_map(move |(ip, sess)| match &sess.state {
+
                SessionState::Negotiated { id, .. } => Some((ip, id, sess)),
+
                _ => None,
+
            })
    }
}

modified radicle-node/src/test/peer.rs
@@ -1,3 +1,4 @@
+
use std::collections::BTreeMap;
use std::iter;
use std::net;
use std::ops::{Deref, DerefMut};
@@ -8,6 +9,7 @@ use crate::address;
use crate::clock::{RefClock, Timestamp};
use crate::git;
use crate::git::Url;
+
use crate::node;
use crate::prelude::NodeId;
use crate::service;
use crate::service::config::*;
@@ -15,6 +17,7 @@ use crate::service::message::*;
use crate::service::reactor::Io;
use crate::service::*;
use crate::storage::WriteStorage;
+
use crate::test::arbitrary;
use crate::test::signer::MockSigner;
use crate::test::simulator;
use crate::{Link, LocalTime};
@@ -142,6 +145,45 @@ where
            .received_message(peer, self.config().network.envelope(msg));
    }

+
    pub fn inventory_announcement(&self) -> Message {
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: arbitrary::gen(3),
+
                timestamp: self.timestamp(),
+
            },
+
            self.signer(),
+
        )
+
    }
+

+
    pub fn node_announcement(&self) -> Message {
+
        let mut alias = [0u8; 32];
+
        alias[..self.name.len()].copy_from_slice(self.name.as_bytes());
+

+
        Message::node(
+
            NodeAnnouncement {
+
                features: node::Features::SEED,
+
                timestamp: self.timestamp(),
+
                alias,
+
                addresses: vec![net::SocketAddr::from((self.ip, service::DEFAULT_PORT)).into()],
+
            },
+
            self.signer(),
+
        )
+
    }
+

+
    pub fn refs_announcement(&self) -> Message {
+
        let inv = self.inventory().unwrap();
+
        let id = inv[self.rng.usize(..inv.len())];
+
        let refs = BTreeMap::new().into();
+
        let ann = AnnouncementMessage::from(RefsAnnouncement {
+
            id,
+
            refs,
+
            timestamp: self.timestamp(),
+
        });
+
        let msg = ann.signed(self.signer());
+

+
        msg.into()
+
    }
+

    pub fn connect_from(&mut self, peer: &Self) {
        let remote = simulator::Peer::<S>::addr(peer);
        let local = net::SocketAddr::new(self.ip, self.rng.u16(..));
modified radicle-node/src/test/tests.rs
@@ -48,7 +48,7 @@ fn test_outbound_connection() {
        .service
        .sessions()
        .negotiated()
-
        .map(|(ip, _)| *ip)
+
        .map(|(ip, _, _)| *ip)
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.ip));
@@ -68,7 +68,7 @@ fn test_inbound_connection() {
        .service
        .sessions()
        .negotiated()
-
        .map(|(ip, _)| *ip)
+
        .map(|(ip, _, _)| *ip)
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.ip));
@@ -273,6 +273,68 @@ fn test_gossip_rebroadcast_timestamp_filtered() {
}

#[test]
+
fn test_announcement_relay() {
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
+
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
+

+
    alice.connect_to(&bob);
+
    alice.connect_to(&eve);
+
    alice.receive(&bob.addr(), bob.inventory_announcement());
+

+
    assert_matches!(
+
        alice.messages(&eve.addr()).next(),
+
        Some(Message::Announcement(_))
+
    );
+

+
    alice.receive(&bob.addr(), bob.inventory_announcement());
+
    assert!(
+
        alice.messages(&eve.addr()).next().is_none(),
+
        "Another inventory with the same timestamp is ignored"
+
    );
+

+
    bob.clock().elapse(LocalDuration::from_mins(1));
+
    alice.receive(&bob.addr(), bob.inventory_announcement());
+
    assert_matches!(
+
        alice.messages(&eve.addr()).next(),
+
        Some(Message::Announcement(_)),
+
        "Another inventory with a fresher timestamp is relayed"
+
    );
+

+
    alice.receive(&bob.addr(), bob.node_announcement());
+
    assert_matches!(
+
        alice.messages(&eve.addr()).next(),
+
        Some(Message::Announcement(_)),
+
        "A node announcement with the same timestamp as the inventory is relayed"
+
    );
+

+
    alice.receive(&bob.addr(), bob.node_announcement());
+
    assert!(alice.messages(&eve.addr()).next().is_none(), "Only once");
+

+
    alice.receive(&eve.addr(), eve.node_announcement());
+
    assert_matches!(
+
        alice.messages(&bob.addr()).next(),
+
        Some(Message::Announcement(_)),
+
        "A node announcement from Eve is relayed to Bob"
+
    );
+
    assert!(
+
        alice.messages(&eve.addr()).next().is_none(),
+
        "But not back to Eve"
+
    );
+

+
    eve.clock().elapse(LocalDuration::from_mins(1));
+
    alice.receive(&bob.addr(), eve.node_announcement());
+
    assert!(
+
        alice.messages(&bob.addr()).next().is_none(),
+
        "Bob already know about this message, since he sent it"
+
    );
+
    assert!(
+
        alice.messages(&eve.addr()).next().is_none(),
+
        "Eve already know about this message, since she signed it"
+
    );
+
}
+

+
#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
@@ -395,7 +457,7 @@ fn test_persistent_peer_reconnect() {
    let ips = alice
        .sessions()
        .negotiated()
-
        .map(|(ip, _)| *ip)
+
        .map(|(ip, _, _)| *ip)
        .collect::<Vec<_>>();
    assert!(ips.contains(&bob.ip));
    assert!(ips.contains(&eve.ip));