Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Make inventory announcements more robust
cloudhead committed 2 years ago
commit c94df2d5cd5f65d785c1eeb4acbc0e1352e083da
parent 47a440a384c8a32a2de65bba7dfec3739ab5ed08
3 files changed +19 -15
modified radicle-cli/tests/commands.rs
@@ -1025,11 +1025,6 @@ fn rad_init_sync_preferred() {

    fixtures::repository(working.join("bob"));

-
    // Necessary for now, if we don't want the new inventry announcement to be considered stale
-
    // for Alice.
-
    // TODO: Find a way to advance internal clocks instead.
-
    thread::sleep(time::Duration::from_millis(3));
-

    // Bob initializes a repo after her node has started, and after bob has connected to it.
    test(
        "examples/rad-init-sync-preferred.md",
@@ -1086,11 +1081,6 @@ fn rad_init_sync_and_clone() {

    fixtures::repository(working.join("alice"));

-
    // Necessary for now, if we don't want the new inventry announcement to be considered stale
-
    // for Bob.
-
    // TODO: Find a way to advance internal clocks instead.
-
    thread::sleep(time::Duration::from_millis(3));
-

    // Alice initializes a repo after her node has started, and after bob has connected to it.
    test(
        "examples/rad-init-sync.md",
modified radicle-node/src/service.rs
@@ -646,7 +646,6 @@ where
                error!(target: "service", "Error announcing inventory: {err}");
            }
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
-
            self.last_announce = now;
        }
        if now - self.last_prune >= PRUNE_INTERVAL {
            trace!(target: "service", "Running 'prune' task...");
@@ -1905,7 +1904,23 @@ where

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
-
        let time = self.time();
+
        let time = if self.clock > self.last_announce {
+
            self.clock.as_millis()
+
        } else if self.last_announce - self.clock < LocalDuration::from_secs(1) {
+
            // In rare cases where the inventory is updated very quickly, we want to make sure all
+
            // announcements carry an increasing timestamp. We allow our timestamps to be up to
+
            // one second in the future.
+
            self.last_announce.as_millis() + 1
+
        } else {
+
            // Announcement is considered redundant, ignore. Nb. This should not happen unless
+
            // you are trying to spam inventories.
+
            log::warn!(
+
                target: "service",
+
                "Ignored outgoing inventory announcement with {} items",
+
                inventory.len()
+
            );
+
            return Ok(());
+
        };
        let msg = AnnouncementMessage::from(gossip::inventory(time, inventory));

        self.outbox.announce(
@@ -1913,6 +1928,8 @@ where
            self.sessions.connected().map(|(_, p)| p),
            self.db.gossip_mut(),
        );
+
        self.last_announce = LocalTime::from_millis(time as u128);
+

        Ok(())
    }

modified radicle-node/src/tests/e2e.rs
@@ -809,9 +809,6 @@ fn test_connection_crossing() {
    assert!(s1 ^ s2, "Exactly one session should be established");
}

-
// TODO(finto): I witnessed a flaky error, but can't replicate.
-
// We log some values until it's clearer where this flake is coming
-
// from.
#[test]
/// Alice is going to try to fetch outdated refs of Bob, from Eve. This is a non-fastfoward fetch
/// on the sigrefs branch.