Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Use variable subscribe backlog delta
cloudhead committed 2 years ago
commit 947f1bfc018a641aded13e7c0b4ab526cbd9b117
parent f0754330c9e7e89b278a21626288e9f428b879d5
3 files changed +47 -38
modified radicle-node/src/service.rs
@@ -73,8 +73,9 @@ pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Maximum attempts to connect to a peer before we give up.
pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
-
/// How far back from the present time should we request gossip messages when connecting to a peer.
-
pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60);
+
/// How far back from the present time should we request gossip messages when connecting to a peer,
+
/// when we initially come online.
+
pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
/// Minimum amount of time to wait before reconnecting to a peer.
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
/// Maximum amount of time to wait before reconnecting to a peer.
@@ -1232,17 +1233,40 @@ where
    /// Set of initial messages to send to a peer.
    fn initial(&self, _link: Link) -> Vec<Message> {
        let filter = self.filter();
+
        let now = self.clock();
+
        let inventory = match self.storage.inventory() {
+
            Ok(i) => i,
+
            Err(e) => {
+
                // Other than crashing the node completely, there's nothing we can do
+
                // here besides returning an empty inventory and logging an error.
+
                error!(target: "service", "Error getting local inventory for initial messages: {e}");
+
                vec![]
+
            }
+
        };

        // TODO: Only subscribe to outbound connections, otherwise we will consume too
        // much bandwidth.

-
        gossip::handshake(
-
            self.node.clone(),
-
            self.clock.as_millis(),
-
            &self.storage,
-
            &self.signer,
-
            filter,
-
        )
+
        // If we've been previously connected to the network, we'll have received gossip messages.
+
        // Instead of simply taking the last timestamp we try to ensure we don't miss any
+
        // messages due un-synchronized clocks.
+
        //
+
        // If this is our first connection to the network, we just ask for a fixed backlog
+
        // of messages to get us started.
+
        let since = match self.gossip.last() {
+
            Ok(Some(last)) => last - MAX_TIME_DELTA.as_millis() as Timestamp,
+
            Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).as_millis() as Timestamp,
+
            Err(e) => {
+
                error!(target: "service", "Error getting the lastest gossip message from storage: {e}");
+
                return vec![];
+
            }
+
        };
+

+
        vec![
+
            Message::node(self.node.clone(), &self.signer),
+
            Message::inventory(gossip::inventory(now.as_millis(), inventory), &self.signer),
+
            Message::subscribe(filter, since, Timestamp::MAX),
+
        ]
    }

    /// Update our routing table with our local node's inventory.
modified radicle-node/src/service/gossip.rs
@@ -1,39 +1,10 @@
pub mod store;

use super::*;
-
use crate::service::filter::Filter;

pub use store::Error;
pub use store::GossipStore as Store;

-
pub fn handshake<G: Signer, S: ReadStorage>(
-
    node: NodeAnnouncement,
-
    now: Timestamp,
-
    storage: &S,
-
    signer: &G,
-
    filter: Filter,
-
) -> Vec<Message> {
-
    let inventory = match storage.inventory() {
-
        Ok(i) => i,
-
        Err(e) => {
-
            error!("Error getting local inventory for handshake: {}", e);
-
            // Other than crashing the node completely, there's nothing we can do
-
            // here besides returning an empty inventory and logging an error.
-
            vec![]
-
        }
-
    };
-

-
    vec![
-
        Message::node(node, signer),
-
        Message::inventory(gossip::inventory(now, inventory), signer),
-
        Message::subscribe(
-
            filter,
-
            now - SUBSCRIBE_BACKLOG_DELTA.as_millis() as u64,
-
            Timestamp::MAX,
-
        ),
-
    ]
-
}
-

pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
    let features = config.features();
    let alias = config.alias.clone();
modified radicle-node/src/service/gossip/store.rs
@@ -57,6 +57,20 @@ impl GossipStore {
        Ok(self.db.change_count())
    }

+
    /// Get the last announcement in the store, by timestamp.
+
    pub fn last(&self) -> Result<Option<Timestamp>, Error> {
+
        let stmt = self
+
            .db
+
            .prepare("SELECT MAX(timestamp) AS latest FROM `announcements`")?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            let latest = row.try_read::<Option<i64>, _>(0)?;
+

+
            return Ok(latest.map(|l| l as Timestamp));
+
        }
+
        Ok(None)
+
    }
+

    /// Process an announcement for the given node.
    /// Returns `true` if the timestamp was updated or the announcement wasn't there before.
    pub fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error> {