Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Revert "node: Use `std::time` for reactor and wire"
✗ CI failure Fintan Halpenny committed 5 months ago
commit a929a58f0881dd811e659e61a4f95566451d5a88
parent 58305cda321859656022770c86839049ffb8c3d8
1 passed 1 failed (2 total) View logs
3 files changed +52 -81
modified crates/radicle-node/src/reactor.rs
@@ -10,10 +10,11 @@ use std::fmt::{Debug, Display, Formatter};
use std::io::ErrorKind;
use std::sync::Arc;
use std::thread::JoinHandle;
-
use std::time::{Duration, Instant};
+
use std::time::Duration;
use std::{io, thread};

use crossbeam_channel::{unbounded, Receiver, TryRecvError};
+
use localtime::LocalTime;
use mio::event::{Event, Source};
use mio::{Events, Interest, Poll, Waker};
use thiserror::Error;
@@ -211,7 +212,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
-
    fn tick(&mut self, instant: Instant);
+
    fn tick(&mut self, time: localtime::LocalTime);

    /// Method called by the reactor when a previously set timeout is fired.
    ///
@@ -226,7 +227,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
        &mut self,
        token: Token,
        reaction: <Self::Listener as EventHandler>::Reaction,
-
        instant: Instant,
+
        time: localtime::LocalTime,
    );

    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
@@ -234,7 +235,7 @@ pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::T
        &mut self,
        token: Token,
        reaction: <Self::Transport as EventHandler>::Reaction,
-
        instant: Instant,
+
        time: localtime::LocalTime,
    );

    /// Method called by the reactor when a given resource was successfully registered
