Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Use `std::time` for reactor and wire
Merged lorenz opened 6 months ago

This reduces the exposure to the localtime crate, using std instead.

3 files changed +52 -51 9bcdd353 e404f103
modified crates/radicle-node/src/reactor.rs
@@ -10,11 +10,10 @@ use std::fmt::{Debug, Display, Formatter};
use std::io::ErrorKind;
use std::sync::Arc;
use std::thread::JoinHandle;
-
use std::time::Duration;
+
use std::time::{Duration, Instant};
use std::{io, thread};

use crossbeam_channel::{unbounded, Receiver, TryRecvError};
-
use localtime::LocalTime;
use mio::event::{Event, Source};
use mio::{Events, Interest, Poll, Waker};
use thiserror::Error;
@@ -212,7 +211,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
-
    fn tick(&mut self, time: localtime::LocalTime);
+
    fn tick(&mut self, instant: Instant);

    /// Method called by the reactor when a previously set timeout is fired.
    ///
@@ -227,7 +226,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
        &mut self,
        token: Token,
        reaction: <Self::Listener as EventHandler>::Reaction,
-
        time: localtime::LocalTime,
+
        instant: Instant,
    );

    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
@@ -235,7 +234,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
        &mut self,
        token: Token,
        reaction: <Self::Transport as EventHandler>::Reaction,
-
        time: localtime::LocalTime,
+
        instant: Instant,
    );

    /// Method called by the reactor when a given resource was successfully registered
