Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Use Mio
Lorenz Leutgeb committed 7 months ago
commit 8324de4b6335db657f0c64654baaa83f51dbbc0d
parent df7878365f6806f951e1ba7afd7bc9995f210b95
14 files changed +1833 -278
modified Cargo.lock
@@ -685,7 +685,6 @@ checksum = "b67c16c8ef5ddcdab57aab83fd8e770540ea3682ccdae09642c63575b0da2184"
dependencies = [
 "amplify",
 "ec25519",
-
 "multibase",
 "sha2",
]

@@ -1928,18 +1927,6 @@ dependencies = [
]

[[package]]
-
name = "io-reactor"
-
version = "0.5.2"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "77d78c3e630f04a61ec86ba171c0bbd161434a7f2e8e4a67728320d4ce7c6c79"
-
dependencies = [
-
 "amplify",
-
 "crossbeam-channel",
-
 "libc",
-
 "popol",
-
]
-

-
[[package]]
name = "io-uring"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2232,20 +2219,6 @@ dependencies = [
]

[[package]]
-
name = "netservices"
-
version = "0.8.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "af0f91a10aaddcc3b76770c3bf5c17680829aa0828e5ffc69c62d58bfbe9c48c"
-
dependencies = [
-
 "amplify",
-
 "cyphernet",
-
 "io-reactor",
-
 "libc",
-
 "rand",
-
 "socket2",
-
]
-

-
[[package]]
name = "newline-converter"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2624,15 +2597,6 @@ dependencies = [
]

[[package]]
-
name = "popol"
-
version = "3.0.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "93406933502e4446250941cf95d5e62851feb62a25b742acf7ffce96755c53e3"
-
dependencies = [
-
 "libc",
-
]
-

-
[[package]]
name = "portable-atomic"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2963,12 +2927,10 @@ dependencies = [
 "crossbeam-channel",
 "cyphernet",
 "fastrand",
-
 "io-reactor",
 "lexopt",
-
 "libc",
 "localtime",
 "log",
-
 "netservices",
+
 "mio 1.0.4",
 "nonempty 0.9.0",
 "qcheck",
 "qcheck-macros",
modified crates/radicle-node/Cargo.toml
@@ -10,7 +10,7 @@ build = "build.rs"
rust-version.workspace = true

[features]
-
default = ["backtrace", "systemd", "structured-logger"]
+
default = ["backtrace", "systemd", "structured-logger", "socket2"]
systemd = ["dep:radicle-systemd"]
test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "radicle-protocol/test", "qcheck", "snapbox"]

@@ -22,14 +22,12 @@ bytes = { workspace = true }
chrono = { workspace = true, features = ["clock"] }
colored = { workspace = true }
crossbeam-channel = { workspace = true }
-
cyphernet = { workspace = true, features = ["tor", "dns", "ed25519", "p2p-ed25519"] }
+
cyphernet = { workspace = true, features = ["tor", "dns", "ed25519", "p2p-ed25519", "noise-framework", "noise_sha2"] }
fastrand = { workspace = true }
-
io-reactor = { version = "0.5.1", features = ["popol"] }
lexopt = { workspace = true }
-
libc = { workspace = true }
-
log = { workspace = true, features = ["std"] }
+
log = { workspace = true, features = ["kv", "std"] }
localtime = { workspace = true }
-
netservices = { version = "0.8.0", features = ["io-reactor", "socket2"] }
+
mio = { version = "1", features = ["net", "os-poll"] }
nonempty = { workspace = true, features = ["serialize"] }
qcheck = { workspace = true, optional = true }
radicle = { workspace = true, features = ["logger"] }
@@ -44,7 +42,7 @@ scrypt = { version = "0.11.0", default-features = false }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["preserve_order"] }
snapbox = { workspace = true, optional = true }
-
socket2 = "0.5.7"
+
socket2 = { version = "0.5.7", features = ["all"], optional = true }
structured-logger = { version = "1.0.4", optional = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
@@ -56,6 +54,7 @@ radicle-systemd = { workspace = true, optional = true }
winpipe = { workspace = true }

[dev-dependencies]
+
mio = { version = "1", features = ["os-ext"] }
qcheck = { workspace = true }
qcheck-macros = { workspace = true }
radicle = { workspace = true, features = ["test"] }
modified crates/radicle-node/src/lib.rs
@@ -8,6 +8,7 @@ use std::sync::LazyLock;

pub mod control;
pub mod fingerprint;
+
pub mod reactor;
pub mod runtime;
pub(crate) use radicle_protocol::service;
#[cfg(any(test, feature = "test"))]
added crates/radicle-node/src/reactor.rs
@@ -0,0 +1,613 @@
+
mod controller;
+
mod listener;
+
mod session;
+
mod timer;
+
mod token;
+
mod transport;
+

+
use std::collections::HashMap;
+
use std::fmt::{Debug, Display, Formatter};
+
use std::io::ErrorKind;
+
use std::sync::Arc;
+
use std::thread::JoinHandle;
+
use std::time::Duration;
+
use std::{io, thread};
+

+
use crossbeam_channel::{unbounded, Receiver, TryRecvError};
+
use localtime::LocalTime;
+
use mio::event::{Event, Source};
+
use mio::{Events, Interest, Poll, Waker};
+
use thiserror::Error;
+

+
use timer::Timer;
+
use token::WAKER;
+

+
pub(crate) use self::controller::{ControlMessage, Controller};
+
pub(crate) use listener::Listener;
+
pub use session::{NoiseSession, ProtocolArtifact, Socks5Session};
+
pub(crate) use token::{Token, Tokens};
+
pub(crate) use transport::{SessionEvent, Transport};
+

+
const SECONDS_IN_AN_HOUR: u64 = 60 * 60;
+

+
/// Maximum amount of time to wait for I/O.
+
const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);
+

+
/// A resource which can be managed by the reactor.
+
pub trait EventHandler {
+
    /// The type of reactions which this resource may generate upon receiving
+
    /// I/O from the reactor via [`EventHandler::handle`]. These events are
+
    /// passed to the reactor [`crate::reactor::ReactionHandler`].
+
    type Reaction;
+

+
    /// Method informing the reactor which types of events this resource is subscribed for.
+
    fn interests(&self) -> Option<Interest>;
+

+
    /// Method called by the reactor when an I/O readiness event
+
    /// is received for this resource.
+
    fn handle(&mut self, event: &Event) -> Vec<Self::Reaction>;
+
}
+

+
/// The trait guarantees that the data are either written in full or, in case
+
/// of an error, none of the data is written. Types implementing the trait must
+
/// also guarantee that multiple attempts to write do not result in
+
/// data to be written out of the initial ordering.
+
pub trait WriteAtomic: std::io::Write {
+
    /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a
+
    /// resource without blocking or fail.
+
    ///
+
    /// # Panics
+
    ///
+
    /// If [`WriteAtomic::write_or_buf`] returns an [`std::io::Error`] of kind
+
    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
+
    /// In this case, [`WriteAtomic::write_or_buf`] is expected to buffer.
+
    fn write_atomic(&mut self, buf: &[u8]) -> io::Result<()> {
+
        use ErrorKind::*;
+

+
        if !self.is_ready_to_write() {
+
            panic!("WriteAtomic::write_atomic was called when the resource is not ready to write");
+
        }
+

+
        let result = self.write_or_buf(buf);
+

+
        debug_assert!(
+
            !matches!(
+
                result.as_ref().err().map(|err| err.kind()),
+
                Some(Interrupted | WouldBlock | WriteZero)
+
            ),
+
            "WriteAtomic::write_or_buf must handle erros of kind {Interrupted:?}, {WouldBlock:?}, {WriteZero:?} by buffering",
+
        );
+

+
        result
+
    }
+

+
    /// Checks whether resource can be written to without blocking.
+
    fn is_ready_to_write(&self) -> bool;
+

+
    /// Writes to the resource in a non-blocking way, buffering the data if necessary,
+
    /// or failing with a system-level error.
+
    ///
+
    /// This method shouldn't be called directly; call [`WriteAtomic::write_atomic`] instead.
+
    ///
+
    /// The method must handle [`std::io::Error`] of kind
+
    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
+
    /// and buffer the data in such cases.
+
    fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
+
}
+

+
/// Reactor errors
+
#[derive(Error)]
+
pub enum Error<L: EventHandler, T: EventHandler> {
+
    #[error("listener {0:?} got disconnected during poll operation")]
+
    ListenerDisconnect(Token, L),
+

+
    #[error("transport {0:?} got disconnected during poll operation")]
+
    TransportDisconnect(Token, T),
+

+
    #[error("registration of a resource has failed: {0}")]
+
    Poll(io::Error),
+

+
    #[error("registration of a resource has failed: {0}")]
+
    Registration(io::Error),
+
}
+

+
impl<L: EventHandler, T: EventHandler> Debug for Error<L, T> {
+
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+
        Display::fmt(self, f)
+
    }
+
}
+

+
/// Actions which can be provided to the [`Reactor`] by the [`ReactionHandler`].
+
///
+
/// Reactor reads actions on each event loop using [`ReactionHandler`] iterator interface.
+
pub enum Action<L, T> {
+
    /// Register a new listener resource for the reactor poll.
+
    ///
+
    /// Reactor can't instantiate the resource, like bind a network listener.
+
    /// Reactor only can register already active resource for polling in the event loop.
+
    RegisterListener(Token, L),
+

+
    /// Register a new transport resource for the reactor poll.
+
    ///
+
    /// Reactor can't instantiate the resource, like open a file or establish network connection.
+
    /// Reactor only can register already active resource for polling in the event loop.
+
    RegisterTransport(Token, T),
+

+
    /// Unregister listener resource from the reactor poll and handover it to the [`ReactionHandler`] via
+
    /// [`ReactionHandler::handover_listener`].
+
    ///
+
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
+
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
+
    /// handled by the handler upon the handover event.
+
    #[allow(dead_code)] // For future use
+
    UnregisterListener(Token),
+

+
    /// Unregister transport resource from the reactor poll and handover it to the [`ReactionHandler`] via
+
    /// [`ReactionHandler::handover_transport`].
+
    ///
+
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
+
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
+
    /// handled by the handler upon the handover event.
+
    UnregisterTransport(Token),
+

+
    /// Write the data to one of the transport resources using [`io::Write`].
+
    Send(Token, Vec<u8>),
+

+
    /// Set a new timer for a given duration from this moment.
+
    ///
+
    /// When the timer fires reactor will timeout poll syscall and call
+
    /// [`ReactionHandler::timer_reacted`].
+
    SetTimer(Duration),
+
}
+

+
impl<L: EventHandler, T: EventHandler> Display for Action<L, T> {
+
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            Action::RegisterListener(token, _listener) => f
+
                .debug_struct("RegisterListener")
+
                .field("token", token)
+
                .field("listener", &"<omitted>")
+
                .finish(),
+
            Action::RegisterTransport(token, _transport) => f
+
                .debug_struct("RegisterTransport")
+
                .field("token", token)
+
                .field("transport", &"<omitted>")
+
                .finish(),
+
            Action::UnregisterListener(token) => f
+
                .debug_struct("UnregisterListener")
+
                .field("token", token)
+
                .finish(),
+
            Action::UnregisterTransport(token) => f
+
                .debug_struct("UnregisterTransport")
+
                .field("token", token)
+
                .finish(),
+
            Action::Send(token, _data) => f
+
                .debug_struct("Send")
+
                .field("token", token)
+
                .field("data", &"<omitted>")
+
                .finish(),
+
            Action::SetTimer(duration) => f
+
                .debug_struct("SetTimer")
+
                .field("duration", duration)
+
                .finish(),
+
        }
+
    }
+
}
+

