| |
/// Listener resources are resources which may spawn more resources and can't be written to. A
|
| |
/// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
|
| |
/// be a special form of a peripheral device or something else.
|
| - |
type Listener: EventHandler + Source + Send;
|
| + |
type Listener: EventHandler + Source + Send + Debug;
|
| |
|
| |
/// Type for a transport resource.
|
| |
///
|
| |
/// Transport is a "full" resource which can be read from - and written to. Usual files, network
|
| |
/// connections, database connections etc are all fall into this category.
|
| - |
type Transport: EventHandler + Source + Send + WriteAtomic;
|
| + |
type Transport: EventHandler + Source + Send + Debug + WriteAtomic;
|
| |
|
| |
/// Method called by the reactor on the start of each event loop once the poll has returned.
|
| |
fn tick(&mut self, time: localtime::LocalTime);
|
| |
///
|
| |
/// Whether one of the events was originated from the waker.
|
| |
fn handle_events(&mut self, time: LocalTime, events: Events) -> bool {
|
| + |
log::trace!(target: "reactor", "Handling events");
|
| |
let mut awoken = false;
|
| + |
let mut deregistered = Vec::new();
|
| |
|
| |
for event in events.into_iter() {
|
| - |
let id = event.token();
|
| + |
let token = event.token();
|
| |
|
| - |
if id == WAKER {
|
| + |
if token == WAKER {
|
| |
log::trace!(target: "reactor", "Awoken by the controller");
|
| |
awoken = true;
|
| - |
} else if self.listeners.contains_key(&id) {
|
| - |
log::trace!(target: "reactor", event:debug; "From listener");
|
| + |
} else if self.listeners.contains_key(&token) {
|
| + |
log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
|
| |
if !event.is_error() {
|
| - |
let listener = self.listeners.get_mut(&id).expect("resource disappeared");
|
| + |
let listener = self
|
| + |
.listeners
|
| + |
.get_mut(&token)
|
| + |
.expect("resource disappeared");
|
| |
listener
|
| |
.handle(event)
|
| |
.into_iter()
|
| |
.for_each(|service_event| {
|
| - |
self.service.listener_reacted(id, service_event, time);
|
| + |
self.service.listener_reacted(token, service_event, time);
|
| |
});
|
| |
} else {
|
| - |
let listener = self
|
| - |
.unregister_listener(id)
|
| - |
.expect("listener has disappeared");
|
| + |
let listener = self.deregister_listener(token).unwrap_or_else(|| panic!("listener with token {} has disappeared", token.0));
|
| |
self.service
|
| - |
.handle_error(Error::ListenerDisconnect(id, listener));
|
| + |
.handle_error(Error::ListenerDisconnect(token, listener));
|
| + |
deregistered.push(token);
|
| |
}
|
| - |
} else if self.transports.contains_key(&id) {
|
| - |
log::trace!(target: "reactor", event:debug; "From transport");
|
| + |
} else if self.transports.contains_key(&token) {
|
| + |
log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
|
| |
if !event.is_error() {
|
| - |
let transport = self.transports.get_mut(&id).expect("resource disappeared");
|
| + |
let transport = self
|
| + |
.transports
|
| + |
.get_mut(&token)
|
| + |
.expect("resource disappeared");
|
| |
transport
|
| |
.handle(event)
|
| |
.into_iter()
|
| |
.for_each(|service_event| {
|
| - |
self.service.transport_reacted(id, service_event, time);
|
| + |
self.service.transport_reacted(token, service_event, time);
|
| |
});
|
| |
} else {
|
| - |
let transport = self
|
| - |
.unregister_transport(id)
|
| - |
.expect("transport has disappeared");
|
| + |
let transport = self.deregister_transport(token).unwrap_or_else(|| panic!("transport with token {} has disappeared", token.0));
|
| |
self.service
|
| - |
.handle_error(Error::TransportDisconnect(id, transport));
|
| + |
.handle_error(Error::TransportDisconnect(token, transport));
|
| + |
deregistered.push(token);
|
| |
}
|
| - |
} else {
|
| - |
panic!("token in poll which is not a known waker, listener or transport")
|
| + |
} else if !deregistered.contains(&token) {
|
| + |
log::error!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
|
| |
}
|
| |
}
|
| |
|
| |
.transport_registered(token, &self.transports[&token]);
|
| |
}
|
| |
Action::UnregisterListener(token) => {
|
| - |
let Some(listener) = self.unregister_listener(token) else {
|
| + |
let Some(listener) = self.deregister_listener(token) else {
|
| |
return Ok(());
|
| |
};
|
| |
|
| - |
log::debug!(target: "reactor", token=token.0; "Handing over listener");
|
| + |
log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
|
| |
self.service.handover_listener(token, listener);
|
| |
}
|
| |
Action::UnregisterTransport(token) => {
|
| - |
let Some(transport) = self.unregister_transport(token) else {
|
| + |
let Some(transport) = self.deregister_transport(token) else {
|
| |
return Ok(());
|
| |
};
|
| |
|
| - |
log::debug!(target: "reactor", token=token.0; "Handing over transport");
|
| + |
log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
|
| |
self.service.handover_transport(token, transport);
|
| |
}
|
| |
Action::Send(token, data) => {
|
| - |
log::trace!(target: "reactor", "Sending {} bytes to {token:?}", data.len());
|
| + |
log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());
|
| |
|
| |
if let Some(transport) = self.transports.get_mut(&token) {
|
| |
if let Err(e) = transport.write_atomic(&data) {
|
| |
log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
|
| - |
if let Some(transport) = self.unregister_transport(token) {
|
| + |
if let Some(transport) = self.deregister_transport(token) {
|
| |
return Err(Error::TransportDisconnect(token, transport));
|
| |
}
|
| |
}
|
| |
} else {
|
| - |
log::error!(target: "reactor", "Transport {token:?} is not in the reactor");
|
| + |
log::error!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
|
| |
}
|
| |
}
|
| |
Action::SetTimer(duration) => {
|
| |
log::info!(target: "reactor", "Shutdown");
|
| |
}
|
| |
|
| - |
fn unregister_listener(&mut self, token: Token) -> Option<H::Listener> {
|
| + |
fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
|
| |
let Some(mut source) = self.listeners.remove(&token) else {
|
| - |
log::warn!(target: "reactor", token=token.0; "Unregistering non-registered listener");
|
| + |
log::warn!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
|
| |
return None;
|
| |
};
|
| |
|
| |
if let Err(err) = self.poll.registry().deregister(&mut source) {
|
| - |
log::warn!(target: "reactor", token=token.0; "Failed to deregister listener from mio: {err}");
|
| + |
log::warn!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
|
| |
}
|
| |
|
| |
Some(source)
|
| |
}
|
| |
|
| - |
fn unregister_transport(&mut self, token: Token) -> Option<H::Transport> {
|
| + |
fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
|
| |
let Some(mut source) = self.transports.remove(&token) else {
|
| - |
log::warn!(target: "reactor", token=token.0; "Unregistering non-registered transport");
|
| + |
log::warn!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
|
| |
return None;
|
| |
};
|
| |
|
| |
if let Err(err) = self.poll.registry().deregister(&mut source) {
|
| - |
log::warn!(target: "reactor", token=token.0; "Failed to deregister transport from mio: {err}");
|
| + |
log::warn!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
|
| |
}
|
| |
|
| |
Some(source)
|