| + |
//! Process events in the persistent event queue.
|
| + |
|
| + |
#![allow(clippy::result_large_err)]
|
| + |
|
| + |
use std::{
|
| + |
thread::{sleep, spawn, JoinHandle},
|
| + |
time::Duration,
|
| + |
};
|
| + |
|
| + |
use log::debug;
|
| + |
use radicle::Profile;
|
| + |
|
| + |
use crate::{
|
| + |
broker::Broker,
|
| + |
db::{Db, DbError, QueueId, QueuedEvent},
|
| + |
error::BrokerError,
|
| + |
event::BrokerEvent,
|
| + |
msg::{MessageError, RequestBuilder},
|
| + |
pages::StatusPage,
|
| + |
};
|
| + |
|
| + |
const WAIT_FOR_EVENTS_DURATION: u64 = 10_000;
|
| + |
|
| + |
#[derive(Default)]
|
| + |
pub struct QueueProcessorBuilder {
|
| + |
db: Option<Db>,
|
| + |
broker: Option<Broker>,
|
| + |
page: Option<StatusPage>,
|
| + |
}
|
| + |
|
| + |
impl QueueProcessorBuilder {
|
| + |
pub fn build(self) -> Result<QueueProcessor, QueueError> {
|
| + |
Ok(QueueProcessor {
|
| + |
db: self.db.ok_or(QueueError::Missing("db"))?,
|
| + |
profile: Profile::load().map_err(QueueError::Profile)?,
|
| + |
broker: self.broker.ok_or(QueueError::Missing("broker"))?,
|
| + |
page: self.page.ok_or(QueueError::Missing("page"))?,
|
| + |
})
|
| + |
}
|
| + |
|
| + |
pub fn db(mut self, db: Db) -> Self {
|
| + |
self.db = Some(db);
|
| + |
self
|
| + |
}
|
| + |
|
| + |
pub fn broker(mut self, broker: Broker) -> Self {
|
| + |
self.broker = Some(broker);
|
| + |
self
|
| + |
}
|
| + |
|
| + |
pub fn page(mut self, page: StatusPage) -> Self {
|
| + |
self.page = Some(page);
|
| + |
self
|
| + |
}
|
| + |
}
|
| + |
|
| + |
pub struct QueueProcessor {
|
| + |
db: Db,
|
| + |
profile: Profile,
|
| + |
broker: Broker,
|
| + |
page: StatusPage,
|
| + |
}
|
| + |
|
| + |
impl QueueProcessor {
|
| + |
pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
|
| + |
spawn(move || self.process_until_shutdown())
|
| + |
}
|
| + |
|
| + |
fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
|
| + |
let mut done = false;
|
| + |
while !done {
|
| + |
if let Some(qe) = self.pick_event()? {
|
| + |
debug!("picked event from queue: {}: {:#?}", qe.id(), qe.event());
|
| + |
done = self.process_event(qe.event())?;
|
| + |
self.drop_event(qe.id())?;
|
| + |
} else {
|
| + |
self.wait_for_events();
|
| + |
}
|
| + |
}
|
| + |
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
|
| + |
let ids = self.db.queued_events().map_err(QueueError::db)?;
|
| + |
debug!("event queue: {ids:?}");
|
| + |
|
| + |
let mut queue = vec![];
|
| + |
for id in ids.iter() {
|
| + |
if let Some(qe) = self.db.get_queued_event(id).map_err(QueueError::db)? {
|
| + |
queue.push(qe);
|
| + |
}
|
| + |
}
|
| + |
queue.sort_by_cached_key(|qe| qe.timestamp().to_string());
|
| + |
|
| + |
if let Some(qe) = queue.first() {
|
| + |
Ok(Some(qe.clone()))
|
| + |
} else {
|
| + |
Ok(None)
|
| + |
}
|
| + |
}
|
| + |
|
| + |
fn process_event(&mut self, event: &BrokerEvent) -> Result<bool, QueueError> {
|
| + |
match event {
|
| + |
BrokerEvent::RefChanged {
|
| + |
rid,
|
| + |
name: _,
|
| + |
oid,
|
| + |
old: _,
|
| + |
} => {
|
| + |
debug!("Action: run: {rid} {oid}");
|
| + |
|
| + |
let trigger = RequestBuilder::default()
|
| + |
.profile(&self.profile)
|
| + |
.broker_event(event)
|
| + |
.build_trigger()
|
| + |
.map_err(|e| QueueError::build_trigger(event, e))?;
|
| + |
self.broker
|
| + |
.execute_ci(&trigger, &mut self.page)
|
| + |
.map_err(QueueError::execute_ci)?;
|
| + |
Ok(false)
|
| + |
}
|
| + |
BrokerEvent::Shutdown => {
|
| + |
debug!("Action: shutdown");
|
| + |
Ok(true)
|
| + |
}
|
| + |
}
|
| + |
}
|
| + |
|
| + |
fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
|
| + |
debug!("remove event {id}");
|
| + |
self.db.remove_queued_event(id).map_err(QueueError::db)?;
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
fn wait_for_events(&self) {
|
| + |
sleep(Duration::from_millis(WAIT_FOR_EVENTS_DURATION));
|
| + |
}
|
| + |
}
|
| + |
|
| + |
#[derive(Debug, thiserror::Error)]
|
| + |
pub enum QueueError {
|
| + |
#[error("failed to load node profile")]
|
| + |
Profile(#[source] radicle::profile::Error),
|
| + |
|
| + |
#[error("programming error: QueueProcessorBuilder field {0} was not set")]
|
| + |
Missing(&'static str),
|
| + |
|
| + |
#[error("failed to use SQLite database")]
|
| + |
Db(#[source] DbError),
|
| + |
|
| + |
#[error("failed to create a trigger message from broker event {0:?}")]
|
| + |
BuildTrigger(BrokerEvent, #[source] MessageError),
|
| + |
|
| + |
#[error("failed to run CI")]
|
| + |
ExecuteCi(#[source] BrokerError),
|
| + |
}
|
| + |
|
| + |
impl QueueError {
|
| + |
fn db(e: DbError) -> Self {
|
| + |
Self::Db(e)
|
| + |
}
|
| + |
|
| + |
fn build_trigger(event: &BrokerEvent, err: MessageError) -> Self {
|
| + |
Self::BuildTrigger(event.clone(), err)
|
| + |
}
|
| + |
|
| + |
fn execute_ci(e: BrokerError) -> Self {
|
| + |
Self::ExecuteCi(e)
|
| + |
}
|
| + |
}
|