Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement staggered broadcast for gossip
cloudhead committed 1 year ago
commit 0834e0fc7d3c2bca164c33a8e11818dc19a892a6
parent 3075a399e9fba16700421171d5bdc4960561b432
8 files changed +387 -132
modified radicle-cli/tests/commands.rs
@@ -1067,6 +1067,7 @@ fn rad_patch_delete() {
    bob.handle.seed(acme, Scope::All).unwrap();
    seed.handle.seed(acme, Scope::All).unwrap();
    alice.connect(&bob).connect(&seed).converge([&bob, &seed]);
+
    bob.routes_to(&[(acme, seed.id)]);

    test(
        "examples/rad-clone.md",
modified radicle-node/src/service.rs
@@ -70,6 +70,8 @@ use self::policy::NamespacesError;

/// How often to run the "idle" task.
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
+
/// How often to run the "gossip" task.
+
pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
/// How often to run the "announce" task.
pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
/// How often to run the "sync" task.
@@ -409,6 +411,9 @@ pub struct Service<D, S, G> {
    sessions: Sessions,
    /// Clock. Tells the time.
    clock: LocalTime,
+
    /// Who relayed what announcement to us. We keep track of this to ensure that
+
    /// we don't relay messages to nodes that already know about these messages.
+
    relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
    /// I/O outbox.
    outbox: Outbox,
    /// Cached local node announcement.
@@ -427,6 +432,8 @@ pub struct Service<D, S, G> {
    filter: Filter,
    /// Last time the service was idle.
    last_idle: LocalTime,
+
    /// Last time the gossip messages were relayed.
+
    last_gossip: LocalTime,
    /// Last time the service synced.
    last_sync: LocalTime,
    /// Last time the service routing table was pruned.
@@ -504,7 +511,9 @@ where
            fetching: HashMap::new(),
            queue: VecDeque::new(),
            filter: Filter::empty(),
+
            relayed_by: HashMap::default(),
            last_idle: LocalTime::default(),
+
            last_gossip: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
            last_timestamp,
@@ -737,6 +746,7 @@ where
        self.maintain_connections();
        // Start periodic tasks.
        self.outbox.wakeup(IDLE_INTERVAL);
+
        self.outbox.wakeup(GOSSIP_INTERVAL);

        Ok(())
    }
@@ -780,6 +790,15 @@ where
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
+
        if now - self.last_gossip >= GOSSIP_INTERVAL {
+
            trace!(target: "service", "Running 'gossip' task...");
+

+
            if let Err(e) = self.relay_announcements() {
+
                error!(target: "service", "Error relaying stored announcements: {e}");
+
            }
+
            self.outbox.wakeup(GOSSIP_INTERVAL);
+
            self.last_gossip = now;
+
        }
        if now - self.last_sync >= SYNC_INTERVAL {
            trace!(target: "service", "Running 'sync' task...");

@@ -1410,9 +1429,10 @@ where
    /// and `false` if it should not.
    pub fn handle_announcement(
        &mut self,
+
        relayer: &NodeId,
        relayer_addr: &Address,
        announcement: &Announcement,
-
    ) -> Result<bool, session::Error> {
+
    ) -> Result<Option<gossip::AnnouncementId>, session::Error> {
        if !announcement.verify() {
            return Err(session::Error::Misbehavior);
        }
@@ -1424,15 +1444,10 @@ where

        // Ignore our own announcements, in case the relayer sent one by mistake.
        if announcer == self.nid() {
-
            return Ok(false);
+
            return Ok(None);
        }
        let now = self.clock;
        let timestamp = message.timestamp();
-
        // To avoid spamming peers on startup with historical gossip messages,
-
        // don't relay messages that are too old. We make an exception for node announcements,
-
        // since they are cached, and will hence often carry old timestamps.
-
        let relay =
-
            message.is_node_announcement() || now - timestamp.to_local_time() <= MAX_TIME_DELTA;

        // Don't allow messages from too far in the future.
        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
@@ -1452,29 +1467,47 @@ where
                Ok(node) => {
                    if node.is_none() {
                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
-
                        return Ok(false);
+
                        return Ok(None);
                    }
                }
                Err(e) => {
                    error!(target: "service", "Error looking up node in address book: {e}");
-
                    return Ok(false);
+
                    return Ok(None);
                }
            }
        }

        // Discard announcement messages we've already seen, otherwise update our last seen time.
-
        match self.db.gossip_mut().announced(announcer, announcement) {
-
            Ok(fresh) => {
-
                if !fresh {
-
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
-
                    return Ok(false);
-
                }
+
        let relay = match self.db.gossip_mut().announced(announcer, announcement) {
+
            Ok(Some(id)) => {
+
                log::debug!(
+
                    target: "service",
+
                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
+
                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
+
                );
+
                // Keep track of who relayed the message for later.
+
                self.relayed_by.entry(id).or_default().push(*relayer);
+

+
                // Decide whether or not to relay this message, if it's fresh.
+
                // To avoid spamming peers on startup with historical gossip messages,
+
                // don't relay messages that are too old. We make an exception for node announcements,
+
                // since they are cached, and will hence often carry old timestamps.
+
                let relay = message.is_node_announcement()
+
                    || now - timestamp.to_local_time() <= MAX_TIME_DELTA;
+
                relay.then_some(id)
+
            }
+
            Ok(None) => {
+
                // FIXME: Still mark as relayed by this peer.
+
                // FIXME: Refs announcements should not be delayed, since they are only sent
+
                // to subscribers.
+
                debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
+
                return Ok(None);
            }
            Err(e) => {
                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
-
                return Ok(false);
+
                return Ok(None);
            }
-
        }
+
        };

        match message {
            // Process a peer inventory update announcement by (maybe) fetching.
@@ -1492,12 +1525,12 @@ where
                    Ok(synced) => {
                        if synced.is_empty() {
                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
-
                            return Ok(false);
+
                            return Ok(None);
                        }
                    }
                    Err(e) => {
                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
-
                        return Ok(false);
+
                        return Ok(None);
                    }
                }

@@ -1545,7 +1578,7 @@ where
                // Empty announcements can be safely ignored.
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
-
                    return Ok(false);
+
                    return Ok(None);
                };
                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
@@ -1598,7 +1631,7 @@ where
                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
                        message.rid
                    );
-
                    return Ok(false);
+
                    return Ok(None);
                }
                // Refs can be relayed by peers who don't have the data in storage,
                // therefore we only check whether we are connected to the *announcer*,
@@ -1672,7 +1705,7 @@ where
                }
            }
        }
