Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node/reactor: Correctly handle error events
Merged lorenz opened 6 months ago

fn handle_events would panic, if there were multiple events for one token, and the first one that happened to be handled was an error. Indeed it is concerning if a token is encountered that was never registered before. However, tokens that were just deregistered must be tracked.

Using Vec here seems a bit costly, in the future, smallvec::SmallVec could be considered.

The “unregister” methods are renamed to “deregister” to better line up with mio vocabulary.

Log stamtements that helped analysis of the panic that occurred here is overhauled and improved, requiring a Debug bound on types that obviously implement it.

5 files changed +57 -40 72cf3d19 0b342485
modified crates/radicle-node/src/reactor.rs
@@ -203,13 +203,13 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
    /// Listener resources are resources which may spawn more resources and can't be written to. A
    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
    /// be a special form of a peripheral device or something else.
-
    type Listener: EventHandler + Source + Send;
+
    type Listener: EventHandler + Source + Send + Debug;

    /// Type for a transport resource.
    ///
    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
    /// connections, database connections etc are all fall into this category.
-
    type Transport: EventHandler + Source + Send + WriteAtomic;
+
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
    fn tick(&mut self, time: localtime::LocalTime);
@@ -428,50 +428,60 @@ impl<H: ReactionHandler> Runtime<H> {
    ///
    /// Whether one of the events was originated from the waker.
    fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
+
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
+
        let mut deregistered = Vec::new();

        for event in events.into_iter() {
-
            let id = event.token();
+
            let token = event.token();

-
            if id == WAKER {
+
            if token == WAKER {
                log::trace!(target: "reactor", "Awoken by the controller");
                awoken = true;
-
            } else if self.listeners.contains_key(&id) {
-
                log::trace!(target: "reactor", event:debug; "From listener");
+
            } else if self.listeners.contains_key(&token) {
+
                log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
                if !event.is_error() {
-
                    let listener = self.listeners.get_mut(&id).expect("resource disappeared");
+
                    let listener = self
+
                        .listeners
+
                        .get_mut(&token)
+
                        .expect("resource disappeared");
                    listener
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.listener_reacted(id, service_event, time);
+
                            self.service.listener_reacted(token, service_event, time);
                        });
                } else {
-
                    let listener = self
-
                        .unregister_listener(id)
-
                        .expect("listener has disappeared");
+
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
+
                        panic!("listener with token {} has disappeared", token.0)
+
                    });
                    self.service
-
                        .handle_error(Error::ListenerDisconnect(id, listener));
+
                        .handle_error(Error::ListenerDisconnect(token, listener));
+
                    deregistered.push(token);
                }
-
            } else if self.transports.contains_key(&id) {
-
                log::trace!(target: "reactor", event:debug; "From transport");
+
            } else if self.transports.contains_key(&token) {
+
                log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
                if !event.is_error() {
-
                    let transport = self.transports.get_mut(&id).expect("resource disappeared");
+
                    let transport = self
+
                        .transports
+
                        .get_mut(&token)
+
                        .expect("resource disappeared");
                    transport
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.transport_reacted(id, service_event, time);
+
                            self.service.transport_reacted(token, service_event, time);
                        });
                } else {
-
                    let transport = self
-
                        .unregister_transport(id)
-
                        .expect("transport has disappeared");
+
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
+
                        panic!("transport with token {} has disappeared", token.0)
+
                    });
                    self.service
-
                        .handle_error(Error::TransportDisconnect(id, transport));
+
                        .handle_error(Error::TransportDisconnect(token, transport));
+
                    deregistered.push(token);
                }
-
            } else {
-
                panic!("token in poll which is not a known waker, listener or transport")
+
            } else if !deregistered.contains(&token) {
+
                log::warn!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
            }
        }

