Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Get rid of `nakamoto::DisconnectReason`
Dr. Maxim Orlovsky committed 3 years ago
commit 08334f790ba788ad7716fe3e31ec6a1206efe3da
parent b7b33acbdf4bb5af004a138c248f165604843d4c
4 files changed +58 -69
modified radicle-node/src/service.rs
@@ -558,11 +558,7 @@ where
        }
    }

-
    pub fn disconnected(
-
        &mut self,
-
        remote: NodeId,
-
        reason: &nakamoto::DisconnectReason<DisconnectReason>,
-
    ) {
+
    pub fn disconnected(&mut self, remote: NodeId, reason: &DisconnectReason) {
        let since = self.local_time();

        debug!("Disconnected from {} ({})", remote, reason);
@@ -576,10 +572,8 @@ where
                    if reason.is_dial_err() {
                        return;
                    }
-
                    if let nakamoto::DisconnectReason::Protocol(r) = reason {
-
                        if !r.is_transient() {
-
                            return;
-
                        }
+
                    if !reason.is_transient() {
+
                        return;
                    }
                    // TODO: Eventually we want a delay before attempting a reconnection,
                    // with exponential back-off.
@@ -610,7 +604,7 @@ where
                // If there's an error, stop processing messages from this peer.
                // However, we still relay messages returned up to this point.
                self.reactor
-
                    .disconnect(remote, DisconnectReason::Error(err));
+
                    .disconnect(remote, DisconnectReason::Session(err));

                // FIXME: The peer should be set in a state such that we don'that
                // process further messages.
@@ -983,8 +977,10 @@ where
            .filter(|(_, session)| session.last_active < *now - STALE_CONNECTION_TIMEOUT);

        for (_, session) in stale {
-
            self.reactor
-
                .disconnect(session.id, DisconnectReason::Error(session::Error::Timeout));
+
            self.reactor.disconnect(
+
                session.id,
+
                DisconnectReason::Session(session::Error::Timeout),
+
            );
        }
    }

@@ -1086,35 +1082,43 @@ where
    }
}

+
/// Disconnect reason.
#[derive(Debug)]
pub enum DisconnectReason {
-
    User,
-
    Peer,
-
    Error(session::Error),
+
    /// Error while dialing the remote. This error occures before a connection is
+
    /// even established. Errors of this kind are usually not transient.
+
    Dial(Arc<dyn std::error::Error + Sync + Send>),
+
    /// Error with an underlying established connection. Sometimes, reconnecting
+
    /// after such an error is possible.
+
    Connection(Arc<dyn std::error::Error + Sync + Send>),
+
    // Session error.
+
    Session(session::Error),
}

impl DisconnectReason {
-
    fn is_transient(&self) -> bool {
-
        match self {
-
            Self::User => false,
-
            Self::Peer => false,
-
            Self::Error(..) => false,
-
        }
+
    pub fn is_dial_err(&self) -> bool {
+
        matches!(self, Self::Dial(_))
+
    }
+

+
    pub fn is_connection_err(&self) -> bool {
+
        matches!(self, Self::Connection(_))
    }
-
}

-
impl From<DisconnectReason> for nakamoto_net::DisconnectReason<DisconnectReason> {
-
    fn from(reason: DisconnectReason) -> Self {
-
        nakamoto_net::DisconnectReason::Protocol(reason)
+
    pub fn is_transient(&self) -> bool {
+
        match self {
+
            Self::Dial(_) => false,
+
            Self::Connection(_) => true,
+
            Self::Session(..) => false,
+
        }
    }
}

impl fmt::Display for DisconnectReason {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::User => write!(f, "user"),
-
            Self::Peer => write!(f, "peer"),
-
            Self::Error(err) => write!(f, "error: {}", err),
+
            Self::Dial(err) => write!(f, "{}", err),
+
            Self::Connection(err) => write!(f, "{}", err),
+
            Self::Session(err) => write!(f, "error: {}", err),
        }
    }
}
modified radicle-node/src/test/simulator.rs
@@ -6,10 +6,10 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
use std::rc::Rc;
+
use std::sync::Arc;
use std::{fmt, io};

