Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node/reactor: Correctly handle error events
✗ CI failure Lorenz Leutgeb committed 6 months ago
commit a5d05d02fc6392ab40beced01c9a2991bf3ee74c
parent 384c506489dd6a4cbf8c80b0370b2b2a8de7835b
3 failed (3 total) View logs
5 files changed +57 -40
modified crates/radicle-node/src/reactor.rs
@@ -203,13 +203,13 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
    /// Listener resources are resources which may spawn more resources and can't be written to. A
    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
    /// be a special form of a peripheral device or something else.
-
    type Listener: EventHandler + Source + Send;
+
    type Listener: EventHandler + Source + Send + Debug;

    /// Type for a transport resource.
    ///
    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
    /// connections, database connections etc are all fall into this category.
-
    type Transport: EventHandler + Source + Send + WriteAtomic;
+
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
    fn tick(&mut self, time: localtime::LocalTime);
@@ -428,50 +428,60 @@ impl<H: ReactionHandler> Runtime<H> {
    ///
    /// Whether one of the events was originated from the waker.
    fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
+
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
+
        let mut deregistered = Vec::new();

        for event in events.into_iter() {
-
            let id = event.token();
+
            let token = event.token();

-
            if id == WAKER {
+
            if token == WAKER {
                log::trace!(target: "reactor", "Awoken by the controller");
                awoken = true;
-
            } else if self.listeners.contains_key(&id) {
-
                log::trace!(target: "reactor", event:debug; "From listener");
+
            } else if self.listeners.contains_key(&token) {
+
                log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
                if !event.is_error() {
-
                    let listener = self.listeners.get_mut(&id).expect("resource disappeared");
+
                    let listener = self
+
                        .listeners
+
                        .get_mut(&token)
+
                        .expect("resource disappeared");
                    listener
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.listener_reacted(id, service_event, time);
+
                            self.service.listener_reacted(token, service_event, time);
                        });
                } else {
-
                    let listener = self
-
                        .unregister_listener(id)
-
                        .expect("listener has disappeared");
+
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
+
                        panic!("listener with token {} has disappeared", token.0)
+
                    });
                    self.service
-
                        .handle_error(Error::ListenerDisconnect(id, listener));
+
                        .handle_error(Error::ListenerDisconnect(token, listener));
+
                    deregistered.push(token);
                }
-
            } else if self.transports.contains_key(&id) {
-
                log::trace!(target: "reactor", event:debug; "From transport");
+
            } else if self.transports.contains_key(&token) {
+
                log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
                if !event.is_error() {
-
                    let transport = self.transports.get_mut(&id).expect("resource disappeared");
+
                    let transport = self
+
                        .transports
+
                        .get_mut(&token)
+
                        .expect("resource disappeared");
                    transport
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.transport_reacted(id, service_event, time);
+
                            self.service.transport_reacted(token, service_event, time);
                        });
                } else {
-
                    let transport = self
-
                        .unregister_transport(id)
-
                        .expect("transport has disappeared");
+
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
+
                        panic!("transport with token {} has disappeared", token.0)
+
                    });
                    self.service
-
                        .handle_error(Error::TransportDisconnect(id, transport));
+
                        .handle_error(Error::TransportDisconnect(token, transport));
+
                    deregistered.push(token);
                }
-
            } else {
-
                panic!("token in poll which is not a known waker, listener or transport")
+
            } else if !deregistered.contains(&token) {
+
                log::warn!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
            }
        }

