Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src reactor.rs
mod controller;
mod listener;
mod session;
mod timer;
mod token;
mod transport;

use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::io::ErrorKind;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::{io, thread};

use crossbeam_channel::{Receiver, TryRecvError, unbounded};
use mio::event::{Event, Source};
use mio::{Events, Interest, Poll, Waker};
use thiserror::Error;

use timer::Timer;
use token::WAKER;

use crate::wire;

pub(crate) use self::controller::{ControlMessage, Controller};
pub(crate) use listener::Listener;
pub use session::{NoiseSession, ProtocolArtifact, Socks5Session};
pub(crate) use token::{Token, Tokens};
pub(crate) use transport::{SessionEvent, Transport};

const SECONDS_IN_AN_HOUR: u64 = 60 * 60;

/// Maximum amount of time to wait for I/O.
const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);

/// Maximum duration to accept the service to spend handling events (and errors,
/// ticking, etc.) without warning. Set to log whenever the service becomes so
/// is so slow to respond that it would not be able to handle at least 10
/// "requests" per second, i.e. `1s / 10 = 100ms`.
const LAG_TIMEOUT: Duration = Duration::from_millis(100);

/// A resource which can be managed by the reactor.
pub trait EventHandler {
    /// The type of reactions which this resource may generate upon receiving
    /// I/O from the reactor via [`EventHandler::handle`]. These events are
    /// passed to the reactor [`crate::reactor::ReactionHandler`].
    type Reaction;

    /// Method informing the reactor which types of events this resource is subscribed for.
    fn interests(&self) -> Option<Interest>;

    /// Method called by the reactor when an I/O readiness event
    /// is received for this resource.
    fn handle(&mut self, event: &Event) -> Vec<Self::Reaction>;
}

/// The trait guarantees that the data are either written in full or, in case
/// of an error, none of the data is written. Types implementing the trait must
/// also guarantee that multiple attempts to write do not result in
/// data to be written out of the initial ordering.
pub trait WriteAtomic: std::io::Write {
    /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a
    /// resource without blocking or fail.
    ///
    /// # Panics
    ///
    /// If [`WriteAtomic::write_or_buf`] returns an [`std::io::Error`] of kind
    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
    /// In this case, [`WriteAtomic::write_or_buf`] is expected to buffer.
    fn write_atomic(&mut self, buf: &[u8]) -> io::Result<()> {
        use ErrorKind::*;

        if !self.is_ready_to_write() {
            panic!("WriteAtomic::write_atomic was called when the resource is not ready to write");
        }

        let result = self.write_or_buf(buf);

        debug_assert!(
            !matches!(
                result.as_ref().err().map(|err| err.kind()),
                Some(Interrupted | WouldBlock | WriteZero)
            ),
            "WriteAtomic::write_or_buf must handle errors of kind {Interrupted:?}, {WouldBlock:?}, {WriteZero:?} by buffering",
        );

        result
    }

    /// Checks whether resource can be written to without blocking.
    fn is_ready_to_write(&self) -> bool;

    /// Writes to the resource in a non-blocking way, buffering the data if necessary,
    /// or failing with a system-level error.
    ///
    /// This method shouldn't be called directly; call [`WriteAtomic::write_atomic`] instead.
    ///
    /// The method must handle [`std::io::Error`] of kind
    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
    /// and buffer the data in such cases.
    fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
}

/// Reactor errors
#[derive(Error)]
pub enum Error<L: EventHandler, T: EventHandler> {
    #[error("listener {0:?} got disconnected during poll operation")]
    ListenerDisconnect(Token, L),

    #[error("transport {0:?} got disconnected during poll operation")]
    TransportDisconnect(Token, T),

    #[error("registration of a resource has failed: {0}")]
    Poll(io::Error),

    #[error("registration of a resource has failed: {0}")]
    Registration(io::Error),
}

impl<L: EventHandler, T: EventHandler> Debug for Error<L, T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        Display::fmt(self, f)
    }
}

/// Actions which can be provided to the [`Reactor`] by the [`ReactionHandler`].
///
/// Reactor reads actions on each event loop using [`ReactionHandler`] iterator interface.
pub enum Action<L, T> {
    /// Register a new listener resource for the reactor poll.
    ///
    /// Reactor can't instantiate the resource, like bind a network listener.
    /// Reactor only can register already active resource for polling in the event loop.
    RegisterListener(Token, L),

    /// Register a new transport resource for the reactor poll.
    ///
    /// Reactor can't instantiate the resource, like open a file or establish network connection.
    /// Reactor only can register already active resource for polling in the event loop.
    RegisterTransport(Token, T),

