Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node/reactor: Rewrite `Runtime::run`
Lorenz Leutgeb committed 2 months ago
commit ae06111e07560ca36357dc2d1cd3d4d8cdd81b71
parent 4d7b942
1 file changed +18 -16
modified crates/radicle-node/src/reactor.rs
@@ -371,10 +371,9 @@ impl<H: ReactionHandler> Runtime<H> {

    fn run(mut self) {
        loop {
-
            let before_poll = Instant::now();
            let timeout = self
                .timeouts
-
                .next_expiring_from(before_poll)
+
                .next_expiring_from(Instant::now())
                .unwrap_or(WAIT_TIMEOUT);

            self.register_interests()
@@ -384,31 +383,34 @@ impl<H: ReactionHandler> Runtime<H> {

            let mut events = Events::with_capacity(1024);

-
            // Blocking
+
            // Block and wait for I/O events, wake by other threads, or timeout.
            let res = self.poll.poll(&mut events, Some(timeout));

-
            self.service.tick();
+
            // This instant allows to measure the time spent by the service
+
            // to handle the result of polling.
            let tick = Instant::now();

-
            // The way this is currently used basically ignores which keys have
-
            // timed out. So as long as *something* timed out, we wake the service.
-
            let timers_fired = self.timeouts.remove_expired_by(tick);
-
            if timers_fired > 0 {
-
                log::trace!(target: "reactor", "Timer has fired");
-
                self.service.timer_reacted();
-
            }
+
            // Inform the service that time has advanced.
+
            self.service.tick();

+
            // Inform the service about errors during polling.
            if let Err(err) = res {
                log::warn!(target: "reactor", "Failure during polling: {err}");
                self.service.handle_error(Error::Poll(err));
            }

-
            let awoken = self.handle_events(tick, events);
-

-
            log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));
+
            // Inform the service that some timers have reacted.
+
            // The way this is currently used basically ignores which
+
            // timers have expired. As long as *something* timed out,
+
            // the service is informed.
+
            let timers_fired = self.timeouts.remove_expired_by(tick);
+
            if timers_fired > 0 {
+
                log::trace!(target: "reactor", "Timer has fired");
+
                self.service.timer_reacted();
+
            }

-
            // Process the commands only if we awoken by the waker.
-
            if awoken {
+
            if self.handle_events(tick, events) {
+
                // If a wake event was emitted, eagerly consume all control messages.
                loop {
                    match self.receiver.try_recv() {
                        Err(TryRecvError::Empty) => break,