Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix bug with persistent peer reconnect
Alexis Sellier committed 3 years ago
commit 7bdedd0ffd6c22c53b61ff432612121e7469c9ba
parent 2ceb39553e0e3f3c898c00c1a035bd56d62be14c
5 files changed +142 -26
modified radicle-node/src/service.rs
@@ -337,7 +337,7 @@ where
        // Connect to configured peers.
        let addrs = self.config.connect.clone();
        for (id, addr) in addrs {
-
            self.reactor.connect(id, addr);
+
            self.connect(id, addr);
        }
        // Ensure that our inventory is recorded in our routing table.
        for id in self.storage.inventory()? {
@@ -397,7 +397,9 @@ where
        debug!(target: "service", "Received command {:?}", cmd);

        match cmd {
-
            Command::Connect(id, addr) => self.reactor.connect(id, addr),
+
            Command::Connect(id, addr) => {
+
                self.connect(id, addr);
+
            }
            Command::Seeds(rid, resp) => {
                let (connected, unconnected) = match self.routing.get(&rid) {
                    Ok(seeds) => seeds
@@ -558,8 +560,8 @@ where
        let persistent = self.config.is_persistent(&id);
        self.sessions
            .entry(id)
-
            .or_insert_with(|| Session::connecting(id, persistent, self.rng.clone()))
-
            .attempted();
+
            .and_modify(|sess| sess.to_connecting())
+
            .or_insert_with(|| Session::connecting(id, persistent, self.rng.clone()));
    }

    pub fn connected(&mut self, remote: NodeId, link: Link) {
@@ -567,7 +569,6 @@ where

        // For outbound connections, we are the first to say "Hello".
        // For inbound connections, we wait for the remote to say "Hello" first.
-
        // TODO: How should we deal with multiple peers connecting from the same IP address?
        if link.is_outbound() {
            if let Some(peer) = self.sessions.get_mut(&remote) {
                self.reactor.write_all(
@@ -583,16 +584,23 @@ where
                peer.to_connected(self.clock);
            }
        } else {
-
            self.sessions.insert(
-
                remote,
-
                Session::connected(
-
                    remote,
-
                    Link::Inbound,
-
                    self.config.is_persistent(&remote),
-
                    self.rng.clone(),
-
                    self.clock,
-
                ),
-
            );
+
            match self.sessions.entry(remote) {
+
                Entry::Occupied(e) => {
+
                    error!(
+
                        target: "service",
+
                        "Connecting peer {remote} already has a session open ({})", e.get()
+
                    );
+
                }
+
                Entry::Vacant(e) => {
+
                    e.insert(Session::connected(
+
                        remote,
+
                        Link::Inbound,
+
                        self.config.is_persistent(&remote),
+
                        self.rng.clone(),
+
                        self.clock,
+
                    ));
+
                }
+
            }
        }
    }

@@ -624,7 +632,7 @@ where
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
                    // even a successful attempt means that we're unlikely to be able to reconnect.

-
                    self.reactor.connect(remote, address.clone());
+
                    self.connect(remote, address.clone());
                }
            } else {
                self.sessions.remove(&remote);
@@ -1010,6 +1018,16 @@ where
        Ok(())
    }

+
    fn connect(&mut self, node: NodeId, addr: Address) -> bool {
+
        if self.sessions.is_unconnected(&node) {
+
            self.reactor.connect(node, addr);
+
            return true;
+
        }
+
        log::warn!(target: "service", "Attempted connection to peer {node} which already has a session");
+

+
        false
+
    }
+

    ////////////////////////////////////////////////////////////////////////////
    // Periodic tasks
    ////////////////////////////////////////////////////////////////////////////
@@ -1096,7 +1114,7 @@ where
            debug!(target: "service", "No eligible peers available to connect to");
        }
        for (id, addr) in addrs {
-
            self.reactor.connect(id, addr.clone());
+
            self.connect(id, addr.clone());
        }
    }
}
@@ -1307,6 +1325,11 @@ impl Sessions {
    pub fn is_negotiated(&self, id: &NodeId) -> bool {
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
    }
+

+
    /// Return whether this node can be connected to.
+
    pub fn is_unconnected(&self, id: &NodeId) -> bool {
+
        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
+
    }
}

impl Deref for Sessions {
modified radicle-node/src/service/reactor.rs
@@ -53,7 +53,6 @@ impl Reactor {

    /// Connect to a peer.
    pub fn connect(&mut self, id: NodeId, addr: Address) {
-
        // TODO: Make sure we don't try to connect more than once to the same address.
        self.io.push_back(Io::Connect(id, addr));
    }

modified radicle-node/src/service/session.rs
@@ -1,3 +1,5 @@
+
use std::fmt;
+

use crate::service::message;
use crate::service::message::Message;
use crate::service::storage;
@@ -52,6 +54,27 @@ pub enum State {
    Disconnected { since: LocalTime },
}

+
impl fmt::Display for State {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::Connecting => {
+
                write!(f, "connecting")
+
            }
+
            Self::Connected { protocol, .. } => match protocol {
+
                Protocol::Gossip { .. } => {
+
                    write!(f, "connected <gossip>")
+
                }
+
                Protocol::Fetch => {
+
                    write!(f, "connected <fetch>")
+
                }
+
            },
+
            Self::Disconnected { .. } => {
+
                write!(f, "disconnected")
+
            }
+
        }
+
    }
+
}
+

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("wrong protocol version in message: {0}")]
@@ -96,6 +119,25 @@ pub struct Session {
    rng: Rng,
}