    /// Unregister listener resource from the reactor poll and handover it to the [`ReactionHandler`] via
    /// [`ReactionHandler::handover_listener`].
    ///
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
    /// handled by the handler upon the handover event.
    #[allow(dead_code)] // For future use
    UnregisterListener(Token),

    /// Unregister transport resource from the reactor poll and handover it to the [`ReactionHandler`] via
    /// [`ReactionHandler::handover_transport`].
    ///
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
    /// handled by the handler upon the handover event.
    UnregisterTransport(Token),

    /// Write the data to one of the transport resources using [`io::Write`].
    Send(Token, Vec<u8>),

    /// Set a new timer for a given duration from this moment.
    ///
    /// When the timer elapses, the reactor will timeout from poll and call
    /// [`ReactionHandler::timer_reacted`].
    SetTimer(Duration),
}

impl<L: EventHandler, T: EventHandler> Display for Action<L, T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            Action::RegisterListener(token, _listener) => f
                .debug_struct("RegisterListener")
                .field("token", token)
                .field("listener", &"<omitted>")
                .finish(),
            Action::RegisterTransport(token, _transport) => f
                .debug_struct("RegisterTransport")
                .field("token", token)
                .field("transport", &"<omitted>")
                .finish(),
            Action::UnregisterListener(token) => f
                .debug_struct("UnregisterListener")
                .field("token", token)
                .finish(),
            Action::UnregisterTransport(token) => f
                .debug_struct("UnregisterTransport")
                .field("token", token)
                .finish(),
            Action::Send(token, _data) => f
                .debug_struct("Send")
                .field("token", token)
                .field("data", &"<omitted>")
                .finish(),
            Action::SetTimer(duration) => f
                .debug_struct("SetTimer")
                .field("duration", duration)
                .finish(),
        }
    }
}

/// A service which handles reactions to the events generated in the [`Reactor`].
pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
    /// Type for a listener resource.
    ///
    /// Listener resources are resources which may spawn more resources and can't be written to. A
    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
    /// be a special form of a peripheral device or something else.
    type Listener: EventHandler + Source + Send + Debug;

    /// Type for a transport resource.
    ///
    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
    /// connections, database connections etc are all fall into this category.
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
    fn tick(&mut self);

    /// Method called by the reactor when a previously set timeout is fired.
    ///
    /// Related: [`Action::SetTimer`].
    fn timer_reacted(&mut self);

    /// Method called by the reactor upon a reaction to an I/O event on a listener resource.
    ///
    /// Since listener doesn't support writing, it can be only a read event (indicating that a new
    /// resource can be spawned from the listener).
    fn listener_reacted(
        &mut self,
        token: Token,
        reaction: <Self::Listener as EventHandler>::Reaction,
        instant: Instant,
    );

    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
    fn transport_reacted(
        &mut self,
        token: Token,
        reaction: <Self::Transport as EventHandler>::Reaction,
        instant: Instant,
    );

    /// Method called by the reactor when a given resource was successfully registered
    /// for given token.
    ///
    /// The token will be used later in [`ReactionHandler::listener_reacted`]
    /// and [`ReactionHandler::handover_listener`] calls to the handler.
    fn listener_registered(&mut self, token: Token, listener: &Self::Listener);

    /// Method called by the reactor when a given resource was successfully registered
    /// for given token.
    ///
    /// The token will be used later in [`ReactionHandler::transport_reacted`],
    /// [`ReactionHandler::handover_transport`] calls to the handler.
    fn transport_registered(&mut self, token: Token, transport: &Self::Transport);

    /// Method called by the reactor when a command is received for the
    /// [`ReactionHandler`].
    ///
    /// The commands are sent via `Controller` from outside of the reactor, including other
    /// threads.
    fn handle_command(&mut self, cmd: wire::Control);

    /// Method called by the reactor on any kind of error during the event loop, including errors of
    /// the poll syscall or I/O errors returned as a part of the poll result events.
    ///
    /// See [`enum@Error`] for the details on errors which may happen.
    fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);

    /// Method called by the reactor upon receiving [`Action::UnregisterListener`].
    ///
    /// Passes the listener resource to the [`ReactionHandler`] when it is already not a part of the reactor
    /// poll. From this point of time it is safe to send the resource to other threads (like
    /// workers) or close the resource.
    fn handover_listener(&mut self, token: Token, listener: Self::Listener);

    /// Method called by the reactor upon receiving [`Action::UnregisterTransport`].
    ///
    /// Passes the transport resource to the [`ReactionHandler`] when it is already not a part of the
    /// reactor poll. From this point of time it is safe to send the resource to other threads
    /// (like workers) or close the resource.
    fn handover_transport(&mut self, token: Token, transport: Self::Transport);
}