-
        Ok(false)
+
        Ok(None)
    }

    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
@@ -1724,21 +1757,22 @@ where
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
                let relayer = peer.id;
                let relayer_addr = peer.addr.clone();
-
                let announcer = ann.node;

-
                if self.handle_announcement(&relayer_addr, &ann)? && self.config.is_relay() {
-
                    // Choose peers we should relay this message to.
-
                    // 1. Don't relay to the peer who sent us this message.
-
                    // 2. Don't relay to the peer who signed this announcement.
-
                    let relay_to = self
-
                        .sessions
-
                        .connected()
-
                        .filter(|(id, _)| *id != &relayer && *id != &announcer)
-
                        .map(|(_, p)| p);
-

-
                    self.outbox.relay(ann, relay_to);
-

-
                    return Ok(());
+
                if let Some(id) = self.handle_announcement(&relayer, &relayer_addr, &ann)? {
+
                    if self.config.is_relay() {
+
                        if let AnnouncementMessage::Inventory(_) = ann.message {
+
                            if let Err(e) = self
+
                                .database_mut()
+
                                .gossip_mut()
+
                                .set_relay(id, gossip::RelayStatus::Relay)
+
                            {
+
                                error!(target: "service", "Error setting relay flag for message: {e}");
+
                                return Ok(());
+
                            }
+
                        } else {
+
                            self.relay(id, ann);
+
                        }
+
                    }
                }
            }
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
@@ -2188,10 +2222,46 @@ where
        self.last_timestamp
    }