+
impl fmt::Display for Session {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        let mut attrs = Vec::new();
+
        let state = self.state.to_string();
+

+
        if self.link.is_inbound() {
+
            attrs.push("inbound");
+
        } else {
+
            attrs.push("outbound");
+
        }
+
        if self.persistent {
+
            attrs.push("persistent");
+
        }
+
        attrs.push(state.as_str());
+

+
        write!(f, "{}", attrs.join(" "))
+
    }
+
}
+

impl Session {
    pub fn connecting(id: NodeId, persistent: bool, rng: Rng) -> Self {
        Self {
@@ -105,7 +147,7 @@ impl Session {
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
-
            attempts: 0,
+
            attempts: 1,
            rng,
        }
    }
@@ -136,6 +178,10 @@ impl Session {
        matches!(self.state, State::Connected { .. })
    }

+
    pub fn is_disconnected(&self) -> bool {
+
        matches!(self.state, State::Disconnected { .. })
+
    }
+

    pub fn is_negotiated(&self) -> bool {
        matches!(
            self.state,
@@ -150,10 +196,6 @@ impl Session {
        self.attempts
    }

-
    pub fn attempted(&mut self) {
-
        self.attempts += 1;
-
    }
-

    pub fn fetch(&mut self, rid: Id) -> Option<Message> {
        if let State::Connected { protocol, .. } = &mut self.state {
            if *protocol == (Protocol::Gossip { requested: None }) {
@@ -168,6 +210,15 @@ impl Session {
        None
    }

+
    pub fn to_connecting(&mut self) {
+
        assert!(
+
            self.is_disconnected(),
+
            "Can only transition to 'connecting' state from 'disconnected' state"
+
        );
+
        self.state = State::Connecting;
+
        self.attempts += 1;
+
    }
+

    pub fn to_connected(&mut self, since: LocalTime) {
        assert!(
            self.is_connecting(),
modified radicle-node/src/test/peer.rs
@@ -141,7 +141,10 @@ where

    pub fn initialize(&mut self) {
        if !self.initialized {
-
            info!("{}: Initializing: address = {}", self.name, self.ip);
+
            info!(
+
                "{}: Initializing: id = {}, address = {}",
+
                self.name, self.id, self.ip
+
            );

            self.initialized = true;
            self.service.initialize(LocalTime::now()).unwrap();
modified radicle-node/src/tests.rs
@@ -5,6 +5,7 @@ use std::io;
use std::sync::Arc;

use crossbeam_channel as chan;
+
use netservices::LinkDirection as Link;

use crate::collections::{HashMap, HashSet};
use crate::crypto::test::signer::MockSigner;
@@ -688,7 +689,7 @@ fn test_inventory_relay() {
}

#[test]
-
fn test_persistent_peer_reconnect() {
+
fn test_persistent_peer_reconnect_attempt() {
    let mut bob = Peer::new("bob", [8, 8, 8, 8]);
    let mut eve = Peer::new("eve", [9, 9, 9, 9]);
    let mut alice = Peer::config(
@@ -747,6 +748,46 @@ fn test_persistent_peer_reconnect() {
}

#[test]
+
fn test_persistent_peer_reconnect_success() {
+
    let bob = Peer::config(
+
        "bob",
+
        [9, 9, 9, 9],
+
        MockStorage::empty(),
+
        peer::Config::default(),
+
    );
+
    let mut alice = Peer::config(
+
        "alice",
+
        [7, 7, 7, 7],
+
        MockStorage::empty(),
+
        peer::Config {
+
            config: Config {
+
                connect: vec![(bob.id, bob.addr())],
+
                ..Config::default()
+
            },
+
            ..peer::Config::default()
+
        },
+
    );
+

+
    alice.attempted(bob.id(), &bob.addr());
+
    alice.connected(bob.id(), Link::Outbound);
+

+
    // A transient error such as this will cause Alice to attempt a reconnection.
+
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
+
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
+

+
    alice
+
        .outbox()
+
        .find_map(|o| match o {
+
            Io::Connect(id, _) => Some(id),
+
            _ => None,
+
        })
+
        .expect("Alice attempts a re-connection");
+

+
    alice.attempted(bob.id(), &bob.addr());
+
    alice.connected(bob.id(), Link::Outbound);
+
}
+

+
#[test]
fn test_maintain_connections() {
    // Peers alice starts out connected to.
    let connected = vec![
@@ -923,7 +964,6 @@ fn prop_inventory_exchange_dense() {
        bob.command(Command::Connect(alice.id(), alice.address()));
        bob.command(Command::Connect(eve.id(), eve.address()));
        eve.command(Command::Connect(alice.id(), alice.address()));
-
        eve.command(Command::Connect(bob.id(), bob.address()));

        let mut peers: HashMap<_, _> = [
            (alice.node_id(), alice),