@@ -372,7 +371,7 @@ impl<H: ReactionHandler> Runtime<H> {

    fn run(mut self) {
        loop {
-
            let before_poll = LocalTime::now();
+
            let before_poll = Instant::now();
            let timeout = self
                .timeouts
                .next_expiring_from(before_poll)
@@ -388,12 +387,12 @@ impl<H: ReactionHandler> Runtime<H> {
            // Blocking
            let res = self.poll.poll(&mut events, Some(timeout));

-
            let now = LocalTime::now();
-
            self.service.tick(now);
+
            let tick = Instant::now();
+
            self.service.tick(tick);

            // The way this is currently used basically ignores which keys have
            // timed out. So as long as *something* timed out, we wake the service.
-
            let timers_fired = self.timeouts.remove_expired_by(now);
+
            let timers_fired = self.timeouts.remove_expired_by(tick);
            if timers_fired > 0 {
                log::trace!(target: "reactor", "Timer has fired");
                self.service.timer_reacted();
@@ -404,7 +403,9 @@ impl<H: ReactionHandler> Runtime<H> {
                self.service.handle_error(Error::Poll(err));
            }

-
            let awoken = self.handle_events(now, events);
+
            let awoken = self.handle_events(tick, events);
+

+
            log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));

            // Process the commands only if we awoken by the waker.
            if awoken {
@@ -420,14 +421,14 @@ impl<H: ReactionHandler> Runtime<H> {
                }
            }

-
            self.handle_actions(now);
+
            self.handle_actions(tick);
        }
    }

    /// # Returns
    ///
    /// Whether one of the events was originated from the waker.
-
    fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
+
    fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
        let mut deregistered = Vec::new();
@@ -449,7 +450,7 @@ impl<H: ReactionHandler> Runtime<H> {
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.listener_reacted(token, service_event, time);
+
                            self.service.listener_reacted(token, service_event, instant);
                        });
                } else {
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
@@ -470,7 +471,8 @@ impl<H: ReactionHandler> Runtime<H> {
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.transport_reacted(token, service_event, time);
+
                            self.service
+
                                .transport_reacted(token, service_event, instant);
                        });
                } else {
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
@@ -488,13 +490,13 @@ impl<H: ReactionHandler> Runtime<H> {
        awoken
    }

-
    fn handle_actions(&mut self, time: LocalTime) {
+
    fn handle_actions(&mut self, instant: Instant) {
        while let Some(action) = self.service.next() {
            log::trace!(target: "reactor", "Handling action {action} from the service");

            // Deadlock may happen here if the service will generate events over and over
            // in the handle_* calls we may never get out of this loop
-
            if let Err(err) = self.handle_action(action, time) {
+
            if let Err(err) = self.handle_action(action, instant) {
                log::error!(target: "reactor", "Error: {err}");
                self.service.handle_error(err);
            }
@@ -504,7 +506,7 @@ impl<H: ReactionHandler> Runtime<H> {
    fn handle_action(
        &mut self,
        action: Action<H::Listener, H::Transport>,
-
        time: LocalTime,
+
        instant: Instant,
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
@@ -562,7 +564,7 @@ impl<H: ReactionHandler> Runtime<H> {
            Action::SetTimer(duration) => {
                log::trace!(target: "reactor", "Adding timer {duration:?} from now");

-
                self.timeouts.set_timeout(duration, time);
+
                self.timeouts.set_timeout(duration, instant);
            }
        }
        Ok(())
modified crates/radicle-node/src/reactor/timer.rs
@@ -1,13 +1,12 @@
-
use std::collections::BTreeSet;
use std::time::Duration;
+
use std::{collections::BTreeSet, time::Instant};

-
use localtime::{LocalDuration, LocalTime};

/// Manages timers and triggers timeouts.
#[derive(Debug, Default)]
pub struct Timer {
    /// Timeouts are durations since the UNIX epoch.
-
    timeouts: BTreeSet<localtime::LocalTime>,
+
    timeouts: BTreeSet<Instant>,
}

impl Timer {
@@ -31,32 +30,32 @@ impl Timer {
    }

    /// Register a new timeout relative to a certain point in time.
-
    pub fn set_timeout(&mut self, timeout: Duration, after: LocalTime) {
-
        let time = after + LocalDuration::from_millis(timeout.as_millis());
+
    pub fn set_timeout(&mut self, timeout: Duration, after: Instant) {
+
        let time = after + timeout;
        self.timeouts.insert(time);
    }

    /// Get the first timeout expiring right at or after certain moment of time.
    /// Returns [`None`] if there are no timeouts.
-
    pub fn next_expiring_from(&self, time: impl Into<LocalTime>) -> Option<Duration> {
+
    pub fn next_expiring_from(&self, time: impl Into<Instant>) -> Option<Duration> {
        let time = time.into();
        let last = *self.timeouts.first()?;
        Some(if last >= time {
-
            Duration::from_millis(last.as_millis() - time.as_millis())
+
            last - time
        } else {
-
            Duration::from_secs(0)
+
            Duration::default()
        })
    }

    /// Removes timeouts which expire by a certain moment of time (inclusive),
    /// returning total number of timeouts which were removed.
-
    pub fn remove_expired_by(&mut self, time: LocalTime) -> usize {
+
    pub fn remove_expired_by(&mut self, instant: Instant) -> usize {
        // Since `split_off` returns everything *after* the given key, including the key,
        // if a timer is set for exactly the given time, it would remain in the "after"
        // set of unexpired keys. This isn't what we want, therefore we add `1` to the
        // given time value so that it is put in the "before" set that gets expired
        // and overwritten.
-
        let at = time + LocalDuration::from_millis(1);
+
        let at = instant + Duration::from_millis(1);
        let unexpired = self.timeouts.split_off(&at);
        let fired = self.timeouts.len();
        self.timeouts = unexpired;
@@ -72,12 +71,12 @@ mod tests {
    fn test_wake_exact() {
        let mut tm = Timer::new();

-
        let now = LocalTime::now();
+
        let now = Instant::now();
        tm.set_timeout(Duration::from_secs(8), now);
        tm.set_timeout(Duration::from_secs(9), now);
        tm.set_timeout(Duration::from_secs(10), now);

-
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 2);
+
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
        assert_eq!(tm.count(), 1);
    }

@@ -85,7 +84,7 @@ mod tests {
    fn test_wake() {
        let mut tm = Timer::new();

-
        let now = LocalTime::now();
+
        let now = Instant::now();
        tm.set_timeout(Duration::from_secs(8), now);
        tm.set_timeout(Duration::from_secs(16), now);
        tm.set_timeout(Duration::from_secs(64), now);
@@ -94,13 +93,13 @@ mod tests {
        assert_eq!(tm.remove_expired_by(now), 0);
        assert_eq!(tm.count(), 4);

-
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 1);
+
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
        assert_eq!(tm.count(), 3, "one timeout has expired");

-
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(66)), 2);
+
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
        assert_eq!(tm.count(), 1, "another two timeouts have expired");

-
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(96)), 1);
+
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
        assert!(!tm.has_timeouts(), "all timeouts have expired");
    }

@@ -108,17 +107,17 @@ mod tests {
    fn test_next() {
        let mut tm = Timer::new();

-
        let mut now = LocalTime::now();
+
        let mut now = Instant::now();
        tm.set_timeout(Duration::from_secs(3), now);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));

-
        now = now + LocalDuration::from_secs(2);
+
        now += Duration::from_secs(2);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));

-
        now = now + LocalDuration::from_secs(1);
+
        now += Duration::from_secs(1);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

-
        now = now + LocalDuration::from_secs(1);
+
        now += Duration::from_secs(1);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

        assert_eq!(tm.remove_expired_by(now), 1);
modified crates/radicle-node/src/wire.rs
@@ -5,6 +5,7 @@ use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
+
use std::time::{Instant, SystemTime};
use std::{io, net, time};

use crossbeam_channel as chan;
@@ -12,7 +13,6 @@ use cyphernet::addr::{HostName, InetHost, NetAddr};
use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
use cyphernet::proxy::socks5;
use cyphernet::{Digest, EcSk, Ecdh, Sha256};
-
use localtime::LocalTime;
use mio::net::TcpStream;
use radicle::node::device::Device;

@@ -313,6 +313,8 @@ pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + E
    peers: Peers,
    /// A (practically) infinite source of tokens to identify transports and listeners.
    tokens: Tokens,
+
    /// Record of system time and instant when the node started.
+
    epoch: (SystemTime, Instant),
}

impl<D, S, G> Wire<D, S, G>
@@ -335,9 +337,14 @@ where
            listening: RandomMap::default(),
            peers: Peers(RandomMap::default()),
            tokens: Tokens::default(),
+
            epoch: (SystemTime::now(), Instant::now()),
        }
    }

+
    fn time(&self, instant: Instant) -> SystemTime {
+
        self.epoch.0 + (instant - self.epoch.1)
+
    }
+

    pub fn listen(&mut self, socket: Listener) {
        let token = self.tokens.advance();
        self.listening.insert(token, socket.local_addr());
@@ -496,7 +503,7 @@ where
    type Listener = Listener;
    type Transport = Transport<WireSession<G>>;

-
    fn tick(&mut self, time: LocalTime) {
+
    fn tick(&mut self, time: Instant) {
        self.metrics.open_channels = self
            .peers
            .iter()
@@ -509,10 +516,8 @@ where
            })
            .sum();
        self.metrics.worker_queue_size = self.worker.len();
-
        self.service.tick(
-
            LocalTime::from_millis(time.as_millis() as u128),
-
            &self.metrics,
-
        );
+

+
        self.service.tick(self.time(time).into(), &self.metrics);
    }

    fn timer_reacted(&mut self) {
@@ -523,7 +528,7 @@ where
        &mut self,
        _: Token, // Note that this is the token of the listener socket.
        event: io::Result<(TcpStream, std::net::SocketAddr)>,
-
        _: LocalTime,
+
        _: Instant,
    ) {
        match event {
            Ok((connection, peer)) => {
@@ -587,12 +592,7 @@ where
        }
    }

-
    fn transport_reacted(
-
        &mut self,
-
        token: Token,
-
        event: SessionEvent<WireSession<G>>,
-
        _: LocalTime,
-
    ) {
+
    fn transport_reacted(&mut self, token: Token, event: SessionEvent<WireSession<G>>, _: Instant) {
        match event {
            SessionEvent::Established(ProtocolArtifact { state, session }) => {
                // SAFETY: With the NoiseXK protocol, there is always a remote static key.