/// High-level reactor API wrapping reactor [`Runtime`] into a thread and providing basic thread
/// management for it.
///
/// Apps running the [`Reactor`] can interface it and a [`ReactionHandler`] via use of the `Controller`
/// API.
pub struct Reactor {
    thread: JoinHandle<()>,
    controller: Controller,
}

impl Reactor {
    /// Creates new reactor and a service exposing the [`ReactionHandler`] to
    /// the reactor.
    ///
    /// The service is sent to the newly created reactor thread which runs the
    /// reactor [`Runtime`].
    pub fn new<H>(service: H, thread_name: String) -> Result<Self, io::Error>
    where
        H: 'static + ReactionHandler,
    {
        let builder = thread::Builder::new().name(thread_name);
        let (sender, receiver) = unbounded();
        let poll = Poll::new()?;
        let controller = Controller::new(sender, Arc::new(Waker::new(poll.registry(), WAKER)?));

        log::debug!(target: "reactor-controller", "Initializing reactor thread...");
        let thread = builder.spawn(move || {
            let runtime = Runtime {
                service,
                poll,
                receiver,
                listeners: HashMap::new(),
                transports: HashMap::new(),
                timeouts: Timer::new(),
            };

            log::info!(target: "reactor", "Entering reactor event loop");

            runtime.run();
        })?;

        // Waking up to consume actions which were provided by the service on launch
        controller.wake()?;

        Ok(Self { thread, controller })
    }

    /// Provides a `Controller` that can be used to send events to
    /// [`ReactionHandler`] via self.
    pub fn controller(&self) -> Controller {
        self.controller.clone()
    }

    /// Joins the reactor thread.
    pub fn join(self) -> thread::Result<()> {
        self.thread.join()
    }
}

/// Internal [`Reactor`] runtime which is run in a dedicated thread.
///
/// This runtime structure *does not* spawn a thread and is *blocking*.
/// It implements the actual reactor event loop.
pub struct Runtime<H: ReactionHandler> {
    service: H,
    poll: Poll,
    receiver: Receiver<ControlMessage>,
    listeners: HashMap<Token, H::Listener>,
    transports: HashMap<Token, H::Transport>,
    timeouts: Timer,
}

impl<H: ReactionHandler> Runtime<H> {
    fn register_interests(&mut self) -> io::Result<()> {
        let registry = self.poll.registry();
        for (id, res) in self.listeners.iter_mut() {
            match res.interests() {
                None => registry.deregister(res)?,
                Some(interests) => registry.reregister(res, *id, interests)?,
            };
        }
        for (id, res) in self.transports.iter_mut() {
            match res.interests() {
                None => registry.deregister(res)?,
                Some(interests) => registry.reregister(res, *id, interests)?,
            };
        }
        Ok(())
    }

