Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement exponential back-off for reconnect
Alexis Sellier committed 3 years ago
commit 413238a1e0ca08f504c5db2289fdf1f32fa182d2
parent 94427ee22ba796cde7b065975545e9439b13d3d4
3 files changed +77 -34
modified radicle-node/src/service.rs
@@ -70,6 +70,10 @@ pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
/// How far back from the present time should we request gossip messages when connecting to a peer.
pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60);
+
/// Minimum amount of time to wait before reconnecting to a peer.
+
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
+
/// Maximum amount of time to wait before reconnecting to a peer.
+
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);

/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
@@ -436,6 +440,9 @@ where
            self.reactor.wakeup(PRUNE_INTERVAL);
            self.last_prune = now;
        }
+

+
        // Always check whether there are persistent peers that need reconnecting.
+
        self.maintain_persistent();
    }

    pub fn command(&mut self, cmd: Command) {
@@ -680,7 +687,7 @@ where
        if let Some(session) = self.sessions.get_mut(&remote) {
            // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
            // return a failure to any potential fetcher.
-
            if let Some(requested) = session.to_disconnected(since) {
+
            if let Some(requested) = session.requesting() {
                if let Some(resp) = self.fetch_reqs.remove(&requested) {
                    resp.send(FetchResult::Failed {
                        reason: format!("disconnected: {reason}"),
@@ -690,26 +697,22 @@ where
            }

            // Attempt to re-connect to persistent peers.
-
            if let Some(address) = self.config.peer(&remote) {
-
                if session.attempts() < MAX_CONNECTION_ATTEMPTS {
-
                    if reason.is_dial_err() {
-
                        return;
-
                    }
-
                    if !reason.is_transient() {
-
                        return;
-
                    }
-
                    // TODO: Eventually we want a delay before attempting a reconnection,
-
                    // with exponential back-off.
-
                    debug!(target: "service",
-
                        "Reconnecting to {} (attempts={})...",
-
                        remote,
-
                        session.attempts()
-
                    );
+
            if self.config.peer(&remote).is_some() {
+
                if reason.is_transient() {
+
                    let delay =
+
                        LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
+
                            .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);

-
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
-
                    // even a successful attempt means that we're unlikely to be able to reconnect.
+
                    session.to_disconnected(since, since + delay);

-
                    self.connect(remote, address.clone());
+
                    debug!(target: "service", "Reconnecting to {remote} in {delay}..");
+

+
                    self.reactor.wakeup(delay);
+
                } else {
+
                    // TODO: Only handle error transience for non-persistent peers.
+
                    warn!(target: "service", "Permanently dropping persistent peer {remote} session due to non-transient error: {reason}");
+

+
                    self.sessions.remove(&remote);
                }
            } else {
                self.sessions.remove(&remote);
@@ -1433,6 +1436,33 @@ where
            self.connect(id, addr.clone());
        }
    }
+

+
    /// Maintain persistent peer connections.
+
    fn maintain_persistent(&mut self) {
+
        let now = self.local_time();
+
        let mut reconnect = Vec::new();
+

+
        for (nid, session) in self.sessions.disconnected_mut() {
+
            if let Some(addr) = self.config.peer(nid) {
+
                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
+
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
+
                    // even a successful attempt means that we're unlikely to be able to reconnect.
+

+
                    if now >= *retry_at {
+
                        // FIXME: Make sure we don't attempt two concurrent outgoing connections
+
                        // to the same peer.
+
                        reconnect.push((*nid, addr.clone(), session.attempts()));
+
                    }
+
                }
+
            }
+
        }
+

+
        for (nid, addr, attempts) in reconnect {
+
            if self.connect(nid, addr) {
+
                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
+
            }
+
        }
+
    }
}

/// Gives read access to the service state.
@@ -1524,6 +1554,8 @@ impl DisconnectReason {
        matches!(self, Self::Connection(_))
    }

+
    // TODO: These aren't quite correct, since dial errors *can* be transient, eg.
+
    // temporary DNS issue.
    pub fn is_transient(&self) -> bool {
        match self {
            Self::Dial(_) => false,
@@ -1638,7 +1670,12 @@ impl Sessions {

    /// Iterator over mutable fully connected peers.
    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
-
        self.0.iter_mut().filter(move |(_, p)| p.is_connected())
+
        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
+
    }
+

+
    /// Iterator over disconnected peers.
+
    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
+
        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
    }

    /// Return whether this node has a fully established session.
modified radicle-node/src/service/session.rs
@@ -49,7 +49,12 @@ pub enum State {
        protocol: Protocol,
    },
    /// When a peer is disconnected.
-
    Disconnected { since: LocalTime },
+
    Disconnected {
+
        /// Since when has this peer been disconnected.
+
        since: LocalTime,
+
        /// When to retry the connection.
+
        retry_at: LocalTime,
+
    },
}

impl fmt::Display for State {
@@ -294,8 +299,12 @@ impl Session {

    /// Move the session state to "disconnected". Returns any pending RID
    /// that was requested.
-
    pub fn to_disconnected(&mut self, since: LocalTime) -> Option<Id> {
-
        let request = if let State::Connected {
+
    pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
+
        self.state = State::Disconnected { since, retry_at };
+
    }
+

+
    pub fn requesting(&self) -> Option<Id> {
+
        if let State::Connected {
            protocol: Protocol::Gossip { requested },
            ..
        } = self.state
@@ -303,10 +312,7 @@ impl Session {
            requested
        } else {
            None
-
        };
-
        self.state = State::Disconnected { since };
-

-
        request
+
        }
    }

    pub fn ping(&mut self, reactor: &mut Reactor) -> Result<(), Error> {
modified radicle-node/src/tests.rs
@@ -822,17 +822,16 @@ fn test_persistent_peer_reconnect_attempt() {
    alice.disconnected(eve.id(), &DisconnectReason::Dial(error.clone()));
    assert_matches!(alice.outbox().next(), None);

-
    for _ in 0..MAX_CONNECTION_ATTEMPTS {
+
    for _ in 0..3 {
        alice.disconnected(bob.id(), &DisconnectReason::Connection(error.clone()));
-
        assert_matches!(alice.outbox().next(), Some(Io::Connect(a, _)) if a == bob.id());
-
        assert_matches!(alice.outbox().next(), None);
+
        alice.elapse(service::MAX_RECONNECTION_DELTA);
+
        alice
+
            .outbox()
+
            .find(|io| matches!(io, Io::Connect(a, _) if a == &bob.id()))
+
            .unwrap();

        alice.attempted(bob.id(), &bob.address());
    }
-

-
    // After the max connection attempts, a disconnect doesn't trigger a reconnect.
-
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
-
    assert_matches!(alice.outbox().next(), None);
}

#[test]
@@ -862,6 +861,7 @@ fn test_persistent_peer_reconnect_success() {
    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
+
    alice.elapse(service::MIN_RECONNECTION_DELTA);

    alice
        .outbox()