Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Implement staggered broadcast for gossip
Merged did:key:z6MksFqX...wzpT opened 1 year ago

We use the staggered broadcast technique when relaying gossip messages to reduce message amplification. In our basic simulation, it brings amplification down from 3.5 to 1.5.

The idea is simply to accumulate gossip messages and relay them after a fixed interval of time. This has two effects:

  1. Old messages that haven’t been sent yet are replaced by newer ones, so only one message is sent after the delay.
  2. Because nodes have their own intervals, messages are not all sent at the same time, which reduces the chance of messages crossing paths.

For now we only turn this on for inventory messages, since this is the least likely to be noticed by users, and also the most problematic currently.

8 files changed +556 -143 34d213ad 0834e0fc
modified radicle-cli/tests/commands.rs
@@ -1067,6 +1067,7 @@ fn rad_patch_delete() {
    bob.handle.seed(acme, Scope::All).unwrap();
    seed.handle.seed(acme, Scope::All).unwrap();
    alice.connect(&bob).connect(&seed).converge([&bob, &seed]);
+
    bob.routes_to(&[(acme, seed.id)]);

    test(
        "examples/rad-clone.md",
modified radicle-node/src/service.rs
@@ -70,6 +70,8 @@ use self::policy::NamespacesError;

/// How often to run the "idle" task.
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
+
/// How often to run the "gossip" task.
+
pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
/// How often to run the "announce" task.
pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
/// How often to run the "sync" task.
@@ -89,7 +91,7 @@ pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
/// How far back from the present time should we request gossip messages when connecting to a peer,
/// when we come online for the first time.
pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
-
/// When subscribing, what margin of error do we give ourselves. A gigher delta means we ask for
+
/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
/// messages further back than strictly necessary, to account for missed messages.
pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
/// Minimum amount of time to wait before reconnecting to a peer.
@@ -409,6 +411,9 @@ pub struct Service<D, S, G> {
    sessions: Sessions,
    /// Clock. Tells the time.
    clock: LocalTime,
+
    /// Who relayed what announcement to us. We keep track of this to ensure that
+
    /// we don't relay messages to nodes that already know about these messages.
+
    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
    /// I/O outbox.
    outbox: Outbox,
    /// Cached local node announcement.
@@ -427,6 +432,8 @@ pub struct Service<D, S, G> {
    filter: Filter,
    /// Last time the service was idle.
    last_idle: LocalTime,
+
    /// Last time the gossip messages were relayed.
+
    last_gossip: LocalTime,
    /// Last time the service synced.
    last_sync: LocalTime,
    /// Last time the service routing table was pruned.
@@ -437,6 +444,8 @@ pub struct Service<D, S, G> {
    last_timestamp: Timestamp,
    /// Time when the service was initialized, or `None` if it wasn't initialized.
    started_at: Option<LocalTime>,
+
    /// Time when the service was last online, or `None` if this is the first time.
+
    last_online_at: Option<LocalTime>,
    /// Publishes events to subscribers.
    emitter: Emitter<Event>,
    /// Local listening addresses.
@@ -502,12 +511,15 @@ where
            fetching: HashMap::new(),
            queue: VecDeque::new(),
            filter: Filter::empty(),
+
            relayed_by: HashMap::default(),
            last_idle: LocalTime::default(),
+
            last_gossip: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
            last_timestamp,
            last_announce: LocalTime::default(),
-
            started_at: None, // Updated on initialize.
+
            started_at: None,     // Updated on initialize.
+
            last_online_at: None, // Updated on initialize.
            emitter,
            listening: vec![],
            metrics: Metrics::default(),
@@ -623,8 +635,16 @@ where

        let nid = self.node_id();

-
        self.started_at = Some(time);
        self.clock = time;
+
        self.started_at = Some(time);
+
        self.last_online_at = match self.db.gossip().last() {
+
            Ok(Some(last)) => Some(last.to_local_time()),
+
            Ok(None) => None,
+
            Err(e) => {
+
                error!(target: "service", "Error getting the lastest gossip message from db: {e}");
+
                None
+
            }
+
        };

        // Populate refs database. This is only useful as part of the upgrade process for nodes
        // that have been online since before the refs database was created.
@@ -726,6 +746,7 @@ where
        self.maintain_connections();
        // Start periodic tasks.
        self.outbox.wakeup(IDLE_INTERVAL);
+
        self.outbox.wakeup(GOSSIP_INTERVAL);

        Ok(())
    }
@@ -769,6 +790,15 @@ where
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
+
        if now - self.last_gossip >= GOSSIP_INTERVAL {
+
            trace!(target: "service", "Running 'gossip' task...");
+

+
            if let Err(e) = self.relay_announcements() {
+
                error!(target: "service", "Error relaying stored announcements: {e}");
+
            }
+
            self.outbox.wakeup(GOSSIP_INTERVAL);
+
            self.last_gossip = now;
+
        }
        if now - self.last_sync >= SYNC_INTERVAL {
            trace!(target: "service", "Running 'sync' task...");

@@ -1399,9 +1429,10 @@ where
    /// and `false` if it should not.
    pub fn handle_announcement(
        &mut self,
+
        relayer: &NodeId,
        relayer_addr: &Address,
        announcement: &Announcement,
-
    ) -> Result<bool, session::Error> {
+
    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
        if !announcement.verify() {
            return Err(session::Error::Misbehavior);
        }
@@ -1413,15 +1444,10 @@ where

        // Ignore our own announcements, in case the relayer sent one by mistake.
        if announcer == self.nid() {
-
            return Ok(false);
+
            return Ok(None);
        }
        let now = self.clock;
        let timestamp = message.timestamp();
-
        // To avoid spamming peers on startup with historical gossip messages,
-
        // don't relay messages that are too old. We make an exception for node announcements,
-
        // since they are cached, and will hence often carry old timestamps.
-
        let relay =
-
            message.is_node_announcement() || now - timestamp.to_local_time() <= MAX_TIME_DELTA;

        // Don't allow messages from too far in the future.
        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
@@ -1441,29 +1467,47 @@ where
                Ok(node) => {
                    if node.is_none() {
                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
-
                        return Ok(false);
+
                        return Ok(None);
                    }
                }
                Err(e) => {
                    error!(target: "service", "Error looking up node in address book: {e}");
-
                    return Ok(false);
+
                    return Ok(None);
                }
            }
        }

        // Discard announcement messages we've already seen, otherwise update our last seen time.
-
        match self.db.gossip_mut().announced(announcer, announcement) {
-
            Ok(fresh) => {
-
                if !fresh {
-
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
-
                    return Ok(false);
-
                }
+
        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
+
            Ok(Some(id)) => {
+
                log::debug!(
+
                    target: "service",
+
                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
+
                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
+
                );
+
                // Keep track of who relayed the message for later.
+
                self.relayed_by.entry(id).or_default().push(*relayer);
+

+
                // Decide whether or not to relay this message, if it's fresh.
+
                // To avoid spamming peers on startup with historical gossip messages,
+
                // don't relay messages that are too old. We make an exception for node announcements,
+
                // since they are cached, and will hence often carry old timestamps.
+
                let relay = message.is_node_announcement()
+
                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
+
                relay.then_some(id)
+
            }
+
            Ok(None) => {
+
                // FIXME: Still mark as relayed by this peer.
+
                // FIXME: Refs announcements should not be delayed, since they are only sent
+
                // to subscribers.
+
                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
+
                return Ok(None);
            }
            Err(e) => {
                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
-
                return Ok(false);
+
                return Ok(None);
            }
-
        }
+
        };

        match message {
            // Process a peer inventory update announcement by (maybe) fetching.
@@ -1481,12 +1525,12 @@ where
                    Ok(synced) => {
                        if synced.is_empty() {
                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
-
                            return Ok(false);
+
                            return Ok(None);
                        }
                    }
                    Err(e) => {
                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
-
                        return Ok(false);
+
                        return Ok(None);
                    }
                }

@@ -1534,7 +1578,7 @@ where
                // Empty announcements can be safely ignored.
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
-
                    return Ok(false);
+
                    return Ok(None);
                };
                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
@@ -1587,7 +1631,7 @@ where
                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
                        message.rid
                    );
-
                    return Ok(false);
+
                    return Ok(None);
                }
                // Refs can be relayed by peers who don't have the data in storage,
                // therefore we only check whether we are connected to the *announcer*,
@@ -1661,7 +1705,7 @@ where
                }
            }
        }
