Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix reconnection logic
Alexis Sellier committed 3 years ago
commit d3f4189324b11a48416cb00941fd3c84ab5bab27
parent 4438a1049844dffd140ef18eb389e872f1bfe7c4
5 files changed +45 -48
modified radicle-node/src/service.rs
@@ -452,8 +452,8 @@ where
        debug!(target: "service", "Received command {:?}", cmd);

        match cmd {
-
            Command::Connect(id, addr) => {
-
                self.connect(id, addr);
+
            Command::Connect(nid, addr) => {
+
                self.connect(nid, addr);
            }
            Command::Seeds(rid, resp) => match self.seeds(&rid) {
                Ok(seeds) => {
@@ -712,6 +712,7 @@ where
                return;
            }
        };
+
        let link = session.link;

        // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
        // return a failure to any potential fetcher.
@@ -726,33 +727,29 @@ where

        // Attempt to re-connect to persistent peers.
        if self.config.peer(&remote).is_some() {
-
            if reason.is_transient() {
-
                let delay =
-
                    LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
-
                        .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
+
            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
+
                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);

-
                session.to_disconnected(since, since + delay);
+
            // Nb. We always try to reconnect to persistent peers, even when the error appears
+
            // to not be transient.
+
            session.to_disconnected(since, since + delay);

-
                debug!(target: "service", "Reconnecting to {remote} in {delay}..");
+
            debug!(target: "service", "Reconnecting to {remote} in {delay}..");

-
                self.reactor.wakeup(delay);
-
            } else {
-
                // TODO: Only handle error transience for non-persistent peers.
-
                warn!(target: "service", "Permanently dropping persistent peer {remote} session due to non-transient error: {reason}");
-

-
                self.sessions.remove(&remote);
-
            }
+
            self.reactor.wakeup(delay);
        } else {
            self.sessions.remove(&remote);
-
            self.maintain_connections();
+
            // Only re-attempt outbound connections, since we don't care if an inbound connection
+
            // is dropped.
+
            if link.is_outbound() {
+
                self.maintain_connections();
+
            }
        }
    }

    pub fn received_message(&mut self, remote: NodeId, message: Message) {
        match self.handle_message(&remote, message) {
-
            Err(session::Error::NotFound(id)) => {
-
                error!("Session not found for {id}");
-
            }
+
            Ok(_) => {}
            Err(err) => {
                // If there's an error, stop processing messages from this peer.
                // However, we still relay messages returned up to this point.
@@ -762,7 +759,6 @@ where
                // FIXME: The peer should be set in a state such that we don't
                // process further messages.
            }
-
            Ok(()) => {}
        }
    }

