Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
REVIEW
◌ CI pending Fintan Halpenny committed 7 months ago
commit b135cd5b77feb46b43e10c56b889358a80de90e5
parent 6d9fea39446f301f318b10a329f519b168de18e2
1 passed 1 pending (2 total) View logs
6 files changed +37 -35
modified crates/radicle-node/src/reactor.rs
@@ -32,8 +32,8 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);

/// A resource which can be managed by the reactor.
pub trait EventHandler: Source + Send {
-
    /// Events which this resource may generate upon receiving I/O from the reactor via
-
    /// [`Resource::handle`]. These events are passed to the reactor [`crate::reactor::Handler`].
+
    /// Events which this resource may generate upon receiving I/O from the reactor.
+
    /// These events are passed to the reactor [`ReactionHandler`].
    type Reaction;

    /// Method informing the reactor which types of events this resource is subscribed for.
@@ -113,9 +113,9 @@ impl<L: EventHandler, T: EventHandler> Debug for Error<L, T> {
    }
}

-
/// Actions which can be provided to the reactor by the [`Handler`].
+
/// Actions which can be provided to the reactor by the [`ReactionHandler`].
///
-
/// Reactor reads actions on each event loop using [`Handler`] iterator interface.
+
/// Reactor reads actions on each event loop using [`ReactionHandler`] iterator interface.
pub enum Action<L: EventHandler, T: EventHandler> {
    /// Register a new listener resource for the reactor poll.
    ///
@@ -131,8 +131,8 @@ pub enum Action<L: EventHandler, T: EventHandler> {
    // #[display("register_transport")]
    RegisterTransport(Token, T),

-
    /// Unregister listener resource from the reactor poll and handover it to the [`Handler`] via
-
    /// [`Handler::handover_listener`].
+
    /// Unregister listener resource from the reactor poll and handover it to the [`ReactionHandler`] via
+
    /// [`ReactionHandler::handover_listener`].
    ///
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
@@ -140,8 +140,8 @@ pub enum Action<L: EventHandler, T: EventHandler> {
    #[allow(dead_code)] // For future use
    UnregisterListener(Token),

-
    /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via
-
    /// [`Handler::handover_transport`].
+
    /// Unregister transport resource from the reactor poll and handover it to the [`ReactionHandler`] via
+
    /// [`ReactionHandler::handover_transport`].
    ///
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
@@ -153,7 +153,7 @@ pub enum Action<L: EventHandler, T: EventHandler> {

    /// Set a new timer for a given duration from this moment.
    ///
-
    /// When the timer fires reactor will timeout poll syscall and call [`Handler::handle_timer`].
+
    /// When the timer fires reactor will timeout poll syscall and call [`ReactionHandler::timer_reacted`].
    SetTimer(Duration),
}

modified crates/radicle-node/src/reactor/controller.rs
@@ -12,10 +12,14 @@ pub enum ControlMessage<C> {

/// Control API to the service which is run inside a reactor.
///
-
/// The service is passed to the [`Reactor`] constructor as a parameter and also exposes [`Handler`]
+
/// The service is passed to the [`Reactor`] constructor as a parameter and also exposes [`ReactionHandler`]
/// API to the reactor itself for receiving reactor-generated events. This API is used by the
/// reactor to inform the service about incoming commands, sent via this [`Controller`] API (see
-
/// [`Handler::Command`] for the details).
+
/// [`ReactionHandler::Command`] for the details).
+
///
+
/// [`Reactor`]: super::Reactor
+
/// [`ReactionHandler`]: crate::reactor::ReactionHandler
+
/// [`ReactionHandler::Command`]: crate::reactor::ReactionHandler::Command
pub struct Controller<C> {
    sender: Sender<ControlMessage<C>>,
    waker: Arc<Waker>,
@@ -36,6 +40,9 @@ impl<C> Controller<C> {
    }

    /// Send a command to the service inside a [`Reactor`] or a reactor [`Runtime`].
+
    ///
+
    /// [`Reactor`]: super::Reactor
+
    /// [`Runtime`]: super::Runtime
    pub fn cmd(&self, mut command: C) -> io::Result<()>
    where
        C: 'static,
modified crates/radicle-node/src/reactor/listener.rs
@@ -32,14 +32,7 @@ impl Source for Listener {
}

impl Listener {
-
    /// Binds listener to the provided socket address(es) with a given context.
-
    ///
-
    /// The `session_context` object provides information for encryption,
-
    /// authentication and other protocols which are a part of the application-
-
    /// specific transport layer and are automatically injected into the
-
    /// new sessions constructed by this listener before they are inserted into
-
    /// the [`crate::reactor::Reactor`] and notifications are delivered to
-
    /// [`crate::reactor::Handler`].
+
    /// Binds listener to the provided socket address.
    pub fn bind(addr: SocketAddr) -> Result<Self> {
        Ok(Self {
            inner: TcpListener::bind(addr)?,
modified crates/radicle-node/src/reactor/transport.rs
@@ -14,7 +14,9 @@ const READ_BUFFER_SIZE: usize = u16::MAX as usize;
const HEAP_BUFFER_SIZE: usize = u16::MAX as usize;

/// An event happening for a [`Transport`] network transport and delivered to
-
/// a [`reactor::Handler`].
+
/// a [`ReactionHandler`].
+
///
+
/// [`ReactionHandler`]: crate::reactor::ReactionHandler
pub enum SessionEvent<S: Session> {
    Established(S::Artifact),
    Data(Vec<u8>),
@@ -24,7 +26,7 @@ pub enum SessionEvent<S: Session> {
/// A state of [`Transport`] network transport.
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
pub enum TransportState {
-
    /// The transport is initiated, but the connection has not established yet.
+
    /// The transport is initiated, but the connection has not been established yet.
    /// This happens only for outgoing connections due to the use of
    /// non-blocking version of a `connect` sys-call. The state is switched once
    /// we receive first notification on a `write` event on this resource from
@@ -36,17 +38,17 @@ pub enum TransportState {
    /// other protocols injected into the session haven't completed yet.
    Handshake,

-
    /// The session is active; all handshakes had completed.
+
    /// The session is active; all handshakes have completed.
    Active,

    /// Session was terminated by any reason: local shutdown, remote orderly
-
    /// shutdown, connectivity issue, dropped connections, encryption or
+
    /// shutdown, connectivity issue, dropped connections, encryption, or
    /// authentication problem etc. Reading and writing from the resource in
    /// this state will result in an error ([`io::Error`]).
    Terminated,
}

-
/// Transport is an adaptor a around specific [`Session`] (implementing
+
/// Transport is an adaptor around a specific `Session` (implementing
/// session management, including optional handshake, encoding, etc.) to be used
/// as a transport resource in a [`crate::reactor::Reactor`].
#[derive(Debug)]
@@ -98,7 +100,7 @@ impl<S: Session> Display for Transport<S> {
}

impl<S: Session> Transport<S> {
-
    /// Constructs reactor-managed resource around an existing [`Session`].
+
    /// Constructs reactor-managed resource around an existing `Session`.
    ///
    /// NB: Must not be called for connections created in a non-blocking mode!
    ///
@@ -197,13 +199,13 @@ impl<S: Session> Transport<S> {
            // In this case, the write couldn't complete. Leave `needs_flush` set
            // to be notified when the socket is ready to write again.
            Err(err)
-
                if [
-
                    io::ErrorKind::WouldBlock,
-
                    io::ErrorKind::WriteZero,
-
                    io::ErrorKind::OutOfMemory,
-
                    io::ErrorKind::Interrupted,
-
                ]
-
                .contains(&err.kind()) =>
+
                if matches!(
+
                    err.kind(),
+
                    io::ErrorKind::WouldBlock
+
                        | io::ErrorKind::WriteZero
+
                        | io::ErrorKind::OutOfMemory
+
                        | io::ErrorKind::Interrupted
+
                ) =>
            {
                log::warn!(target: "transport", "Resource {} was not able to consume any data even though it has announced its write readiness", self.display());
                self.write_intent = true;
@@ -249,7 +251,7 @@ impl<S: Session> Transport<S> {
                    | io::ErrorKind::OutOfMemory
                    | io::ErrorKind::WriteZero
                    | io::ErrorKind::Interrupted => {
-
                        log::warn!(target: "transport", "Resource {} kernel buffer is fulled (system message is '{err}')", self.display());
+
                        log::warn!(target: "transport", "Resource {} kernel buffer is full (system message is '{err}')", self.display());
                        Ok(0)
                    },
                    _ => {
modified crates/radicle-node/src/runtime.rs
@@ -99,7 +99,7 @@ impl From<service::Error> for Error {
    }
}

-
/// Wraps a [`UnixListener`] but tracks its origin.
+
/// Wraps a [`Listener`] but tracks its origin.
pub enum ControlSocket {
    /// The listener was created by binding to it.
    Bound(Listener, PathBuf),
modified crates/radicle-node/src/wire.rs
@@ -1139,7 +1139,7 @@ pub fn dial<G: Ecdh<Pk = NodeId>>(
            .ok_or(io::ErrorKind::AddrNotAvailable)?
    };

-
    // NOTE: Previously, here was a not about setting the timeout for connecting
+
    // NOTE: Previously, here was a note about setting the timeout for connecting
    // to DEFAULT_DIAL_TIMEOUT, for which we have not figured out a way yet.
    // Generally, we should understand what happens if the following call to
    // `connect` fails. How do we learn about it? Where's the leak?