+
    fn relay(&mut self, id: gossip::AnnouncementId, msg: Announcement) {
+
        let announcer = msg.node;
+
        let relayed_by = self.relayed_by.get(&id);
+
        // Choose peers we should relay this message to.
+
        // 1. Don't relay to a peer who sent us this message.
+
        // 2. Don't relay to the peer who signed this announcement.
+
        let relay_to = self
+
            .sessions
+
            .connected()
+
            .filter(|(id, _)| {
+
                relayed_by
+
                    .map(|relayers| !relayers.contains(id))
+
                    .unwrap_or(true) // If there are no relayers we let it through.
+
            })
+
            .filter(|(id, _)| **id != announcer)
+
            .map(|(_, p)| p);
+

+
        self.outbox.relay(msg, relay_to);
+
    }
+

    ////////////////////////////////////////////////////////////////////////////
    // Periodic tasks
    ////////////////////////////////////////////////////////////////////////////

+
    fn relay_announcements(&mut self) -> Result<(), Error> {
+
        let now = self.clock.into();
+
        let rows = self.database_mut().gossip_mut().relays(now)?;
+
        let local = self.node_id();
+

+
        for (id, msg) in rows {
+
            let announcer = msg.node;
+
            if announcer == local {
+
                // Don't relay our own stored gossip messages.
+
                continue;
+
            }
+
            self.relay(id, msg);
+
        }
+
        Ok(())
+
    }
+

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
        let msg = AnnouncementMessage::from(self.inventory.clone());
modified radicle-node/src/service/gossip.rs
@@ -2,8 +2,7 @@ pub mod store;

use super::*;

-
pub use store::Error;
-
pub use store::Store;
+
pub use store::{AnnouncementId, Error, RelayStatus, Store};

pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
    let features = config.features();
