Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: use configurable network intervals
Quaylyn Rimer committed 3 months ago
commit b241f0cbdce8f1f86dedfe05044eec6aec8f289f
parent b332c4bfc4c41a4a2d8d7e7dcf1be537342949cc
1 file changed +68 -22
modified crates/radicle-protocol/src/service.rs
@@ -487,6 +487,50 @@ impl<D, S, G> Service<D, S, G> {
    pub fn emitter(&self) -> Emitter<Event> {
        self.emitter.clone()
    }
+

+
    fn idle_interval(&self) -> LocalDuration {
+
        self.config.limits.network.idle_interval.into()
+
    }
+

+
    fn gossip_interval(&self) -> LocalDuration {
+
        self.config.limits.network.gossip_interval.into()
+
    }
+

+
    fn sync_interval(&self) -> LocalDuration {
+
        self.config.limits.network.sync_interval.into()
+
    }
+

+
    fn announce_interval(&self) -> LocalDuration {
+
        self.config.limits.network.announce_interval.into()
+
    }
+

+
    fn prune_interval(&self) -> LocalDuration {
+
        self.config.limits.network.prune_interval.into()
+
    }
+

+
    fn keep_alive_delta(&self) -> LocalDuration {
+
        self.config.limits.network.keep_alive_delta.into()
+
    }
+

+
    pub fn fetch_timeout(&self) -> time::Duration {
+
        self.config.limits.network.fetch_timeout_secs.into()
+
    }
+

+
    fn target_outbound_peers(&self) -> usize {
+
        self.config.limits.network.target_outbound_peers.into()
+
    }
+

+
    fn min_reconnection_delta(&self) -> LocalDuration {
+
        self.config.limits.network.min_reconnection_delta.into()
+
    }
+

+
    fn max_reconnection_delta(&self) -> LocalDuration {
+
        self.config.limits.network.max_reconnection_delta.into()
+
    }
+

+
    fn connection_retry_delta(&self) -> LocalDuration {
+
        self.config.limits.network.connection_retry_delta.into()
+
    }
}

impl<D, S, G> Service<D, S, G>
@@ -699,8 +743,8 @@ where
        // Try to establish some connections.
        self.maintain_connections();
        // Start periodic tasks.
-
        self.outbox.wakeup(IDLE_INTERVAL);
-
        self.outbox.wakeup(GOSSIP_INTERVAL);
+
        self.outbox.wakeup(self.idle_interval());
+
        self.outbox.wakeup(self.gossip_interval());

        Ok(())
    }
@@ -734,7 +778,7 @@ where
            now - self.started_at.expect("Service::wake: service must be initialized")
        );