+
/// A service which handles reactions to the events generated in the [`Reactor`].
+
pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
+
    /// Type for a listener resource.
+
    ///
+
    /// Listener resources are resources which may spawn more resources and can't be written to. A
+
    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
+
    /// be a special form of a peripheral device or something else.
+
    type Listener: EventHandler + Source + Send;
+

+
    /// Type for a transport resource.
+
    ///
+
    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
+
    /// connections, database connections etc are all fall into this category.
+
    type Transport: EventHandler + Source + Send + WriteAtomic;
+

+
    /// A command which may be sent to the [`ReactionHandler`] from outside of the [`Reactor`],
+
    /// including other threads.
+
    ///
+
    /// The handler object is owned by the reactor runtime and executes always in the context of the
+
    /// reactor runtime thread. Thus, if other (micro)services within the app needs to communicate
+
    /// to the handler they have to use this data type, which usually is an enumeration for a set of
+
    /// commands supported by the handler.
+
    ///
+
    /// The commands are sent by using reactor [`Controller`] API.
+
    type Command: Debug + Send;
+

+
    /// Method called by the reactor on the start of each event loop once the poll has returned.
+
    fn tick(&mut self, time: localtime::LocalTime);
+

+
    /// Method called by the reactor when a previously set timeout is fired.
+
    ///
+
    /// Related: [`Action::SetTimer`].
+
    fn timer_reacted(&mut self);
+

+
    /// Method called by the reactor upon a reaction to an I/O event on a listener resource.
+
    ///
+
    /// Since listener doesn't support writing, it can be only a read event (indicating that a new
+
    /// resource can be spawned from the listener).
+
    fn listener_reacted(
+
        &mut self,
+
        token: Token,
+
        reaction: <Self::Listener as EventHandler>::Reaction,
+
        time: localtime::LocalTime,
+
    );
+

+
    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
+
    fn transport_reacted(
+
        &mut self,
+
        token: Token,
+
        reaction: <Self::Transport as EventHandler>::Reaction,
+
        time: localtime::LocalTime,
+
    );
+

+
    /// Method called by the reactor when a given resource was successfully registered
+
    /// for given token.
+
    ///
+
    /// The token will be used later in [`ReactionHandler::listener_reacted`]
+
    /// and [`ReactionHandler::handover_listener`] calls to the handler.
+
    fn listener_registered(&mut self, token: Token, listener: &Self::Listener);
+

+
    /// Method called by the reactor when a given resource was successfully registered
+
    /// for given token.
+
    ///
+
    /// The token will be used later in [`ReactionHandler::transport_reacted`],
+
    /// [`ReactionHandler::handover_transport`] calls to the handler.
+
    fn transport_registered(&mut self, token: Token, transport: &Self::Transport);
+

+
    /// Method called by the reactor when a [`ReactionHandler::Command`] is received for the
+
    /// [`ReactionHandler`].
+
    ///
+
    /// The commands are sent via [`Controller`] from outside of the reactor, including other
+
    /// threads.
+
    fn handle_command(&mut self, cmd: Self::Command);
+

+
    /// Method called by the reactor on any kind of error during the event loop, including errors of
+
    /// the poll syscall or I/O errors returned as a part of the poll result events.
+
    ///
+
    /// See [`enum@Error`] for the details on errors which may happen.
+
    fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);
+

+
    /// Method called by the reactor upon receiving [`Action::UnregisterListener`].
+
    ///
+
    /// Passes the listener resource to the [`ReactionHandler`] when it is already not a part of the reactor
+
    /// poll. From this point of time it is safe to send the resource to other threads (like
+
    /// workers) or close the resource.
+
    fn handover_listener(&mut self, token: Token, listener: Self::Listener);
+

+
    /// Method called by the reactor upon receiving [`Action::UnregisterTransport`].
+
    ///
+
    /// Passes the transport resource to the [`ReactionHandler`] when it is already not a part of the
+
    /// reactor poll. From this point of time it is safe to send the resource to other threads
+
    /// (like workers) or close the resource.
+
    fn handover_transport(&mut self, token: Token, transport: Self::Transport);
+
}
+

+
/// High-level reactor API wrapping reactor [`Runtime`] into a thread and providing basic thread
+
/// management for it.
+
///
+
/// Apps running the [`Reactor`] can interface it and a [`ReactionHandler`] via use of the [`Controller`]
+
/// API.
+
pub struct Reactor<C> {
+
    thread: JoinHandle<()>,
+
    controller: Controller<C>,
+
}
+

+
impl<C> Reactor<C> {
+
    /// Creates new reactor and a service exposing [`ReactionHandler`] API to
+
    /// the reactor.
+
    ///
+
    /// Similar to the [`Reactor::new`], but allows to specify the name for the reactor thread.
+
    /// The service is sent to the newly created reactor thread which runs the
+
    /// reactor [`Runtime`].
+
    pub fn new<H>(service: H, thread_name: String) -> Result<Self, io::Error>
+
    where
+
        H: 'static + ReactionHandler<Command = C>,
+
        C: 'static + Send,
+
    {
+
        let builder = thread::Builder::new().name(thread_name);
+
        let (sender, receiver) = unbounded();
+
        let poll = Poll::new()?;
+
        let controller = Controller::new(sender, Arc::new(Waker::new(poll.registry(), WAKER)?));
+

+
        log::debug!(target: "reactor-controller", "Initializing reactor thread...");
+
        let thread = builder.spawn(move || {
+
            let runtime = Runtime {
+
                service,
+
                poll,
+
                receiver,
+
                listeners: HashMap::new(),
+
                transports: HashMap::new(),
+
                timeouts: Timer::new(),
+
            };
+

+
            log::info!(target: "reactor", "Entering reactor event loop");
+

+
            runtime.run();
+
        })?;
+

+
        // Waking up to consume actions which were provided by the service on launch
+
        controller.wake()?;
+

+
        Ok(Self { thread, controller })
+
    }
+

+
    /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service
+
    /// running inside of its thread.
+
    ///
+
    /// See [`ReactionHandler::Command`] for the details.
+
    pub fn controller(&self) -> Controller<C> {
+
        self.controller.clone()
+
    }
+

+
    /// Joins the reactor thread.
+
    pub fn join(self) -> thread::Result<()> {
+
        self.thread.join()
+
    }
+
}
+

+
/// Internal [`Reactor`] runtime which is run in a dedicated thread.
+
///
+
/// Use this structure directly only if you'd like to have the full
+
/// control over the reactor thread.
+
///
+
/// This runtime structure **does not** spawn a thread and is **blocking**.
+
/// It implements the actual reactor event loop.
+
pub struct Runtime<H: ReactionHandler> {
+
    service: H,
+
    poll: Poll,
+
    receiver: Receiver<ControlMessage<H::Command>>,
+
    listeners: HashMap<Token, H::Listener>,
+
    transports: HashMap<Token, H::Transport>,
+
    timeouts: Timer,
+
}
+

+
impl<H: ReactionHandler> Runtime<H> {
+
    fn register_interests(&mut self) -> io::Result<()> {
+
        let registry = self.poll.registry();
+
        for (id, res) in self.listeners.iter_mut() {
+
            match res.interests() {
+
                None => registry.deregister(res)?,
+
                Some(interests) => registry.reregister(res, *id, interests)?,
+
            };
+
        }
+
        for (id, res) in self.transports.iter_mut() {
+
            match res.interests() {
+
                None => registry.deregister(res)?,
+
                Some(interests) => registry.reregister(res, *id, interests)?,
+
            };
+
        }
+
        Ok(())
+
    }
+

+
    fn run(mut self) {
+
        loop {
+
            let before_poll = LocalTime::now();
+
            let timeout = self
+
                .timeouts
+
                .next_expiring_from(before_poll)
+
                .unwrap_or(WAIT_TIMEOUT);
+

+
            self.register_interests()
+
                .expect("registering interests must work to ensure correct operation");
+

+
            log::trace!(target: "reactor", "Polling with timeout {timeout:?}");
+

+
            let mut events = Events::with_capacity(1024);
+

+
            // Blocking
+
            let res = self.poll.poll(&mut events, Some(timeout));
+

+
            let now = LocalTime::now();
+
            self.service.tick(now);
+

+
            // The way this is currently used basically ignores which keys have
+
            // timed out. So as long as *something* timed out, we wake the service.
+
            let timers_fired = self.timeouts.remove_expired_by(now);
+
            if timers_fired > 0 {
+
                log::trace!(target: "reactor", "Timer has fired");
+
                self.service.timer_reacted();
+
            }
+

+
            if let Err(err) = res {
+
                log::error!(target: "reactor", "Error during polling: {err}");
+
                self.service.handle_error(Error::Poll(err));
+
            }
+

+
            let awoken = self.handle_events(now, events);
+

+
            // Process the commands only if we awaken by the waker
+
            if awoken {
+
                loop {
+
                    match self.receiver.try_recv() {
+
                        Err(TryRecvError::Empty) => break,
+
                        Err(TryRecvError::Disconnected) => {
+
                            panic!("control channel disconnected unexpectedly")
+
                        }
+
                        Ok(ControlMessage::Shutdown) => return self.handle_shutdown(),
+
                        Ok(ControlMessage::Command(cmd)) => self.service.handle_command(cmd),
+
                    }
+
                }
+
            }
+

+
            self.handle_actions(now);
+
        }
+
    }
+

+
    /// # Returns
+
    ///
+
    /// Whether it was awakened by a waker
+
    fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
+
        let mut awoken = false;
+

+
        for event in events.into_iter() {
+
            let id = event.token();
+

+
            if id == WAKER {
+
                log::trace!(target: "reactor", "Awoken by the controller");
+
                awoken = true;
+
            } else if self.listeners.contains_key(&id) {
+
                log::trace!(target: "reactor", event:debug; "From listener");
+
                if !event.is_error() {
+
                    let listener = self.listeners.get_mut(&id).expect("resource disappeared");
+
                    listener
+
                        .handle(event)
+
                        .into_iter()
+
                        .for_each(|service_event| {
+
                            self.service.listener_reacted(id, service_event, time);
+
                        });
+
                } else {
+
                    let listener = self
+
                        .unregister_listener(id)
+
                        .expect("listener has disappeared");
+
                    self.service
+
                        .handle_error(Error::ListenerDisconnect(id, listener));
+
                }
+
            } else if self.transports.contains_key(&id) {
+
                log::trace!(target: "reactor", event:debug; "From transport");
+
                if !event.is_error() {
+
                    let transport = self.transports.get_mut(&id).expect("resource disappeared");
+
                    transport
+
                        .handle(event)
+
                        .into_iter()
+
                        .for_each(|service_event| {
+
                            self.service.transport_reacted(id, service_event, time);
+
                        });
+
                } else {
+
                    let transport = self
+
                        .unregister_transport(id)
+
                        .expect("transport has disappeared");
+
                    self.service
+
                        .handle_error(Error::TransportDisconnect(id, transport));
+
                }
+
            } else {
+
                panic!("token in poll which is not a known waker, listener or transport")
+
            }
+
        }
+

+
        awoken
+
    }
+

+
    fn handle_actions(&mut self, time: LocalTime) {
+
        while let Some(action) = self.service.next() {
+
            log::trace!(target: "reactor", "Handling action {action} from the service");
+

+
            // Deadlock may happen here if the service will generate events over and over
+
            // in the handle_* calls we may never get out of this loop
+
            if let Err(err) = self.handle_action(action, time) {
+
                log::error!(target: "reactor", "Error: {err}");
+
                self.service.handle_error(err);
+
            }
+
        }
+
    }
+

+
    /// # Safety
+
    ///
+
    /// Panics on [`Action::Send`] for read-only resources or resources which are not ready for a