modified radicle-node/src/service/gossip/store.rs
@@ -23,6 +23,9 @@ pub enum Error {
    UnitOverflow(#[from] TryFromIntError),
}

+
/// Unique announcement identifier.
+
pub type AnnouncementId = u64;
+

/// A database that has access to historical gossip messages.
/// Keeps track of the latest received gossip messages for each node.
/// Grows linearly with the number of nodes on the network.
@@ -35,7 +38,17 @@ pub trait Store {

    /// Process an announcement for the given node.
    /// Returns `true` if the timestamp was updated or the announcement wasn't there before.
-
    fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error>;
+
    fn announced(
+
        &mut self,
+
        nid: &NodeId,
+
        ann: &Announcement,
+
    ) -> Result<Option<AnnouncementId>, Error>;
+

+
    /// Set whether a message should be relayed or not.
+
    fn set_relay(&mut self, id: AnnouncementId, relay: RelayStatus) -> Result<(), Error>;
+

+
    /// Return messages that should be relayed.
+
    fn relays(&mut self, now: Timestamp) -> Result<Vec<(AnnouncementId, Announcement)>, Error>;

    /// Get all the latest gossip messages of all nodes, filtered by inventory filter and
    /// announcement timestamps.
@@ -78,7 +91,11 @@ impl Store for Database {
        Ok(None)
    }

-
    fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error> {
+
    fn announced(
+
        &mut self,
+
        nid: &NodeId,
+
        ann: &Announcement,
+
    ) -> Result<Option<AnnouncementId>, Error> {
        assert_ne!(
            ann.timestamp(),
            Timestamp::MIN,
@@ -89,7 +106,8 @@ impl Store for Database {
             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
             ON CONFLICT DO UPDATE
             SET message = ?4, signature = ?5, timestamp = ?6
-
             WHERE timestamp < ?6",
+
             WHERE timestamp < ?6
+
             RETURNING rowid",
        )?;
        stmt.bind((1, nid))?;

@@ -112,9 +130,53 @@ impl Store for Database {
        }
        stmt.bind((5, &ann.signature))?;
        stmt.bind((6, &ann.message.timestamp()))?;
+

+
        if let Some(row) = stmt.into_iter().next() {
+
            let row = row?;
+
            let id = row.read::<i64, _>("rowid");
+

+
            Ok(Some(id as AnnouncementId))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    fn set_relay(&mut self, id: AnnouncementId, relay: RelayStatus) -> Result<(), Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE announcements
+
             SET relay = ?1
+
             WHERE rowid = ?2",
+
        )?;
+
        stmt.bind((1, relay))?;
+
        stmt.bind((2, id as i64))?;
        stmt.next()?;

-
        Ok(self.db.change_count() > 0)
+
        Ok(())
+
    }
+

+
    fn relays(&mut self, now: Timestamp) -> Result<Vec<(AnnouncementId, Announcement)>, Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE announcements
+
             SET relay = ?1
+
             WHERE relay IS ?2
+
             RETURNING rowid, node, type, message, signature, timestamp",
+
        )?;
+
        stmt.bind((1, RelayStatus::RelayedAt(now)))?;
+
        stmt.bind((2, RelayStatus::Relay))?;
+

+
        let mut rows = stmt
+
            .into_iter()
+
            .map(|row| {
+
                let row = row?;
+
                parse::announcement(row)
+
            })
+
            .collect::<Result<Vec<_>, _>>()?;
+

+
        // Nb. Manually sort by insertion order, because we can't use `ORDER BY` with `RETURNING`
+
        // as of SQLite 3.45.
+
        rows.sort_by_key(|(id, _)| *id);
+

+
        Ok(rows)
    }

    fn filtered<'a>(
@@ -124,7 +186,7 @@ impl Store for Database {
        to: Timestamp,
    ) -> Result<Box<dyn Iterator<Item = Result<Announcement, Error>> + 'a>, Error> {
        let mut stmt = self.db.prepare(
-
            "SELECT node, type, message, signature, timestamp
+
            "SELECT rowid, node, type, message, signature, timestamp
             FROM announcements
             WHERE timestamp >= ?1 and timestamp < ?2
             ORDER BY timestamp, node, type",
@@ -138,32 +200,9 @@ impl Store for Database {
            stmt.into_iter()
                .map(|row| {
                    let row = row?;
-
                    let node = row.read::<NodeId, _>("node");
-
                    let gt = row.read::<GossipType, _>("type");
-
                    let message = match gt {
-
                        GossipType::Refs => {
-
                            let ann = row.read::<RefsAnnouncement, _>("message");
-
                            AnnouncementMessage::Refs(ann)
-
                        }
-
                        GossipType::Inventory => {
-
                            let ann = row.read::<InventoryAnnouncement, _>("message");
-
                            AnnouncementMessage::Inventory(ann)
-
                        }
-
                        GossipType::Node => {
-
                            let ann = row.read::<NodeAnnouncement, _>("message");
-
                            AnnouncementMessage::Node(ann)
-
                        }
-
                    };
-
                    let signature = row.read::<Signature, _>("signature");
-
                    let timestamp = row.read::<Timestamp, _>("timestamp");
-

-
                    debug_assert_eq!(timestamp, message.timestamp());
-

-
                    Ok(Announcement {
-
                        node,
-
                        message,
-
                        signature,
-
                    })
+
                    let (_, ann) = parse::announcement(row)?;
+

+
                    Ok(ann)
                })
                .filter(|ann| match ann {
                    Ok(a) => a.matches(filter),
@@ -251,6 +290,24 @@ impl From<wire::Error> for sql::Error {
    }
}

+
/// Message relay status.
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum RelayStatus {
+
    Relay,
+
    DontRelay,
+
    RelayedAt(Timestamp),
+
}
+

+
impl sql::BindableWithIndex for RelayStatus {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        match self {
+
            Self::Relay => sql::Value::Null.bind(stmt, i),
+
            Self::DontRelay => sql::Value::Integer(-1).bind(stmt, i),
+
            Self::RelayedAt(t) => t.bind(stmt, i),
+
        }
+
    }
+
}
+

/// Type of gossip message.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum GossipType {
@@ -297,6 +354,43 @@ impl TryFrom<&sql::Value> for GossipType {
    }
}

+
mod parse {
+
    use super::*;
+

+
    pub fn announcement(row: sql::Row) -> Result<(AnnouncementId, Announcement), Error> {
+
        let id = row.read::<i64, _>("rowid") as AnnouncementId;
+
        let node = row.read::<NodeId, _>("node");
+
        let gt = row.read::<GossipType, _>("type");
+
        let message = match gt {
+
            GossipType::Refs => {
+
                let ann = row.read::<RefsAnnouncement, _>("message");
+
                AnnouncementMessage::Refs(ann)
+
            }
+
            GossipType::Inventory => {
+
                let ann = row.read::<InventoryAnnouncement, _>("message");
+
                AnnouncementMessage::Inventory(ann)
+
            }
+
            GossipType::Node => {
+
                let ann = row.read::<NodeAnnouncement, _>("message");
+
                AnnouncementMessage::Node(ann)
+
            }
+
        };
+
        let signature = row.read::<Signature, _>("signature");
+
        let timestamp = row.read::<Timestamp, _>("timestamp");
+

+
        debug_assert_eq!(timestamp, message.timestamp());
+

+
        Ok((
+
            id,
+
            Announcement {
+
                node,
+
                message,
+
                signature,
+
            },
+
        ))
+
    }
+
}
+

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
@@ -304,6 +398,7 @@ mod test {
    use crate::prelude::{BoundedVec, RepoId};
    use crate::test::arbitrary;
    use localtime::LocalTime;
+
    use radicle::assert_matches;
    use radicle_crypto::test::signer::MockSigner;

    #[test]
@@ -326,10 +421,26 @@ mod test {
        .signed(&signer);

        // Only the first announcement of each type is recognized as new.
-
        assert!(db.announced(&nid, &refs).unwrap());
-
        assert!(!db.announced(&nid, &refs).unwrap());
+
        let id1 = db.announced(&nid, &refs).unwrap().unwrap();
+
        assert!(db.announced(&nid, &refs).unwrap().is_none());

-
        assert!(db.announced(&nid, &inv).unwrap());
-
        assert!(!db.announced(&nid, &inv).unwrap());
+
        let id2 = db.announced(&nid, &inv).unwrap().unwrap();
+
        assert!(db.announced(&nid, &inv).unwrap().is_none());
+

+
        // Nothing was set to be relayed.
+
        assert_eq!(db.relays(LocalTime::now().into()).unwrap().len(), 0);
+

+
        // Set the messages to be relayed.
+
        db.set_relay(id1, RelayStatus::Relay).unwrap();
+
        db.set_relay(id2, RelayStatus::Relay).unwrap();
+

+
        // Now they are returned.
+
        assert_matches!(
+
            db.relays(LocalTime::now().into()).unwrap().as_slice(),
+
            &[(id1_, _), (id2_, _)]
+
            if id1_ == id1 && id2_ == id2
+
        );
+
        // But only once.
+
        assert_matches!(db.relays(LocalTime::now().into()).unwrap().as_slice(), &[]);
    }
}
modified radicle-node/src/test/peer.rs
@@ -201,11 +201,13 @@ where
    }

    pub fn initialize(&mut self) -> &mut Self {
-
        info!(
-
            target: "test",
-
            "{}: Initializing: id = {}, address = {}",
-
            self.name, self.id, self.ip
-
        );
+
        if !self.initialized {
+
            info!(
+
                target: "test",
+
                "{}: Initializing: id = {}, address = {}",
+
                self.name, self.id, self.ip
+
            );
+
        }
        assert_ne!(self.local_time, LocalTime::default());

        self.initialized = true;
@@ -271,8 +273,9 @@ where
        self.service.node_id()
    }

-
    pub fn receive(&mut self, peer: NodeId, msg: Message) {
+
    pub fn receive(&mut self, peer: NodeId, msg: Message) -> &mut Self {
        self.service.received_message(peer, msg);
+
        self
    }

    pub fn inventory_announcement(&self) -> Message {
modified radicle-node/src/tests.rs
@@ -605,38 +605,47 @@ fn test_announcement_relay() {

    alice.connect_to(&bob);
    alice.connect_to(&eve);
-
    alice.receive(bob.id(), bob.inventory_announcement());
-

+
    alice
+
        .receive(bob.id(), bob.inventory_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_))
    );

-
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
+
    alice.receive(bob.id(), bob.inventory_announcement());
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "Another inventory with the same timestamp is ignored"
    );

    bob.elapse(LocalDuration::from_mins(1));
-
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
+
    alice
+
        .receive(bob.id(), bob.inventory_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "Another inventory with a fresher timestamp is relayed"
    );

-
    alice.receive(bob.id(), bob.node_announcement());
+
    alice
+
        .receive(bob.id(), bob.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "A node announcement with the same timestamp as the inventory is relayed"
    );

-
    alice.receive(bob.id(), bob.node_announcement());
+
    alice
+
        .receive(bob.id(), bob.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert!(alice.messages(eve.id()).next().is_none(), "Only once");

-
    alice.receive(eve.id(), eve.node_announcement());
+
    alice
+
        .receive(eve.id(), eve.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(bob.id()).next(),
        Some(Message::Announcement(_)),
@@ -648,7 +657,9 @@ fn test_announcement_relay() {
    );

    eve.elapse(LocalDuration::from_mins(1));
-
    alice.receive(bob.id(), eve.node_announcement());
+
    alice
+
        .receive(bob.id(), eve.node_announcement())
+
        .elapse(service::GOSSIP_INTERVAL);
    assert!(
        alice.messages(bob.id()).next().is_none(),
        "Bob already know about this message, since he sent it"
@@ -703,7 +714,9 @@ fn test_refs_announcement_relay() {
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(eve.id(), Message::Subscribe(Subscribe::all()));
-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
+
        .elapse(service::GOSSIP_INTERVAL);

    assert_matches!(
        alice.messages(eve.id()).next(),
@@ -711,20 +724,26 @@ fn test_refs_announcement_relay() {
        "A refs announcement from Bob is relayed to Eve"
    );

-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[0]))
+
        .elapse(service::GOSSIP_INTERVAL);
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "The same ref announement is not relayed"
    );

-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[1]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[1]))
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "But a different one is"
    );

-
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[2]));
+
    alice