use log::*;
-
use nakamoto_net as nakamoto;
use nakamoto_net::{Link, LocalDuration, LocalTime};

use crate::crypto::{Negotiator, Signer};
@@ -55,7 +55,7 @@ pub enum Input {
        link: Link,
    },
    /// Disconnected from peer.
-
    Disconnected(NodeId, Rc<nakamoto::DisconnectReason<DisconnectReason>>),
+
    Disconnected(NodeId, Rc<DisconnectReason>),
    /// Received a message from a remote peer.
    Received(NodeId, Vec<Message>),
    /// Used to advance the state machine after some wall time has passed.
@@ -493,9 +493,9 @@ impl<S: WriteStorage + 'static, G: Signer + Negotiator> Simulation<S, G> {
                                remote,
                                input: Input::Disconnected(
                                    remote,
-
                                    Rc::new(nakamoto::DisconnectReason::ConnectionError(
-
                                        io::Error::from(io::ErrorKind::UnexpectedEof).into(),
-
                                    )),
+
                                    Rc::new(DisconnectReason::Connection(Arc::new(
+
                                        io::Error::from(io::ErrorKind::UnexpectedEof),
+
                                    ))),
                                ),
                            },
                        );
@@ -533,7 +533,7 @@ impl<S: WriteStorage + 'static, G: Signer + Negotiator> Simulation<S, G> {
                self.priority.push_back(Scheduled {
                    remote,
                    node,
-
                    input: Input::Disconnected(remote, Rc::new(reason.into())),
+
                    input: Input::Disconnected(remote, Rc::new(reason)),
                });

                // Nb. It's possible for disconnects to happen simultaneously from both ends, hence
@@ -556,9 +556,9 @@ impl<S: WriteStorage + 'static, G: Signer + Negotiator> Simulation<S, G> {
                        remote: node,
                        input: Input::Disconnected(
                            node,
-
                            Rc::new(nakamoto::DisconnectReason::ConnectionError(
-
                                io::Error::from(io::ErrorKind::ConnectionReset).into(),
-
                            )),
+
                            Rc::new(DisconnectReason::Connection(Arc::new(io::Error::from(
+
                                io::ErrorKind::ConnectionReset,
+
                            )))),
                        ),
                    },
                );
modified radicle-node/src/tests.rs
@@ -3,7 +3,6 @@ use std::io;
use std::sync::Arc;

use crossbeam_channel as chan;
-
use nakamoto_net as nakamoto;

