Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix subscription 'since' timestamp
cloudhead committed 1 year ago
commit 1a8b2f5e2cfebf7ea9eb02ec681190eaa7a9ee25
parent 34d213ad23eeef6b2803f4f3f88a83cad4423234
1 file changed +18 -11
modified radicle-node/src/service.rs
@@ -89,7 +89,7 @@ pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
/// How far back from the present time should we request gossip messages when connecting to a peer,
/// when we come online for the first time.
pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
-
/// When subscribing, what margin of error do we give ourselves. A gigher delta means we ask for
+
/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
/// messages further back than strictly necessary, to account for missed messages.
pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
/// Minimum amount of time to wait before reconnecting to a peer.
@@ -437,6 +437,8 @@ pub struct Service<D, S, G> {
    last_timestamp: Timestamp,
    /// Time when the service was initialized, or `None` if it wasn't initialized.
    started_at: Option<LocalTime>,
+
    /// Time when the service was last online, or `None` if this is the first time.
+
    last_online_at: Option<LocalTime>,
    /// Publishes events to subscribers.
    emitter: Emitter<Event>,
    /// Local listening addresses.
@@ -507,7 +509,8 @@ where
            last_prune: LocalTime::default(),
            last_timestamp,
            last_announce: LocalTime::default(),
-
            started_at: None, // Updated on initialize.
+
            started_at: None,     // Updated on initialize.
+
            last_online_at: None, // Updated on initialize.
            emitter,
            listening: vec![],
            metrics: Metrics::default(),
@@ -623,8 +626,16 @@ where

        let nid = self.node_id();

-
        self.started_at = Some(time);
        self.clock = time;
+
        self.started_at = Some(time);
+
        self.last_online_at = match self.db.gossip().last() {
+
            Ok(Some(last)) => Some(last.to_local_time()),
+
            Ok(None) => None,
+
            Err(e) => {
+
                error!(target: "service", "Error getting the lastest gossip message from db: {e}");
+
                None
+
            }
+
        };

        // Populate refs database. This is only useful as part of the upgrade process for nodes
        // that have been online since before the refs database was created.
@@ -1863,15 +1874,11 @@ where
        //
        // If this is our first connection to the network, we just ask for a fixed backlog
        // of messages to get us started.
-
        let since = match self.db.gossip().last() {
-
            Ok(Some(last)) => Timestamp::from(last.to_local_time() - SUBSCRIBE_BACKLOG_DELTA),
-
            Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into(),
-
            Err(e) => {
-
                error!(target: "service", "Error getting the lastest gossip message from storage: {e}");
-
                return vec![];
-
            }
+
        let since = if let Some(last) = self.last_online_at {
+
            Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
+
        } else {
+
            (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
        };
-

        debug!(target: "service", "Subscribing to messages since timestamp {since}..");

        vec![