Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Don't relay redundant gossip messages
Alexis Sellier committed 3 years ago
commit ed1120f3eb4b25aa416afbaa4f14b90ff10f396e
parent 2c0cdcd1c5466cf48ea6c73cdd6f8d1e14fa4b5e
3 files changed +161 -49
modified radicle-node/src/service.rs
@@ -180,8 +180,6 @@ pub struct Service<R, A, S, G> {
    gossip: Gossip,
    /// Peer sessions, currently or recently connected.
    sessions: Sessions,
-
    /// Keeps track of node states.
-
    nodes: BTreeMap<NodeId, Node>,
    /// Clock. Tells the time.
    clock: LocalTime,
    /// Interface to the I/O reactor.
@@ -253,8 +251,6 @@ where
            clock,
            routing,
            gossip: Gossip::default(),
-
            // FIXME: This should be loaded from the address store.
-
            nodes: BTreeMap::new(),
            reactor: Reactor::default(),
            sessions,
            out_of_sync: false,
@@ -797,7 +793,11 @@ where
        let now = self.clock;
        let timestamp = message.timestamp();
        let relay = self.config.relay;
-
        let peer = self.nodes.entry(*announcer).or_insert_with(Node::default);
+
        let peer = self
+
            .gossip
+
            .nodes
+
            .entry(*announcer)
+
            .or_insert_with(Node::default);

        // Don't allow messages from too far in the future.
        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
@@ -808,7 +808,7 @@ where
            AnnouncementMessage::Inventory(message) => {
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
-
                if !peer.inventory_announced(timestamp) {
+
                if !peer.inventory_announced(announcement.clone()) {
                    debug!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.time());
                    return Ok(false);
                }
@@ -875,19 +875,19 @@ where
                        info!(target: "service", "Routing table updated for {} with seed {relayer}", message.rid);
                    }
                }
+
                // Discard announcement messages we've already seen, otherwise update
+
                // our last seen time.
+
                if !peer.refs_announced(message.rid, announcement.clone()) {
+
                    debug!(target: "service", "Ignoring stale refs announcement from {announcer} (time={timestamp})");
+
                    return Ok(false);
+
                }
