Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Change behavior on conflicting connections
cloudhead committed 2 years ago
commit 0bcec941ee29aaf88db7402491f0bfa53f6ae352
parent e695d06bbba0bcd32d9a76473e79b85f3caac766
7 files changed +273 -113
modified radicle-node/src/runtime/handle.rs
@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::{fmt, io, time};

use crossbeam_channel as chan;
-
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
+
use radicle::node::{ConnectOptions, ConnectResult, Link, Seeds};
use radicle::storage::refs::RefsAt;
use reactor::poller::popol::PopolWaker;
use thiserror::Error;
@@ -264,6 +264,11 @@ impl radicle::node::Handle for Handle {
                .iter()
                .map(|(nid, s)| radicle::node::Session {
                    nid: *nid,
+
                    link: if s.link.is_inbound() {
+
                        Link::Inbound
+
                    } else {
+
                        Link::Outbound
+
                    },
                    addr: s.addr.clone(),
                    state: s.state.clone(),
                })
modified radicle-node/src/service.rs
@@ -1148,8 +1148,19 @@ where
        }
    }

-
    pub fn disconnected(&mut self, remote: NodeId, reason: &DisconnectReason) {
+
    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
        let since = self.local_time();
+
        let Some(session) = self.sessions.get_mut(&remote) else {
+
            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
+
            // disconnection event once the transport is dropped.
+
            trace!(target: "service", "Redundant disconnection for {} ({})", remote, reason);
+
            return;
+
        };
+
        // In cases of connection conflicts, there may be disconnections of one of the two
+
        // connections. In that case we don't want the service to remove the session.
+
        if session.link != link {
+
            return;
+
        }

        info!(target: "service", "Disconnected from {} ({})", remote, reason);
        self.emitter.emit(Event::PeerDisconnected {
@@ -1157,13 +1168,6 @@ where
            reason: reason.to_string(),
        });

-
        let Some(session) = self.sessions.get_mut(&remote) else {
-
            if cfg!(debug_assertions) {
-
                panic!("Service::disconnected: unknown session {remote}");
-
            } else {
-
                return;
-
            }
-
        };
        let link = session.link;
        let addr = session.addr.clone();

@@ -1210,7 +1214,9 @@ where
                    }
                }
                DisconnectReason::Session(e) => e.severity(),
-
                DisconnectReason::Command => Severity::Low,
+
                DisconnectReason::Command
+
                | DisconnectReason::Conflict
+
                | DisconnectReason::SelfConnection => Severity::Low,
            };

            if let Err(e) = self
@@ -2307,6 +2313,10 @@ pub enum DisconnectReason {
    Fetch(FetchError),
    /// Session error.
    Session(session::Error),
+
    /// Session conflicts with existing session.
+
    Conflict,
+
    /// Connection to self.
+
    SelfConnection,
    /// User requested disconnect
    Command,
}
@@ -2333,6 +2343,8 @@ impl fmt::Display for DisconnectReason {
            Self::Dial(err) => write!(f, "{err}"),
            Self::Connection(err) => write!(f, "{err}"),
            Self::Command => write!(f, "command"),
+
            Self::SelfConnection => write!(f, "self-connection"),
+
            Self::Conflict => write!(f, "conflict"),
            Self::Session(err) => write!(f, "{err}"),
            Self::Fetch(err) => write!(f, "fetch: {err}"),
        }
modified radicle-node/src/test/simulator.rs
@@ -396,7 +396,8 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            log::error!(target: "sim", "Connection is attempted and connected at the same time");
                        }
                        if attempt || connection {
-
                            p.disconnected(id, &reason);
+
                            p.disconnected(id, Link::Inbound, &reason);
+
                            p.disconnected(id, Link::Outbound, &reason);
                        }
                    }
                    Input::Wake => p.wake(),