@@ -979,7 +975,8 @@ where
        message: Message,
    ) -> Result<(), session::Error> {
        let Some(peer) = self.sessions.get_mut(remote) else {
-
            return Err(session::Error::NotFound(*remote));
+
            warn!(target: "service", "Session not found for {remote}");
+
            return Ok(());
        };
        peer.last_active = self.clock;

@@ -1495,7 +1492,7 @@ where
        let now = self.local_time();
        let mut reconnect = Vec::new();

-
        for (nid, session) in self.sessions.disconnected_mut() {
+
        for (nid, session) in self.sessions.iter_mut() {
            if let Some(addr) = self.config.peer(nid) {
                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
@@ -1593,8 +1590,8 @@ impl DisconnectReason {
        match self {
            Self::Dial(_) => false,
            Self::Connection(_) => true,
-
            Self::Session(..) => false,
            Self::Fetch(_) => true,
+
            Self::Session(err) => err.is_transient(),
        }
    }
}
modified radicle-node/src/service/session.rs
@@ -5,7 +5,6 @@ use radicle::storage::Namespaces;

use crate::service::message;
use crate::service::message::Message;
-
use crate::service::storage;
use crate::service::{Id, LocalTime, NodeId, Reactor, Rng};
use crate::Link;

@@ -120,22 +119,23 @@ pub enum FetchResult {

#[derive(thiserror::Error, Debug)]
pub enum Error {
-
    #[error("wrong protocol version in message: {0}")]
-
    WrongVersion(u32),
    #[error("invalid announcement timestamp: {0}")]
    InvalidTimestamp(u64),
-
    #[error("session not found for node `{0}`")]
-
    NotFound(NodeId),
-
    #[error("verification failed on fetch: {0}")]
-
    VerificationFailed(#[from] storage::VerifyError),
    #[error("peer misbehaved")]
    Misbehavior,
    #[error("peer timed out")]
    Timeout,
-
    #[error("handshake error")]
-
    Handshake(String),
-
    #[error("failed to inspect remotes for fetch: {0}")]
-
    Remotes(#[from] storage::refs::Error),
+
}
+

+
impl Error {
+
    /// Check whether this error is transient.
+
    pub fn is_transient(&self) -> bool {
+
        match self {
+
            Self::InvalidTimestamp(_) => false,
+
            Self::Misbehavior => false,
+
            Self::Timeout => true,
+
        }
+
    }
}

/// A peer session. Each connected peer will have one session.
modified radicle-node/src/test/peer.rs
@@ -126,12 +126,17 @@ where
        name: &'static str,
        ip: impl Into<net::IpAddr>,
        storage: S,
-
        config: Config<G>,
+
        mut config: Config<G>,
    ) -> Self {
        let routing = routing::Table::memory().unwrap();
        let tracking = tracking::Store::memory().unwrap();
        let tracking = tracking::Config::new(config.policy, config.scope, tracking);
        let id = *config.signer.public_key();
+
        let ip = ip.into();
+
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));
+

+
        // Make sure the peer address is advertized.
+
        config.config.external_addresses.push(local_addr.into());

        let emitter: Emitter<Event> = Default::default();
        let service = Service::new(
@@ -145,8 +150,6 @@ where
            config.rng.clone(),
            emitter,
        );
-
        let ip = ip.into();
-
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));

        Self {
            name,
modified radicle-node/src/test/simulator.rs
@@ -491,8 +491,6 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
            Io::Connect(remote, addr) => {
                assert!(remote != node, "self-connections are not allowed");

-
                let latency = self.latency(node, remote);
-

                self.inbox.insert(
                    self.time + MIN_LATENCY,
                    Scheduled {
@@ -525,6 +523,8 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    return;
                }

+
                let latency = MIN_LATENCY + self.latency(node, remote);
+

                self.inbox.insert(
                    // The remote will get the connection attempt with some latency.
                    self.time + latency,
modified radicle-node/src/tests.rs
@@ -900,16 +900,12 @@ fn test_persistent_peer_reconnect_attempt() {
    //
    // Now let's disconnect a peer.

-
    // A transient error such as this will cause Alice to attempt a reconnection.
-
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
-

-
    // A non-transient disconnect, such as one requested by the user will not trigger
-
    // a reconnection.
-
    alice.disconnected(eve.id(), &DisconnectReason::Dial(error.clone()));
-
    assert_matches!(alice.outbox().next(), None);
+
    // A non-transient disconnect, such as one due to peer misbehavior will still trigger a
+
    // a reconnection, since this is a persistent peer.
+
    let reason = DisconnectReason::Session(session::Error::Misbehavior);

    for _ in 0..3 {
-
        alice.disconnected(bob.id(), &DisconnectReason::Connection(error.clone()));
+
        alice.disconnected(bob.id(), &reason);
        alice.elapse(service::MAX_RECONNECTION_DELTA);
        alice
            .outbox()
@@ -1259,7 +1255,8 @@ fn prop_inventory_exchange_dense() {
        }
    }
    qcheck::QuickCheck::new()
-
        .gen(qcheck::Gen::new(8))
+
        .gen(qcheck::Gen::new(5))
+
        .tests(20)
        .quickcheck(property as fn(MockStorage, MockStorage, MockStorage));
}