+
        .receive(bob.id(), bob.refs_announcement(bob_inv[2]))
+
        .elapse(service::GOSSIP_INTERVAL);
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
@@ -953,6 +972,8 @@ fn test_refs_announcement_offline() {

#[test]
fn test_inventory_relay() {
+
    logger::init(log::Level::Debug);
+

    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
@@ -965,16 +986,19 @@ fn test_inventory_relay() {
    alice.wake(); // Run all periodic tasks now so they don't trigger later.
    alice.connect_to(&bob);
    alice.connect_from(&eve);
-
    alice.receive(
-
        bob.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv.clone(),
-
                timestamp: now,
-
            },
-
            bob.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            bob.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv.clone(),
+
                    timestamp: now,
+
                },
+
                bob.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
@@ -990,32 +1014,38 @@ fn test_inventory_relay() {
        "The inventory is not sent back to Bob"
    );

-
    alice.receive(
-
        bob.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv.clone(),
-
                timestamp: now,
-
            },
-
            bob.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            bob.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv.clone(),
+
                    timestamp: now,
+
                },
+
                bob.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        None,
        "Sending the same inventory again doesn't trigger a relay"
    );

-
    alice.receive(
-
        bob.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv.clone(),
-
                timestamp: now + 1,
-
            },
-
            bob.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            bob.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv.clone(),