+

                // TODO: Buffer/throttle fetches.
                if self
                    .tracking
                    .is_repo_tracked(&message.rid)
                    .expect("Service::handle_announcement: error accessing tracking configuration")
                {
-
                    // Discard announcement messages we've already seen, otherwise update
-
                    // our last seen time.
-
                    if !peer.refs_announced(message.rid, timestamp) {
-
                        debug!(target: "service", "Ignoring stale refs announcement from {announcer} (time={timestamp})");
-
                        return Ok(false);
-
                    }
-

                    // Refs can be relayed by peers who don't have the data in storage,
                    // therefore we only check whether we are connected to the *announcer*,
                    // which is required by the protocol to only announce refs it has.
@@ -923,7 +923,7 @@ where
            ) => {
                // Discard node messages we've already seen, otherwise update
                // our last seen time.
-
                if !peer.node_announced(timestamp) {
+
                if !peer.node_announced(announcement.clone()) {
                    debug!(target: "service", "Ignoring stale node announcement from {announcer}");
                    return Ok(false);
                }
@@ -1004,20 +1004,20 @@ where
            // Process a peer announcement.
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
                let relayer = peer.id;
+
                let announcer = ann.node;

                // Returning true here means that the message should be relayed.
                if self.handle_announcement(&relayer, &ann)? {
-
                    self.gossip.received(ann.clone(), ann.message.timestamp());
-

                    // Choose peers we should relay this message to.
                    // 1. Don't relay to the peer who sent us this message.
                    // 2. Don't relay to the peer who signed this announcement.
                    let relay_to = self
                        .sessions
                        .connected()
-
                        .filter(|(id, _)| *id != remote && *id != &ann.node);
+
                        .filter(|(id, _)| *id != remote && *id != &announcer)
+
                        .map(|(_, p)| p);

-
                    self.reactor.relay(ann.clone(), relay_to.map(|(_, p)| p));
+
                    self.reactor.relay(ann, relay_to);

                    return Ok(());
                }
@@ -1621,31 +1621,31 @@ pub enum LookupError {
    Identity(#[from] IdentityError),
}

-
/// Information on a peer, that we may or may not be connected to.
+
/// Keeps track of the most recent announcements of a node.
#[derive(Default, Debug)]
pub struct Node {
    /// Last ref announcements (per project).
-
    pub last_refs: HashMap<Id, Timestamp>,
+
    pub last_refs: HashMap<Id, Announcement>,
    /// Last inventory announcement.
-
    pub last_inventory: Timestamp,
+
    pub last_inventory: Option<Announcement>,
    /// Last node announcement.
-
    pub last_node: Timestamp,
+
    pub last_node: Option<Announcement>,
}

impl Node {
    /// Process a refs announcement for the given node.
    /// Returns `true` if the timestamp was updated.
-
    pub fn refs_announced(&mut self, id: Id, t: Timestamp) -> bool {
+
    pub fn refs_announced(&mut self, id: Id, ann: Announcement) -> bool {
        match self.last_refs.entry(id) {
            Entry::Vacant(e) => {
-
                e.insert(t);
+
                e.insert(ann);
                return true;
            }
            Entry::Occupied(mut e) => {
                let last = e.get_mut();

-
                if t > *last {
-
                    *last = t;
+
                if ann.timestamp() > last.timestamp() {
+
                    *last = ann;
                    return true;
                }
            }
@@ -1655,20 +1655,36 @@ impl Node {

    /// Process an inventory announcement for the given node.
    /// Returns `true` if the timestamp was updated.
-
    pub fn inventory_announced(&mut self, t: Timestamp) -> bool {
-
        if t > self.last_inventory {
-
            self.last_inventory = t;
-
            return true;
+
    pub fn inventory_announced(&mut self, ann: Announcement) -> bool {
+
        match &mut self.last_inventory {
+
            Some(last) => {
+
                if ann.timestamp() > last.timestamp() {
+
                    *last = ann;
+
                    return true;
+
                }
+
            }
+
            None => {
+
                self.last_inventory = Some(ann);
+
                return true;
+
            }
        }
        false
    }

    /// Process a node announcement for the given node.
    /// Returns `true` if the timestamp was updated.
-
    pub fn node_announced(&mut self, t: Timestamp) -> bool {
-
        if t > self.last_node {
-
            self.last_node = t;
-
            return true;
+
    pub fn node_announced(&mut self, ann: Announcement) -> bool {
+
        match &mut self.last_node {
+
            Some(last) => {
+
                if ann.timestamp() > last.timestamp() {
+
                    *last = ann;
+
                    return true;
+
                }
+
            }
+
            None => {
+
                self.last_node = Some(ann);
+
                return true;
+
            }
        }
        false
    }
@@ -1738,28 +1754,30 @@ mod gossip {

    #[derive(Default, Debug)]
    pub struct Gossip {
-
        received: Vec<(Timestamp, Announcement)>,
+
        // FIXME: This should be loaded from the address store.
+
        /// Keeps track of node announcements.
+
        pub nodes: BTreeMap<NodeId, Node>,
    }

    impl Gossip {
-
        // TODO: Overwrite old messages from the same node or project.
-
        // TODO: Should "time" be this node's time, or the time inside the message?
-
        pub fn received(&mut self, ann: Announcement, time: Timestamp) {
-
            self.received.push((time, ann));
-
        }
-

        pub fn filtered<'a>(
            &'a self,
            filter: &'a Filter,
            start: Timestamp,
            end: Timestamp,
        ) -> impl Iterator<Item = Announcement> + '_ {
-
            self.received
-
                .iter()
-
                .filter(move |(t, _)| *t >= start && *t < end)
-
                .filter(move |(_, a)| a.matches(filter))
-
                .cloned()
-
                .map(|(_, ann)| ann)
+
            self.nodes
+
                .values()
+
                .flat_map(|n| {
+
                    [&n.last_node, &n.last_inventory]
+
                        .into_iter()
+
                        .flatten()
+
                        .chain(n.last_refs.values())
+
                        .cloned()
+
                        .collect::<Vec<_>>()
+
                })
+
                .filter(move |ann| ann.timestamp() >= start && ann.timestamp() < end)
+
                .filter(move |ann| ann.matches(filter))
        }
    }

modified radicle-node/src/service/message.rs
@@ -307,6 +307,16 @@ impl Announcement {
            AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
        }
    }
+

+
    /// Check whether this announcement is of the same variant as another.
+
    pub fn variant_eq(&self, other: &Self) -> bool {
+
        std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
+
    }
+

+
    /// Get the announcement timestamp.
+
    pub fn timestamp(&self) -> Timestamp {
+
        self.message.timestamp()
+
    }
}

/// Message payload.
@@ -339,6 +349,21 @@ pub enum Message {
    FetchOk { rid: Id },
}

+
impl PartialOrd for Message {
+
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+
        Some(self.cmp(other))
+
    }
+
}
+

+
impl Ord for Message {
+
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+
        let this = wire::serialize(self);
+
        let other = wire::serialize(other);
+

+
        this.cmp(&other)
+
    }
+
}
+

impl Message {
    pub fn announcement(
        node: NodeId,
modified radicle-node/src/tests.rs
@@ -1,5 +1,6 @@
mod e2e;

+
use std::collections::BTreeSet;
use std::default::*;
use std::io;
use std::sync::Arc;
@@ -426,11 +427,77 @@ fn test_announcement_rebroadcast() {
        }),
    );

-
    let relayed = alice.messages(eve.id()).collect::<Vec<_>>();
+
    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();
+
    let received = received.into_iter().collect::<BTreeSet<_>>();
+

    assert_eq!(relayed, received);
}

#[test]
+
fn test_announcement_rebroadcast_duplicates() {
+
    let mut carol = Peer::new("carol", [4, 4, 4, 4]);
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let eve = Peer::new("eve", [9, 9, 9, 9]);
+
    let rids = arbitrary::set::<Id>(3..=3);
+

+
    alice.connect_to(&bob);
+

+
    // These are not expected to be relayed.
+
    let stale = {
+
        let mut anns = BTreeSet::new();
+

+
        for _ in 0..5 {
+
            carol.elapse(LocalDuration::from_mins(1));
+

+
            anns.insert(carol.inventory_announcement());
+
            anns.insert(carol.node_announcement());
+
        }
+
        anns
+
    };
+

+
    // These are expected to be relayed.
+
    let expected = {
+
        let mut anns = BTreeSet::new();
+

+
        carol.elapse(LocalDuration::from_mins(1));
+
        anns.insert(carol.inventory_announcement());
+
        anns.insert(carol.node_announcement());
+

+
        for rid in rids {
+
            alice.track_repo(&rid, tracking::Scope::All).unwrap();
+
            anns.insert(carol.refs_announcement(rid));
+
            anns.insert(bob.refs_announcement(rid));
+
        }
+
        anns
+
    };
+

+
    let mut all = stale.iter().chain(expected.iter()).collect::<Vec<_>>();
+
    fastrand::shuffle(&mut all);
+

+
    // Alice receives all messages out of order.
+
    for ann in all {
+
        alice.receive(bob.id, ann.clone());
+
    }
+

+
    // Alice relays just the expected ones back to Eve.
+
    alice.connect_from(&eve);
+
    alice.receive(
+
        eve.id(),
+
        Message::Subscribe(Subscribe {
+
            filter: Filter::default(),
+
            since: Timestamp::MIN,
+
            until: Timestamp::MAX,
+
        }),
+
    );
+

+
    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();
+

+
    assert_eq!(relayed.len(), 8);
+
    assert_eq!(relayed, expected);
+
}
+

+
#[test]
fn test_announcement_rebroadcast_timestamp_filtered() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
@@ -464,7 +531,9 @@ fn test_announcement_rebroadcast_timestamp_filtered() {
        }),
    );

-
    let relayed = alice.messages(eve.id()).collect::<Vec<_>>();
+
    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();
+
    let second = second.into_iter().collect::<BTreeSet<_>>();
+

    assert_eq!(relayed.len(), second.len());
    assert_eq!(relayed, second);
}