@@ -498,7 +508,7 @@ impl<H: ReactionHandler> Runtime<H> {
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
-
                log::debug!(target: "reactor", token=token.0; "Registering listener");
+
                log::trace!(target: "reactor", token=token.0; "Registering listener {:?} with token {}", listener, token.0);

                self.poll
                    .registry()
@@ -520,33 +530,33 @@ impl<H: ReactionHandler> Runtime<H> {
                    .transport_registered(token, &self.transports[&token]);
            }
            Action::UnregisterListener(token) => {
-
                let Some(listener) = self.unregister_listener(token) else {
+
                let Some(listener) = self.deregister_listener(token) else {
                    return Ok(());
                };

-
                log::debug!(target: "reactor", token=token.0; "Handing over listener");
+
                log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
                self.service.handover_listener(token, listener);
            }
            Action::UnregisterTransport(token) => {
-
                let Some(transport) = self.unregister_transport(token) else {
+
                let Some(transport) = self.deregister_transport(token) else {
                    return Ok(());
                };

-
                log::debug!(target: "reactor", token=token.0; "Handing over transport");
+
                log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
                self.service.handover_transport(token, transport);
            }
            Action::Send(token, data) => {
-
                log::trace!(target: "reactor", "Sending {} bytes to {token:?}", data.len());
+
                log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());

                if let Some(transport) = self.transports.get_mut(&token) {
                    if let Err(e) = transport.write_atomic(&data) {
                        log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
-
                        if let Some(transport) = self.unregister_transport(token) {
+
                        if let Some(transport) = self.deregister_transport(token) {
                            return Err(Error::TransportDisconnect(token, transport));
                        }
                    }
                } else {
-
                    log::error!(target: "reactor", "Transport {token:?} is not in the reactor");
+
                    log::error!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
                }
            }
            Action::SetTimer(duration) => {
@@ -562,27 +572,27 @@ impl<H: ReactionHandler> Runtime<H> {
        log::info!(target: "reactor", "Shutdown");
    }

-
    fn unregister_listener(&mut self, token: Token) -> Option<H::Listener> {
+
    fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
        let Some(mut source) = self.listeners.remove(&token) else {
-
            log::warn!(target: "reactor", token=token.0; "Unregistering non-registered listener");
+
            log::warn!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
-
            log::warn!(target: "reactor", token=token.0; "Failed to deregister listener from mio: {err}");
+
            log::warn!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
        }

        Some(source)
    }

-
    fn unregister_transport(&mut self, token: Token) -> Option<H::Transport> {
+
    fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
        let Some(mut source) = self.transports.remove(&token) else {
-
            log::warn!(target: "reactor", token=token.0; "Unregistering non-registered transport");
+
            log::warn!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
-
            log::warn!(target: "reactor", token=token.0; "Failed to deregister transport from mio: {err}");
+
            log::warn!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
        }

        Some(source)
modified crates/radicle-node/src/reactor/session.rs
@@ -100,7 +100,7 @@ impl<M: StateMachine, S: Session> Display for ProtocolArtifact<M, S> {
    }
}

-
#[derive(Copy, Clone, Eq, PartialEq)]
+
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct Protocol<M: StateMachine, S: Session> {
    pub(crate) state: M,
    pub(crate) session: S,
modified crates/radicle-node/src/runtime.rs
@@ -1,6 +1,7 @@
pub mod handle;
pub mod thread;

+
use std::fmt::Debug;
use std::path::PathBuf;
use std::{fs, io, net};

@@ -131,7 +132,11 @@ impl Runtime {
        signer: Device<G>,
    ) -> Result<Runtime, Error>
    where
-
        G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + 'static,
+
        G: crypto::signature::Signer<crypto::Signature>
+
            + Ecdh<Pk = NodeId>
+
            + Clone
+
            + Debug
+
            + 'static,
    {
        let id = *signer.public_key();
        let alias = config.alias.clone();
modified crates/radicle-node/src/test/node.rs
@@ -1,3 +1,4 @@
+
use std::fmt::Debug;
use std::io::BufRead as _;
use std::mem::ManuallyDrop;
use std::path::Path;
@@ -455,7 +456,7 @@ impl Node<MockSigner> {
    }
}

-
impl<G: cyphernet::Ecdh<Pk = NodeId> + Signer<Signature> + Clone> Node<G> {
+
impl<G: cyphernet::Ecdh<Pk = NodeId> + Signer<Signature> + Clone + Debug> Node<G> {
    /// Spawn a node in its own thread.
    pub fn spawn(self) -> NodeHandle<G> {
        let alias = self.config.alias.clone();
modified crates/radicle-node/src/wire.rs
@@ -3,6 +3,7 @@
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
+
use std::fmt::Debug;
use std::sync::Arc;
use std::{io, net, time};

@@ -490,7 +491,7 @@ impl<D, S, G> reactor::ReactionHandler for Wire<D, S, G>
where
    D: service::Store + Send,
    S: WriteStorage + Send + 'static,
-
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
+
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send + Debug,
{
    type Listener = Listener;
    type Transport = Transport<WireSession<G>>;