Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Disconnect peers on failed handshakes
Dr. Maxim Orlovsky committed 3 years ago
commit 4d77f95c217e862b05066360df47f8bb1695a709
parent e9ae5897f77097ba6214865084a9beca8b92de67
2 files changed +17 -7
modified radicle-node/src/service/session.rs
@@ -51,6 +51,8 @@ pub enum Error {
    Misbehavior,
    #[error("peer timed out")]
    Timeout,
+
    #[error("handshake error")]
+
    Handshake(String),
}

/// A peer session. Each connected peer will have one session.
modified radicle-node/src/wire.rs
@@ -22,7 +22,7 @@ use crate::identity::Id;
use crate::node;
use crate::service;
use crate::service::reactor::Io;
-
use crate::service::{filter, routing};
+
use crate::service::{filter, routing, session};
use crate::storage::refs::Refs;
use crate::storage::refs::SignedRefs;
use crate::storage::WriteStorage;
@@ -433,7 +433,7 @@ pub struct Inbox<T: Transcode> {
#[derive(Debug)]
pub struct Wire<R, S, W, G, H: Handshake> {
    handshakes: HashMap<net::SocketAddr, H>,
-
    handshake_queue: VecDeque<(net::SocketAddr, Vec<u8>)>,
+
    inner_queue: VecDeque<nakamoto::Io<service::Event, service::DisconnectReason>>,
    inboxes: HashMap<net::SocketAddr, Inbox<H::Transcoder>>,
    inner: service::Service<R, S, W, G>,
}
@@ -442,7 +442,7 @@ impl<R, S, W, G, H: Handshake> Wire<R, S, W, G, H> {
    pub fn new(inner: service::Service<R, S, W, G>) -> Self {
        Self {
            handshakes: HashMap::new(),
-
            handshake_queue: Default::default(),
+
            inner_queue: Default::default(),
            inboxes: HashMap::new(),
            inner,
        }
@@ -504,14 +504,16 @@ where
                HandshakeResult::Next(handshake, reply) => {
                    self.handshakes.insert(*addr, handshake);
                    if !reply.is_empty() {
-
                        self.handshake_queue.push_back((*addr, reply));
+
                        self.inner_queue
+
                            .push_back(nakamoto::Io::Write(*addr, reply));
                    }
                    return;
                }
                HandshakeResult::Complete(transcoder, reply) => {
                    log::debug!("handshake with peer {} is complete", addr);
                    if !reply.is_empty() {
-
                        self.handshake_queue.push_back((*addr, reply));
+
                        self.inner_queue
+
                            .push_back(nakamoto::Io::Write(*addr, reply));
                    }
                    self.inboxes.insert(
                        *addr,
@@ -523,6 +525,12 @@ where
                }
                HandshakeResult::Error(err) => {
                    log::error!("invalid handshake input. Details: {}", err);
+
                    self.inner_queue.push_back(nakamoto::Io::Disconnect(
+
                        *addr,
+
                        service::DisconnectReason::Error(session::Error::Handshake(
+
                            err.to_string(),
+
                        )),
+
                    ));
                    return;
                }
            }
@@ -559,8 +567,8 @@ impl<R, S, W, G, H: Handshake> Iterator for Wire<R, S, W, G, H> {
    type Item = nakamoto::Io<service::Event, service::DisconnectReason>;

    fn next(&mut self) -> Option<Self::Item> {
-
        if let Some((addr, handshake_data)) = self.handshake_queue.pop_front() {
-
            return Some(nakamoto::Io::Write(addr, handshake_data));
+
        if let Some(event) = self.inner_queue.pop_front() {
+
            return Some(event);
        }

        match self.inner.next() {