+
    /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`])
+
    /// implementation.
+
    fn handle_action(
+
        &mut self,
+
        action: Action<H::Listener, H::Transport>,
+
        time: LocalTime,
+
    ) -> Result<(), Error<H::Listener, H::Transport>> {
+
        match action {
+
            Action::RegisterListener(token, mut listener) => {
+
                log::debug!(target: "reactor", token=token.0; "Registering listener");
+

+
                self.poll
+
                    .registry()
+
                    .register(&mut listener, token, Interest::READABLE)
+
                    .map_err(Error::Registration)?;
+
                self.listeners.insert(token, listener);
+
                self.service
+
                    .listener_registered(token, &self.listeners[&token]);
+
            }
+
            Action::RegisterTransport(token, mut transport) => {
+
                log::debug!(target: "reactor", token=token.0; "Registering transport");
+

+
                self.poll
+
                    .registry()
+
                    .register(&mut transport, token, Interest::READABLE)
+
                    .map_err(Error::Registration)?;
+
                self.transports.insert(token, transport);
+
                self.service
+
                    .transport_registered(token, &self.transports[&token]);
+
            }
+
            Action::UnregisterListener(token) => {
+
                let Some(listener) = self.unregister_listener(token) else {
+
                    return Ok(());
+
                };
+

+
                log::debug!(target: "reactor", token=token.0; "Handing over listener");
+
                self.service.handover_listener(token, listener);
+
            }
+
            Action::UnregisterTransport(token) => {
+
                let Some(transport) = self.unregister_transport(token) else {
+
                    return Ok(());
+
                };
+

+
                log::debug!(target: "reactor", token=token.0; "Handing over transport");
+
                self.service.handover_transport(token, transport);
+
            }
+
            Action::Send(token, data) => {
+
                log::trace!(target: "reactor", "Sending {} bytes to {token:?}", data.len());
+

+
                if let Some(transport) = self.transports.get_mut(&token) {
+
                    if let Err(e) = transport.write_atomic(&data) {
+
                        log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
+
                        if let Some(transport) = self.unregister_transport(token) {
+
                            return Err(Error::TransportDisconnect(token, transport));
+
                        }
+
                    }
+
                } else {
+
                    log::error!(target: "reactor", "Transport {token:?} is not in the reactor");
+
                }
+
            }
+
            Action::SetTimer(duration) => {
+
                log::trace!(target: "reactor", "Adding timer {duration:?} from now");
+

+
                self.timeouts.set_timeout(duration, time);
+
            }
+
        }
+
        Ok(())
+
    }
+

+
    fn handle_shutdown(self) {
+
        log::info!(target: "reactor", "Shutdown");
+

+
        // We just drop here?
+
    }
+

+
    fn unregister_listener(&mut self, token: Token) -> Option<H::Listener> {
+
        let Some(mut source) = self.listeners.remove(&token) else {
+
            log::warn!(target: "reactor", token=token.0; "Unregistering non-registered listener");
+
            return None;
+
        };
+

+
        if let Err(err) = self.poll.registry().deregister(&mut source) {
+
            log::warn!(target: "reactor", token=token.0; "Failed to deregister listener from mio: {err}");
+
        }
+

+
        Some(source)
+
    }
+

+
    fn unregister_transport(&mut self, token: Token) -> Option<H::Transport> {
+
        let Some(mut source) = self.transports.remove(&token) else {
+
            log::warn!(target: "reactor", token=token.0; "Unregistering non-registered transport");
+
            return None;
+
        };
+

+
        if let Err(err) = self.poll.registry().deregister(&mut source) {
+
            log::warn!(target: "reactor", token=token.0; "Failed to deregister transport from mio: {err}");
+
        }
+

+
        Some(source)
+
    }
+
}
added crates/radicle-node/src/reactor/controller.rs
@@ -0,0 +1,78 @@
+
use crossbeam_channel::Sender;
+
use mio::Waker;
+
use std::fmt::Debug;
+
use std::io;
+
use std::io::ErrorKind;
+
use std::sync::Arc;
+

+
pub enum ControlMessage<C> {
+
    Command(C),
+
    Shutdown,
+
}
+

+
/// Used by the [`crate::reactor::Reactor`] to inform the
+
/// [`crate::reactor::ReactionHandler`] about
+
/// incoming commands, sent via this [`Controller`] API
+
/// (see [`crate::reactor::ReactionHandler::Command`] for the details).
+
pub struct Controller<C> {
+
    sender: Sender<ControlMessage<C>>,
+
    waker: Arc<Waker>,
+
}
+

+
impl<C> Clone for Controller<C> {
+
    fn clone(&self) -> Self {
+
        Controller {
+
            sender: self.sender.clone(),
+
            waker: self.waker.clone(),
+
        }
+
    }
+
}
+

+
impl<C> Controller<C> {
+
    pub fn new(sender: Sender<ControlMessage<C>>, waker: Arc<Waker>) -> Self {
+
        Self { sender, waker }
+
    }
+

+
    pub fn cmd(&self, mut command: C) -> io::Result<()>
+
    where
+
        C: 'static,
+
    {
+
        {
+
            use std::any::Any;
+

+
            let cmd = Box::new(command);
+
            let any = cmd as Box<dyn Any>;
+
            let any = match any.downcast::<Box<dyn Debug>>() {
+
                Err(any) => {
+
                    log::debug!(target: "reactor::controller", "Sending command to the reactor");
+
                    any
+
                }
+
                Ok(debug) => {
+
                    log::debug!(target: "reactor::controller", "Sending command {debug:?} to the reactor");
+
                    debug
+
                }
+
            };
+
            command = *any.downcast().expect("from upcast");
+
        }
+

+
        self.sender
+
            .send(ControlMessage::Command(command))
+
            .map_err(|_| ErrorKind::BrokenPipe)?;
+
        self.wake()?;
+
        Ok(())
+
    }
+

+
    /// Shut down the reactor.
+
    pub fn shutdown(self) -> Result<(), Self> {
+
        log::info!(target: "reactor::controller", "Initiating reactor shutdown...");
+

+
        let res1 = self.sender.send(ControlMessage::Shutdown);
+
        let res2 = self.wake();
+
        res1.or(res2).map_err(|_| self)
+
    }
+

+
    pub fn wake(&self) -> io::Result<()> {
+
        log::trace!(target: "reactor::controller", "Wakening the reactor");
+
        self.waker.wake()
+
    }
+
}
added crates/radicle-node/src/reactor/listener.rs
@@ -0,0 +1,79 @@
+
use mio::event::{Event, Source};
+
use mio::net::{TcpListener, TcpStream};
+
use mio::{Interest, Registry, Token};
+
use std::io::Result;
+

+
use std::net::SocketAddr;
+
use std::time::Duration;
+

+
use crate::reactor::EventHandler;
+

+
/// A reactor-manageable TCP listener which can
+
/// be aware of additional encryption, authentication and other forms of
+
/// transport-layer protocols which will be automatically injected into accepted
+
/// connections.
+
#[derive(Debug)]
+
pub struct Listener {
+
    inner: TcpListener,
+
}
+

+
impl Source for Listener {
+
    fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
+
        self.inner.register(registry, token, interests)
+
    }
+

+
    fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
+
        self.inner.reregister(registry, token, interests)
+
    }
+

+
    fn deregister(&mut self, registry: &Registry) -> Result<()> {
+
        self.inner.deregister(registry)
+
    }
+
}
+

+
impl Listener {
+
    pub fn bind(addr: SocketAddr) -> Result<Self> {
+
        Ok(Self {
+
            inner: TcpListener::bind(addr)?,
+
        })
+
    }
+

+
    /// Returns the local [`std::net::SocketAddr`] on which self accepts
+
    /// connections.
+
    pub fn local_addr(&self) -> std::net::SocketAddr {
+
        self.inner
+
            .local_addr()
+
            .expect("TCP listener doesn't have local address")
+
    }
+

+
    fn accept(&mut self) -> Result<(TcpStream, SocketAddr)> {
+
        /// Maximum time to wait when reading from a socket.
+
        const READ_TIMEOUT: Duration = Duration::from_secs(6);
+

+
        /// Maximum time to wait when writing to a socket.
+
        const WRITE_TIMEOUT: Duration = Duration::from_secs(3);
+

+
        let (stream, peer) = self.inner.accept()?;
+
        let stream = std::net::TcpStream::from(stream);
+
        stream.set_read_timeout(Some(READ_TIMEOUT))?;
+
        stream.set_write_timeout(Some(WRITE_TIMEOUT))?;
+
        stream.set_nonblocking(true)?;
+
        Ok((TcpStream::from_std(stream), peer))
+
    }
+
}
+

+
impl EventHandler for Listener {
+
    type Reaction = Result<(TcpStream, SocketAddr)>;
+

+
    fn interests(&self) -> Option<Interest> {
+
        Some(Interest::READABLE)
+
    }
+

+
    fn handle(&mut self, event: &Event) -> Vec<Self::Reaction> {
+
        if !event.is_readable() {
+
            return vec![];
+
        }
+

+
        vec![self.accept()]
+
    }
+
}
added crates/radicle-node/src/reactor/session.rs
@@ -0,0 +1,320 @@
+
use std::error;
+
use std::fmt::{Debug, Display};
+
use std::io;
+
use std::io::{Read, Write};
+
use std::net::{Shutdown, SocketAddr};
+

+
use cyphernet::encrypt::noise::NoiseState;
+
use cyphernet::proxy::socks5;
+

+
use mio::event::Source;
+
use mio::net::TcpStream;
+
use mio::{Interest, Registry, Token};
+

+
pub type NoiseSession<E, D, S> = Protocol<NoiseState<E, D>, S>;
+
pub type Socks5Session<S> = Protocol<socks5::Socks5, S>;
+

+
pub trait Session: Send + Read + Write {
+
    type Inner: Session;
+
    type Artifact: Display;
+

+
    fn is_established(&self) -> bool {
+
        self.artifact().is_some()
+
    }
+

+
    fn run_handshake(&mut self) -> io::Result<()> {
+
        Ok(())
+
    }
+

+
    fn display(&self) -> String {
+
        self.artifact()
+
            .map(|artifact| artifact.to_string())
+
            .unwrap_or_else(|| "<no-id>".to_string())
+
    }
+

+
    fn artifact(&self) -> Option<Self::Artifact>;
+

+
    fn stream(&mut self) -> &mut TcpStream;
+

+
    fn disconnect(self) -> io::Result<()>;
+
}
+

+
pub trait StateMachine: Sized + Send {
+
    const NAME: &'static str;
+

+
    type Artifact;
+

+
    type Error: error::Error + Send + Sync + 'static;
+

+
    fn next_read_len(&self) -> usize;
+

+
    fn advance(&mut self, input: &[u8]) -> Result<Vec<u8>, Self::Error>;
+

+
    fn artifact(&self) -> Option<Self::Artifact>;
+

+
    // Blocking
+
    fn run_handshake<RW>(&mut self, stream: &mut RW) -> io::Result<()>
+
    where
+
        RW: Read + Write,
+
    {
+
        let mut input = vec![];
+
        while !self.is_complete() {
+
            let act = self.advance(&input).map_err(|err| {
+
                log::error!(target: Self::NAME, "Handshake failure: {err}");
+
                io::Error::other(err)
+
            })?;
+
            if !act.is_empty() {
+
                log::trace!(target: Self::NAME, "Sending handshake act {act:02x?}");
+

+
                stream.write_all(&act)?;
+
            }
+
            if !self.is_complete() {
+
                input = vec![0u8; self.next_read_len()];
+
                stream.read_exact(&mut input)?;
+

+
                log::trace!(target: Self::NAME, "Receiving handshake act {input:02x?}");
+
            }
+
        }
+

+
        log::debug!(target: Self::NAME, "Handshake protocol {} successfully completed", Self::NAME);
+
        Ok(())
+
    }
+

+
    fn is_complete(&self) -> bool {
+
        self.artifact().is_some()
+
    }
+
}
+

+
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
+
pub struct ProtocolArtifact<M: StateMachine, S: Session> {
+
    pub(crate) session: S::Artifact,
+
    pub(crate) state: M::Artifact,
+
}
+

+
impl<M: StateMachine, S: Session> Display for ProtocolArtifact<M, S> {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        f.debug_struct("ProtocolArtifact")
+
            .field("session", &"<omitted>")
+
            .field("state", &"<omitted>")
+
            .finish()
+
    }
+
}
+

+
#[derive(Copy, Clone, Eq, PartialEq)]
+
pub struct Protocol<M: StateMachine, S: Session> {
+
    pub(crate) state: M,
+
    pub(crate) session: S,
+
}
+

+
impl<M: StateMachine, S: Session> Protocol<M, S> {
+
    pub fn new(session: S, state_machine: M) -> Self {
+
        Self {
+
            state: state_machine,
+
            session,
+
        }
+
    }
+
}
+

+
impl<M: StateMachine, S: Session> io::Read for Protocol<M, S> {
+
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        log::trace!(target: M::NAME, "Reading event");
+

+
        if self.state.is_complete() || !self.session.is_established() {
+
            log::trace!(target: M::NAME, "Passing reading to inner not yet established session");
+
            return self.session.read(buf);
+
        }
+

+
        let len = self.state.next_read_len();
+
        let mut input = vec![0u8; len];
+
        self.session.read_exact(&mut input)?;
+

+
        log::trace!(target: M::NAME, "Received handshake act: {input:02x?}");
+

+
        if !input.is_empty() {
+
            let output = self.state.advance(&input).map_err(|err| {
+
                log::error!(target: M::NAME, "Handshake failure: {err}");
+
                io::Error::other(err)
+
            })?;
+

+
            if !output.is_empty() {
+
                log::trace!(target: M::NAME, "Sending handshake act on read: {output:02x?}");
+
                self.session.write_all(&output)?;
+
            }
+
        }
+

+
        Ok(0)
+
    }
+
}
+

+
impl<M: StateMachine, S: Session> Write for Protocol<M, S> {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        log::trace!(target: M::NAME, "Writing event (state_complete={}, session_established={})", self.state.is_complete(), self.session.is_established());
+

+
        if self.state.is_complete() || !self.session.is_established() {
+
            log::trace!(target: M::NAME, "Passing writing to inner session");
+
            return self.session.write(buf);
+
        }
+

+
        if self.state.next_read_len() == 0 {
+
            log::trace!(target: M::NAME, "Starting handshake protocol");
+

+
            let act = self.state.advance(&[]).map_err(|err| {
+
                log::error!(target: M::NAME, "Handshake failure: {err}");
+
                io::Error::other(err)
+
            })?;
+

+
            if !act.is_empty() {
+
                log::trace!(target: M::NAME, "Sending handshake act on write: {act:02x?}");
+
                self.session.write_all(&act)?;
+
            } else {
+
                log::trace!(target: M::NAME, "Handshake complete, passing data to inner session");
+
                return self.session.write(buf);
+
            }
+
        }
+

+
        if buf.is_empty() {
+
            Ok(0)
+
        } else {
+
            Err(io::ErrorKind::Interrupted.into())
+
        }
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        self.session.flush()
+
    }
+
}
+

+
impl<M: StateMachine, S: Session> Session for Protocol<M, S> {
+
    type Inner = S;
+
    type Artifact = ProtocolArtifact<M, S>;
+

+
    fn run_handshake(&mut self) -> io::Result<()> {
+
        log::debug!(target: M::NAME, "Starting handshake protocol {}", M::NAME);
+

+
        if !self.session.is_established() {
+
            self.session.run_handshake()?;
+
        }
+

+
        self.state.run_handshake(self.session.stream())
+
    }
+

+
    fn artifact(&self) -> Option<Self::Artifact> {
+
        Some(ProtocolArtifact {
+
            session: self.session.artifact()?,
+
            state: self.state.artifact()?,
+
        })
+
    }
+

+
    fn stream(&mut self) -> &mut TcpStream {
+
        self.session.stream()
+
    }
+

+
    fn disconnect(self) -> io::Result<()> {
+
        self.session.disconnect()
+
    }
+
}
+

+
impl<M: StateMachine, S: Session + Source> Source for Protocol<M, S> {
+
    fn register(
+
        &mut self,
+
        registry: &Registry,
+
        token: Token,
+
        interests: Interest,
+
    ) -> io::Result<()> {
+
        self.session.register(registry, token, interests)
+
    }
+

+
    fn reregister(
+
        &mut self,
+
        registry: &Registry,
+
        token: Token,
+
        interests: Interest,
+
    ) -> io::Result<()> {
+
        self.session.reregister(registry, token, interests)
+
    }
+

+
    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+
        self.session.deregister(registry)
+
    }
+
}
+

+
impl Session for TcpStream {
+
    type Inner = Self;
+
    type Artifact = SocketAddr;
+

+
    fn artifact(&self) -> Option<Self::Artifact> {
+
        self.peer_addr().ok()
+
    }
+

+
    fn stream(&mut self) -> &mut TcpStream {
+
        self
+
    }
+

+
    fn disconnect(self) -> io::Result<()> {
+
        self.shutdown(Shutdown::Both)
+
    }
+
}
+

+
mod impl_noise {
+
    use cyphernet::encrypt::noise::{error::NoiseError as Error, NoiseState as Noise};
+
    use cyphernet::{Digest, Ecdh};
+

+
    use super::*;
+

+
    #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
+
    pub struct NoiseArtifact<E: Ecdh, D: Digest> {
+
        pub handshake_hash: D::Output,
+
        pub remote_static_key: Option<E::Pk>,
+
    }
+

+
    impl<E: Ecdh, D: Digest> StateMachine for Noise<E, D> {
+
        const NAME: &'static str = "noise";
+
        type Artifact = NoiseArtifact<E, D>;
+
        type Error = Error;
+

+
        fn next_read_len(&self) -> usize {
+
            self.next_read_len()
+
        }
+

+
        fn advance(&mut self, input: &[u8]) -> Result<Vec<u8>, Self::Error> {
+
            self.advance(input)
+
        }
+

+
        fn artifact(&self) -> Option<Self::Artifact> {
+
            self.get_handshake_hash().map(|hh| NoiseArtifact {
+
                handshake_hash: hh,
+
                remote_static_key: self.get_remote_static_key(),
+
            })
+
        }
+
    }
+
}
+

+
mod impl_socks5 {
+
    use cyphernet::addr::{Host as _, HostName, NetAddr};
+
    use cyphernet::proxy::socks5::{Error, Socks5};
+

+
    use super::*;
+

+
    impl StateMachine for Socks5 {
+
        const NAME: &'static str = "socks5";
+

+
        type Artifact = NetAddr<HostName>;
+
        type Error = Error;
+

+
        fn next_read_len(&self) -> usize {
+
            self.next_read_len()
+
        }
+

+
        fn advance(&mut self, input: &[u8]) -> Result<Vec<u8>, Self::Error> {
+
            self.advance(input)
+
        }
+

+
        fn artifact(&self) -> Option<Self::Artifact> {
+
            match self {
+
                Socks5::Initial(addr, false) if !addr.requires_proxy() => Some(addr.clone()),
+
                Socks5::Active(addr) => Some(addr.clone()),
+
                _ => None,
+
            }
+
        }
+
    }
+
}
added crates/radicle-node/src/reactor/timer.rs
@@ -0,0 +1,128 @@
+
use std::collections::BTreeSet;
+
use std::time::Duration;
+

+
use localtime::{LocalDuration, LocalTime};
+

+
/// Manages timers and triggers timeouts.
+
#[derive(Debug, Default)]
+
pub struct Timer {
+
    /// Timeouts are durations since the UNIX epoch.
+
    timeouts: BTreeSet<localtime::LocalTime>,
+
}
+

+
impl Timer {
+
    /// Create a new timer containing no timeouts.
+
    pub fn new() -> Self {
+
        Self {
+
            timeouts: BTreeSet::new(),
+
        }
+
    }
+

+
    /// Return the number of timeouts being tracked.
+
    #[cfg(test)]
+
    pub fn count(&self) -> usize {
+
        self.timeouts.len()
+
    }
+

+
    /// Check whether there are timeouts being tracked.
+
    #[cfg(test)]
+
    pub fn has_timeouts(&self) -> bool {
+
        !self.timeouts.is_empty()
+
    }
+

+
    /// Register a new timeout relative to a certain point in time.
+
    pub fn set_timeout(&mut self, timeout: Duration, after: LocalTime) {
+
        let time = after + LocalDuration::from_millis(timeout.as_millis());
+
        self.timeouts.insert(time);
+
    }
+

+
    /// Get the first timeout expiring right at or after certain moment of time.
+
    /// Returns [`None`] if there are no timeouts.
+
    pub fn next_expiring_from(&self, time: impl Into<LocalTime>) -> Option<Duration> {
+
        let time = time.into();
+
        let last = *self.timeouts.first()?;
+
        Some(if last >= time {
+
            Duration::from_millis(last.as_millis() - time.as_millis())
+
        } else {
+
            Duration::from_secs(0)
+
        })
+
    }
+

+
    /// Removes timeouts which expire by a certain moment of time (inclusive),
+
    /// returning total number of timeouts which were removed.
+
    pub fn remove_expired_by(&mut self, time: LocalTime) -> usize {
+
        // Since `split_off` returns everything *after* the given key, including the key,
+
        // if a timer is set for exactly the given time, it would remain in the "after"
+
        // set of unexpired keys. This isn't what we want, therefore we add `1` to the
+
        // given time value so that it is put in the "before" set that gets expired
+
        // and overwritten.
+
        let at = time + LocalDuration::from_millis(1);
+
        let unexpired = self.timeouts.split_off(&at);
+
        let fired = self.timeouts.len();
+
        self.timeouts = unexpired;
+
        fired
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+

+
    #[test]
+
    fn test_wake_exact() {
+
        let mut tm = Timer::new();
+

+
        let now = LocalTime::now();
+
        tm.set_timeout(Duration::from_secs(8), now);
+
        tm.set_timeout(Duration::from_secs(9), now);
+
        tm.set_timeout(Duration::from_secs(10), now);
+

+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 2);
+
        assert_eq!(tm.count(), 1);
+
    }
+

+
    #[test]
+
    fn test_wake() {
+
        let mut tm = Timer::new();
+

+
        let now = LocalTime::now();
+
        tm.set_timeout(Duration::from_secs(8), now);
+
        tm.set_timeout(Duration::from_secs(16), now);
+
        tm.set_timeout(Duration::from_secs(64), now);
+
        tm.set_timeout(Duration::from_secs(72), now);
+

+
        assert_eq!(tm.remove_expired_by(now), 0);
+
        assert_eq!(tm.count(), 4);
+

+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 1);
+
        assert_eq!(tm.count(), 3, "one timeout has expired");
+

+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(66)), 2);
+
        assert_eq!(tm.count(), 1, "another two timeouts have expired");
+

+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(96)), 1);
+
        assert!(!tm.has_timeouts(), "all timeouts have expired");
+
    }
+

+
    #[test]
+
    fn test_next() {
+
        let mut tm = Timer::new();
+

+
        let mut now = LocalTime::now();
+
        tm.set_timeout(Duration::from_secs(3), now);
+
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));
+

+
        now = now + LocalDuration::from_secs(2);
+
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));
+

+
        now = now + LocalDuration::from_secs(1);
+
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));
+

+
        now = now + LocalDuration::from_secs(1);
+
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));
+

+
        assert_eq!(tm.remove_expired_by(now), 1);
+
        assert_eq!(tm.count(), 0);
+
        assert_eq!(tm.next_expiring_from(now), None);
+
    }
+
}
added crates/radicle-node/src/reactor/token.rs
@@ -0,0 +1,47 @@
+
pub use mio::Token;
+

+
pub const WAKER: Token = Token(0);
+

+
#[derive(Clone, Debug)]
+
pub struct Tokens {
+
    initial: usize,
+
    current: usize,
+
}
+

+
impl Tokens {
+
    pub fn new(initial: usize) -> Self {
+
        Tokens {
+
            initial,
+
            current: initial,
+
        }
+
    }
+

+
    /// Returns the next id for the resource.
+
    #[inline]
+
    pub fn advance(&mut self) -> Token {
+
        let current = self.current;
+

+
        self.current = {
+
            let candidate = current.wrapping_add(1);
+

+
            if candidate == usize::MIN {
+
                // If we overflowed, reset to the initial value.
+
                // The range of `usize` is so large that likely
+
                // a few years have passed since the early tokens
+
                // were used.
+
                log::info!(target = "reactor"; "Tokens wrapped.");
+
                self.initial
+
            } else {
+
                candidate
+
            }
+
        };
+

+
        Token(current)
+
    }
+
}
+

+
impl Default for Tokens {
+
    fn default() -> Self {
+
        Tokens::new(1)
+
    }
+
}
added crates/radicle-node/src/reactor/transport.rs
@@ -0,0 +1,328 @@
+
use std::collections::VecDeque;
+
use std::fmt::{Debug, Display, Formatter};
+
use std::io::Write;
+
use std::{fmt, io};
+

+
use mio::event::{Event, Source};
+
use mio::{Interest, Registry, Token};
+
use radicle::node::Link;
+

+
use crate::reactor::session::Session;
+
use crate::reactor::{EventHandler, WriteAtomic};
+

+
const READ_BUFFER_SIZE: usize = u16::MAX as usize;
+

+
/// An event happening for a [`Transport`] network transport and delivered to
+
/// a [`crate::reactor::ReactionHandler`].
+
pub enum SessionEvent<S: Session> {
+
    Established(S::Artifact),
+
    Data(Vec<u8>),
+
    Terminated(io::Error),
+
}
+

+
/// A state of [`Transport`] network transport.
+
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
+
pub enum TransportState {
+
    /// The transport is initiated, but the connection has not established yet.
+
    /// This happens only for outgoing connections due to the use of
+
    /// non-blocking version of a `connect` syscall. The state is switched once
+
    /// we receive first notification on a `write` event on this resource from
+
    /// the reactor `poll`.
+
    Init,
+

+
    /// The connection is established, but the session handshake is still in
+
    /// progress. This happens while encryption handshake, authentication and
+
    /// other protocols injected into the session haven't completed yet.
+
    Handshake,
+

+
    /// The session is active; all handshakes had completed.
+
    Active,
+

+
    /// Session was terminated by any reason: local shutdown, remote orderly
+
    /// shutdown, connectivity issue, dropped connections, encryption or
+
    /// authentication problem etc. Reading and writing from the resource in
+
    /// this state will result in an error ([`io::Error`]).
+
    Terminated,
+
}
+

+
/// Transport is an adaptor a around specific [`Session`] (implementing
+
/// session management, including optional handshake, encoding, etc.) to be used
+
/// as a transport resource in a [`crate::reactor::Reactor`].
+
#[derive(Debug)]
+
pub struct Transport<S: Session> {
+
    state: TransportState,
+
    session: S,
+
    link_direction: Link,
+
    write_intent: bool,
+
    read_buffer: Box<[u8; READ_BUFFER_SIZE]>,
+
    write_buffer: VecDeque<u8>,
+
}
+

+
impl<S: Session + Source> Source for Transport<S> {
+
    fn register(
+
        &mut self,
+
        registry: &Registry,
+
        token: Token,
+
        interests: Interest,
+
    ) -> io::Result<()> {
+
        self.session.register(registry, token, interests)
+
    }
+

+
    fn reregister(
+
        &mut self,
+
        registry: &Registry,
+
        token: Token,
+
        interests: Interest,
+
    ) -> io::Result<()> {
+
        self.session.reregister(registry, token, interests)
+
    }
+

+
    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+
        self.session.deregister(registry)
+
    }
+
}
+

+
impl<S: Session> Display for Transport<S> {
+
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+
        match self.session.artifact() {
+
            None => f
+
                .debug_struct("Transport")
+
                .field("state", &self.state)
+
                .field("link_direction", &self.link_direction)
+
                .field("write_intent", &self.write_intent)
+
                .finish(),
+
            Some(id) => Display::fmt(&id, f),
+
        }
+
    }
+
}
+

+
impl<S: Session> Transport<S> {
+
    /// Constructs reactor-managed resource around an existing [`Session`].
+
    ///
+
    /// NB: Must not be called for connections created in a non-blocking mode!
+
    ///
+
    /// # Errors
+
    ///
+
    /// If a session can be put into a non-blocking mode.
+
    pub fn with_session(session: S, link_direction: Link) -> io::Result<Self> {
+
        let state = if session.is_established() {
+
            // If we are disconnected, we will get instantly updated from the
+
            // reactor and the state will change automatically
+
            TransportState::Active
+
        } else {
+
            TransportState::Handshake
+
        };
+
        Ok(Self {
+
            state,
+
            session,
+
            link_direction,
+
            write_intent: true,
+
            read_buffer: Box::new([0u8; READ_BUFFER_SIZE]),
+
            write_buffer: VecDeque::new(),
+
        })
+
    }
+

+
    pub fn display(&self) -> impl Display {
+
        self.session.display()
+
    }
+

+
    fn terminate(&mut self, reason: io::Error) -> SessionEvent<S> {
+
        log::trace!(target: "transport", "Terminating session {self} due to {reason:?}");
+

+
        self.state = TransportState::Terminated;
+
        SessionEvent::Terminated(reason)
+
    }
+

+
    fn handle_io(&mut self, interest: Interest) -> Option<SessionEvent<S>> {
+
        if self.state == TransportState::Terminated {
+
            log::warn!(target: "transport", "Transport {self} is terminated, ignoring I/O event");
+
            return None;
+
        }
+

+
        let mut force_write_intent = false;
+
        if self.state == TransportState::Init {
+
            log::debug!(target: "transport", "Transport {self} is connected, initializing handshake");
+

+
            force_write_intent = true;
+
            self.state = TransportState::Handshake;
+
        } else if self.state == TransportState::Handshake {
+
            debug_assert!(!self.session.is_established());
+

+
            log::trace!(target: "transport", "Transport {self} got I/O while in handshake mode");
+
        }
+

+
        let resp = match interest {
+
            Interest::READABLE => self.handle_readable(),
+
            Interest::WRITABLE => self.handle_writable(),
+
            _ => unreachable!(),
+
        };
+

+
        if force_write_intent {
+
            self.write_intent = true;
+
        } else if self.state == TransportState::Handshake {
+
            // During handshake, after each read we need to write and then wait
+
            self.write_intent = interest == Interest::READABLE;
+
        }
+

+
        if matches!(&resp, Some(SessionEvent::Terminated(e)) if e.kind() == io::ErrorKind::ConnectionReset)
+
            && self.state != TransportState::Handshake
+
        {
+
            log::debug!(target: "transport", "Peer {self} has reset the connection");
+

+
            self.state = TransportState::Terminated;
+
            resp
+
        } else if self.session.is_established() && self.state == TransportState::Handshake {
+
            log::debug!(target: "transport", "Handshake with {self} is complete");
+

+
            // We just got connected; may need to send output
+
            self.write_intent = true;
+
            self.state = TransportState::Active;
+
            Some(SessionEvent::Established(
+
                self.session.artifact().expect("session is established"),
+
            ))
+
        } else {
+
            resp
+
        }
+
    }
+

+
    fn handle_writable(&mut self) -> Option<SessionEvent<S>> {
+
        if !self.session.is_established() {
+
            let _ = self.session.write(&[]);
+
            self.write_intent = true;
+
            return None;
+
        }
+
        match self.flush() {
+
            Ok(_) => None,
+
            // In this case, the write couldn't complete. Leave `needs_flush` set
+
            // to be notified when the socket is ready to write again.
+
            Err(err)
+
                if [
+
                    io::ErrorKind::WouldBlock,
+
                    io::ErrorKind::WriteZero,
+
                    io::ErrorKind::OutOfMemory,
+
                    io::ErrorKind::Interrupted,
+
                ]
+
                .contains(&err.kind()) =>
+
            {
+
                log::warn!(target: "transport", "Resource {} was not able to consume any data even though it has announced its write readiness", self.display());
+
                self.write_intent = true;
+
                None
+
            }
+
            Err(err) => Some(self.terminate(err)),
+
        }
+
    }
+

+
    fn handle_readable(&mut self) -> Option<SessionEvent<S>> {
+
        // Nb. Since `poll`, which this reactor is based on, is *level-triggered*,
+
        // we will be notified again if there is still data to be read on the socket.
+
        // Hence, there is no use in putting this socket read in a loop, as the second
+
        // invocation would likely block.
+
        match self.session.read(self.read_buffer.as_mut()) {
+
            Ok(0) if !self.session.is_established() => None,
+
            Ok(0) => Some(SessionEvent::Terminated(
+
                io::ErrorKind::ConnectionReset.into(),
+
            )),
+
            Ok(len) => Some(SessionEvent::Data(self.read_buffer[..len].to_vec())),
+
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
+
                // This shouldn't normally happen, since this function is only called
+
                // when there's data on the socket. We leave it here in case external
+
                // conditions change.
+

+
                log::warn!(target: "transport",
+
                    "WOULD_BLOCK on resource which had read intent - probably normal thing to happen"
+
                );
+
                None
+
            }
+
            Err(err) => Some(self.terminate(err)),
+
        }
+
    }
+

+
    fn flush_buffer(&mut self) -> io::Result<()> {
+
        let orig_len = self.write_buffer.len();
+

+
        log::trace!(target: "transport", "Resource {} is flushing its buffer of {orig_len} bytes", self.display());
+
        let len =
+
            self.session.write(self.write_buffer.make_contiguous()).or_else(|err| {
+
                match err.kind() {
+
                    io::ErrorKind::WouldBlock
+
                    | io::ErrorKind::OutOfMemory
+
                    | io::ErrorKind::WriteZero
+
                    | io::ErrorKind::Interrupted => {
+
                        log::warn!(target: "transport", "Resource {} kernel buffer is fulled (system message is '{err}')", self.display());
+
                        Ok(0)
+
                    },
+
                    _ => {
+
                        log::error!(target: "transport", "Resource {} failed write operation with message '{err}'", self.display());
+
                        Err(err)
+
                    },
+
                }
+
            })?;
+
        if orig_len > len {
+
            log::debug!(target: "transport", "Resource {} was able to consume only a part of the buffered data ({len} of {orig_len} bytes)", self.display());
+
            self.write_intent = true;
+
        } else {
+
            log::trace!(target: "transport", "Resource {} was able to consume all of the buffered data ({len} of {orig_len} bytes)", self.display());
+
            self.write_intent = false;
+
        }
+
        self.write_buffer.drain(..len);
+
        Ok(())
+
    }
+
}
+

+
impl<S: Session + Source> EventHandler for Transport<S> {
+
    type Reaction = SessionEvent<S>;
+

+
    fn interests(&self) -> Option<Interest> {
+
        use mio::Interest;
+
        use TransportState::*;
+

+
        match self.state {
+
            Init => Some(Interest::WRITABLE),
+
            Active | Handshake if self.write_intent => {
+
                Some(Interest::READABLE | Interest::WRITABLE)
+
            }
+
            Active | Handshake => Some(Interest::READABLE),
+
            Terminated => None,
+
        }
+
    }
+

+
    fn handle(&mut self, event: &Event) -> Vec<Self::Reaction> {
+
        let mut events = Vec::with_capacity(2);
+
        if event.is_writable() {
+
            if let Some(event) = self.handle_io(Interest::WRITABLE) {
+
                events.push(event);
+
            }
+
        }
+
        if event.is_readable() {
+
            if let Some(event) = self.handle_io(Interest::READABLE) {
+
                events.push(event);
+
            }
+
        }
+
        events
+
    }
+
}
+

+
impl<S: Session> Write for Transport<S> {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        self.write_atomic(buf).map(|_| buf.len())
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        let res = self.flush_buffer();
+
        self.session.flush().and(res)
+
    }
+
}
+

+
impl<S: Session> WriteAtomic for Transport<S> {
+
    fn is_ready_to_write(&self) -> bool {
+
        self.state == TransportState::Active
+
    }
+

+
    fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()> {
+
        if buf.is_empty() {
+
            return Ok(());
+
        }
+
        self.write_buffer.extend(buf);
+
        self.flush_buffer()
+
    }
+
}
modified crates/radicle-node/src/runtime.rs
@@ -11,14 +11,11 @@ use winpipe::WinListener as Listener;

use crossbeam_channel as chan;
use cyphernet::Ecdh;
-
use netservices::resource::NetAccept;
use radicle::cob::migrate;
use radicle::crypto;
use radicle::node::device::Device;
use radicle_fetch::FetchLimit;
use radicle_signals::Signal;
-
use reactor::poller::popol;
-
use reactor::Reactor;
use thiserror::Error;

use radicle::node;
@@ -33,6 +30,8 @@ use radicle::{cob, git, storage, Storage};

use crate::control;
use crate::node::{routing, NodeId};
+
use crate::reactor;
+
use crate::reactor::Reactor;
use crate::service::gossip;
use crate::wire;
use crate::wire::Wire;
@@ -115,7 +114,7 @@ pub struct Runtime {
    pub control: ControlSocket,
    pub handle: Handle,
    pub storage: Storage,
-
    pub reactor: Reactor<wire::Control, popol::Poller>,
+
    pub reactor: Reactor<wire::Control>,
    pub pool: worker::Pool,
    pub local_addrs: Vec<net::SocketAddr>,
    pub signals: chan::Receiver<Signal>,
@@ -225,13 +224,13 @@ impl Runtime {
        let mut local_addrs = Vec::new();

        for addr in listen {
-
            let listener = NetAccept::bind(&addr)?;
+
            let listener = reactor::Listener::bind(addr)?;
            let local_addr = listener.local_addr();

            local_addrs.push(local_addr);
            wire.listen(listener);
        }
-
        let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
+
        let reactor = Reactor::new(wire, thread::name(&id, "service"))?;
        let handle = Handle::new(home.clone(), reactor.controller(), emitter);

        let nid = *signer.public_key();
modified crates/radicle-node/src/runtime/handle.rs
@@ -13,13 +13,13 @@ use radicle::node::events::{Event, Events};
use radicle::node::policy;
use radicle::node::{Config, NodeId};
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
-
use reactor::poller::popol::PopolWaker;
use serde_json::json;
use thiserror::Error;

use crate::identity::RepoId;
use crate::node::{Alias, Command, FetchResult};
use crate::profile::Home;
+
use crate::reactor;
use crate::runtime::Emitter;
use crate::service;
use crate::service::{CommandError, QueryState};
@@ -68,7 +68,7 @@ impl<T> From<chan::SendError<T>> for Error {

pub struct Handle {
    pub(crate) home: Home,
-
    pub(crate) controller: reactor::Controller<wire::Control, PopolWaker>,
+
    pub(crate) controller: reactor::Controller<wire::Control>,

    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
    shutdown: Arc<AtomicBool>,
@@ -103,7 +103,7 @@ impl Clone for Handle {
impl Handle {
    pub fn new(
        home: Home,
-
        controller: reactor::Controller<wire::Control, PopolWaker>,
+
        controller: reactor::Controller<wire::Control>,
        emitter: Emitter<Event>,
    ) -> Self {
        Self {
modified crates/radicle-node/src/wire.rs
@@ -1,10 +1,8 @@
//! Implementation of the transport protocol.
//!
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
-
//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
-
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::{io, net, time};

@@ -15,13 +13,10 @@ use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
use cyphernet::proxy::socks5;
use cyphernet::{Digest, EcSk, Ecdh, Sha256};
use localtime::LocalTime;
-
use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
-
use netservices::session::{NoiseSession, ProtocolArtifact, Socks5Session};
-
use netservices::{NetConnection, NetReader, NetWriter};
+
use mio::net::TcpStream;
use radicle::node::device::Device;
-
use reactor::{ResourceId, ResourceType, Timestamp};

-
use radicle::collections::RandomMap;
+
use radicle::collections::{RandomMap, RandomSet};
use radicle::crypto;
use radicle::node::config::AddressConfig;
use radicle::node::Link;
@@ -33,6 +28,10 @@ pub use radicle_protocol::wire::frame::{Frame, FrameData, StreamId};
pub use radicle_protocol::wire::*;
use radicle_protocol::worker::{FetchRequest, FetchResult};

+
use crate::reactor;
+
use crate::reactor::{Listener, Transport};
+
use crate::reactor::{NoiseSession, ProtocolArtifact, SessionEvent, Socks5Session};
+
use crate::reactor::{Token, Tokens};
use crate::service;
use crate::service::io::Io;
use crate::service::FETCH_TIMEOUT;
@@ -69,14 +68,10 @@ pub enum Control {
}

/// Peer session type.
-
pub type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<net::TcpStream>>;
-
/// Peer session type (read-only).
-
pub type WireReader = NetReader<Socks5Session<net::TcpStream>>;
-
/// Peer session type (write-only).
-
pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::TcpStream>>;
+
type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<TcpStream>>;

/// Reactor action.
-
type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;
+
type Action<G> = reactor::Action<Listener, Transport<WireSession<G>>>;

/// A worker stream.
struct Stream {
@@ -175,23 +170,14 @@ impl Streams {
/// The initial state of an outbound peer before handshake is completed.
#[derive(Debug)]
struct Outbound {
-
    /// Resource ID, if registered.
-
    id: Option<ResourceId>,
+
    /// Token for I/O event notification.
+
    token: Token,
    /// Remote address.
    addr: NetAddr<HostName>,
    /// Remote Node ID.
    nid: NodeId,
}

-
/// The initial state of an inbound peer before handshake is completed.
-
#[derive(Debug)]
-
struct Inbound {
-
    /// Resource ID, if registered.
-
    id: Option<ResourceId>,
-
    /// Remote address.
-
    addr: NetAddr<HostName>,
-
}
-

/// Peer connection state machine.
enum Peer {
    /// The state after handshake is completed.
@@ -251,49 +237,49 @@ impl Peer {
}

/// Holds connected peers.
-
struct Peers(RandomMap<ResourceId, Peer>);
+
struct Peers(RandomMap<Token, Peer>);

impl Peers {
-
    fn get_mut(&mut self, id: &ResourceId) -> Option<&mut Peer> {
-
        self.0.get_mut(id)
+
    fn get_mut(&mut self, token: &Token) -> Option<&mut Peer> {
+
        self.0.get_mut(token)
    }

-
    fn entry(&mut self, id: ResourceId) -> Entry<ResourceId, Peer> {
-
        self.0.entry(id)
+
    fn entry(&mut self, token: Token) -> Entry<Token, Peer> {
+
        self.0.entry(token)
    }

-
    fn insert(&mut self, id: ResourceId, peer: Peer) {
-
        if self.0.insert(id, peer).is_some() {
-
            log::warn!(target: "wire", "Replacing existing peer id={id}");
+
    fn insert(&mut self, token: Token, peer: Peer) {
+
        if self.0.insert(token, peer).is_some() {
+
            log::warn!(target: "wire", token=token.0; "Replacing existing peer");
        }
    }

-
    fn remove(&mut self, id: &ResourceId) -> Option<Peer> {
+
    fn remove(&mut self, id: &Token) -> Option<Peer> {
        self.0.remove(id)
    }

-
    fn lookup(&self, node_id: &NodeId) -> Option<(ResourceId, &Peer)> {
+
    fn lookup(&self, id: &NodeId) -> Option<(Token, &Peer)> {
        self.0
            .iter()
-
            .find(|(_, peer)| peer.id() == Some(node_id))
-
            .map(|(fd, peer)| (*fd, peer))
+
            .find(|(_, peer)| peer.id() == Some(id))
+
            .map(|(token, peer)| (*token, peer))
    }

-
    fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(ResourceId, &mut Peer)> {
+
    fn lookup_mut(&mut self, id: &NodeId) -> Option<(Token, &mut Peer)> {
        self.0
            .iter_mut()
-
            .find(|(_, peer)| peer.id() == Some(node_id))
+
            .find(|(_, peer)| peer.id() == Some(id))
            .map(|(fd, peer)| (*fd, peer))
    }

-
    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
+
    fn active(&self) -> impl Iterator<Item = (Token, &NodeId, Link)> {
        self.0.iter().filter_map(|(id, peer)| match peer {
            Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
            Peer::Disconnecting { .. } => None,
        })
    }

-
    fn connected(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
+
    fn connected(&self) -> impl Iterator<Item = (Token, &NodeId)> {
        self.0.iter().filter_map(|(id, peer)| {
            if let Peer::Connected { nid, .. } = peer {
                Some((*id, nid))
@@ -309,7 +295,7 @@ impl Peers {
}

/// Wire protocol implementation for a set of peers.
-
pub struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
+
pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
    /// Backing service instance.
    service: Service<D, S, G>,
    /// Worker pool interface.
@@ -321,13 +307,15 @@ pub struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Outbound attempted peers without a session.
-
    outbound: RandomMap<RawFd, Outbound>,
+
    outbound: RandomMap<Token, Outbound>,
    /// Inbound peers without a session.
-
    inbound: RandomMap<RawFd, Inbound>,
+
    inbound: RandomSet<Token>,
    /// Listening addresses that are not yet registered.
-
    listening: RandomMap<RawFd, net::SocketAddr>,
+
    listening: RandomMap<Token, net::SocketAddr>,
    /// Peer (established) sessions.
    peers: Peers,
+
    /// A (practically) infinite source of tokens to identify transports and listeners.
+
    tokens: Tokens,
}

impl<D, S, G> Wire<D, S, G>
@@ -345,43 +333,45 @@ where
            signer,
            metrics: Metrics::default(),
            actions: VecDeque::new(),
-
            inbound: RandomMap::default(),
+
            inbound: RandomSet::default(),
            outbound: RandomMap::default(),
            listening: RandomMap::default(),
            peers: Peers(RandomMap::default()),
+
            tokens: Tokens::default(),
        }
    }

-
    pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
-
        self.listening
-
            .insert(socket.as_raw_fd(), socket.local_addr());
-
        self.actions.push_back(Action::RegisterListener(socket));
+
    pub fn listen(&mut self, socket: Listener) {
+
        let token = self.tokens.advance();
+
        self.listening.insert(token, socket.local_addr());
+
        self.actions
+
            .push_back(Action::RegisterListener(token, socket));
    }

-
    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
-
        match self.peers.entry(id) {
+
    fn disconnect(&mut self, token: Token, reason: DisconnectReason) -> Option<(NodeId, Link)> {
+
        match self.peers.entry(token) {
            Entry::Vacant(_) => {
                // Connecting peer with no session.
-
                log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
-
                self.actions.push_back(Action::UnregisterTransport(id));
+
                log::debug!(target: "wire", token=token.0; "Disconnecting pending peer: {reason}");
+
                self.actions.push_back(Action::UnregisterTransport(token));

                // Check for attempted outbound connections. Unestablished inbound connections don't
                // have an NID yet.
                self.outbound
                    .values()
-
                    .find(|o| o.id == Some(id))
+
                    .find(|o| o.token == token)
                    .map(|o| (o.nid, Link::Outbound))
            }
            Entry::Occupied(mut e) => match e.get_mut() {
                Peer::Disconnecting { nid, link, .. } => {
-
                    log::error!(target: "wire", "Peer with id={id} is already disconnecting");
+
                    log::error!(target: "wire", token=token.0; "Peer is already disconnecting");

                    nid.map(|n| (n, *link))
                }
                Peer::Connected {
                    nid, streams, link, ..
                } => {
-
                    log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
+
                    log::debug!(target: "wire", token=token.0; "Disconnecting peer: {reason}");
                    let nid = *nid;
                    let link = *link;

@@ -391,7 +381,7 @@ where
                        link,
                        reason,
                    });
-
                    self.actions.push_back(Action::UnregisterTransport(id));
+
                    self.actions.push_back(Action::UnregisterTransport(token));

                    Some((nid, link))
                }
@@ -484,33 +474,33 @@ where
        }
    }

-
    fn cleanup(&mut self, id: ResourceId, fd: RawFd) {
-
        if self.inbound.remove(&fd).is_some() {
-
            log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
-
        } else if let Some(outbound) = self.outbound.remove(&fd) {
-
            log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
+
    fn cleanup(&mut self, token: Token) {
+
        if self.inbound.remove(&token) {
+
            log::debug!(target: "wire", token=token.0; "Cleaning up inbound peer state");
+
        } else if let Some(outbound) = self.outbound.remove(&token) {
+
            log::debug!(target: "wire", token=token.0; "Cleaning up outbound peer state");
            self.service.disconnected(
                outbound.nid,
                Link::Outbound,
                &DisconnectReason::connection(),
            );
        } else {
-
            log::debug!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
+
            log::debug!(target: "wire", token=token.0; "Tried to cleanup unknown peer");
        }
    }
}

-
impl<D, S, G> reactor::Handler for Wire<D, S, G>
+
impl<D, S, G> reactor::ReactionHandler for Wire<D, S, G>
where
    D: service::Store + Send,
    S: WriteStorage + Send + 'static,
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
{
-
    type Listener = NetAccept<WireSession<G>>;
-
    type Transport = NetTransport<WireSession<G>>;
+
    type Listener = Listener;
+
    type Transport = Transport<WireSession<G>>;
    type Command = Control;

-
    fn tick(&mut self, time: Timestamp) {
+
    fn tick(&mut self, time: LocalTime) {
        self.metrics.open_channels = self
            .peers
            .iter()
@@ -529,133 +519,109 @@ where
        );
    }

-
    fn handle_timer(&mut self) {
+
    fn timer_reacted(&mut self) {
        self.service.wake();
    }

-
    fn handle_listener_event(
+
    fn listener_reacted(
        &mut self,
-
        _: ResourceId, // Nb. This is the ID of the listener socket.
-
        event: ListenerEvent<WireSession<G>>,
-
        _: Timestamp,
+
        _: Token, // Note that this is the token of the listener socket.
+
        event: io::Result<(TcpStream, std::net::SocketAddr)>,
+
        _: LocalTime,
    ) {
        match event {
-
            ListenerEvent::Accepted(connection) => {
-
                let Ok(remote) = connection.remote_addr() else {
-
                    log::warn!(target: "wire", "Accepted connection doesn't have remote address; dropping..");
-
                    drop(connection);
-

-
                    return;
-
                };
+
            Ok((connection, peer)) => {
+
                let remote = NetAddr::from(peer);
                let InetHost::Ip(ip) = remote.host else {
                    log::error!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
                    drop(connection);

                    return;
                };
-
                let fd = connection.as_raw_fd();
-
                log::debug!(target: "wire", "Inbound connection from {remote} (fd={fd})..");
+
                log::debug!(target: "wire", "Inbound connection from {remote}..");

                // If the service doesn't want to accept this connection,
                // we drop the connection here, which disconnects the socket.
                if !self.service.accepted(ip) {
-
                    log::debug!(target: "wire", "Rejecting inbound connection from {ip} (fd={fd})..");
+
                    log::debug!(target: "wire", "Rejecting inbound connection from {ip}..");
                    drop(connection);

                    return;
                }

-
                let session = match accept::<G>(
+
                let session = accept::<G>(
                    remote.clone().into(),
                    connection,
                    self.signer.clone().into_inner(),
-
                ) {
-
                    Ok(s) => s,
-
                    Err(e) => {
-
                        log::error!(target: "wire", "Error creating session for {ip}: {e}");
-
                        return;
-
                    }
-
                };
-
                let transport = match NetTransport::with_session(
-
                    session,
-
                    netservices::Direction::Inbound,
-
                ) {
+
                );
+
                let transport = match Transport::with_session(session, Link::Inbound) {
                    Ok(transport) => transport,
                    Err(err) => {
                        log::error!(target: "wire", "Failed to create transport for accepted connection: {err}");
                        return;
                    }
                };
-
                log::debug!(target: "wire", "Accepted inbound connection from {remote} (fd={fd})..");

-
                self.inbound.insert(
-
                    fd,
-
                    Inbound {
-
                        id: None,
-
                        addr: remote.into(),
-
                    },
-
                );
+
                let token = self.tokens.advance();
+
                log::debug!(target: "wire", token=token.0; "Accepted inbound connection from {remote}..");
+

+
                self.inbound.insert(token);
                self.actions
-
                    .push_back(reactor::Action::RegisterTransport(transport))
+
                    .push_back(reactor::Action::RegisterTransport(token, transport))
            }
-
            ListenerEvent::Failure(err) => {
+
            Err(err) => {
                log::error!(target: "wire", "Error listening for inbound connections: {err}");
            }
        }
    }

-
    fn handle_registered(&mut self, fd: RawFd, id: ResourceId, typ: ResourceType) {
-
        match typ {
-
            ResourceType::Listener => {
-
                if let Some(local_addr) = self.listening.remove(&fd) {
-
                    self.service.listening(local_addr);
-
                }
-
            }
-
            ResourceType::Transport => {
-
                if let Some(outbound) = self.outbound.get_mut(&fd) {
-
                    log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
-
                    outbound.id = Some(id);
-
                } else if let Some(inbound) = self.inbound.get_mut(&fd) {
-
                    log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
-
                    inbound.id = Some(id);
-
                } else {
-
                    log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
-
                }
-
            }
+
    fn listener_registered(&mut self, token: Token, _listener: &Self::Listener) {
+
        if let Some(local_addr) = self.listening.remove(&token) {
+
            self.service.listening(local_addr);
        }
    }

-
    fn handle_transport_event(
+
    fn transport_registered(&mut self, token: Token, _transport: &Self::Transport) {
+
        if let Some(outbound) = self.outbound.get_mut(&token) {
+
            log::debug!(target: "wire", token=token.0; "Outbound peer resource registered for {}", outbound.nid);
+
        } else if self.inbound.contains(&token) {
+
            log::debug!(target: "wire", token=token.0; "Inbound peer resource registered");
+
        } else {
+
            log::warn!(target: "wire", token=token.0; "Unknown peer registered");
+
        }
+
    }
+

+
    fn transport_reacted(
        &mut self,
-
        id: ResourceId,
+
        token: Token,
        event: SessionEvent<WireSession<G>>,
-
        _: Timestamp,
+
        _: LocalTime,
    ) {
        match event {
-
            SessionEvent::Established(fd, ProtocolArtifact { state, .. }) => {
+
            SessionEvent::Established(ProtocolArtifact { state, session }) => {
                // SAFETY: With the NoiseXK protocol, there is always a remote static key.
                let nid: NodeId = state.remote_static_key.unwrap();
                // Make sure we don't try to connect to ourselves by mistake.
                if &nid == self.signer.public_key() {
                    log::error!(target: "wire", "Self-connection detected, disconnecting..");
-
                    self.disconnect(id, DisconnectReason::SelfConnection);
+
                    self.disconnect(token, DisconnectReason::SelfConnection);

                    return;
                }
-
                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
+

+
                let established_addr: NetAddr<HostName> = session.state;
+
                let (addr, link) = if self.inbound.remove(&token) {
                    self.metrics.peer(nid).inbound_connection_attempts += 1;
-
                    (peer.addr, Link::Inbound)
-
                } else if let Some(peer) = self.outbound.remove(&fd) {
+
                    (established_addr, Link::Inbound)
+
                } else if let Some(peer) = self.outbound.remove(&token) {
                    assert_eq!(nid, peer.nid);
                    (peer.addr, Link::Outbound)
                } else {
-
                    log::error!(target: "wire", "Session for {nid} (id={id}) not found");
+
                    log::error!(target: "wire", token=token.0; "Session for {nid} not found");
                    return;
                };
                log::debug!(
-
                    target: "wire",
-
                    "Session established with {nid} (id={id}) (fd={fd}) ({})",
-
                    if link.is_inbound() { "inbound" } else { "outbound" }
+
                    target: "wire", token=token.0, direction:display=link; "Session established with {nid}"
                );

                // Connections to close.
@@ -681,21 +647,17 @@ where
                    conflicting.extend(
                        self.peers
                            .active()
-
                            .filter(|(c_id, d, _)| **d == nid && *c_id != id)
+
                            .filter(|(c_id, d, _)| **d == nid && *c_id != token)
                            .map(|(c_id, _, link)| (c_id, link)),
                    );

                    // Outbound connection attempts with the same remote key but a different file
                    // descriptor are conflicting.
-
                    conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
-
                        if other.nid == nid && *c_fd != fd {
-
                            other.id.map(|c_id| (c_id, Link::Outbound))
-
                        } else {
-
                            None
-
                        }
+
                    conflicting.extend(self.outbound.iter().filter_map(|(c_id, other)| {
+
                        (other.nid == nid && *c_id != token).then_some((*c_id, Link::Outbound))
                    }));

-
                    for (c_id, c_link) in conflicting {
+
                    for (c_token, c_link) in conflicting {
                        // If we have precedence, the inbound connection is closed.
                        // In the case where both connections are inbound or outbound,
                        // we close the newer connection, ie. the one with the higher
@@ -703,31 +665,31 @@ where
                        let close = match (link, c_link) {
                            (Link::Inbound, Link::Outbound) => {
                                if precedence {
-
                                    id
+
                                    token
                                } else {
-
                                    c_id
+
                                    c_token
                                }
                            }
                            (Link::Outbound, Link::Inbound) => {
                                if precedence {
-
                                    c_id
+
                                    c_token
                                } else {
-
                                    id
+
                                    token
                                }
                            }
-
                            (Link::Inbound, Link::Inbound) => id.max(c_id),
-
                            (Link::Outbound, Link::Outbound) => id.max(c_id),
+
                            (Link::Inbound, Link::Inbound) => token.max(c_token),
+
                            (Link::Outbound, Link::Outbound) => token.max(c_token),
                        };

                        log::warn!(
-
                            target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
+
                            target: "wire", "Established session with token {} conflicts with existing session with token {} for {nid}", token.0, c_token.0
                        );
                        disconnect.push(close);
                    }
                }
                for id in &disconnect {
                    log::warn!(
-
                        target: "wire", "Closing conflicting session (id={id}) with {nid}.."
+
                        target: "wire", token=token.0; "Closing conflicting session with {nid}.."
                    );
                    // Disconnect and return the associated NID of the peer, if available.
                    if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
@@ -738,9 +700,9 @@ where
                            .disconnected(nid, link, &DisconnectReason::Conflict);
                    }
                }
-
                if !disconnect.contains(&id) {
+
                if !disconnect.contains(&token) {
                    self.peers
-
                        .insert(id, Peer::connected(nid, addr.clone(), link));
+
                        .insert(token, Peer::connected(nid, addr.clone(), link));
                    self.service.connected(nid, addr.into(), link);
                }
            }
@@ -750,7 +712,7 @@ where
                    inbox,
                    streams,
                    ..
-
                }) = self.peers.get_mut(&id)
+
                }) = self.peers.get_mut(&token)
                {
                    let metrics = self.metrics.peer(*nid);
                    metrics.received_bytes += data.len();
@@ -758,7 +720,10 @@ where
                    if inbox.input(&data).is_err() {
                        log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
                        log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
-
                        self.disconnect(id, DisconnectReason::Session(session::Error::Misbehavior));
+
                        self.disconnect(
+
                            token,
+
                            DisconnectReason::Session(session::Error::Misbehavior),
+
                        );

                        return;
                    }
@@ -859,7 +824,7 @@ where
                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
                                }
                                self.disconnect(
-
                                    id,
+
                                    token,
                                    DisconnectReason::Session(session::Error::Misbehavior),
                                );
                                break;
@@ -867,11 +832,11 @@ where
                        }
                    }
                } else {
-
                    log::warn!(target: "wire", "Dropping message from unconnected peer (id={id})");
+
                    log::warn!(target: "wire", token=token.0; "Dropping message from unconnected peer");
                }
            }
            SessionEvent::Terminated(err) => {
-
                self.disconnect(id, DisconnectReason::Connection(Arc::new(err)));
+
                self.disconnect(token, DisconnectReason::Connection(Arc::new(err)));
            }
        }
    }
@@ -884,22 +849,18 @@ where
        }
    }

-
    fn handle_error(
-
        &mut self,
-
        err: reactor::Error<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>,
-
    ) {
+
    fn handle_error(&mut self, err: reactor::Error<Listener, Transport<WireSession<G>>>) {
        match err {
-
            reactor::Error::Poll(err) => {
+
            reactor::Error::Poll(err) | reactor::Error::Registration(err) => {
                // TODO: This should be a fatal error, there's nothing we can do here.
                log::error!(target: "wire", "Can't poll connections: {err}");
            }
-
            reactor::Error::ListenerDisconnect(id, _) => {
+
            reactor::Error::ListenerDisconnect(token, _) => {
                // TODO: This should be a fatal error, there's nothing we can do here.
-
                log::error!(target: "wire", "Listener {id} disconnected");
+
                log::error!(target: "wire", token=token.0; "Listener disconnected");
            }
-
            reactor::Error::TransportDisconnect(id, transport) => {
-
                let fd = transport.as_raw_fd();
-
                log::error!(target: "wire", "Peer id={id} (fd={fd}) disconnected");
+
            reactor::Error::TransportDisconnect(token, transport) => {
+
                log::error!(target: "wire", token=token.0; "Peer disconnected");

                // We're dropping the TCP connection here.
                drop(transport);
@@ -907,7 +868,7 @@ where
                // The peer transport is already disconnected and removed from the reactor;
                // therefore there is no need to initiate a disconnection. We simply remove
                // the peer from the map.
-
                match self.peers.remove(&id) {
+
                match self.peers.remove(&token) {
                    Some(mut peer) => {
                        if let Peer::Connected { streams, .. } = &mut peer {
                            streams.shutdown();
@@ -923,26 +884,24 @@ where
                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
                        }
                    }
-
                    None => self.cleanup(id, fd),
+
                    None => self.cleanup(token),
                }
            }
        }
    }

-
    fn handover_listener(&mut self, id: ResourceId, _listener: Self::Listener) {
-
        log::error!(target: "wire", "Listener handover is not supported (id={id})");
+
    fn handover_listener(&mut self, token: Token, _listener: Self::Listener) {
+
        log::error!(target: "wire", token=token.0; "Listener handover is not supported");
    }

-
    fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport) {
-
        let fd = transport.as_raw_fd();
-

-
        match self.peers.entry(id) {
+
    fn handover_transport(&mut self, token: Token, transport: Self::Transport) {
+
        match self.peers.entry(token) {
            Entry::Occupied(e) => {
                match e.get() {
                    Peer::Disconnecting {
                        nid, reason, link, ..
                    } => {
-
                        log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");
+
                        log::debug!(target: "wire", token=token.0; "Transport handover for disconnecting peer");

                        // Disconnect TCP stream.
                        drop(transport);
@@ -960,11 +919,11 @@ where
                        e.remove();
                    }
                    Peer::Connected { nid, .. } => {
-
                        panic!("Wire::handover_transport: Unexpected handover of connected peer {nid} with id={id} (fd={fd})");
+
                        panic!("Wire::handover_transport: Unexpected handover of connected peer {nid} with token {}", token.0);
                    }
                }
            }
-
            Entry::Vacant(_) => self.cleanup(id, fd),
+
            Entry::Vacant(_) => self.cleanup(token),
        }
    }
}
@@ -1028,27 +987,24 @@ where
                        self.service.config(),
                    )
                    .and_then(|session| {
-
                        NetTransport::<WireSession<G>>::with_session(
-
                            session,
-
                            netservices::Direction::Outbound,
-
                        )
+
                        Transport::<WireSession<G>>::with_session(session, Link::Outbound)
                    }) {
                        Ok(transport) => {
+
                            let token = self.tokens.advance();
                            self.outbound.insert(
-
                                transport.as_raw_fd(),
+
                                token,
                                Outbound {
-
                                    id: None,
+
                                    token,
                                    nid: node_id,
                                    addr: addr.to_inner(),
                                },
                            );
                            log::debug!(
                                target: "wire",
-
                                "Registering outbound transport for {node_id} (fd={})..",
-
                                transport.as_raw_fd()
+
                                "Registering outbound transport for {node_id}.."
                            );
                            self.actions
-
                                .push_back(reactor::Action::RegisterTransport(transport));
+
                                .push_back(reactor::Action::RegisterTransport(token, transport));
                        }
                        Err(err) => {
                            log::error!(target: "wire", "Error establishing connection to {addr}: {err}");
@@ -1180,27 +1136,41 @@ pub fn dial<G: Ecdh<Pk = NodeId>>(
            ));
        }
    };
-
    // Nb. This timeout is currently not used by the underlying library due to the
-
    // `socket2` library not supporting non-blocking connect with timeout.
-
    let connection = net::TcpStream::connect_nonblocking(inet_addr, DEFAULT_DIAL_TIMEOUT)?;
+

+
    let addr = {
+
        use std::net::ToSocketAddrs as _;
+

+
        inet_addr
+
            .to_socket_addrs()?
+
            .next()
+
            .ok_or(io::ErrorKind::AddrNotAvailable)?
+
    };
+

+
    // NOTE: Previously, here was a not about setting the timeout for connecting
+
    // to DEFAULT_DIAL_TIMEOUT, for which we have not figured out a way yet.
+
    // Generally, we should understand what happens if the following call to
+
    // `connect` fails. How do we learn about it? Where's the leak?
+

+
    let connection = TcpStream::connect(addr)?;
+

    // Whether to tunnel regular connections through the proxy.
    let force_proxy = config.proxy.is_some();

-
    session::<G>(
+
    Ok(session::<G>(
        remote_addr,
        Some(remote_id),
        connection,
        force_proxy,
        signer,
-
    )
+
    ))
}

/// Accept a new connection.
pub fn accept<G: Ecdh<Pk = NodeId>>(
    remote_addr: NetAddr<HostName>,
-
    connection: net::TcpStream,
+
    connection: TcpStream,
    signer: G,
-
) -> io::Result<WireSession<G>> {
+
) -> WireSession<G> {
    session::<G>(remote_addr, None, connection, false, signer)
}

@@ -1208,42 +1178,64 @@ pub fn accept<G: Ecdh<Pk = NodeId>>(
fn session<G: Ecdh<Pk = NodeId>>(
    remote_addr: NetAddr<HostName>,
    remote_id: Option<NodeId>,
-
    connection: net::TcpStream,
+
    connection: TcpStream,
    force_proxy: bool,
    signer: G,
-
) -> io::Result<WireSession<G>> {
-
    // There are issues with setting TCP_NODELAY on WSL. Not a big deal.
+
) -> WireSession<G> {
    if let Err(e) = connection.set_nodelay(true) {
-
        log::warn!(target: "wire", "Unable to set TCP_NODELAY on fd {}: {e}", connection.as_raw_fd());
+
        log::warn!(target: "wire", "Unable to set TCP_NODELAY on socket {connection:?}: {e}");
    }
-
    connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
-
    connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
-

-
    let sock = socket2::Socket::from(connection);
-
    let ka = socket2::TcpKeepalive::new()
-
        .with_time(time::Duration::from_secs(30))
-
        .with_interval(time::Duration::from_secs(10))
-
        .with_retries(3);
-
    if let Err(e) = sock.set_tcp_keepalive(&ka) {
-
        log::warn!(target: "wire", "Unable to set TCP_KEEPALIVE on fd {}: {e}", sock.as_raw_fd());
+

+
    let connection = std::net::TcpStream::from(connection);
+

+
    if let Err(e) = connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT)) {
+
        log::warn!(target: "wire", "Unable to set TCP read timeout on socket {connection:?}: {e}");
+
    }
+

+
    if let Err(e) = connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT)) {
+
        log::warn!(target: "wire", "Unable to set TCP write timeout on socket {connection:?}: {e}");
    }

-
    let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
-
    let proxy = Socks5Session::with(sock.into(), socks5);
-
    let pair = G::generate_keypair();
-
    let keyset = Keyset {
-
        e: pair.0,
-
        s: Some(signer),
-
        re: None,
-
        rs: remote_id,
+
    #[cfg(feature = "socket2")]
+
    {
+
        let connection = socket2::SockRef::from(&connection);
+

+
        let ka = socket2::TcpKeepalive::new()
+
            .with_time(time::Duration::from_secs(30))
+
            .with_interval(time::Duration::from_secs(10));
+

+
        #[cfg(not(windows))]
+
        let ka = ka.with_retries(3);
+

+
        if let Err(e) = connection.set_tcp_keepalive(&ka) {
+
            log::warn!(target: "wire", "Failed to set TCP_KEEPALIVE on socket {connection:?}: {e}");
+
        }
+
    }
+

+
    #[cfg(not(feature = "socket2"))]
+
    log::debug!(target: "wire", "Not attempting to set TCP_KEEPALIVE on socket {connection:?}");
+

+
    let connection = TcpStream::from_std(connection);
+

+
    let proxy = {
+
        let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
+
        Socks5Session::new(connection, socks5)
    };
-
    let noise = NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(
-
        NOISE_XK,
-
        remote_id.is_some(),
-
        &[],
-
        keyset,
-
    );
-
    Ok(WireSession::with(proxy, noise))
+

+
    let noise = {
+
        let pair = G::generate_keypair();
+

+
        let keyset = Keyset {
+
            e: pair.0,
+
            s: Some(signer),
+
            re: None,
+
            rs: remote_id,
+
        };
+

+
        NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(NOISE_XK, remote_id.is_some(), &[], keyset)
+
    };
+

+
    WireSession::new(proxy, noise)
}

#[cfg(test)]
modified crates/radicle/src/node.rs
@@ -751,6 +751,15 @@ impl Link {
    }
}

+
impl std::fmt::Display for Link {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            Link::Outbound => write!(f, "outbound"),
+
            Link::Inbound => write!(f, "inbound"),
+
        }
+
    }
+
}
+

/// An established network connection with a peer.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]