+
                    timestamp: now + 1,
+
                },
+
                bob.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
@@ -1028,16 +1058,19 @@ fn test_inventory_relay() {
    );

    // Inventory from Eve relayed to Bob.
-
    alice.receive(
-
        eve.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inv,
-
                timestamp: now,
-
            },
-
            eve.signer(),
-
        ),
-
    );
+
    alice
+
        .receive(
+
            eve.id(),
+
            Message::inventory(
+
                InventoryAnnouncement {
+
                    inventory: inv,
+
                    timestamp: now,
+
                },
+
                eve.signer(),
+
            ),
+
        )
+
        .elapse(service::GOSSIP_INTERVAL);
+

    assert_matches!(
        alice.inventory_announcements(bob.id()).next(),
        Some(Message::Announcement(Announcement {
@@ -1764,6 +1797,7 @@ fn prop_inventory_exchange_dense() {
#[test]
fn test_announcement_message_amplification() {
    let mut results = Vec::new();
+
    let mut rng = fastrand::Rng::new();

    while results.len() < *TEST_CASES {
        let mut alice = Peer::new("alice", [7, 7, 7, 7]);
@@ -1781,6 +1815,23 @@ fn test_announcement_message_amplification() {
        );
        let rid = gen::<RepoId>(1);

+
        // Make sure the node gossip intervals are not accidentally synchronized.
+
        alice.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        bob.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        eve.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        zod.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+
        tom.elapse(LocalDuration::from_millis(
+
            rng.u128(0..=service::GOSSIP_INTERVAL.as_millis()),
+
        ));
+

        // Fully-connected network.
        alice.command(Command::Connect(
            bob.id,
@@ -1868,6 +1919,17 @@ fn test_announcement_message_amplification() {
            s.elapsed() < LocalDuration::from_mins(3)
        });

+
        // Make sure they have the routing table entry.
+
        for node in [&bob, &eve, &zod, &tom] {
+
            assert!(node
+
                .service
+
                .database()
+
                .routing()
+
                .get(&rid)
+
                .unwrap()
+
                .contains(&alice.id));
+
        }
+

        // Count how many copies of Alice's inventory message have been received by peers.
        let received = sim.messages().iter().filter(|m| {
            matches!(
@@ -1894,5 +1956,6 @@ fn test_announcement_message_amplification() {
    // By using delayed message propagation though, we can bring this down closer to the minimum.
    log::debug!(target: "test", "Average message amplification: {amp}");

-
    assert!(amp < 4., "Amplification factor of {amp} is too high");
+
    assert!(amp < 2., "Amplification factor of {amp} is too high");
+
    assert!(amp >= 1., "Amplification can't be lower than 1");
}
modified radicle/src/node/db.rs
@@ -29,6 +29,7 @@ const MIGRATIONS: &[&str] = &[
    include_str!("db/migrations/2.sql"),
    include_str!("db/migrations/3.sql"),
    include_str!("db/migrations/4.sql"),
+
    include_str!("db/migrations/5.sql"),
];

#[derive(Error, Debug)]
added radicle/src/node/db/migrations/5.sql
@@ -0,0 +1,7 @@
+
-- Add the "relay" column.
+
-- The "relay" column can be set in several different ways:
+
--
+
-- * If set to `-1`, it means this announcement should *not* be relayed.
+
-- * If set to `NULL`, it means it *should* be relayed.
+
-- * If set to a positive integer, it means it has been relayed at that UNIX timestamp.
+
alter table "announcements" add column "relay" integer default -1;