Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Prevent redundant [re-]connections
Alexis Sellier committed 3 years ago
commit cd12741bf205fc69d7ccb96b5ff2fe66a1f947ca
parent 413238a1e0ca08f504c5db2289fdf1f32fa182d2
4 files changed +119 -58
modified radicle-node/src/service.rs
@@ -637,14 +637,15 @@ where
        // Inbound connection attempt.
    }

-
    pub fn attempted(&mut self, id: NodeId, addr: &Address) {
-
        debug!(target: "service", "Attempted connection to {id} ({addr})");
+
    pub fn attempted(&mut self, nid: NodeId, addr: &Address) {
+
        debug!(target: "service", "Attempted connection to {nid} ({addr})");

-
        let persistent = self.config.is_persistent(&id);
-
        self.sessions
-
            .entry(id)
-
            .and_modify(|sess| sess.to_connecting())
-
            .or_insert_with(|| Session::connecting(id, persistent, self.rng.clone()));
+
        if let Some(sess) = self.sessions.get_mut(&nid) {
+
            sess.to_attempted();
+
        } else {
+
            #[cfg(debug_assertions)]
+
            panic!("Service::attempted: unknown session {nid}@{addr}");
+
        }
    }

    pub fn connected(&mut self, remote: NodeId, link: Link) {
@@ -666,9 +667,8 @@ where
                    );
                }
                Entry::Vacant(e) => {
-
                    let peer = e.insert(Session::connected(
+
                    let peer = e.insert(Session::inbound(
                        remote,
-
                        Link::Inbound,
                        self.config.is_persistent(&remote),
                        self.rng.clone(),
                        self.clock,
@@ -684,40 +684,46 @@ where

        debug!(target: "service", "Disconnected from {} ({})", remote, reason);

-
        if let Some(session) = self.sessions.get_mut(&remote) {
-
            // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
-
            // return a failure to any potential fetcher.
-
            if let Some(requested) = session.requesting() {
-
                if let Some(resp) = self.fetch_reqs.remove(&requested) {
-
                    resp.send(FetchResult::Failed {
-
                        reason: format!("disconnected: {reason}"),
-
                    })
-
                    .ok();
-
                }
+
        let Some(session) = self.sessions.get_mut(&remote) else {
+
            if cfg!(debug_assertions) {
+
                panic!("Service::disconnected: unknown session {remote}");
+
            } else {
+
                return;
            }
+
        };

-
            // Attempt to re-connect to persistent peers.
-
            if self.config.peer(&remote).is_some() {
-
                if reason.is_transient() {
-
                    let delay =
-
                        LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
-
                            .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
+
        // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
+
        // return a failure to any potential fetcher.
+
        if let Some(requested) = session.requesting() {
+
            if let Some(resp) = self.fetch_reqs.remove(&requested) {
+
                resp.send(FetchResult::Failed {
+
                    reason: format!("disconnected: {reason}"),
+
                })
+
                .ok();
+
            }
+
        }

-
                    session.to_disconnected(since, since + delay);
+
        // Attempt to re-connect to persistent peers.
+
        if self.config.peer(&remote).is_some() {
+
            if reason.is_transient() {
+
                let delay =
+
                    LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
+
                        .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);

-
                    debug!(target: "service", "Reconnecting to {remote} in {delay}..");
+
                session.to_disconnected(since, since + delay);

-
                    self.reactor.wakeup(delay);
-
                } else {
-
                    // TODO: Only handle error transience for non-persistent peers.
-
                    warn!(target: "service", "Permanently dropping persistent peer {remote} session due to non-transient error: {reason}");
+
                debug!(target: "service", "Reconnecting to {remote} in {delay}..");

-
                    self.sessions.remove(&remote);
-
                }
+
                self.reactor.wakeup(delay);
            } else {
+
                // TODO: Only handle error transience for non-persistent peers.
+
                warn!(target: "service", "Permanently dropping persistent peer {remote} session due to non-transient error: {reason}");
+

                self.sessions.remove(&remote);
-
                self.maintain_connections();
            }
+
        } else {
+
            self.sessions.remove(&remote);
+
            self.maintain_connections();
        }
    }

@@ -1090,7 +1096,7 @@ where
                self.reactor
                    .fetch(peer, rid, FetchDirection::Initiator { namespaces });
            }
-
            (session::State::Connecting { .. }, msg) => {
+
            (session::State::Attempted { .. } | session::State::Initial, msg) => {
                error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id);
            }
            (session::State::Disconnected { .. }, msg) => {
@@ -1248,16 +1254,30 @@ where
        }
    }

-
    fn connect(&mut self, node: NodeId, addr: Address) -> bool {
-
        if self.sessions.is_disconnected(&node) {
-
            self.reactor.connect(node, addr);
+
    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
+
        if let Some(sess) = self.sessions.get_mut(&nid) {
+
            sess.to_initial();
+
            self.reactor.connect(nid, addr);
+

            return true;
        }
-
        log::warn!(target: "service", "Attempted connection to peer {node} which already has a session");
-

        false
    }

+
    fn connect(&mut self, nid: NodeId, addr: Address) -> bool {
+
        if self.sessions.contains_key(&nid) {
+
            log::warn!(target: "service", "Attempted connection to peer {nid} which already has a session");
+
            return false;
+
        }
+
        let persistent = self.config.is_persistent(&nid);
+

+
        self.sessions
+
            .insert(nid, Session::outbound(nid, persistent, self.rng.clone()));
+
        self.reactor.connect(nid, addr);
+

+
        true
+
    }
+

    fn seeds(&self, rid: &Id) -> Result<Seeds, Error> {
        #[derive(Default)]
        pub struct Stats {
@@ -1439,6 +1459,8 @@ where

    /// Maintain persistent peer connections.
    fn maintain_persistent(&mut self) {
+
        debug!(target: "service", "Maintaining persistent peers..");
+

        let now = self.local_time();
        let mut reconnect = Vec::new();

@@ -1449,8 +1471,6 @@ where
                    // even a successful attempt means that we're unlikely to be able to reconnect.

                    if now >= *retry_at {
-
                        // FIXME: Make sure we don't attempt two concurrent outgoing connections
-
                        // to the same peer.
                        reconnect.push((*nid, addr.clone(), session.attempts()));
                    }
                }
@@ -1458,7 +1478,7 @@ where
        }

        for (nid, addr, attempts) in reconnect {
-
            if self.connect(nid, addr) {
+
            if self.reconnect(nid, addr) {
                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
            }
        }
modified radicle-node/src/service/session.rs
@@ -38,7 +38,9 @@ impl Default for Protocol {
#[allow(clippy::large_enum_variant)]
pub enum State {
    /// Initial state for outgoing connections.
-
    Connecting,
+
    Initial,
+
    /// Connection attempted successfully.
+
    Attempted,
    /// Initial state after handshake protocol hand-off.
    Connected {
        /// Connected since this time.
@@ -60,8 +62,11 @@ pub enum State {
impl fmt::Display for State {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::Connecting => {
-
                write!(f, "connecting")
+
            Self::Initial => {
+
                write!(f, "initial")
+
            }
+
            Self::Attempted => {
+
                write!(f, "attempted")
            }
            Self::Connected { protocol, .. } => match protocol {
                Protocol::Gossip {
@@ -163,10 +168,10 @@ impl fmt::Display for Session {
}

impl Session {
-
    pub fn connecting(id: NodeId, persistent: bool, rng: Rng) -> Self {
+
    pub fn outbound(id: NodeId, persistent: bool, rng: Rng) -> Self {
        Self {
            id,
-
            state: State::Connecting,
+
            state: State::Initial,
            link: Link::Outbound,
            subscribe: None,
            persistent,
@@ -176,7 +181,7 @@ impl Session {
        }
    }

-
    pub fn connected(id: NodeId, link: Link, persistent: bool, rng: Rng, time: LocalTime) -> Self {
+
    pub fn inbound(id: NodeId, persistent: bool, rng: Rng, time: LocalTime) -> Self {
        Self {
            id,
            state: State::Connected {
@@ -184,7 +189,7 @@ impl Session {
                ping: PingState::default(),
                protocol: Protocol::default(),
            },
-
            link,
+
            link: Link::Inbound,
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
@@ -194,7 +199,7 @@ impl Session {
    }

    pub fn is_connecting(&self) -> bool {
-
        matches!(self.state, State::Connecting { .. })
+
        matches!(self.state, State::Attempted { .. })
    }

    pub fn is_connected(&self) -> bool {
@@ -215,6 +220,10 @@ impl Session {
        matches!(self.state, State::Disconnected { .. })
    }

+
    pub fn is_initial(&self) -> bool {
+
        matches!(self.state, State::Initial)
+
    }
+

    pub fn is_requesting(&self) -> bool {
        matches!(
            self.state,
@@ -275,12 +284,12 @@ impl Session {
        }
    }

-
    pub fn to_connecting(&mut self) {
+
    pub fn to_attempted(&mut self) {
        assert!(
-
            self.is_disconnected(),
-
            "Can only transition to 'connecting' state from 'disconnected' state"
+
            self.is_initial(),
+
            "Can only transition to 'attempted' state from 'initial' state"
        );
-
        self.state = State::Connecting;
+
        self.state = State::Attempted;
        self.attempts += 1;
    }

@@ -303,6 +312,16 @@ impl Session {
        self.state = State::Disconnected { since, retry_at };
    }

+
    /// Return to initial state from disconnected state. This state transition
+
    /// happens when we attempt to re-connect to a disconnected peer.
+
    pub fn to_initial(&mut self) {
+
        assert!(
+
            self.is_disconnected(),
+
            "Can only transition to 'initial' state from 'disconnected' state"
+
        );
+
        self.state = State::Initial;
+
    }
+

    pub fn requesting(&self) -> Option<Id> {
        if let State::Connected {
            protocol: Protocol::Gossip { requested },
modified radicle-node/src/test/peer.rs
@@ -21,6 +21,7 @@ use crate::service::*;
use crate::storage::git::transport::remote;
use crate::storage::{RemoteId, WriteStorage};
use crate::test::arbitrary;
+
use crate::test::assert_matches;
use crate::test::simulator;
use crate::test::storage::MockStorage;
use crate::Link;
@@ -272,6 +273,11 @@ where
        let remote_addr = simulator::Peer::<S, G>::addr(peer);

        self.initialize();
+
        self.service
+
            .command(Command::Connect(remote_id, remote_addr.clone()));
+

+
        assert_matches!(self.outbox().next(), Some(Io::Connect { .. }));
+

        self.service.attempted(remote_id, &remote_addr);
        self.service.connected(remote_id, Link::Outbound);

modified radicle-node/src/tests.rs
@@ -110,6 +110,23 @@ fn test_disconnecting_unresponsive_peer() {
}

#[test]
+
fn test_redundant_connect() {
+
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
+
    let bob = Peer::new("bob", [9, 9, 9, 9]);
+

+
    alice.command(Command::Connect(bob.id(), bob.address()));
+
    alice.command(Command::Connect(bob.id(), bob.address()));
+
    alice.command(Command::Connect(bob.id(), bob.address()));
+

+
    // Only one connection attempt is made.
+
    assert_matches!(
+
        alice.outbox().collect::<Vec<_>>().as_slice(),
+
        [Io::Connect(id, addr)]
+
        if *id == bob.id() && *addr == bob.addr()
+
    );
+
}
+

+
#[test]
fn test_connection_kept_alive() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let mut bob = Peer::new("bob", [9, 9, 9, 9]);
@@ -854,14 +871,13 @@ fn test_persistent_peer_reconnect_success() {
            ..peer::Config::default()
        },
    );
-

-
    alice.attempted(bob.id(), &bob.addr());
-
    alice.connected(bob.id(), Link::Outbound);
+
    alice.connect_to(&bob);

    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
    alice.elapse(service::MIN_RECONNECTION_DELTA);
+
    alice.elapse(service::MIN_RECONNECTION_DELTA); // Trigger a second wakeup to test idempotence.

    alice
        .outbox()