Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Revert "node: Use `std::time` for reactor and wire"
Archived fintohaps opened 4 months ago

This reverts commit e404f1038f461264f9395742ef74f5b710bef54a.

The commit above introduced the Epoch type to reduce the need for depending on the localtime crate. Eventually, node operators noticed that announcements would appear to be in the future and cause connection issues.

After more research, an issue1 was found that describes how on some platforms, primarily Linux and MacOS, Instant::duration_since would return a shortened duration. This is the cause of this apparent drift in time.

1

https://github.com/rust-lang/rust/issues/87906

3 files changed +52 -81 58305cda a929a58f
modified crates/radicle-node/src/reactor.rs
@@ -10,10 +10,11 @@ use std::fmt::{Debug, Display, Formatter};
use std::io::ErrorKind;
use std::sync::Arc;
use std::thread::JoinHandle;
-
use std::time::{Duration, Instant};
+
use std::time::Duration;
use std::{io, thread};

use crossbeam_channel::{unbounded, Receiver, TryRecvError};
+
use localtime::LocalTime;
use mio::event::{Event, Source};
use mio::{Events, Interest, Poll, Waker};
use thiserror::Error;
@@ -211,7 +212,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
-
    fn tick(&mut self, instant: Instant);
+
    fn tick(&mut self, time: localtime::LocalTime);

    /// Method called by the reactor when a previously set timeout is fired.
    ///
@@ -226,7 +227,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
        &mut self,
        token: Token,
        reaction: <Self::Listener as EventHandler>::Reaction,
-
        instant: Instant,
+
        time: localtime::LocalTime,
    );

    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
@@ -234,7 +235,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
        &mut self,
        token: Token,
        reaction: <Self::Transport as EventHandler>::Reaction,
-
        instant: Instant,
+
        time: localtime::LocalTime,
    );

    /// Method called by the reactor when a given resource was successfully registered