    fn run(mut self) {
        loop {
            let timeout = self
                .timeouts
                .next_expiring_from(Instant::now())
                .unwrap_or(WAIT_TIMEOUT);

            self.register_interests()
                .expect("registering interests must work to ensure correct operation");

            log::trace!(target: "reactor", "Polling with timeout {timeout:?}");

            let mut events = Events::with_capacity(1024);

            // Block and wait for I/O events, wake by other threads, or timeout.
            let res = self.poll.poll(&mut events, Some(timeout));

            // This instant allows to measure the time spent by the service
            // to handle the result of polling.
            let tick = Instant::now();

            // Inform the service that time has advanced.
            self.service.tick();

            // Inform the service about errors during polling.
            if let Err(err) = res {
                log::warn!(target: "reactor", "Failure during polling: {err}");
                self.service.handle_error(Error::Poll(err));
            }

            // Inform the service that some timers have reacted.
            // The way this is currently used basically ignores which
            // timers have expired. As long as *something* timed out,
            // the service is informed.
            let timers_fired = self.timeouts.remove_expired_by(tick);
            if timers_fired > 0 {
                log::trace!(target: "reactor", "Timer has fired");
                self.service.timer_reacted();
            }

            if self.handle_events(tick, events) {
                // If a wake event was emitted, eagerly consume all control messages.
                loop {
                    use ControlMessage::*;
                    use TryRecvError::*;

                    match self.receiver.try_recv() {
                        Ok(Command(cmd)) => self.service.handle_command(*cmd),
                        Ok(Shutdown) => return self.handle_shutdown(),
                        Err(Empty) => break,
                        Err(Disconnected) => panic!("control channel disconnected unexpectedly"),
                    }
                }
            }

            let duration = Instant::now().duration_since(tick);
            if duration > LAG_TIMEOUT {
                log::debug!(target: "reactor", "Service was busy {:?} which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
            }

            self.handle_actions(tick);
        }
    }

    /// # Returns
    ///
    /// Whether one of the events was originated from the waker.
    fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
        let mut deregistered = Vec::new();

        for event in events.into_iter() {
            let token = event.token();

            if token == WAKER {
                log::trace!(target: "reactor", "Awoken by the controller");
                awoken = true;
            } else if self.listeners.contains_key(&token) {
                log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
                if !event.is_error() {
                    let listener = self
                        .listeners
                        .get_mut(&token)
                        .expect("resource disappeared");
                    listener
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
                            self.service.listener_reacted(token, service_event, instant);
                        });
                } else {
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
                        panic!("listener with token {} has disappeared", token.0)
                    });
                    self.service
                        .handle_error(Error::ListenerDisconnect(token, listener));
                    deregistered.push(token);
                }
            } else if self.transports.contains_key(&token) {
                log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
                if !event.is_error() {
                    let transport = self
                        .transports
                        .get_mut(&token)
                        .expect("resource disappeared");
                    transport
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
                            self.service
                                .transport_reacted(token, service_event, instant);
                        });
                } else {
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
                        panic!("transport with token {} has disappeared", token.0)
                    });
                    self.service
                        .handle_error(Error::TransportDisconnect(token, transport));
                    deregistered.push(token);
                }
            } else if !deregistered.contains(&token) {
                log::debug!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
            }
        }

        awoken
    }

    fn handle_actions(&mut self, instant: Instant) {
        while let Some(action) = self.service.next() {
            log::trace!(target: "reactor", "Handling action {action} from the service");

            // Deadlock may happen here if the service will generate events over and over
            // in the handle_* calls we may never get out of this loop
            if let Err(err) = self.handle_action(action, instant) {
                log::warn!(target: "reactor", "Failure: {err}");
                self.service.handle_error(err);
            }
        }
    }

    fn handle_action(
        &mut self,
        action: Action<H::Listener, H::Transport>,
        instant: Instant,
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
                log::trace!(target: "reactor", token=token.0; "Registering listener {:?} with token {}", listener, token.0);

                self.poll
                    .registry()
                    .register(&mut listener, token, Interest::READABLE)
                    .map_err(Error::Registration)?;
                self.listeners.insert(token, listener);
                self.service
                    .listener_registered(token, &self.listeners[&token]);
            }
            Action::RegisterTransport(token, mut transport) => {
                log::debug!(target: "reactor", token=token.0; "Registering transport");

                self.poll
                    .registry()
                    .register(&mut transport, token, Interest::READABLE)
                    .map_err(Error::Registration)?;
                self.transports.insert(token, transport);
                self.service
                    .transport_registered(token, &self.transports[&token]);
            }
            Action::UnregisterListener(token) => {
                let Some(listener) = self.deregister_listener(token) else {
                    return Ok(());
                };

                log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
                self.service.handover_listener(token, listener);
            }
            Action::UnregisterTransport(token) => {
                let Some(transport) = self.deregister_transport(token) else {
                    return Ok(());
                };

                log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
                self.service.handover_transport(token, transport);
            }
            Action::Send(token, data) => {
                log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());

                if let Some(transport) = self.transports.get_mut(&token) {
                    if let Err(e) = transport.write_atomic(&data) {
                        log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
                        if let Some(transport) = self.deregister_transport(token) {
                            return Err(Error::TransportDisconnect(token, transport));
                        }
                    }
                } else {
                    log::debug!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
                }
            }
            Action::SetTimer(duration) => {
                log::trace!(target: "reactor", "Adding timer {duration:?} from now");

                self.timeouts.set_timeout(duration, instant);
            }
        }
        Ok(())
    }

    fn handle_shutdown(self) {
        log::info!(target: "reactor", "Shutdown");
    }

    fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
        let Some(mut source) = self.listeners.remove(&token) else {
            log::debug!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
            log::debug!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
        }

        Some(source)
    }

    fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
        let Some(mut source) = self.transports.remove(&token) else {
            log::debug!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
            log::debug!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
        }

        Some(source)
    }
}