-
        if now - self.last_idle >= IDLE_INTERVAL {
+
        if now - self.last_idle >= self.idle_interval() {
            trace!(target: "service", "Running 'idle' task...");

            if self.refs_populate_pending {
@@ -761,35 +805,35 @@ where
            self.idle_connections();
            self.maintain_connections();
            self.dequeue_fetches();
-
            self.outbox.wakeup(IDLE_INTERVAL);
+
            self.outbox.wakeup(self.idle_interval());
            self.last_idle = now;
        }
-
        if now - self.last_gossip >= GOSSIP_INTERVAL {
+
        if now - self.last_gossip >= self.gossip_interval() {
            trace!(target: "service", "Running 'gossip' task...");

            if let Err(e) = self.relay_announcements() {
                warn!(target: "service", "Failed to relay stored announcements: {e}");
            }
-
            self.outbox.wakeup(GOSSIP_INTERVAL);
+
            self.outbox.wakeup(self.gossip_interval());
            self.last_gossip = now;
        }
-
        if now - self.last_sync >= SYNC_INTERVAL {
+
        if now - self.last_sync >= self.sync_interval() {
            trace!(target: "service", "Running 'sync' task...");

            if let Err(e) = self.fetch_missing_repositories() {
                warn!(target: "service", "Failed to fetch missing inventory: {e}");
            }
-
            self.outbox.wakeup(SYNC_INTERVAL);
+
            self.outbox.wakeup(self.sync_interval());
            self.last_sync = now;
        }
-
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
+
        if now - self.last_announce >= self.announce_interval() {
            trace!(target: "service", "Running 'announce' task...");

            self.announce_inventory();
-
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
+
            self.outbox.wakeup(self.announce_interval());
            self.last_announce = now;
        }
-
        if now - self.last_prune >= PRUNE_INTERVAL {
+
        if now - self.last_prune >= self.prune_interval() {
            trace!(target: "service", "Running 'prune' task...");

            if let Err(err) = self.prune_routing_entries(&now) {
@@ -803,7 +847,7 @@ where
                warn!(target: "service", "Failed to prune gossip entries: {err}");
            }

-
            self.outbox.wakeup(PRUNE_INTERVAL);
+
            self.outbox.wakeup(self.prune_interval());
            self.last_prune = now;
        }

@@ -1401,7 +1445,7 @@ where
        // Attempt to re-connect to persistent peers.
        if self.config.peer(&remote).is_some() {
            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
-
                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
+
                .clamp(self.min_reconnection_delta(), self.max_reconnection_delta());

            // Nb. We always try to reconnect to persistent peers, even when the error appears
            // to not be transient.
@@ -1520,7 +1564,7 @@ where
                log::debug!(
                    target: "service",
                    "Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
-
                    (self.last_gossip + GOSSIP_INTERVAL) - self.clock
+
                    (self.last_gossip + self.gossip_interval()) - self.clock
                );
                // Keep track of who relayed the message for later.
                self.relayed_by.entry(id).or_default().push(*relayer);
@@ -1613,7 +1657,7 @@ where

                for rid in missing {
                    debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
-
                    self.fetch(rid, *announcer, FETCH_TIMEOUT, None);
+
                    self.fetch(rid, *announcer, self.fetch_timeout(), None);
                }
                return Ok(relay);
            }
@@ -1694,7 +1738,7 @@ where
                    return Ok(relay);
                };
                // Finally, start the fetch.
-
                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
+
                self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_timeout(), None);

                return Ok(relay);
            }
@@ -1985,7 +2029,9 @@ where
    fn is_online(&self) -> bool {
        self.sessions
            .connected()
-
            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
+
            .filter(|(_, s)| {
+
                s.addr.is_routable() && s.last_active >= self.clock - self.idle_interval()
+
            })
            .count()
            > 0
    }
@@ -2506,7 +2552,7 @@ where
        let inactive_sessions = self
            .sessions
            .connected_mut()
-
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
+
            .filter(|(_, session)| *now - session.last_active >= self.keep_alive_delta())
            .map(|(_, session)| session);
        for session in inactive_sessions {
            session.ping(self.clock, &mut self.outbox).ok();
@@ -2581,7 +2627,7 @@ where
                Ok(seeds) => {
                    if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
                        for seed in connected {
-
                            self.fetch(rid, seed.nid, FETCH_TIMEOUT, None);
+
                            self.fetch(rid, seed.nid, self.fetch_timeout(), None);
                        }
                    } else {
                        // TODO: We should make sure that this fetch is retried later, either
@@ -2629,7 +2675,7 @@ where
        };
        trace!(target: "service", "Maintaining connections..");

-
        let target = TARGET_OUTBOUND_PEERS;
+
        let target = self.target_outbound_peers();
        let now = self.clock;
        let outbound = self
            .sessions
@@ -2655,10 +2701,10 @@ where
                        // If we succeeded the last time we tried, this is a good address.
                        // If it's been long enough that we failed to connect, we also try again.
                        (Some(success), Some(attempt)) => {
-
                            success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
+
                            success >= attempt || now - attempt >= self.connection_retry_delta()
                        }
                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
-
                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
+
                        (None, Some(attempt)) => now - attempt >= self.connection_retry_delta(),
                        // If we have no failed attempts for this address, it's worth a try.
                        (_, None) => true,
                    })