use crate::collections::{HashMap, HashSet};
use crate::crypto::test::signer::MockSigner;
@@ -379,7 +378,7 @@ fn test_inventory_relay_bad_timestamp() {
    );
    assert_matches!(
        alice.outbox().next(),
-
        Some(Io::Disconnect(addr, DisconnectReason::Error(session::Error::InvalidTimestamp(t))))
+
        Some(Io::Disconnect(addr, DisconnectReason::Session(session::Error::InvalidTimestamp(t))))
        if addr == bob.id() && t == timestamp
    );
}
@@ -735,17 +734,11 @@ fn test_persistent_peer_reconnect() {

    // A non-transient disconnect, such as one requested by the user will not trigger
    // a reconnection.
-
    alice.disconnected(
-
        eve.id(),
-
        &nakamoto::DisconnectReason::DialError(error.clone()),
-
    );
+
    alice.disconnected(eve.id(), &DisconnectReason::Dial(error.clone()));
    assert_matches!(alice.outbox().next(), None);

    for _ in 0..MAX_CONNECTION_ATTEMPTS {
-
        alice.disconnected(
-
            bob.id(),
-
            &nakamoto::DisconnectReason::ConnectionError(error.clone()),
-
        );
+
        alice.disconnected(bob.id(), &DisconnectReason::Connection(error.clone()));
        assert_matches!(alice.outbox().next(), Some(Io::Connect(a, _)) if a == bob.id());
        assert_matches!(alice.outbox().next(), None);

@@ -753,10 +746,7 @@ fn test_persistent_peer_reconnect() {
    }

    // After the max connection attempts, a disconnect doesn't trigger a reconnect.
-
    alice.disconnected(
-
        bob.id(),
-
        &nakamoto::DisconnectReason::ConnectionError(error),
-
    );
+
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
    assert_matches!(alice.outbox().next(), None);
}

@@ -790,10 +780,7 @@ fn test_maintain_connections() {
    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    for peer in connected.iter() {
-
        alice.disconnected(
-
            peer.id(),
-
            &nakamoto::DisconnectReason::ConnectionError(error.clone()),
-
        );
+
        alice.disconnected(peer.id(), &DisconnectReason::Connection(error.clone()));

        let id = alice
            .outbox()
modified radicle-node/src/wire/transport.rs
@@ -11,7 +11,7 @@ use std::{io, net};

use crossbeam_channel as chan;
use cyphernet::addr::{Addr as _, HostAddr, PeerAddr};
-
use nakamoto_net::{DisconnectReason, Link, LocalTime};
+
use nakamoto_net::{Link, LocalTime};
use netservices::noise::NoiseXk;
use netservices::wire::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
use netservices::NetSession;
@@ -23,7 +23,7 @@ use radicle::storage::WriteStorage;

use crate::crypto::Signer;
use crate::service::reactor::{Fetch, Io};
-
use crate::service::{routing, session, Message, Service};
+
use crate::service::{routing, session, DisconnectReason, Message, Service};
use crate::wire::{Decode, Encode};
use crate::worker::{WorkerReq, WorkerResp};
use crate::{address, service};
@@ -43,7 +43,7 @@ enum Peer<G: Negotiator> {
    /// or once connected.
    Disconnected {
        id: NodeId,
-
        reason: DisconnectReason<service::DisconnectReason>,
+
        reason: DisconnectReason,
    },
    /// The state after we've started the process of upgraded the peer for a fetch.
    /// The request to handover the socket was made to the reactor.
@@ -76,7 +76,7 @@ impl<G: Negotiator> Peer<G> {
    }

    /// Switch to disconnected state.
-
    fn disconnected(&mut self, reason: DisconnectReason<service::DisconnectReason>) {
+
    fn disconnected(&mut self, reason: DisconnectReason) {
        if let Self::Connected { id, .. } = self {
            *self = Self::Disconnected { id: *id, reason };
        } else {
@@ -188,7 +188,7 @@ where
        })
    }

-
    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason<service::DisconnectReason>) {
+
    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason) {
        let Some(peer) = self.peers.get_mut(&fd) else {
            log::error!(target: "transport", "Peer with fd {fd} was not found");
            return;
@@ -342,9 +342,9 @@ where
                    );
                    self.disconnect(
                        fd,
-
                        DisconnectReason::DialError(
-
                            io::Error::from(io::ErrorKind::AlreadyExists).into(),
-
                        ),
+
                        DisconnectReason::Dial(Arc::new(io::Error::from(
+
                            io::ErrorKind::AlreadyExists,
+
                        ))),
                    );
                }

@@ -385,9 +385,7 @@ where
                                log::error!(target: "transport", "Invalid message from {}: {err}", id);
                                self.disconnect(
                                    fd,
-
                                    DisconnectReason::Protocol(service::DisconnectReason::Error(
-
                                        session::Error::Misbehavior,
-
                                    )),
+
                                    DisconnectReason::Session(session::Error::Misbehavior),
                                );
                                break;
                            }
@@ -398,7 +396,7 @@ where
                }
            }
            SessionEvent::Terminated(err) => {
-
                self.disconnect(fd, DisconnectReason::ConnectionError(Arc::new(err)));
+
                self.disconnect(fd, DisconnectReason::Connection(Arc::new(err)));
            }
        }
    }
@@ -513,14 +511,14 @@ where
                        }
                        Err(err) => {
                            self.service
-
                                .disconnected(node_id, &DisconnectReason::DialError(Arc::new(err)));
+
                                .disconnected(node_id, &DisconnectReason::Dial(Arc::new(err)));
                            break;
                        }
                    }
                }
                Io::Disconnect(node_id, reason) => {
                    let fd = self.by_id(&node_id);
-
                    self.disconnect(fd, DisconnectReason::Protocol(reason));
+
                    self.disconnect(fd, reason);

                    return self.actions.pop_back();
                }