Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix some connection-related logic
Alexis Sellier committed 3 years ago
commit f5f36191d9320e7d570b0c5ce29d791878db70e1
parent 7bdedd0ffd6c22c53b61ff432612121e7469c9ba
5 files changed +88 -40
modified radicle-node/src/runtime.rs
@@ -194,7 +194,9 @@ impl<G: Signer + EcSign + 'static> Runtime<G> {
                return Err(err.into());
            }
        };
-
        let control = thread::spawn(move || control::listen(listener, self.handle));
+
        let control = thread::Builder::new()
+
            .name(self.id.to_human())
+
            .spawn(move || control::listen(listener, self.handle))?;

        log::info!(target: "node", "Spawning git daemon at {}..", self.storage.path().display());

modified radicle-node/src/service.rs
@@ -586,7 +586,7 @@ where
        } else {
            match self.sessions.entry(remote) {
                Entry::Occupied(e) => {
-
                    error!(
+
                    warn!(
                        target: "service",
                        "Connecting peer {remote} already has a session open ({})", e.get()
                    );
modified radicle-node/src/test/environment.rs
@@ -58,6 +58,7 @@ impl Environment {
        let signer = MemorySigner::load(&profile.keystore, "radicle".to_owned().into()).unwrap();

        Node {
+
            id: *profile.id(),
            home: profile.home,
            signer,
            storage: profile.storage,
@@ -93,6 +94,7 @@ impl Environment {

/// A node that can be run.
pub struct Node<G> {
+
    pub id: NodeId,
    pub home: Home,
    pub signer: G,
    pub storage: Storage,
@@ -172,6 +174,7 @@ impl Node<MockSigner> {
        let storage = Storage::open(home.storage()).unwrap();

        Self {
+
            id: *signer.public_key(),
            home,
            signer,
            storage,
modified radicle-node/src/tests/e2e.rs
@@ -1,3 +1,5 @@
+
use std::{thread, time};
+

use radicle::crypto::Signer;
use radicle::node::{FetchResult, Handle as _};
use radicle::storage::{ReadRepository, WriteStorage};
@@ -267,3 +269,29 @@ fn test_fetch_up_to_date() {
    let result = alice.handle.fetch(acme, bob.id).unwrap();
    assert_eq!(result.success(), Some(vec![]));
}
+

+
#[test]
+
#[ignore = "failing"]
+
#[should_panic]
+
// TODO: This test currently passes but the behavior is wrong. The test should not panic.
+
// We should figure out why we end up with no sessions established.
+
fn test_connection_crossing() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let alice = Node::init(tmp.path());
+
    let bob = Node::init(tmp.path());
+

+
    let mut alice = alice.spawn(service::Config::default());
+
    let mut bob = bob.spawn(service::Config::default());
+

+
    alice.handle.connect(bob.id, bob.addr.into()).unwrap();
+
    bob.handle.connect(alice.id, alice.addr.into()).unwrap();
+

+
    thread::sleep(time::Duration::from_secs(1));
+

+
    let s1 = alice.handle.sessions().unwrap().contains_key(&bob.id);
+
    let s2 = bob.handle.sessions().unwrap().contains_key(&alice.id);
+

+
    assert!(s1 ^ s2, "Exactly one session should be established");
+
}
modified radicle-node/src/wire/protocol.rs
@@ -62,7 +62,7 @@ type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSes
/// Peer connection state machine.
enum Peer {
    /// The initial state before handshake is completed.
-
    Connecting { link: Link },
+
    Connecting { link: Link, id: Option<NodeId> },
    /// The state after handshake is completed.
    /// Peers in this state are handled by the underlying service.
    Connected { link: Link, id: NodeId },
@@ -86,9 +86,10 @@ enum Peer {
impl std::fmt::Debug for Peer {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
-
            Self::Connecting { link } => write!(f, "Connecting({link:?})"),
+
            Self::Connecting { link, id: Some(id) } => write!(f, "Connecting({link:?}, {id})"),
+
            Self::Connecting { link, id: None } => write!(f, "Connecting({link:?})"),
            Self::Connected { link, id } => write!(f, "Connected({link:?}, {id})"),
-
            Self::Disconnected { reason, id } => write!(f, "Disconnected({reason}, {id:?})"),
+
            Self::Disconnected { .. } => write!(f, "Disconnected"),
            Self::Upgrading { fetch, link, id } => write!(
                f,
                "Upgrading(initiated={}, {link:?}, {id})",
@@ -111,14 +112,14 @@ impl Peer {
        }
    }

-
    /// Return a new connecting peer.
-
    fn connecting(link: Link) -> Self {
-
        Self::Connecting { link }
+
    /// Return a new inbound connecting peer.
+
    fn connecting(link: Link, id: Option<NodeId>) -> Self {
+
        Self::Connecting { link, id }
    }

    /// Switch to connected state.
    fn connected(&mut self, id: NodeId) {
-
        if let Self::Connecting { link } = self {
+
        if let Self::Connecting { link, .. } = self {
            *self = Self::Connected { link: *link, id };
        } else {
            panic!("Peer::connected: session for {id} is already established");
@@ -132,8 +133,8 @@ impl Peer {
                id: Some(*id),
                reason,
            };
-
        } else if let Self::Connecting { .. } = self {
-
            *self = Self::Disconnected { id: None, reason };
+
        } else if let Self::Connecting { id, .. } = self {
+
            *self = Self::Disconnected { id: *id, reason };
        } else {
            panic!("Peer::disconnected: session is not connected ({self:?})");
        }
@@ -262,6 +263,17 @@ where
        }
    }

+
    fn active(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
+
        self.peers.iter().filter_map(|(fd, peer)| match peer {
+
            Peer::Connecting { id: Some(id), .. } => Some((*fd, id)),
+
            Peer::Connecting { id: None, .. } => None,
+
            Peer::Connected { id, .. } => Some((*fd, id)),
+
            Peer::Upgrading { id, .. } => Some((*fd, id)),
+
            Peer::Upgraded { id, .. } => Some((*fd, id)),
+
            Peer::Disconnected { .. } => None,
+
        })
+
    }
+

    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
        self.peers.iter().filter_map(|(fd, peer)| {
            if let Peer::Connected { id, .. } = peer {
@@ -379,8 +391,10 @@ where
                    "Accepting inbound peer connection from {}..",
                    connection.remote_addr()
                );
-
                self.peers
-
                    .insert(connection.as_raw_fd(), Peer::connecting(Link::Inbound));
+
                self.peers.insert(
+
                    connection.as_raw_fd(),
+
                    Peer::connecting(Link::Inbound, None),
+
                );

                let session = WireSession::accept::<{ Sha256::OUTPUT_LEN }>(
                    connection,
@@ -416,11 +430,11 @@ where
                state: Cert { pk: node_id, .. },
                ..
            }) => {
-
                log::debug!(target: "wire", "Session established with {node_id}");
+
                log::debug!(target: "wire", "Session established with {node_id} (fd={fd})");

                let conflicting = self
-
                    .connected()
-
                    .filter(|(_, id)| **id == node_id)
+
                    .active()
+
                    .filter(|(other, id)| **id == node_id && *other != fd)
                    .map(|(fd, _)| fd)
                    .collect::<Vec<_>>();

@@ -440,13 +454,15 @@ where
                    log::error!(target: "wire", "Session not found for fd {fd}");
                    return;
                };
-
                let Peer::Connecting { link } = peer else {
+
                let Peer::Connecting { link, .. } = peer else {
                    log::error!(
                        target: "wire",
                        "Session for {node_id} was either not found, or in an invalid state"
                    );
                    return;
                };
+
                log::debug!(target: "wire", "Found connecting peer ({:?})..", link);
+

                let link = *link;

                peer.connected(node_id);
@@ -479,7 +495,6 @@ where
                }
            }
            SessionEvent::Terminated(err) => {
-
                log::debug!(target: "wire", "Session for fd {fd} terminated: {err}");
                self.disconnect(fd, DisconnectReason::Connection(Arc::new(err)));
            }
        }
@@ -518,24 +533,12 @@ where
                // TODO: This should be a fatal error, there's nothing we can do here.
                log::error!(target: "wire", "Received error: listener {} disconnected: {}", id, err);
            }
-
            reactor::Error::TransportPollError(id, err) => {
-
                log::error!(target: "wire", "Received error: peer {} disconnected: {}", id, err);
-
                self.actions.push_back(Action::UnregisterTransport(*id));
+
            reactor::Error::TransportPollError(fd, err) => {
+
                log::error!(target: "wire", "Received error: peer (fd={fd}) disconnected: {err}");
+
                self.actions.push_back(Action::UnregisterTransport(*fd));
            }
-
            // TODO: Why is the error an `i16`?
-
            reactor::Error::TransportDisconnect(id, _, err) => {
-
                if let Some(remote) = self.peers.get(id) {
-
                    if let Some(id) = remote.id() {
-
                        self.service.disconnected(
-
                            *id,
-
                            &DisconnectReason::Connection(Arc::new(io::Error::from(
-
                                io::ErrorKind::ConnectionReset,
-
                            ))),
-
                        );
-
                    }
-
                }
-
                // TODO: Notify service.
-
                log::error!(target: "wire", "Received error: peer {} disconnected: {}", id, err);
+
            reactor::Error::TransportDisconnect(fd, _, err) => {
+
                log::error!(target: "wire", "Received error: peer (fd={fd}) disconnected: {err}");
            }
            reactor::Error::WriteFailure(id, err) => {
                // TODO: Disconnect peer?
@@ -554,6 +557,7 @@ where

    fn handover_transport(&mut self, transport: Self::Transport) {
        let fd = transport.as_raw_fd();
+
        log::debug!(target: "wire", "Received transport handover (fd={fd})");

        match self.peers.get(&fd) {
            Some(Peer::Disconnected { id, reason }) => {
@@ -568,8 +572,6 @@ where
                }
            }
            Some(Peer::Upgrading { .. }) => {
-
                log::debug!(target: "wire", "Received handover of transport with fd {fd}");
-

                self.upgraded(transport);
            }
            Some(_) => {
@@ -595,10 +597,19 @@ where
        while let Some(ev) = self.service.next() {
            match ev {
                Io::Write(node_id, msgs) => {
+
                    let fd = match self.fd_by_id(&node_id) {
+
                        (fd, Peer::Connected { .. }) => fd,
+
                        (_, peer) => {
+
                            // If the peer is disconnected by the wire protocol, the service may
+
                            // not be aware of this yet, and may continue to write messages to it.
+
                            log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
+
                            continue;
+
                        }
+
                    };
                    log::trace!(
                        target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
                    );
-
                    let fd = self.connected_fd_by_id(&node_id);
+

                    let mut data = Vec::new();
                    for msg in msgs {
                        msg.encode(&mut data).expect("in-memory writes never fail");
@@ -634,13 +645,17 @@ where
                            self.service.attempted(node_id, &addr);
                            // TODO: Keep track of peer address for when peer disconnects before
                            // handshake is complete.
-
                            self.peers
-
                                .insert(transport.as_raw_fd(), Peer::connecting(Link::Outbound));
+
                            self.peers.insert(
+
                                transport.as_raw_fd(),
+
                                Peer::connecting(Link::Outbound, Some(node_id)),
+
                            );

                            self.actions
                                .push_back(reactor::Action::RegisterTransport(transport));
                        }
                        Err(err) => {
+
                            log::error!(target: "wire", "Error establishing connection: {err}");
+

                            self.service
                                .disconnected(node_id, &DisconnectReason::Dial(Arc::new(err)));
                            break;