Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Don't re-use timestamps
cloudhead committed 2 years ago
commit 0fcfbb3ecfa34f53d71437d27c791de333e9c42e
parent 7fc3b73e8865af613f7bb4723107c381287b791c
1 file changed +47 -48
modified radicle-node/src/service.rs
@@ -393,7 +393,9 @@ pub struct Service<D, S, G> {
    last_sync: LocalTime,
    /// Last time the service routing table was pruned.
    last_prune: LocalTime,
-
    /// Last time the service announced its inventory.
+
    /// Last time the service announced something.
+
    last_timestamp: LocalTime,
+
    /// Last time the inventory was announced.
    last_announce: LocalTime,
    /// Time when the service was initialized, or `None` if it wasn't initialized.
    started_at: Option<LocalTime>,
@@ -455,6 +457,7 @@ where
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
+
            last_timestamp: LocalTime::default(),
            last_announce: LocalTime::default(),
            started_at: None,
            emitter,
@@ -779,7 +782,7 @@ where

                // Let all our peers know that we're interested in this repo from now on.
                self.outbox.broadcast(
-
                    Message::subscribe(self.filter(), self.time(), Timestamp::MAX),
+
                    Message::subscribe(self.filter(), self.clock.as_millis(), Timestamp::MAX),
                    self.sessions.connected().map(|(_, s)| s),
                );
            }
@@ -1044,7 +1047,7 @@ where
                info!(target: "service", "Fetched {rid} from {remote} successfully");
                // Update our routing table in case this fetch was user-initiated and doesn't
                // come from an announcement.
-
                self.seed_discovered(rid, remote, self.time());
