Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Respect relay config on subscribe
cloudhead committed 2 years ago
commit 0d880e12e151979ab676083f779599b8f14b69fb
parent a54180199eaad468ee321c56a2f7bd44836b779e
2 files changed +36 -30
modified radicle-node/src/service.rs
@@ -25,7 +25,7 @@ use radicle::node;
use radicle::node::address;
use radicle::node::address::Store as _;
use radicle::node::address::{AddressBook, KnownAddress};
-
use radicle::node::config::{PeerConfig, Relay};
+
use radicle::node::config::PeerConfig;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
@@ -88,6 +88,9 @@ pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
/// How far back from the present time should we request gossip messages when connecting to a peer,
/// when we come online for the first time.
pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
+
/// When subscribing, what margin of error do we give ourselves. A gigher delta means we ask for
+
/// messages further back than strictly necessary, to account for missed messages.
+
pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
/// Minimum amount of time to wait before reconnecting to a peer.
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
/// Maximum amount of time to wait before reconnecting to a peer.
@@ -1334,7 +1337,7 @@ where
        &mut self,
        relayer_addr: &Address,
        announcement: &Announcement,
-
    ) -> Result<Relay, session::Error> {
+
    ) -> Result<bool, session::Error> {
        if !announcement.verify() {
            return Err(session::Error::Misbehavior);
        }
@@ -1346,20 +1349,15 @@ where

        // Ignore our own announcements, in case the relayer sent one by mistake.
        if announcer == self.nid() {
-
            return Ok(Relay::Never);
+
            return Ok(false);
        }
        let now = self.clock;
        let timestamp = message.timestamp();
        // To avoid spamming peers on startup with historical gossip messages,
        // don't relay messages that are too old. We make an exception for node announcements,
        // since they are cached, and will hence often carry old timestamps.
-
        let relay = if message.is_node_announcement()
-
            || now - timestamp.to_local_time() <= MAX_TIME_DELTA
-
        {
-
            self.config.relay
-
        } else {
-
            Relay::Never
-
        };
+
        let relay =
+
            message.is_node_announcement() || now - timestamp.to_local_time() <= MAX_TIME_DELTA;

        // Don't allow messages from too far in the future.
        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
@@ -1379,12 +1377,12 @@ where
                Ok(node) => {
                    if node.is_none() {
                        debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
-
                        return Ok(Relay::Never);
+
                        return Ok(false);
                    }
                }
                Err(e) => {
                    error!(target: "service", "Error looking up node in address book: {e}");
-
                    return Ok(Relay::Never);
+
                    return Ok(false);
                }
            }
        }
@@ -1394,12 +1392,12 @@ where
            Ok(fresh) => {
                if !fresh {
                    debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
-
                    return Ok(Relay::Never);
+
                    return Ok(false);
                }
            }
            Err(e) => {
                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
-
                return Ok(Relay::Never);
+
                return Ok(false);
            }
        }

@@ -1419,12 +1417,12 @@ where
                    Ok(synced) => {
                        if synced.is_empty() {
                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
-
                            return Ok(Relay::Never);
+
                            return Ok(false);
                        }
                    }
                    Err(e) => {
                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
-
                        return Ok(Relay::Never);
+
                        return Ok(false);
                    }
                }

@@ -1473,7 +1471,7 @@ where
                // Empty announcements can be safely ignored.
                let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
                    debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
-
                    return Ok(Relay::Never);
+
                    return Ok(false);
                };
                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
@@ -1526,7 +1524,7 @@ where
                        "Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
                        message.rid
                    );
-
                    return Ok(Relay::Never);
+
                    return Ok(false);
                }
                // Refs can be relayed by peers who don't have the data in storage,
                // therefore we only check whether we are connected to the *announcer*,
@@ -1600,7 +1598,7 @@ where
                }
            }
        }
-
        Ok(Relay::Never)
+
        Ok(false)
    }

    pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
@@ -1624,6 +1622,8 @@ where
        remote: &NodeId,
        message: Message,
    ) -> Result<(), session::Error> {
+
        let local = self.node_id();
+
        let relay = self.config.is_relay();
        let Some(peer) = self.sessions.get_mut(remote) else {
            warn!(target: "service", "Session not found for {remote}");
            return Ok(());
@@ -1652,15 +1652,7 @@ where
                let relayer_addr = peer.addr.clone();
                let announcer = ann.node;

-
                let relay = match self.handle_announcement(&relayer_addr, &ann)? {
-
                    // In "auto" mode, we only relay if we're a public seed node.
-
                    // This reduces traffic for private nodes, as well as message redundancy.
-
                    Relay::Auto => !self.config.external_addresses.is_empty(),
-
                    Relay::Never => false,
-
                    Relay::Always => true,
-
                };
-

-
                if relay {
+
                if self.handle_announcement(&relayer_addr, &ann)? && self.config.is_relay() {
                    // Choose peers we should relay this message to.
                    // 1. Don't relay to the peer who sent us this message.
                    // 2. Don't relay to the peer who signed this announcement.
@@ -1695,7 +1687,10 @@ where
                            if ann.node == *remote {
                                continue;
                            }
-
                            self.outbox.write(peer, ann.into());
+
                            // Only send messages if we're a relay, or it's our own messages.
+
                            if relay || ann.node == local {
+
                                self.outbox.write(peer, ann.into());
+
                            }
                        }
                    }
                    Err(e) => {
@@ -1816,7 +1811,7 @@ where
        // If this is our first connection to the network, we just ask for a fixed backlog
        // of messages to get us started.
        let since = match self.db.gossip().last() {
-
            Ok(Some(last)) => Timestamp::from(last.to_local_time() - MAX_TIME_DELTA),
+
            Ok(Some(last)) => Timestamp::from(last.to_local_time() - SUBSCRIBE_BACKLOG_DELTA),
            Ok(None) => (*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into(),
            Err(e) => {
                error!(target: "service", "Error getting the lastest gossip message from storage: {e}");
modified radicle/src/node/config.rs
@@ -347,6 +347,17 @@ impl Config {
        self.peer(id).is_some()
    }

+
    /// Are we a relay node? This determines what we do with gossip messages from other peers.
+
    pub fn is_relay(&self) -> bool {
+
        match self.relay {
+
            // In "auto" mode, we only relay if we're a public seed node.
+
            // This reduces traffic for private nodes, as well as message redundancy.
+
            Relay::Auto => !self.external_addresses.is_empty(),
+
            Relay::Never => false,
+
            Relay::Always => true,
+
        }
+
    }
+

    pub fn features(&self) -> node::Features {
        node::Features::SEED
    }