-
        Ok(false)
+
        Ok(None)
    }

    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
@@ -1713,21 +1757,22 @@ where
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
                let relayer = peer.id;
                let relayer_addr = peer.addr.clone();
-
                let announcer = ann.node;
-

-
                if self.handle_announcement(&relayer_addr, &ann)? && self.config.is_relay() {
-
                    // Choose peers we should relay this message to.
-
                    // 1. Don't relay to the peer who sent us this message.
-
                    // 2. Don't relay to the peer who signed this announcement.
-
                    let relay_to = self
-
                        .sessions
-
                        .connected()
-
                        .filter(|(id, _)| *id != &relayer && *id != &announcer)
-
                        .map(|(_, p)| p);
-

-
                    self.outbox.relay(ann, relay_to);

-
                    return Ok(());
+
                if let Some(id) = self.handle_announcement(&relayer, &relayer_addr, &ann)? {
+
                    if self.config.is_relay() {
+
                        if let AnnouncementMessage::Inventory(_) = ann.message {
+
                            if let Err(e) = self
+
                                .database_mut()
+
                                .gossip_mut()
+
                                .set_relay(id, gossip::RelayStatus::Relay)
+
                            {
+
                                error!(target: "service", "Error setting relay flag for message: {e}");
+
                                return Ok(());
+
                            }
+
                        } else {
+
                            self.relay(id, ann);
+
                        }
+
                    }
                }
            }
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
@@ -1863,15 +1908,11 @@ where
        //
        // If this is our first connection to the network, we just ask for a fixed backlog
        // of messages to get us started.