+
                self.seed_discovered(rid, remote, self.clock.as_millis());

                for update in &updated {
                    if update.is_skipped() {
@@ -1169,14 +1172,17 @@ where
        self.emitter.emit(Event::PeerConnected { nid: remote });

        let msgs = self.initial(link);
-
        let now = self.time();

        if link.is_outbound() {
            if let Some(peer) = self.sessions.get_mut(&remote) {
                peer.to_connected(self.clock);
                self.outbox.write_all(peer, msgs);

-
                if let Err(e) = self.db.addresses_mut().connected(&remote, &peer.addr, now) {
+
                if let Err(e) =
+
                    self.db
+
                        .addresses_mut()
+
                        .connected(&remote, &peer.addr, self.clock.as_millis())
+
                {
                    error!(target: "service", "Error updating address book with connection: {e}");
                }
            }
@@ -1360,7 +1366,7 @@ where
        match self.db.gossip_mut().announced(announcer, announcement) {
            Ok(fresh) => {
                if !fresh {
-
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={})", self.time());
+
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
                    return Ok(false);
                }
            }
@@ -1752,9 +1758,10 @@ where
    }

    /// Set of initial messages to send to a peer.
-
    fn initial(&self, _link: Link) -> Vec<Message> {
-
        let filter = self.filter();
+
    fn initial(&mut self, _link: Link) -> Vec<Message> {
+
        let timestamp = self.timestamp();
        let now = self.clock();
+
        let filter = self.filter();
        let inventory = match self.storage.inventory() {
            Ok(i) => i,
            Err(e) => {
@@ -1787,7 +1794,7 @@ where

        vec![
            Message::node(self.node.clone(), &self.signer),
-
            Message::inventory(gossip::inventory(now.as_millis(), inventory), &self.signer),
+
            Message::inventory(gossip::inventory(timestamp, inventory), &self.signer),
            Message::subscribe(filter, since, Timestamp::MAX),
        ]
    }
@@ -1804,7 +1811,7 @@ where
    /// Update our routing table with our local node's inventory.
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
        let inventory = self.storage.inventory()?;
-
        let result = self.sync_routing(inventory, self.node_id(), self.time())?;
+
        let result = self.sync_routing(inventory, self.node_id(), self.clock.as_millis())?;

        Ok(result)
    }
@@ -1860,12 +1867,12 @@ where

    /// Return a refs announcement including the given remotes.
    fn refs_announcement_for(
-
        &self,
+
        &mut self,
        rid: RepoId,
        remotes: impl IntoIterator<Item = NodeId>,
    ) -> Result<(Announcement, Vec<RefsAt>), Error> {
        let repo = self.storage.repository(rid)?;
-
        let timestamp = self.time();
+
        let timestamp = self.timestamp();
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

        for remote_id in remotes.into_iter() {
@@ -1891,8 +1898,7 @@ where

    /// Announce our own refs for the given repo.
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc<Verified>) -> Result<Vec<RefsAt>, Error> {
-
        let refs = self.announce_refs(rid, doc, [self.node_id()])?;
-
        let now = self.local_time();
+
        let (refs, timestamp) = self.announce_refs(rid, doc, [self.node_id()])?;

        // Update refs database with our signed refs branches.
        // This isn't strictly necessary for now, as we only use the database for fetches, and
@@ -1901,13 +1907,15 @@ where
            self.emitter.emit(Event::LocalRefsAnnounced {
                rid,
                refs: r,
-
                timestamp: now.as_millis(),
+
                timestamp,
            });
-
            if let Err(e) =
-
                self.database_mut()
-
                    .refs_mut()
-
                    .set(&rid, &r.remote, &SIGREFS_BRANCH, r.at, now)
-
            {
+
            if let Err(e) = self.database_mut().refs_mut().set(
+
                &rid,
+
                &r.remote,
+
                &SIGREFS_BRANCH,
+
                r.at,
+
                LocalTime::from_millis(timestamp as u128),
+
            ) {
                error!(
                    target: "service",
                    "Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
@@ -1924,9 +1932,10 @@ where
        rid: RepoId,
        doc: Doc<Verified>,
        remotes: impl IntoIterator<Item = NodeId>,
-
    ) -> Result<Vec<RefsAt>, Error> {
-
        let peers = self.sessions.connected().map(|(_, p)| p);
+
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
+
        let timestamp = ann.timestamp();
+
        let peers = self.sessions.connected().map(|(_, p)| p);

        // Update our sync status for our own refs. This is useful for determining if refs were
        // updated while the node was stopped.
@@ -1934,14 +1943,14 @@ where
        if let Some(refs) = refs.iter().find(|r| r.remote == ann.node) {
            info!(
                target: "service",
-
                "Announcing own refs for {rid} to peers ({}) (t={})..",
-
                refs.at, ann.timestamp()
+
                "Announcing own refs for {rid} to peers ({}) (t={timestamp})..",
+
                refs.at
            );

            if let Err(e) = self
                .db
                .seeds_mut()
-
                .synced(&rid, &ann.node, refs.at, ann.timestamp())
+
                .synced(&rid, &ann.node, refs.at, timestamp)
            {
                error!(target: "service", "Error updating sync status for local node: {e}");
            }
@@ -1955,7 +1964,7 @@ where
            }),
            self.db.gossip_mut(),
        );
-
        Ok(refs)
+
        Ok((refs, timestamp))
    }

    fn sync_and_announce_inventory(&mut self) {
@@ -2004,9 +2013,9 @@ where
            return false;
        }
        let persistent = self.config.is_persistent(&nid);
-
        let time = self.time();
+
        let timestamp = self.clock().as_millis();

-
        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, time) {
+
        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
            error!(target: "service", "Error updating address book with connection attempt: {e}");
        }
        self.sessions.insert(
@@ -2079,9 +2088,15 @@ where
        }
    }

-
    /// Get the current time.
-
    fn time(&self) -> Timestamp {
-
        self.clock.as_millis()
+
    /// Get a timestamp for using in announcements.
+
    /// Never returns the same timestamp twice.
+
    fn timestamp(&mut self) -> Timestamp {
+
        if self.clock > self.last_timestamp {
+
            self.last_timestamp = self.clock;
+
        } else {
+
            self.last_timestamp = self.last_timestamp + LocalDuration::from_millis(1);
+
        }
+
        self.last_timestamp.as_millis()
    }

    ////////////////////////////////////////////////////////////////////////////
@@ -2090,23 +2105,7 @@ where

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self, inventory: Inventory) -> Result<(), storage::Error> {
-
        let time = if self.clock > self.last_announce {
-
            self.clock.as_millis()
-
        } else if self.last_announce - self.clock < LocalDuration::from_secs(1) {
-
            // In rare cases where the inventory is updated very quickly, we want to make sure all
-
            // announcements carry an increasing timestamp. We allow our timestamps to be up to
-
            // one second in the future.
-
            self.last_announce.as_millis() + 1
-
        } else {
-
            // Announcement is considered redundant, ignore. Nb. This should not happen unless
-
            // you are trying to spam inventories.
-
            log::warn!(
-
                target: "service",
-
                "Ignored outgoing inventory announcement with {} items",
-
                inventory.len()
-
            );
-
            return Ok(());
-
        };
+
        let time = self.timestamp();
        let msg = AnnouncementMessage::from(gossip::inventory(time, inventory));

        self.outbox.announce(