@@ -498,7 +508,7 @@ impl<H: ReactionHandler> Runtime<H> {
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
-
                log::debug!(target: "reactor", token=token.0; "Registering listener");
+
                log::trace!(target: "reactor", token=token.0; "Registering listener {:?} with token {}", listener, token.0);

                self.poll
                    .registry()
@@ -520,33 +530,33 @@ impl<H: ReactionHandler> Runtime<H> {
                    .transport_registered(token, &self.transports[&token]);
            }
            Action::UnregisterListener(token) => {
-
                let Some(listener) = self.unregister_listener(token) else {
+
                let Some(listener) = self.deregister_listener(token) else {
                    return Ok(());
                };

-
                log::debug!(target: "reactor", token=token.0; "Handing over listener");
+
                log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
                self.service.handover_listener(token, listener);
            }
            Action::UnregisterTransport(token) => {
-
                let Some(transport) = self.unregister_transport(token) else {
+
                let Some(transport) = self.deregister_transport(token) else {
                    return Ok(());
                };

-
                log::debug!(target: "reactor", token=token.0; "Handing over transport");
+
                log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
                self.service.handover_transport(token, transport);
            }
            Action::Send(token, data) => {
-
                log::trace!(target: "reactor", "Sending {} bytes to {token:?}", data.len());
+
                log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());

                if let Some(transport) = self.transports.get_mut(&token) {
                    if let Err(e) = transport.write_atomic(&data) {
                        log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
-
                        if let Some(transport) = self.unregister_transport(token) {
+
                        if let Some(transport) = self.deregister_transport(token) {
                            return Err(Error::TransportDisconnect(token, transport));
                        }
                    }
                } else {
-
                    log::error!(target: "reactor", "Transport {token:?} is not in the reactor");
+
                    log::error!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
                }
            }
            Action::SetTimer(duration) => {
@@ -562,27 +572,27 @@ impl<H: ReactionHandler> Runtime<H> {
        log::info!(target: "reactor", "Shutdown");
    }

-
    fn unregister_listener(&mut self, token: Token) -> Option<H::Listener> {
+
    fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
        let Some(mut source) = self.listeners.remove(&token) else {
-
            log::warn!(target: "reactor", token=token.0; "Unregistering non-registered listener");
+
            log::warn!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
-
            log::warn!(target: "reactor", token=token.0; "Failed to deregister listener from mio: {err}");
+
            log::warn!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
        }

        Some(source)
    }

-
    fn unregister_transport(&mut self, token: Token) -> Option<H::Transport> {
+
    fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
        let Some(mut source) = self.transports.remove(&token) else {
-
            log::warn!(target: "reactor", token=token.0; "Unregistering non-registered transport");
+
            log::warn!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
-
            log::warn!(target: "reactor", token=token.0; "Failed to deregister transport from mio: {err}");
+
            log::warn!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
        }

        Some(source)
modified crates/radicle-node/src/reactor/session.rs
@@ -100,7 +100,7 @@ impl<M: StateMachine, S: Session> Display for ProtocolArtifact<M, S> {
    }
}

-
#[derive(Copy, Clone, Eq, PartialEq)]
+
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct Protocol<M: StateMachine, S: Session> {
    pub(crate) state: M,
    pub(crate) session: S,
modified crates/radicle-node/src/runtime.rs
@@ -1,6 +1,7 @@
pub mod handle;
pub mod thread;

+
use std::fmt::Debug;
use std::path::PathBuf;
use std::{fs, io, net};

@@ -131,7 +132,11 @@ impl Runtime {
        signer: Device<G>,
    ) -> Result<Runtime, Error>
    where
-
        G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + 'static,
+
        G: crypto::signature::Signer<crypto::Signature>
+
            + Ecdh<Pk = NodeId>
+
            + Clone
+
            + Debug
+
            + 'static,
    {
        let id = *signer.public_key();
        let alias = config.alias.clone();
modified crates/radicle-node/src/test/node.rs
@@ -1,3 +1,4 @@
+
use std::fmt::Debug;
use std::io::BufRead as _;
use std::mem::ManuallyDrop;
use std::path::Path;
@@ -455,7 +456,7 @@ impl Node<MockSigner> {
    }
}

-
impl<G: cyphernet::Ecdh<Pk = NodeId> + Signer<Signature> + Clone> Node<G> {
+
impl<G: cyphernet::Ecdh<Pk = NodeId> + Signer<Signature> + Clone + Debug> Node<G> {
    /// Spawn a node in its own thread.
    pub fn spawn(self) -> NodeHandle<G> {
        let alias = self.config.alias.clone();
modified crates/radicle-node/src/wire.rs
@@ -3,6 +3,7 @@
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
+
use std::fmt::Debug;
use std::sync::Arc;
use std::{io, net, time};

@@ -490,7 +491,7 @@ impl<D, S, G> reactor::ReactionHandler for Wire<D, S, G>
where
    D: service::Store + Send,
    S: WriteStorage + Send + 'static,
-
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
+
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send + Debug,
{
    type Listener = Listener;
    type Transport = Transport<WireSession<G>>;