-
        let since = match self.db.gossip().last() {
-
            Ok(Some(last)) => Timestamp::from(last.to_local_time() - SUBSCRIBE_BACKLOG_DELTA),
-
            Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into(),
-
            Err(e) => {
-
                error!(target: "service", "Error getting the lastest gossip message from storage: {e}");
-
                return vec![];
-
            }
+
        let since = if let Some(last) = self.last_online_at {
+
            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
+
        } else {
+
            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
        };
-

        debug!(target: "service", "Subscribing to messages since timestamp {since}..");

        vec![
@@ -2181,10 +2222,46 @@ where
        self.last_timestamp
    }

+
    fn relay(&mut self, id: gossip::AnnouncementId, msg: Announcement) {
+
        let announcer = msg.node;
+
        let relayed_by = self.relayed_by.get(&id);
+
        // Choose peers we should relay this message to.
+
        // 1. Don't relay to a peer who sent us this message.
+
        // 2. Don't relay to the peer who signed this announcement.
+
        let relay_to = self
+
            .sessions
+
            .connected()
+
            .filter(|(id, _)| {
+
                relayed_by
+
                    .map(|relayers| !relayers.contains(id))
+
                    .unwrap_or(true) // If there are no relayers we let it through.
+
            })
+
            .filter(|(id, _)| **id != announcer)
+
            .map(|(_, p)| p);
+

+
        self.outbox.relay(msg, relay_to);
+
    }
+

    ////////////////////////////////////////////////////////////////////////////
    // Periodic tasks
    ////////////////////////////////////////////////////////////////////////////

+
    fn relay_announcements(&mut self) -> Result<(), Error> {
+
        let now = self.clock.into();
+
        let rows = self.database_mut().gossip_mut().relays(now)?;
+
        let local = self.node_id();
+

+
        for (id, msg) in rows {
+
            let announcer = msg.node;
+
            if announcer == local {
+
                // Don't relay our own stored gossip messages.
+
                continue;
+
            }
+
            self.relay(id, msg);
+
        }
+
        Ok(())
+
    }
+

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
        let msg = AnnouncementMessage::from(self.inventory.clone());
modified radicle-node/src/service/gossip.rs
@@ -2,8 +2,7 @@ pub mod store;

use super::*;

-
pub use store::Error;
-
pub use store::Store;
+
pub use store::{AnnouncementId, Error, RelayStatus, Store};

pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
    let features = config.features();
modified radicle-node/src/service/gossip/store.rs
@@ -23,6 +23,9 @@ pub enum Error {
    UnitOverflow(#[from] TryFromIntError),
}

+
/// Unique announcement identifier.
+
pub type AnnouncementId = u64;
+

/// A database that has access to historical gossip messages.
/// Keeps track of the latest received gossip messages for each node.
/// Grows linearly with the number of nodes on the network.
@@ -35,7 +38,17 @@ pub trait Store {

    /// Process an announcement for the given node.
    /// Returns `true` if the timestamp was updated or the announcement wasn't there before.
-
    fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error>;
+
    fn announced(
+
        &mut self,
+
        nid: &NodeId,
+
        ann: &Announcement,
+
    ) -> Result<Option<AnnouncementId>, Error>;
+

+
    /// Set whether a message should be relayed or not.
+
    fn set_relay(&mut self, id: AnnouncementId, relay: RelayStatus) -> Result<(), Error>;
+

+
    /// Return messages that should be relayed.
+
    fn relays(&mut self, now: Timestamp) -> Result<Vec<(AnnouncementId, Announcement)>, Error>;

    /// Get all the latest gossip messages of all nodes, filtered by inventory filter and
    /// announcement timestamps.
@@ -78,7 +91,11 @@ impl Store for Database {
        Ok(None)
    }

-
    fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error> {
+
    fn announced(
+
        &mut self,
+
        nid: &NodeId,
+
        ann: &Announcement,
+
    ) -> Result<Option<AnnouncementId>, Error> {
        assert_ne!(
            ann.timestamp(),
            Timestamp::MIN,
@@ -89,7 +106,8 @@ impl Store for Database {
             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
             ON CONFLICT DO UPDATE
             SET message = ?4, signature = ?5, timestamp = ?6
-
             WHERE timestamp < ?6",
+
             WHERE timestamp < ?6
+
             RETURNING rowid",
        )?;
        stmt.bind((1, nid))?;

@@ -112,9 +130,53 @@ impl Store for Database {
        }
        stmt.bind((5, &ann.signature))?;
        stmt.bind((6, &ann.message.timestamp()))?;
+

+
        if let Some(row) = stmt.into_iter().next() {
+
            let row = row?;
+
            let id = row.read::<i64, _>("rowid");
+

+
            Ok(Some(id as AnnouncementId))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    fn set_relay(&mut self, id: AnnouncementId, relay: RelayStatus) -> Result<(), Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE announcements
+
             SET relay = ?1
+
             WHERE rowid = ?2",
+
        )?;
+
        stmt.bind((1, relay))?;
+
        stmt.bind((2, id as i64))?;
        stmt.next()?;

-
        Ok(self.db.change_count() > 0)
+
        Ok(())
+
    }
+

+
    fn relays(&mut self, now: Timestamp) -> Result<Vec<(AnnouncementId, Announcement)>, Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE announcements
+
             SET relay = ?1
+
             WHERE relay IS ?2
+
             RETURNING rowid, node, type, message, signature, timestamp",
+
        )?;
+
        stmt.bind((1, RelayStatus::RelayedAt(now)))?;
+
        stmt.bind((2, RelayStatus::Relay))?;
+

+
        let mut rows = stmt
+
            .into_iter()
+
            .map(|row| {
+
                let row = row?;
+
                parse::announcement(row)
+
            })
+
            .collect::<Result<Vec<_>, _>>()?;
+

+
        // Nb. Manually sort by insertion order, because we can't use `ORDER BY` with `RETURNING`
+
        // as of SQLite 3.45.
+
        rows.sort_by_key(|(id, _)| *id);
+

+
        Ok(rows)
    }

    fn filtered<'a>(
@@ -124,7 +186,7 @@ impl Store for Database {
        to: Timestamp,
    ) -> Result<Box<dyn Iterator<Item = Result<Announcement, Error>> + 'a>, Error> {
        let mut stmt = self.db.prepare(
-
            "SELECT node, type, message, signature, timestamp
+
            "SELECT rowid, node, type, message, signature, timestamp
             FROM announcements
             WHERE timestamp >= ?1 and timestamp < ?2
             ORDER BY timestamp, node, type",
@@ -138,32 +200,9 @@ impl Store for Database {
            stmt.into_iter()
                .map(|row| {
                    let row = row?;
-
                    let node = row.read::<NodeId, _>("node");
-
                    let gt = row.read::<GossipType, _>("type");
-
                    let message = match gt {
-
                        GossipType::Refs => {
-
                            let ann = row.read::<RefsAnnouncement, _>("message");
-
                            AnnouncementMessage::Refs(ann)
-
                        }
-
                        GossipType::Inventory => {
-
                            let ann = row.read::<InventoryAnnouncement, _>("message");
-
                            AnnouncementMessage::Inventory(ann)
-
                        }
-
                        GossipType::Node => {
-
                            let ann = row.read::<NodeAnnouncement, _>("message");
-
                            AnnouncementMessage::Node(ann)
-
                        }
-
                    };
-
                    let signature = row.read::<Signature, _>("signature");
-
                    let timestamp = row.read::<Timestamp, _>("timestamp");
-

-
                    debug_assert_eq!(timestamp, message.timestamp());
-

-
                    Ok(Announcement {
-
                        node,
-
                        message,
-
                        signature,
-
                    })
+
                    let (_, ann) = parse::announcement(row)?;
+

+
                    Ok(ann)
                })
                .filter(|ann| match ann {
                    Ok(a) => a.matches(filter),
@@ -251,6 +290,24 @@ impl From<wire::Error> for sql::Error {
    }
}

+
/// Message relay status.
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum RelayStatus {
+
    Relay,
+
    DontRelay,
+
    RelayedAt(Timestamp),
+
}
+

+
impl sql::BindableWithIndex for RelayStatus {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        match self {
+
            Self::Relay => sql::Value::Null.bind(stmt, i),
+
            Self::DontRelay => sql::Value::Integer(-1).bind(stmt, i),
+
            Self::RelayedAt(t) => t.bind(stmt, i),
+
        }
+
    }
+
}
+

/// Type of gossip message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum GossipType {
@@ -297,6 +354,43 @@ impl TryFrom<&sql::Value> for GossipType {
    }
}

+
mod parse {
+
    use super::*;
+

+
    pub fn announcement(row: sql::Row) -> Result<(AnnouncementId, Announcement), Error> {
+
        let id = row.read::<i64, _>("rowid") as AnnouncementId;
+
        let node = row.read::<NodeId, _>("node");
+
        let gt = row.read::<GossipType, _>("type");
+
        let message = match gt {
+
            GossipType::Refs => {
+
                let ann = row.read::<RefsAnnouncement, _>("message");
+
                AnnouncementMessage::Refs(ann)
+
            }
+
            GossipType::Inventory => {
+
                let ann = row.read::<InventoryAnnouncement, _>("message");
+
                AnnouncementMessage::Inventory(ann)
+
            }
+
            GossipType::Node => {
+
                let ann = row.read::<NodeAnnouncement, _>("message");
+
                AnnouncementMessage::Node(ann)
+
            }
+
        };
+
        let signature = row.read::<Signature, _>("signature");
+
        let timestamp = row.read::<Timestamp, _>("timestamp");
+

+
        debug_assert_eq!(timestamp, message.timestamp());
+

+
        Ok((
+
            id,
+
            Announcement {
+
                node,
+
                message,
+
                signature,
+
            },
+
        ))
+
    }
+
}
+

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
@@ -304,6 +398,7 @@ mod test {
    use crate::prelude::{BoundedVec, RepoId};
    use crate::test::arbitrary;
    use localtime::LocalTime;
+
    use radicle::assert_matches;
    use radicle_crypto::test::signer::MockSigner;

    #[test]
@@ -326,10 +421,26 @@ mod test {
        .signed(&signer);

        // Only the first announcement of each type is recognized as new.
-
        assert!(db.announced(&nid, &refs).unwrap());
-
        assert!(!db.announced(&nid, &refs).unwrap());
+
        let id1 = db.announced(&nid, &refs).unwrap().unwrap();
+
        assert!(db.announced(&nid, &refs).unwrap().is_none());

-
        assert!(db.announced(&nid, &inv).unwrap());
-
        assert!(!db.announced(&nid, &inv).unwrap());
+
        let id2 = db.announced(&nid, &inv).unwrap().unwrap();
+
        assert!(db.announced(&nid, &inv).unwrap().is_none());
+

+
        // Nothing was set to be relayed.
+
        assert_eq!(db.relays(LocalTime::now().into()).unwrap().len(), 0);
+

+
        // Set the messages to be relayed.
+
        db.set_relay(id1, RelayStatus::Relay).unwrap();
+
        db.set_relay(id2, RelayStatus::Relay).unwrap();
+

+
        // Now they are returned.
+
        assert_matches!(
+
            db.relays(LocalTime::now().into()).unwrap().as_slice(),
+
            &[(id1_, _), (id2_, _)]
+
            if id1_ == id1 && id2_ == id2
+
        );
+
        // But only once.
+
        assert_matches!(db.relays(LocalTime::now().into()).unwrap().as_slice(), &[]);
    }
}
modified radicle-node/src/test/peer.rs
@@ -201,11 +201,13 @@ where
    }

    pub fn initialize(&mut self) -> &mut Self {
-
        info!(
-
            target: "test",
-
            "{}: Initializing: id = {}, address = {}",
-
            self.name, self.id, self.ip
-
        );
+
        if !self.initialized {
+
            info!(
+
                target: "test",
+
                "{}: Initializing: id = {}, address = {}",
+
                self.name, self.id, self.ip
+
            );
+
        }
        assert_ne!(self.local_time, LocalTime::default());

        self.initialized = true;
@@ -271,8 +273,9 @@ where
        self.service.node_id()
    }

-
    pub fn receive(&mut self, peer: NodeId, msg: Message) {
+
    pub fn receive(&mut self, peer: NodeId, msg: Message) -> &mut Self {
        self.service.received_message(peer, msg);
+
        self
    }

    pub fn inventory_announcement(&self) -> Message {
modified radicle-node/src/tests.rs
@@ -2,19 +2,23 @@ mod e2e;

use std::collections::BTreeSet;
use std::default::*;
+
use std::env;
use std::io;
use std::sync::Arc;
use std::time;

use crossbeam_channel as chan;
use netservices::Direction as Link;
+
use once_cell::sync::Lazy;
use radicle::identity::Visibility;
-
use radicle::node::address::Store;
+
use radicle::node::address::Store as _;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
use radicle::storage::RefUpdate;
+
use radicle::test::arbitrary::gen;
+
use radicle::test::storage::MockRepository;

use crate::collections::{RandomMap, RandomSet};
use crate::crypto::test::signer::MockSigner;
@@ -41,6 +45,7 @@ use crate::test::peer;
use crate::test::peer::Peer;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};
+

use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
@@ -49,6 +54,16 @@ use crate::worker::fetch;
use crate::LocalTime;
use crate::{git, identity, rad, runtime, service, test};

+
/// Default number of tests to run when testing things with high variance.
+
pub const DEFAULT_TEST_CASES: usize = 10;
+
/// Test cases to run when testing things with high variance.
+
pub static TEST_CASES: Lazy<usize> = Lazy::new(|| {
+
    env::var("RAD_TEST_CASES")
+
        .ok()
+
        .and_then(|s| s.parse::<usize>().ok())
+
        .unwrap_or(DEFAULT_TEST_CASES)
+
});
+

// NOTE
//
// If you wish to see the logs for a running test, simply add the following line to your test:
@@ -590,38 +605,47 @@ fn test_announcement_relay() {

    alice.connect_to(&bob);
    alice.connect_to(&eve);
-
    alice.receive(bob.id(), bob.inventory_announcement());
-

+
    alice
+
        .receive(bob.id(), bob.inventory_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_))
    );

-
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
+
    alice.receive(bob.id(), bob.inventory_announcement());
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "Another inventory with the same timestamp is ignored"
    );

    bob.elapse(LocalDuration::from_mins(1));
-
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
+
    alice
+
        .receive(bob.id(), bob.inventory_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "Another inventory with a fresher timestamp is relayed"
    );

-
    alice.receive(bob.id(), bob.node_announcement());
+
    alice
+
        .receive(bob.id(), bob.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "A node announcement with the same timestamp as the inventory is relayed"
    );

-
    alice.receive(bob.id(), bob.node_announcement());
+
    alice
+
        .receive(bob.id(), bob.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert!(alice.messages(eve.id()).next().is_none(), "Only once");

-
    alice.receive(eve.id(), eve.node_announcement());
+
    alice
+
        .receive(eve.id(), eve.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(bob.id()).next(),
        Some(Message::Announcement(_)),
@@ -633,7 +657,9 @@ fn test_announcement_relay() {
    );

    eve.elapse(LocalDuration::from_mins(1));
-
    alice.receive(bob.id(), eve.node_announcement());
+
    alice
+
        .receive(bob.id(), eve.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert!(
        alice.messages(bob.id()).next().is_none(),
        "Bob already know about this message, since he sent it"
@@ -688,7 +714,9 @@ fn test_refs_announcement_relay() {
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(eve.id(), Message::Subscribe(Subscribe::all()));
-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
+
        .elapse(service::GOSSIP_INTERVAL);

    assert_matches!(
        alice.messages(eve.id()).next(),
@@ -696,20 +724,26 @@ fn test_refs_announcement_relay() {
        "A refs announcement from Bob is relayed to Eve"
    );

-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
+
        .elapse(service::GOSSIP_INTERVAL);
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "The same ref announement is not relayed"
    );

-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[1]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[1]))
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "But a different one is"
    );

-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[2]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[2]))
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
@@ -938,6 +972,8 @@ fn test_refs_announcement_offline() {

#[test]
fn test_inventory_relay() {
+
    logger::init(log::Level::Debug);
+

    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
@@ -950,16 +986,19 @@ fn test_inventory_relay() {
    alice.wake(); // Run all periodic tasks now so they don't trigger later.
    alice.connect_to(&bob);
    alice.connect_from(&eve);
-
    alice.receive(
-
        bob.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv.clone(),
-
                timestamp: now,
-
            },
-
            bob.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            bob.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv.clone(),
+
                    timestamp: now,
+
                },
+
                bob.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
@@ -975,32 +1014,38 @@ fn test_inventory_relay() {
        "The inventory is not sent back to Bob"
    );

-
    alice.receive(
-
        bob.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv.clone(),
-
                timestamp: now,
-
            },
-
            bob.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            bob.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv.clone(),
+
                    timestamp: now,
+
                },
+
                bob.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        None,
        "Sending the same inventory again doesn't trigger a relay"
    );

-
    alice.receive(
-
        bob.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv.clone(),
-
                timestamp: now + 1,
-
            },
-
            bob.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            bob.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv.clone(),
+
                    timestamp: now + 1,
+
                },
+
                bob.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
@@ -1013,16 +1058,19 @@ fn test_inventory_relay() {
    );

    // Inventory from Eve relayed to Bob.
-
    alice.receive(
-
        eve.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv,
-
                timestamp: now,
-
            },
-
            eve.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            eve.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv,
+
                    timestamp: now,
+
                },
