| |
/// Maximum amount of time to wait for I/O.
|
| |
const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);
|
| |
|
| + |
/// Maximum duration to accept the service to spend handling events (and errors,
|
| + |
/// ticking, etc.) without warning. Set to log whenever the service becomes so
|
| + |
/// is so slow to respond that it would not be able to handle at least 10
|
| + |
/// "requests" per second, i.e. `1s / 10 = 100ms`.
|
| + |
const LAG_TIMEOUT: Duration = Duration::from_millis(100);
|
| + |
|
| |
/// A resource which can be managed by the reactor.
|
| |
pub trait EventHandler {
|
| |
/// The type of reactions which this resource may generate upon receiving
|
| |
|
| |
let mut events = Events::with_capacity(1024);
|
| |
|
| - |
// Blocking
|
| + |
// Block and wait for I/O events, wake by other threads, or timeout.
|
| |
let res = self.poll.poll(&mut events, Some(timeout));
|
| |
|
| - |
self.service.tick();
|
| + |
// This instant allows to measure the time spent by the service
|
| + |
// to handle the result of polling.
|
| |
let tick = Instant::now();
|
| |
|
| - |
// The way this is currently used basically ignores which keys have
|
| - |
// timed out. So as long as *something* timed out, we wake the service.
|
| - |
let timers_fired = self.timeouts.remove_expired_by(tick);
|
| - |
if timers_fired > 0 {
|
| - |
log::trace!(target: "reactor", "Timer has fired");
|
| - |
self.service.timer_reacted();
|
| - |
}
|
| + |
// Inform the service that time has advanced.
|
| + |
self.service.tick();
|
| |
|
| + |
// Inform the service about errors during polling.
|
| |
if let Err(err) = res {
|
| |
log::warn!(target: "reactor", "Failure during polling: {err}");
|
| |
self.service.handle_error(Error::Poll(err));
|
| |
}
|
| |
|
| - |
let awoken = self.handle_events(tick, events);
|
| - |
|
| - |
log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));
|
| + |
// Inform the service that some timers have reacted.
|
| + |
// The way this is currently used basically ignores which
|
| + |
// timers have expired. As long as *something* timed out,
|
| + |
// the service is informed.
|
| + |
let timers_fired = self.timeouts.remove_expired_by(tick);
|
| + |
if timers_fired > 0 {
|
| + |
log::trace!(target: "reactor", "Timer has fired");
|
| + |
self.service.timer_reacted();
|
| + |
}
|
| |
|
| - |
// Process the commands only if we awoken by the waker.
|
| - |
if awoken {
|
| + |
if self.handle_events(tick, events) {
|
| + |
// If a wake event was emitted, eagerly consume all control messages.
|
| |
loop {
|
| + |
use ControlMessage::*;
|
| + |
use TryRecvError::*;
|
| + |
|
| |
match self.receiver.try_recv() {
|
| - |
Err(TryRecvError::Empty) => break,
|
| - |
Err(TryRecvError::Disconnected) => {
|
| - |
panic!("control channel disconnected unexpectedly")
|
| - |
}
|
| - |
Ok(ControlMessage::Shutdown) => return self.handle_shutdown(),
|
| - |
Ok(ControlMessage::Command(cmd)) => self.service.handle_command(*cmd),
|
| + |
Ok(Command(cmd)) => self.service.handle_command(*cmd),
|
| + |
Ok(Shutdown) => return self.handle_shutdown(),
|
| + |
Err(Empty) => break,
|
| + |
Err(Disconnected) => panic!("control channel disconnected unexpectedly"),
|
| |
}
|
| |
}
|
| |
}
|
| |
|
| + |
let duration = Instant::now().duration_since(tick);
|
| + |
if duration > LAG_TIMEOUT {
|
| + |
log::warn!(target: "reactor", "Service was busy {:?} which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
|
| + |
}
|
| + |
|
| |
self.handle_actions(tick);
|
| |
}
|
| |
}
|