Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Reset connection attempts when stable
cloudhead committed 2 years ago
commit 9a82aae993c8f927222833d5a8b47983af293720
parent ef4efe32009a6d9fa748a23287fa2a93b5475530
3 files changed +51 -11
modified radicle-node/src/service.rs
@@ -719,6 +719,7 @@ where

            self.keep_alive(&now);
            self.disconnect_unresponsive_peers(&now);
+
            self.idle_connections();
            self.maintain_connections();
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
@@ -1202,14 +1203,6 @@ where
            if let Some(peer) = self.sessions.get_mut(&remote) {
                peer.to_connected(self.clock);
                self.outbox.write_all(peer, msgs);
-

-
                if let Err(e) =
-
                    self.db
-
                        .addresses_mut()
-
                        .connected(&remote, &peer.addr, self.clock.into())
-
                {
-
                    error!(target: "service", "Error updating address book with connection: {e}");
-
                }
            }
        } else {
            match self.sessions.entry(remote) {
@@ -2255,6 +2248,25 @@ where
        Ok(())
    }

+
    /// Run idle task for all connections.
+
    fn idle_connections(&mut self) {
+
        for (_, sess) in self.sessions.iter_mut() {
+
            sess.idle(self.clock);
+

+
            if sess.is_stable() {
+
                // Mark as connected once connection is stable.
+
                if let Err(e) =
+
                    self.db
+
                        .addresses_mut()
+
                        .connected(&sess.id, &sess.addr, self.clock.into())
+
                {
+
                    error!(target: "service", "Error updating address book with connection: {e}");
+
                }
+
            }
+
        }
+
    }
+

+
    /// Try to maintain a target number of connections.
    fn maintain_connections(&mut self) {
        let PeerConfig::Dynamic { target } = self.config.peers else {
            return;
modified radicle-node/src/service/session.rs
@@ -5,11 +5,14 @@ use crate::node::config::Limits;
use crate::node::Severity;
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::{Address, LocalTime, NodeId, Outbox, RepoId, Rng};
+
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
use crate::{Link, Timestamp};

pub use crate::node::{PingState, State};

+
/// Time after which a connection is considered stable.
+
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
+

#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
    /// The remote peer sent an invalid announcement timestamp,
@@ -61,7 +64,7 @@ pub struct Session {

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
-
    /// upon successful connection.
+
    /// upon successful connection, once the connection is stable.
    attempts: usize,
    /// Source of entropy.
    rng: Rng,
@@ -120,6 +123,7 @@ impl Session {
                ping: PingState::default(),
                fetching: HashSet::default(),
                latencies: VecDeque::default(),
+
                stable: false,
            },
            link: Link::Inbound,
            subscribe: None,
@@ -135,6 +139,10 @@ impl Session {
        matches!(self.state, State::Attempted { .. })
    }

+
    pub fn is_stable(&self) -> bool {
+
        matches!(self.state, State::Connected { stable: true, .. })
+
    }
+

    pub fn is_connected(&self) -> bool {
        self.state.is_connected()
    }
@@ -167,6 +175,22 @@ impl Session {
        self.attempts
    }

+
    /// Run 'idle' task for session.
+
    pub fn idle(&mut self, now: LocalTime) {
+
        if let State::Connected {
+
            since,
+
            ref mut stable,
+
            ..
+
        } = self.state
+
        {
+
            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
+
                *stable = true;
+
                // Reset number of attempts for stable connections.
+
                self.attempts = 0;
+
            }
+
        }
+
    }
+

    /// Mark this session as fetching the given RID.
    ///
    /// # Panics
@@ -204,7 +228,6 @@ impl Session {
    }

    pub fn to_connected(&mut self, since: LocalTime) {
-
        self.attempts = 0;
        self.last_active = since;

        let State::Attempted = &self.state else {
@@ -215,6 +238,7 @@ impl Session {
            ping: PingState::default(),
            fetching: HashSet::default(),
            latencies: VecDeque::default(),
+
            stable: false,
        };
    }

modified radicle/src/node.rs
@@ -105,6 +105,9 @@ pub enum State {
        /// Measured latencies for this peer.
        #[serde(skip)]
        latencies: VecDeque<LocalDuration>,
+
        /// Whether the connection is stable.
+
        #[serde(skip)]
+
        stable: bool,
    },
    /// When a peer is disconnected.
    #[serde(rename_all = "camelCase")]
@@ -1260,6 +1263,7 @@ mod test {
                ping: Default::default(),
                fetching: Default::default(),
                latencies: VecDeque::default(),
+
                stable: false,
            }))
            .unwrap(),
        )