+
                eve.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(bob.id()).next(),
        Some(Message::Announcement(Announcement {
@@ -1745,3 +1793,169 @@ fn prop_inventory_exchange_dense() {
        .tests(20)
        .quickcheck(property as fn(MockStorage, MockStorage, MockStorage));
}
+

+
#[test]
+
fn test_announcement_message_amplification() {
+
    let mut results = Vec::new();
+
    let mut rng = fastrand::Rng::new();
+

+
    while results.len() < *TEST_CASES {
+
        let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
        let mut bob = Peer::new("bob", [8, 8, 8, 8]);
+
        let mut eve = Peer::new("eve", [9, 9, 9, 9]);
+
        let mut zod = Peer::new("zod", [5, 5, 5, 5]);
+
        let mut tom = Peer::new("tom", [4, 4, 4, 4]);
+
        let mut sim = Simulation::new(
+
            LocalTime::now(),
+
            alice.rng.clone(),
+
            simulator::Options {
+
                latency: 0..1, // 0 - 1s
+
                failure_rate: 0.,
+
            },
+
        );
+
        let rid = gen::<RepoId>(1);
+

+
        // Make sure the node gossip intervals are not accidentally synchronized.
+
        alice.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        bob.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        eve.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        zod.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        tom.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+

+
        // Fully-connected network.
+
        alice.command(Command::Connect(
+
            bob.id,
+
            bob.address(),
+
            ConnectOptions::default(),
+
        ));
+
        alice.command(Command::Connect(
+
            eve.id,
+
            eve.address(),
+
            ConnectOptions::default(),
+
        ));
+
        alice.command(Command::Connect(
+
            zod.id,
+
            zod.address(),
+
            ConnectOptions::default(),
+
        ));
+
        alice.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            eve.id,
+
            eve.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            zod.id,
+
            zod.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+
        eve.command(Command::Connect(
+
            zod.id,
+
            zod.address(),
+
            ConnectOptions::default(),
+
        ));
+
        eve.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+
        zod.command(Command::Connect(
+
            tom.id,
+
            tom.address(),
+
            ConnectOptions::default(),
+
        ));
+

+
        // Let the nodes connect to each other.
+
        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
+
            s.elapsed() < LocalDuration::from_mins(3)
+
        });
+

+
        // Ensure nodes are all connected, otherwise skip this test run.
+
        if alice.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if bob.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if eve.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if zod.sessions().connected().count() != 4 {
+
            continue;
+
        }
+
        if tom.sessions().connected().count() != 4 {
+
            continue;
+
        }
+

+
        let (tx, _) = chan::bounded(1);
+
        let timestamp = (*alice.clock()).into();
+
        alice
+
            .storage_mut()
+
            .repos
+
            .insert(rid, gen::<MockRepository>(1));
+
        alice.command(Command::UpdateInventory(rid, tx));
+
        alice.command(Command::AnnounceInventory);
+

+
        sim.run_while([&mut alice, &mut bob, &mut eve, &mut zod, &mut tom], |s| {
+
            s.elapsed() < LocalDuration::from_mins(3)
+
        });
+

+
        // Make sure they have the routing table entry.
+
        for node in [&bob, &eve, &zod, &tom] {
+
            assert!(node
+
                .service
+
                .database()
+
                .routing()
+
                .get(&rid)
+
                .unwrap()
+
                .contains(&alice.id));
+
        }
+

+
        // Count how many copies of Alice's inventory message have been received by peers.
+
        let received = sim.messages().iter().filter(|m| {
+
            matches!(
+
                m,
+
                (_, _, Message::Announcement(Announcement {
+
                    node,
+
                    message: AnnouncementMessage::Inventory(i),
+
                    ..
+
                }))
+
                if node == &alice.id && i.inventory.to_vec() == vec![rid] && i.timestamp == timestamp
+
            )
+
        });
+
        results.push(received.count());
+
    }
+
    // Calculate the average amplification factor based on all simulation runs.
+
    let avg = results.iter().sum::<usize>() as f64 / results.len() as f64;
+
    // Amplification is total divided by minimum, ie. it's a relative metric.
+
    let amp = avg / 4.;
+

+
    // The worse case scenario is (n - 1)^2 messages received for one message announced.
+
    // In the above case of 5 nodes, this is 4 * 4 = 16 messages. This is an amplification of 4.0.
+
    // The best case is an amplification of 1.0, ie. each node receives the message once only.
+
    //
+
    // By using delayed message propagation though, we can bring this down closer to the minimum.
+
    log::debug!(target: "test", "Average message amplification: {amp}");
+

+
    assert!(amp < 2., "Amplification factor of {amp} is too high");
+
    assert!(amp >= 1., "Amplification can't be lower than 1");
+
}
modified radicle/src/node/db.rs
@@ -29,6 +29,7 @@ const MIGRATIONS: &[&str] = &[
    include_str!("db/migrations/2.sql"),
    include_str!("db/migrations/3.sql"),
    include_str!("db/migrations/4.sql"),
+
    include_str!("db/migrations/5.sql"),
];

#[derive(Error, Debug)]
added radicle/src/node/db/migrations/5.sql
@@ -0,0 +1,7 @@
+
-- Add the "relay" column.
+
-- The "relay" column can be set in several different ways:
+
--
+
-- * If set to `-1`, it means this announcement should *not* be relayed.
+
-- * If set to `NULL`, it means it *should* be relayed.
+
-- * If set to a positive integer, it means it has been relayed at that UNIX timestamp.
+
alter table "announcements" add column "relay" integer default -1;