Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node/reactor: Rewrite `Runtime::run` and introduce `LAG_TIMEOUT`
Merged lorenz opened 2 months ago

Since the service performs further I/O (e.g. uses SQLite), it can keep the reactor runtime thread busy for long periods. Emit a warning if that is the case.

100 ms is quite relaxed, this is to only catch severe cases and avoid spamming the log.

1 file changed +36 -22 4d7b942b 30701cc6
modified crates/radicle-node/src/reactor.rs
@@ -34,6 +34,12 @@ const SECONDS_IN_AN_HOUR: u64 = 60 * 60;
/// Maximum amount of time to wait for I/O.
const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);

+
/// Maximum duration to accept the service to spend handling events (and errors,
+
/// ticking, etc.) without warning. Set to log whenever the service becomes so
+
/// is so slow to respond that it would not be able to handle at least 10
+
/// "requests" per second, i.e. `1s / 10 = 100ms`.
+
const LAG_TIMEOUT: Duration = Duration::from_millis(100);
+

/// A resource which can be managed by the reactor.
pub trait EventHandler {
    /// The type of reactions which this resource may generate upon receiving
@@ -371,10 +377,9 @@ impl<H: ReactionHandler> Runtime<H> {

    fn run(mut self) {
        loop {
-
            let before_poll = Instant::now();
            let timeout = self
                .timeouts
-
                .next_expiring_from(before_poll)
+
                .next_expiring_from(Instant::now())
                .unwrap_or(WAIT_TIMEOUT);

            self.register_interests()
@@ -384,43 +389,52 @@ impl<H: ReactionHandler> Runtime<H> {

            let mut events = Events::with_capacity(1024);

-
            // Blocking
+
            // Block and wait for I/O events, wake by other threads, or timeout.
            let res = self.poll.poll(&mut events, Some(timeout));

-
            self.service.tick();
+
            // This instant allows to measure the time spent by the service
+
            // to handle the result of polling.
            let tick = Instant::now();

-
            // The way this is currently used basically ignores which keys have
-
            // timed out. So as long as *something* timed out, we wake the service.
-
            let timers_fired = self.timeouts.remove_expired_by(tick);
-
            if timers_fired > 0 {
-
                log::trace!(target: "reactor", "Timer has fired");
-
                self.service.timer_reacted();
-
            }
+
            // Inform the service that time has advanced.
+
            self.service.tick();

+
            // Inform the service about errors during polling.
            if let Err(err) = res {
                log::warn!(target: "reactor", "Failure during polling: {err}");
                self.service.handle_error(Error::Poll(err));
            }

-
            let awoken = self.handle_events(tick, events);
-

-
            log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));
+
            // Inform the service that some timers have reacted.
+
            // The way this is currently used basically ignores which
+
            // timers have expired. As long as *something* timed out,
+
            // the service is informed.
+
            let timers_fired = self.timeouts.remove_expired_by(tick);
+
            if timers_fired > 0 {
+
                log::trace!(target: "reactor", "Timer has fired");
+
                self.service.timer_reacted();
+
            }

-
            // Process the commands only if we awoken by the waker.
-
            if awoken {
+
            if self.handle_events(tick, events) {
+
                // If a wake event was emitted, eagerly consume all control messages.
                loop {
+
                    use ControlMessage::*;
+
                    use TryRecvError::*;
+

                    match self.receiver.try_recv() {
-
                        Err(TryRecvError::Empty) => break,
-
                        Err(TryRecvError::Disconnected) => {
-
                            panic!("control channel disconnected unexpectedly")
-
                        }
-
                        Ok(ControlMessage::Shutdown) => return self.handle_shutdown(),
-
                        Ok(ControlMessage::Command(cmd)) => self.service.handle_command(*cmd),
+
                        Ok(Command(cmd)) => self.service.handle_command(*cmd),
+
                        Ok(Shutdown) => return self.handle_shutdown(),
+
                        Err(Empty) => break,
+
                        Err(Disconnected) => panic!("control channel disconnected unexpectedly"),
                    }
                }
            }

+
            let duration = Instant::now().duration_since(tick);
+
            if duration > LAG_TIMEOUT {
+
                log::warn!(target: "reactor", "Service was busy {:?} which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
+
            }
+

            self.handle_actions(tick);
        }
    }