@@ -371,7 +372,7 @@ impl<H: ReactionHandler> Runtime<H> {

    fn run(mut self) {
        loop {
-
            let before_poll = Instant::now();
+
            let before_poll = LocalTime::now();
            let timeout = self
                .timeouts
                .next_expiring_from(before_poll)
@@ -387,12 +388,12 @@ impl<H: ReactionHandler> Runtime<H> {
            // Blocking
            let res = self.poll.poll(&mut events, Some(timeout));

-
            let tick = Instant::now();
-
            self.service.tick(tick);
+
            let now = LocalTime::now();
+
            self.service.tick(now);

            // The way this is currently used basically ignores which keys have
            // timed out. So as long as *something* timed out, we wake the service.
-
            let timers_fired = self.timeouts.remove_expired_by(tick);
+
            let timers_fired = self.timeouts.remove_expired_by(now);
            if timers_fired > 0 {
                log::trace!(target: "reactor", "Timer has fired");
                self.service.timer_reacted();
@@ -403,9 +404,7 @@ impl<H: ReactionHandler> Runtime<H> {
                self.service.handle_error(Error::Poll(err));
            }

-
            let awoken = self.handle_events(tick, events);
-

-
            log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));
+
            let awoken = self.handle_events(now, events);

            // Process the commands only if we awoken by the waker.
            if awoken {
@@ -421,14 +420,14 @@ impl<H: ReactionHandler> Runtime<H> {
                }
            }

-
            self.handle_actions(tick);
+
            self.handle_actions(now);
        }
    }

    /// # Returns
    ///
    /// Whether one of the events was originated from the waker.
-
    fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
+
    fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
        let mut deregistered = Vec::new();
@@ -450,7 +449,7 @@ impl<H: ReactionHandler> Runtime<H> {
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service.listener_reacted(token, service_event, instant);
+
                            self.service.listener_reacted(token, service_event, time);
                        });
                } else {
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
@@ -471,8 +470,7 @@ impl<H: ReactionHandler> Runtime<H> {
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
-
                            self.service
-
                                .transport_reacted(token, service_event, instant);
+
                            self.service.transport_reacted(token, service_event, time);
                        });
                } else {
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
@@ -490,13 +488,13 @@ impl<H: ReactionHandler> Runtime<H> {
        awoken
    }

-
    fn handle_actions(&mut self, instant: Instant) {
+
    fn handle_actions(&mut self, time: LocalTime) {
        while let Some(action) = self.service.next() {
            log::trace!(target: "reactor", "Handling action {action} from the service");

            // Deadlock may happen here if the service will generate events over and over
            // in the handle_* calls we may never get out of this loop
-
            if let Err(err) = self.handle_action(action, instant) {
+
            if let Err(err) = self.handle_action(action, time) {
                log::error!(target: "reactor", "Error: {err}");
                self.service.handle_error(err);
            }
@@ -506,7 +504,7 @@ impl<H: ReactionHandler> Runtime<H> {
    fn handle_action(
        &mut self,
        action: Action<H::Listener, H::Transport>,
-
        instant: Instant,
+
        time: LocalTime,
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
@@ -564,7 +562,7 @@ impl<H: ReactionHandler> Runtime<H> {
            Action::SetTimer(duration) => {
                log::trace!(target: "reactor", "Adding timer {duration:?} from now");

-
                self.timeouts.set_timeout(duration, instant);
+
                self.timeouts.set_timeout(duration, time);
            }
        }
        Ok(())
modified crates/radicle-node/src/reactor/timer.rs
@@ -1,11 +1,13 @@
+
use std::collections::BTreeSet;
use std::time::Duration;
-
use std::{collections::BTreeSet, time::Instant};
+

+
use localtime::{LocalDuration, LocalTime};

/// Manages timers and triggers timeouts.
#[derive(Debug, Default)]
pub struct Timer {
    /// Timeouts are durations since the UNIX epoch.
-
    timeouts: BTreeSet<Instant>,
+
    timeouts: BTreeSet<localtime::LocalTime>,
}

impl Timer {
@@ -29,32 +31,32 @@ impl Timer {
    }

    /// Register a new timeout relative to a certain point in time.
-
    pub fn set_timeout(&mut self, timeout: Duration, after: Instant) {
-
        let time = after + timeout;
+
    pub fn set_timeout(&mut self, timeout: Duration, after: LocalTime) {
+
        let time = after + LocalDuration::from_millis(timeout.as_millis());
        self.timeouts.insert(time);
    }

    /// Get the first timeout expiring right at or after certain moment of time.
    /// Returns [`None`] if there are no timeouts.
-
    pub fn next_expiring_from(&self, time: impl Into<Instant>) -> Option<Duration> {
+
    pub fn next_expiring_from(&self, time: impl Into<LocalTime>) -> Option<Duration> {
        let time = time.into();
        let last = *self.timeouts.first()?;
        Some(if last >= time {
-
            last - time
+
            Duration::from_millis(last.as_millis() - time.as_millis())
        } else {
-
            Duration::default()
+
            Duration::from_secs(0)
        })
    }

    /// Removes timeouts which expire by a certain moment of time (inclusive),
    /// returning total number of timeouts which were removed.
-
    pub fn remove_expired_by(&mut self, instant: Instant) -> usize {
+
    pub fn remove_expired_by(&mut self, time: LocalTime) -> usize {
        // Since `split_off` returns everything *after* the given key, including the key,
        // if a timer is set for exactly the given time, it would remain in the "after"
        // set of unexpired keys. This isn't what we want, therefore we add `1` to the
        // given time value so that it is put in the "before" set that gets expired
        // and overwritten.
-
        let at = instant + Duration::from_millis(1);
+
        let at = time + LocalDuration::from_millis(1);
        let unexpired = self.timeouts.split_off(&at);
        let fired = self.timeouts.len();
        self.timeouts = unexpired;
@@ -70,12 +72,12 @@ mod tests {
    fn test_wake_exact() {
        let mut tm = Timer::new();

-
        let now = Instant::now();
+
        let now = LocalTime::now();
        tm.set_timeout(Duration::from_secs(8), now);
        tm.set_timeout(Duration::from_secs(9), now);
        tm.set_timeout(Duration::from_secs(10), now);

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 2);
        assert_eq!(tm.count(), 1);
    }

@@ -83,7 +85,7 @@ mod tests {
    fn test_wake() {
        let mut tm = Timer::new();

-
        let now = Instant::now();
+
        let now = LocalTime::now();
        tm.set_timeout(Duration::from_secs(8), now);
        tm.set_timeout(Duration::from_secs(16), now);
        tm.set_timeout(Duration::from_secs(64), now);
@@ -92,13 +94,13 @@ mod tests {
        assert_eq!(tm.remove_expired_by(now), 0);
        assert_eq!(tm.count(), 4);

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(9)), 1);
        assert_eq!(tm.count(), 3, "one timeout has expired");

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(66)), 2);
        assert_eq!(tm.count(), 1, "another two timeouts have expired");

-
        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
+
        assert_eq!(tm.remove_expired_by(now + LocalDuration::from_secs(96)), 1);
        assert!(!tm.has_timeouts(), "all timeouts have expired");
    }

@@ -106,17 +108,17 @@ mod tests {
    fn test_next() {
        let mut tm = Timer::new();

-
        let mut now = Instant::now();
+
        let mut now = LocalTime::now();
        tm.set_timeout(Duration::from_secs(3), now);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));

-
        now += Duration::from_secs(2);
+
        now = now + LocalDuration::from_secs(2);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));

-
        now += Duration::from_secs(1);
+
        now = now + LocalDuration::from_secs(1);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

-
        now += Duration::from_secs(1);
+
        now = now + LocalDuration::from_secs(1);
        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

        assert_eq!(tm.remove_expired_by(now), 1);
modified crates/radicle-node/src/wire.rs
@@ -5,7 +5,6 @@ use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
-
use std::time::{Instant, SystemTime};
use std::{io, net, time};

use crossbeam_channel as chan;
@@ -13,6 +12,7 @@ use cyphernet::addr::{HostName, InetHost, NetAddr};
use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
use cyphernet::proxy::socks5;
use cyphernet::{Digest, EcSk, Ecdh, Sha256};
+
use localtime::LocalTime;
use mio::net::TcpStream;
use radicle::node::device::Device;

@@ -291,35 +291,6 @@ impl Peers {
    }
}

-
/// The epoch time of when the node started.
-
struct Epoch {
-
    /// The system time when the node started.
-
    started_time: SystemTime,
-
    /// The instant when the node started.
-
    started_at: Instant,
-
}
-

-
impl Epoch {
-
    /// Construct a new [`Epoch`].
-
    fn new(started_time: SystemTime, started_at: Instant) -> Self {
-
        Self {
-
            started_time,
-
            started_at,
-
        }
-
    }
-

-
    /// Construct an [`Epoch`] where both values are recorded using their
-
    /// equivalent `now` constructors.
-
    fn now() -> Self {
-
        Self::new(SystemTime::now(), Instant::now())
-
    }
-

-
    /// Get the elapsed [`SystemTime`] given a later [`Instant`].
-
    fn elapsed_time(&self, later: Instant) -> SystemTime {
-
        self.started_time + (later - self.started_at)
-
    }
-
}
-

/// Wire protocol implementation for a set of peers.
pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
    /// Backing service instance.
@@ -342,8 +313,6 @@ pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + E
    peers: Peers,
    /// A (practically) infinite source of tokens to identify transports and listeners.
    tokens: Tokens,
-
    /// Record of system time and instant when the node started.
-
    epoch: Epoch,
}

impl<D, S, G> Wire<D, S, G>
@@ -366,14 +335,9 @@ where
            listening: RandomMap::default(),
            peers: Peers(RandomMap::default()),
            tokens: Tokens::default(),
-
            epoch: Epoch::now(),
        }
    }

-
    fn time(&self, instant: Instant) -> SystemTime {
-
        self.epoch.elapsed_time(instant)
-
    }
-

    pub fn listen(&mut self, socket: Listener) {
        let token = self.tokens.advance();
        self.listening.insert(token, socket.local_addr());
@@ -532,7 +496,7 @@ where
    type Listener = Listener;
    type Transport = Transport<WireSession<G>>;

-
    fn tick(&mut self, time: Instant) {
+
    fn tick(&mut self, time: LocalTime) {
        self.metrics.open_channels = self
            .peers
            .iter()
@@ -545,8 +509,10 @@ where
            })
            .sum();
        self.metrics.worker_queue_size = self.worker.len();
-

-
        self.service.tick(self.time(time).into(), &self.metrics);
+
        self.service.tick(
+
            LocalTime::from_millis(time.as_millis() as u128),
+
            &self.metrics,
+
        );
    }

    fn timer_reacted(&mut self) {
@@ -557,7 +523,7 @@ where
        &mut self,
        _: Token, // Note that this is the token of the listener socket.
        event: io::Result<(TcpStream, std::net::SocketAddr)>,
-
        _: Instant,
+
        _: LocalTime,
    ) {
        match event {
            Ok((connection, peer)) => {
@@ -621,7 +587,12 @@ where
        }
    }

-
    fn transport_reacted(&mut self, token: Token, event: SessionEvent<WireSession<G>>, _: Instant) {
+
    fn transport_reacted(
+
        &mut self,
+
        token: Token,
+
        event: SessionEvent<WireSession<G>>,
+
        _: LocalTime,
+
    ) {
        match event {
            SessionEvent::Established(ProtocolArtifact { state, session }) => {
                // SAFETY: With the NoiseXK protocol, there is always a remote static key.