@@ -371,7 +372,7 @@ impl<H: ReactionHandler> Runtime<H> {

    fn run(mut self) {
        loop {
-
            let before_poll = Instant::now();
+
            let before_poll = LocalTime::now();
            let timeout = self
                .timeouts
                .next_expiring_from(before_poll)
@@ -387,12 +388,12 @@ impl<H: ReactionHandler> Runtime<H> {
            // Blocking
            let res = self.poll.poll(&mut events, Some(timeout));

-
            let tick = Instant::now();
-
            self.service.tick(tick);
+
            let now = LocalTime::now();
+
            self.service.tick(now);

            // The way this is currently used basically ignores which keys have
            // timed out. So as long as *something* timed out, we wake the service.
-
            let timers_fired = self.timeouts.remove_expired_by(tick);
+
            let timers_fired = self.timeouts.remove_expired_by(now);
            if timers_fired > 0 {
                log::trace!(target: "reactor", "Timer has fired");
                self.service.timer_reacted();
@@ -403,9 +404,7 @@ impl<H: ReactionHandler> Runtime<H> {
                self.service.handle_error(Error::Poll(err));
            }

-
            let awoken = self.handle_events(tick, events);
-

-
            log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));
+
            let awoken = self.handle_events(now, events);

            // Process the commands only if we awoken by the waker.
            if awoken {
@@ -421,14 +420,14 @@ impl<H: ReactionHandler> Runtime<H> {
                }
            }

-
            self.handle_actions(tick);
+
            self.handle_actions(now);
        }
    }

    /// # Returns
    ///
    /// Whether one of the events was originated from the waker.
-
    fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
+
    fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
        let mut deregistered = Vec::new();
@@ -450,7 +449,7 @@ impl<H: ReactionHandler> Runtime<H> {
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.listener_reacted(token, service_event, instant);
+
                            self.service.listener_reacted(token, service_event, time);
                        });
                } else {
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
@@ -471,8 +470,7 @@ impl<H: ReactionHandler> Runtime<H> {
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service
-
                                .transport_reacted(token, service_event, instant);
+
                            self.service.transport_reacted(token, service_event, time);
                        });
                } else {
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
@@ -490,13 +488,13 @@ impl<H: ReactionHandler> Runtime<H> {
        awoken
    }

-
    fn handle_actions(&mut self, instant: Instant) {
+
    fn handle_actions(&mut self, time: LocalTime) {
        while let Some(action) = self.service.next() {
            log::trace!(target: "reactor", "Handling action {action} from the service");

            // Deadlock may happen here if the service will generate events over and over
            // in the handle_* calls we may never get out of this loop
-
            if let Err(err) = self.handle_action(action, instant) {
+
            if let Err(err) = self.handle_action(action, time) {
                log::error!(target: "reactor", "Error: {err}");
                self.service.handle_error(err);
            }
@@ -506,7 +504,7 @@ impl<H: ReactionHandler> Runtime<H> {
    fn handle_action(
        &mut self,
        action: Action<H::Listener, H::Transport>,
-
        instant: Instant,
+
        time: LocalTime,
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
@@ -564,7 +562,7 @@ impl<H: ReactionHandler> Runtime<H> {
            Action::SetTimer(duration) => {
                log::trace!(target: "reactor", "Adding timer {duration:?} from now");

-
                self.timeouts.set_timeout(duration, instant);
+
                self.timeouts.set_timeout(duration, time);
            }
        }
        Ok(())
modified crates/radicle-node/src/reactor/timer.rs
@@ -1,11 +1,13 @@
+
use std::collections::BTreeSet;
use std::time::Duration;
-
use std::{collections::BTreeSet, time::Instant};
+

+
use localtime::{LocalDuration, LocalTime};

/// Manages timers and triggers timeouts.
#[derive(Debug, Default)]
pub struct Timer {
    /// Timeouts are durations since the UNIX epoch.
-
    timeouts: BTreeSet<Instant>,
+
    timeouts: BTreeSet<localtime::LocalTime>,
}

impl Timer {
@@ -29,32 +31,32 @@ impl Timer {
    }

    /// Register a new timeout relative to a certain point in time.
-
    pub fn set_timeout(&mut self, timeout: Duration, after: Instant) {
-
        let time = after + timeout;
+
    pub fn set_timeout(&mut self, timeout: Duration, after: LocalTime) {
+
        let time = after + LocalDuration::from_millis(timeout.as_millis());
        self.timeouts.insert(time);
    }

    /// Get the first timeout expiring right at or after certain moment of time.
    /// Returns [`None`] if there are no timeouts.
-
    pub fn next_expiring_from(&self, time: impl Into<Instant>) -> Option<Duration> {
+
    pub fn next_expiring_from(&self, time: impl Into<LocalTime>) -> Option<Duration> {
        let time = time.into();
        let last = *self.timeouts.first()?;
        Some(if last >= time {
-
            last - time
+
            Duration::from_millis(last.as_millis() - time.as_millis())
        } else {
-
            Duration::default()
+
            Duration::from_secs(0)
        })
    }

    /// Removes timeouts which expire by a certain moment of time (inclusive),
    /// returning total number of timeouts which were removed.
-
    pub fn remove_expired_by(&mut self, instant: Instant) -> usize {
+
    pub fn remove_expired_by(&mut self, time: LocalTime) -> usize {
        // Since `split_off` returns everything *after* the given key, including the key,
        // if a timer is set for exactly the given time, it would remain in the "after"
        // set of unexpired keys. This isn't what we want, therefore we add `1` to the
        // given time value so that it is put in the "before" set that gets expired
        // and overwritten.
-
        let at = instant + Duration::from_millis(1);
+
        let at = time + LocalDuration::from_millis(1);
        let unexpired = self.timeouts.split_off(&at);
        let fired = self.timeouts.len();
        self.timeouts = unexpired;
@@ -70,12 +72,12 @@ mod tests {
    fn test_wake_exact() {
        let mut tm = Timer::new();

-
        let now = Instant::now();
+
        let now = LocalTime::now();
        tm.set_timeout(Duration::from_secs(8), now);
        tm.set_timeout(Duration::from_secs(9), now);
        tm.set_timeout(Duration::from_secs(10), now);

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 2);
        assert_eq!(tm.count(), 1);
    }

@@ -83,7 +85,7 @@ mod tests {
    fn test_wake() {
        let mut tm = Timer::new();

-
        let now = Instant::now();
+
        let now = LocalTime::now();
        tm.set_timeout(Duration::from_secs(8), now);
        tm.set_timeout(Duration::from_secs(16), now);
        tm.set_timeout(Duration::from_secs(64), now);
@@ -92,13 +94,13 @@ mod tests {
        assert_eq!(tm.remove_expired_by(now), 0);
        assert_eq!(tm.count(), 4);

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 1);
        assert_eq!(tm.count(), 3, "one timeout has expired");

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(66)), 2);
        assert_eq!(tm.count(), 1, "another two timeouts have expired");

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(96)), 1);
        assert!(!tm.has_timeouts(), "all timeouts have expired");
    }

@@ -106,17 +108,17 @@ mod tests {
    fn test_next() {
        let mut tm = Timer::new();

-
        let mut now = Instant::now();
+
        let mut now = LocalTime::now();
        tm.set_timeout(Duration::from_secs(3), now);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));

-
        now += Duration::from_secs(2);
+
        now = now + LocalDuration::from_secs(2);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));

-
        now += Duration::from_secs(1);
+
        now = now + LocalDuration::from_secs(1);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

-
        now += Duration::from_secs(1);
+
        now = now + LocalDuration::from_secs(1);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

        assert_eq!(tm.remove_expired_by(now), 1);
modified crates/radicle-node/src/wire.rs
@@ -5,7 +5,6 @@ use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
-
use std::time::{Instant, SystemTime};
use std::{io, net, time};

use crossbeam_channel as chan;
@@ -13,6 +12,7 @@ use cyphernet::addr::{HostName, InetHost, NetAddr};
use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
use cyphernet::proxy::socks5;
use cyphernet::{Digest, EcSk, Ecdh, Sha256};
+
use localtime::LocalTime;
use mio::net::TcpStream;
use radicle::node::device::Device;

@@ -291,35 +291,6 @@ impl Peers {
    }
}

-
/// The epoch time of when the node started.
-
struct Epoch {
-
    /// The system time when the node started.
-
    started_time: SystemTime,
-
    /// The instant when the node started.
-
    started_at: Instant,
-
}
-

-
impl Epoch {
-
    /// Construct a new [`Epoch`].
-
    fn new(started_time: SystemTime, started_at: Instant) -> Self {
-
        Self {
-
            started_time,
-
            started_at,
-
        }
-
    }
-

-
    /// Construct an [`Epoch`] where both values are recorded using their
-
    /// equivalent `now` constructors.
-
    fn now() -> Self {
-
        Self::new(SystemTime::now(), Instant::now())
-
    }
-

-
    /// Get the elapsed [`SystemTime`] given a later [`Instant`].
-
    fn elapsed_time(&self, later: Instant) -> SystemTime {
-
        self.started_time + (later - self.started_at)
-
    }
-
}
-

/// Wire protocol implementation for a set of peers.
pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
    /// Backing service instance.
@@ -342,8 +313,6 @@ pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + E
    peers: Peers,
    /// A (practically) infinite source of tokens to identify transports and listeners.
    tokens: Tokens,
-
    /// Record of system time and instant when the node started.
-
    epoch: Epoch,
}

impl<D, S, G> Wire<D, S, G>
@@ -366,14 +335,9 @@ where
            listening: RandomMap::default(),
            peers: Peers(RandomMap::default()),
            tokens: Tokens::default(),
-
            epoch: Epoch::now(),
        }
    }

-
    fn time(&self, instant: Instant) -> SystemTime {
-
        self.epoch.elapsed_time(instant)
-
    }
-

    pub fn listen(&mut self, socket: Listener) {
        let token = self.tokens.advance();
        self.listening.insert(token, socket.local_addr());
@@ -532,7 +496,7 @@ where
    type Listener = Listener;
    type Transport = Transport<WireSession<G>>;

-
    fn tick(&mut self, time: Instant) {
+
    fn tick(&mut self, time: LocalTime) {
        self.metrics.open_channels = self
            .peers
            .iter()
@@ -545,8 +509,10 @@ where
            })
            .sum();
        self.metrics.worker_queue_size = self.worker.len();
-

-
        self.service.tick(self.time(time).into(), &self.metrics);
+
        self.service.tick(
+
            LocalTime::from_millis(time.as_millis() as u128),
+
            &self.metrics,
+
        );
    }

    fn timer_reacted(&mut self) {
@@ -557,7 +523,7 @@ where
        &mut self,
        _: Token, // Note that this is the token of the listener socket.
        event: io::Result<(TcpStream, std::net::SocketAddr)>,
-
        _: Instant,
+
        _: LocalTime,
    ) {
        match event {
            Ok((connection, peer)) => {
@@ -621,7 +587,12 @@ where
        }
    }

-
    fn transport_reacted(&mut self, token: Token, event: SessionEvent<WireSession<G>>, _: Instant) {
+
    fn transport_reacted(
+
        &mut self,
+
        token: Token,
+
        event: SessionEvent<WireSession<G>>,
+
        _: LocalTime,
+
    ) {
        match event {
            SessionEvent::Established(ProtocolArtifact { state, session }) => {
                // SAFETY: With the NoiseXK protocol, there is always a remote static key.