modified radicle-node/src/tests.rs
@@ -900,7 +900,11 @@ fn test_refs_announcement_offline() {
        .addresses_mut()
        .remove(&bob.id)
        .unwrap(); // Make sure we don't reconnect automatically.
-
    alice.disconnected(bob.id, &DisconnectReason::Session(session::Error::Timeout));
+
    alice.disconnected(
+
        bob.id,
+
        Link::Outbound,
+
        &DisconnectReason::Session(session::Error::Timeout),
+
    );
    alice.outbox().for_each(drop);
    alice.restart();
    alice.connect_to(&bob);
@@ -1079,7 +1083,7 @@ fn test_persistent_peer_reconnect_attempt() {
    let reason = DisconnectReason::Session(session::Error::Misbehavior);

    for _ in 0..3 {
-
        alice.disconnected(bob.id(), &reason);
+
        alice.disconnected(bob.id(), Link::Outbound, &reason);
        alice.elapse(service::MAX_RECONNECTION_DELTA);
        alice
            .outbox()
@@ -1116,7 +1120,11 @@ fn test_persistent_peer_reconnect_success() {

    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
-
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
+
    alice.disconnected(
+
        bob.id(),
+
        Link::Outbound,
+
        &DisconnectReason::Connection(error),
+
    );
    alice.elapse(service::MIN_RECONNECTION_DELTA);
    alice.elapse(service::MIN_RECONNECTION_DELTA); // Trigger a second wakeup to test idempotence.

@@ -1163,7 +1171,7 @@ fn test_maintain_connections() {
    // A non-transient error such as this will cause Alice to attempt a different peer.
    let error = session::Error::Misbehavior;
    for peer in connected.iter() {
-
        alice.disconnected(peer.id(), &DisconnectReason::Session(error));
+
        alice.disconnected(peer.id(), Link::Outbound, &DisconnectReason::Session(error));

        let id = alice
            .outbox()
@@ -1197,7 +1205,11 @@ fn test_maintain_connections_transient() {
    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    for peer in connected.iter() {
-
        alice.disconnected(peer.id(), &DisconnectReason::Connection(error.clone()));
+
        alice.disconnected(
+
            peer.id(),
+
            Link::Outbound,
+
            &DisconnectReason::Connection(error.clone()),
+
        );
        alice
            .outbox()
            .find(|o| matches!(o, Io::Connect(id, _) if id == &peer.id()))
@@ -1212,9 +1224,9 @@ fn test_maintain_connections_failed_attempt() {
    let reason =
        DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)));

-
    alice.connect_to(&eve);
    // Make sure Alice knows about Eve.
-
    alice.disconnected(eve.id(), &reason);
+
    alice.connect_to(&eve);
+
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    alice
        .outbox()
        .find(|o| matches!(o, Io::Connect(id, _) if id == &eve.id))
@@ -1222,7 +1234,7 @@ fn test_maintain_connections_failed_attempt() {
    alice.attempted(eve.id, eve.addr());

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
-
    alice.disconnected(eve.id(), &reason);
+
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    assert_matches!(
        alice.outbox().find(|o| matches!(o, Io::Connect(_, _))),
        None
@@ -1236,7 +1248,7 @@ fn test_maintain_connections_failed_attempt() {
        .expect("Alice attempts Eve again");

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
-
    alice.disconnected(eve.id(), &reason);
+
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    assert!(!alice.outbox().any(|o| matches!(o, Io::Connect(_, _))));
    // Or even after some short time..
    alice.elapse(MIN_RECONNECTION_DELTA);
modified radicle-node/src/tests/e2e.rs
@@ -1,7 +1,7 @@
use std::{collections::HashSet, thread, time};

use radicle::crypto::{test::signer::MockSigner, Signer};
-
use radicle::node::{Alias, FetchResult, Handle as _, DEFAULT_TIMEOUT};
+
use radicle::node::{Alias, ConnectResult, FetchResult, Handle as _, DEFAULT_TIMEOUT};
use radicle::storage::{
    ReadRepository, ReadStorage, RefUpdate, RemoteRepository, SignRepository, ValidateRepository,
    WriteRepository, WriteStorage,
@@ -769,10 +769,6 @@ fn test_concurrent_fetches() {
}

#[test]
-
#[ignore = "failing"]
-
#[should_panic]
-
// TODO: This test currently passes but the behavior is wrong. The test should not panic.
-
// We should figure out why we end up with no sessions established.
fn test_connection_crossing() {
    logger::init(log::Level::Debug);

@@ -780,33 +776,61 @@ fn test_connection_crossing() {
    let alice = Node::init(tmp.path(), Config::test(Alias::new("alice")));
    let bob = Node::init(tmp.path(), Config::test(Alias::new("bob")));

-
    let mut alice = alice.spawn();
-
    let mut bob = bob.spawn();
+
    let alice = alice.spawn();
+
    let bob = bob.spawn();
+
    let preferred = alice.id.max(bob.id);

-
    alice
-
        .handle
-
        .connect(bob.id, bob.addr.into(), ConnectOptions::default())
-
        .unwrap();
-
    bob.handle
-
        .connect(alice.id, alice.addr.into(), ConnectOptions::default())
-
        .unwrap();
+
    log::debug!(target: "test", "Preferred peer is {preferred}");
+

+
    let t1 = thread::spawn({
+
        let mut alice = alice.handle.clone();
+

+
        move || {
+
            alice
+
                .connect(bob.id, bob.addr.into(), ConnectOptions::default())
+
                .unwrap()
+
        }
+
    });
+
    let t2 = thread::spawn({
+
        let mut bob = bob.handle.clone();
+
        move || {
+
            bob.connect(alice.id, alice.addr.into(), ConnectOptions::default())
+
                .unwrap()
+
        }
+
    });
+

+
    let r1 = t1.join().unwrap();
+
    let r2 = t2.join().unwrap();
+

+
    // Note that the non-preferred peer will have their outbound connection fail, and this
+
    // could already show up as the result of the call here (but not always).
+
    if preferred == alice.id {
+
        assert_matches!(r1, ConnectResult::Connected);
+
    } else {
+
        assert_matches!(r2, ConnectResult::Connected);
+
    }

    thread::sleep(time::Duration::from_secs(1));

-
    let s1 = alice
-
        .handle
-
        .sessions()
-
        .unwrap()
-
        .iter()
-
        .any(|s| s.nid == bob.id);
-
    let s2 = bob
-
        .handle
-
        .sessions()
-
        .unwrap()
-
        .iter()
-
        .any(|s| s.nid == alice.id);
+
    let alice_s = alice.handle.sessions().unwrap();
+
    let bob_s = bob.handle.sessions().unwrap();

-
    assert!(s1 ^ s2, "Exactly one session should be established");
+
    // Both sessions are established.
+
    let s1 = alice_s.iter().find(|s| s.nid == bob.id).unwrap();
+
    let s2 = bob_s.iter().find(|s| s.nid == alice.id).unwrap();
+

+
    log::debug!(target: "test", "{:?}", alice.handle.sessions());
+
    log::debug!(target: "test", "{:?}", bob.handle.sessions());
+

+
    if preferred == alice.id {
+
        assert_eq!(s1.link, radicle::node::Link::Outbound);
+
        assert_eq!(s2.link, radicle::node::Link::Inbound);
+
    } else {
+
        assert_eq!(s1.link, radicle::node::Link::Inbound);
+
        assert_eq!(s2.link, radicle::node::Link::Outbound);
+
    }
+
    assert_eq!(alice_s.len(), 1);
+
    assert_eq!(bob_s.len(), 1);
}

#[test]
modified radicle-node/src/wire/protocol.rs
@@ -178,6 +178,7 @@ enum Peer {
    /// The peer was scheduled for disconnection. Once the transport is handed over
    /// by the reactor, we can consider it disconnected.
    Disconnecting {
+
        link: Link,
        nid: Option<NodeId>,
        reason: DisconnectReason,
    },
@@ -201,6 +202,13 @@ impl Peer {
        }
    }

+
    fn link(&self) -> Link {
+
        match self {
+
            Peer::Connected { link, .. } => *link,
+
            Peer::Disconnecting { link, .. } => *link,
+
        }
+
    }
+

    /// Connected peer.
    fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
        Self::Connected {
@@ -211,20 +219,6 @@ impl Peer {
            streams: Streams::new(link),
        }
    }
-

-
    /// Switch to disconnecting state.
-
    fn disconnecting(&mut self, reason: DisconnectReason) {
-
        if let Self::Connected { nid, streams, .. } = self {
-
            streams.shutdown();
-

-
            *self = Self::Disconnecting {
-
                nid: Some(*nid),
-
                reason,
-
            };
-
        } else {
-
            panic!("Peer::disconnected: session is not connected ({self:?})");
-
        }
-
    }
}

/// Holds connected peers.
@@ -263,9 +257,9 @@ impl Peers {
            .map(|(fd, peer)| (*fd, peer))
    }

-
    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
+
    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
        self.0.iter().filter_map(|(id, peer)| match peer {
-
            Peer::Connected { nid, .. } => Some((*id, nid)),
+
            Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
            Peer::Disconnecting { .. } => None,
        })
    }
@@ -336,22 +330,44 @@ where
        self.actions.push_back(Action::RegisterListener(socket));
    }

-
    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) {
-
        match self.peers.get_mut(&id) {
-
            Some(Peer::Disconnecting { .. }) => {
-
                log::error!(target: "wire", "Peer with id={id} is already disconnecting");
-
            }
-
            Some(peer) => {
-
                log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
-

-
                peer.disconnecting(reason);
-
                self.actions.push_back(Action::UnregisterTransport(id));
-
            }
-
            None => {
+
    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
+
        match self.peers.entry(id) {
+
            Entry::Vacant(_) => {
                // Connecting peer with no session.
                log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
                self.actions.push_back(Action::UnregisterTransport(id));
+

+
                // Check for attempted outbound connections. Unestablished inbound connections don't
+
                // have an NID yet.
+
                self.outbound
+
                    .values()
+
                    .find(|o| o.id == Some(id))
+
                    .map(|o| (o.nid, Link::Outbound))
            }
+
            Entry::Occupied(mut e) => match e.get_mut() {
+
                Peer::Disconnecting { nid, link, .. } => {
+
                    log::error!(target: "wire", "Peer with id={id} is already disconnecting");
+

+
                    nid.map(|n| (n, *link))
+
                }
+
                Peer::Connected {
+
                    nid, streams, link, ..
+
                } => {
+
                    log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
+
                    let nid = *nid;
+
                    let link = *link;
+

+
                    streams.shutdown();
+
                    e.insert(Peer::Disconnecting {
+
                        nid: Some(nid),
+
                        link,
+
                        reason,
+
                    });
+
                    self.actions.push_back(Action::UnregisterTransport(id));
+

+
                    Some((nid, link))
+
                }
+
            },
        }
    }

@@ -435,8 +451,11 @@ where
            log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
        } else if let Some(outbound) = self.outbound.remove(&fd) {
            log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
-
            self.service
-
                .disconnected(outbound.nid, &DisconnectReason::connection());
+
            self.service.disconnected(
+
                outbound.nid,
+
                Link::Outbound,
+
                &DisconnectReason::connection(),
+
            );
        } else {
            log::warn!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
        }
@@ -542,36 +561,10 @@ where
                // Make sure we don't try to connect to ourselves by mistake.
                if &nid == self.signer.public_key() {
                    log::error!(target: "wire", "Self-connection detected, disconnecting..");
+
                    self.disconnect(id, DisconnectReason::SelfConnection);

-
                    self.disconnect(
-
                        id,
-
                        DisconnectReason::Dial(Arc::new(io::Error::from(
-
                            io::ErrorKind::AlreadyExists,
-
                        ))),
-
                    );
                    return;
                }
-
                log::debug!(target: "wire", "Session established with {nid} (id={id}) (fd={fd})");
-

-
                let conflicting = self
-
                    .peers
-
                    .active()
-
                    .filter(|(other, d)| **d == nid && *other != id)
-
                    .map(|(id, _)| id)
-
                    .collect::<Vec<_>>();
-

-
                for id in conflicting {
-
                    log::warn!(
-
                        target: "wire", "Closing conflicting session with {nid} (id={id})"
-
                    );
-
                    self.disconnect(
-
                        id,
-
                        DisconnectReason::Dial(Arc::new(io::Error::from(
-
                            io::ErrorKind::AlreadyExists,
-
                        ))),
-
                    );
-
                }
-

                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
                    (peer.addr, Link::Inbound)
                } else if let Some(peer) = self.outbound.remove(&fd) {
@@ -581,9 +574,97 @@ where
                    log::error!(target: "wire", "Session for {nid} (id={id}) not found");
                    return;
                };
-
                self.peers
-
                    .insert(id, Peer::connected(nid, addr.clone(), link));
-
                self.service.connected(nid, addr.into(), link);
+
                log::debug!(
+
                    target: "wire",
+
                    "Session established with {nid} (id={id}) (fd={fd}) ({})",
+
                    if link.is_inbound() { "inbound" } else { "outbound" }
+
                );
+

+
                // Connections to close.
+
                let mut disconnect = Vec::new();
+

+
                // Handle conflicting connections.
+
                // This is typical when nodes have mutually configured their nodes to connect to
+
                // each other on startup. We handle this by deterministically choosing one node
+
                // whos outbound connection is the one that is kept. The other connections are
+
                // dropped.
+
                {
+
                    // Whether we have precedence in case of conflicting connections.
+
                    // Having precedence means that our outbound connection will win over
+
                    // the other node's outbound connection.
+
                    let precedence = *self.signer.public_key() > nid;
+

+
                    // Pre-existing connections that conflict with this newly established session.
+
                    // Note that we can't know whether a connection is conflicting before we get the
+
                    // remote static key.
+
                    let mut conflicting = Vec::new();
+

+
                    // Active sessions with the same NID but a different Resource ID are conflicting.
+
                    conflicting.extend(
+
                        self.peers
+
                            .active()
+
                            .filter(|(c_id, d, _)| **d == nid && *c_id != id)
+
                            .map(|(c_id, _, link)| (c_id, link)),
+
                    );
+

+
                    // Outbound connection attempts with the same remote key but a different file
+
                    // descriptor are conflicting.
+
                    conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
+
                        if other.nid == nid && *c_fd != fd {
+
                            other.id.map(|c_id| (c_id, Link::Outbound))
+
                        } else {
+
                            None
+
                        }
+
                    }));
+

+
                    for (c_id, c_link) in conflicting {
+
                        // If we have precedence, the inbound connection is closed.
+
                        // In the case where both connections are inbound or outbound,
+
                        // we close the newer connection, ie. the one with the higher
+
                        // resource id.
+
                        let close = match (link, c_link) {
+
                            (Link::Inbound, Link::Outbound) => {
+
                                if precedence {
+
                                    id
+
                                } else {
+
                                    c_id
+
                                }
+
                            }
+
                            (Link::Outbound, Link::Inbound) => {
+
                                if precedence {
+
                                    c_id
+
                                } else {
+
                                    id
+
                                }
+
                            }
+
                            (Link::Inbound, Link::Inbound) => id.max(c_id),
+
                            (Link::Outbound, Link::Outbound) => id.max(c_id),
+
                        };
+

+
                        log::warn!(
+
                            target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
+
                        );
+
                        disconnect.push(close);
+
                    }
+
                }
+
                for id in &disconnect {
+
                    log::warn!(
+
                        target: "wire", "Closing conflicting session (id={id}) with {nid}.."
+
                    );
+
                    // Disconnect and return the associated NID of the peer, if available.
+
                    if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
+
                        // We disconnect the session eagerly because otherwise we will get the new
+
                        // `connected` event before the `disconnect`, resulting in a duplicate
+
                        // connection.
+
                        self.service
+
                            .disconnected(nid, link, &DisconnectReason::Conflict);
+
                    }
+
                }
+
                if !disconnect.contains(&id) {
+
                    self.peers
+
                        .insert(id, Peer::connected(nid, addr.clone(), link));
+
                    self.service.connected(nid, addr.into(), link);
+
                }
            }
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected {
@@ -726,8 +807,11 @@ where
                        }

                        if let Some(id) = peer.id() {
-
                            self.service
-
                                .disconnected(*id, &DisconnectReason::connection());
+
                            self.service.disconnected(
+
                                *id,
+
                                peer.link(),
+
                                &DisconnectReason::connection(),
+
                            );
                        } else {
                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
                        }
@@ -748,7 +832,9 @@ where
        match self.peers.entry(id) {
            Entry::Occupied(e) => {
                match e.get() {
-
                    Peer::Disconnecting { nid, reason, .. } => {
+
                    Peer::Disconnecting {
+
                        nid, reason, link, ..
+
                    } => {
                        log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");

                        // Disconnect TCP stream.
@@ -756,7 +842,13 @@ where

                        // If there is no NID, the service is not aware of the peer.
                        if let Some(nid) = nid {
-
                            self.service.disconnected(*nid, reason);
+
                            // In the case of a conflicting connection, there will be two resources
+
                            // for the peer. However, at the service level, there is only one, and
+
                            // it is identified by NID.
+
                            //
+
                            // Therefore, we specify which of the connections we're closing by
+
                            // passing the `link`.
+
                            self.service.disconnected(*nid, *link, reason);
                        }
                        e.remove();
                    }
@@ -847,8 +939,11 @@ where
                        Err(err) => {
                            log::error!(target: "wire", "Error establishing connection to {addr}: {err}");

-
                            self.service
-
                                .disconnected(node_id, &DisconnectReason::Dial(Arc::new(err)));
+
                            self.service.disconnected(
+
                                node_id,
+
                                Link::Outbound,
+
                                &DisconnectReason::Dial(Arc::new(err)),
+
                            );
                        }
                    }
                }
modified radicle/src/node.rs
@@ -506,10 +506,21 @@ impl Command {
    }
}

+
/// Connection link direction.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub enum Link {
+
    /// Outgoing connection.
+
    Outbound,
+
    /// Incoming connection.
+
    Inbound,
+
}
+

/// An established network connection with a peer.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
    pub nid: NodeId,
+
    pub link: Link,
    pub addr: Address,
    pub state: State,
}