Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat(src/db.rs): add queue for CI events
Lars Wirzenius committed 1 year ago
commit ef8384b05b48b4bc3148154ae6f8feac5cb3e9cc
parent 1274368a2095f32fce9397b5837cefcd5706fa14
1 file changed +67 -13
modified src/db.rs
@@ -21,7 +21,7 @@ use sqlite::{Connection, State, Statement};
use time::{macros::format_description, OffsetDateTime};
use uuid::Uuid;

-
use crate::{event::BrokerEvent, msg::RunId, run::Run};
+
use crate::{ci_event::CiEvent, event::BrokerEvent, msg::RunId, run::Run};

// how long to retry when SQL fails for busy database
const MAX_WAIT: Duration = Duration::from_millis(60_000);
@@ -58,6 +58,7 @@ impl Db {
        const TABLES: &[&str] = &[
            "CREATE TABLE IF NOT EXISTS counter_test (counter INT)",
            "CREATE TABLE IF NOT EXISTS event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT)",
+
            "CREATE TABLE IF NOT EXISTS ci_event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT)",
            "CREATE TABLE IF NOT EXISTS ci_runs (broker_run_id TEXT PRIMARY KEY, json TEXT)",
        ];

@@ -196,16 +197,44 @@ impl Db {
        Ok(ids)
    }

+
    /// Return list of identifiers for CI events currently in the queue.
+
    pub fn queued_ci_events(&self) -> Result<Vec<QueueId>, DbError> {
+
        let mut select = self.prepare("SELECT id FROM ci_event_queue")?;
+

+
        let mut ids = vec![];
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let id: String = select
+
                        .stmt
+
                        .read("id")
+
                        .map_err(|e| DbError::list_events(&select.sql, e))?;
+
                    ids.push(QueueId::from(&id));
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::list_events(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(ids)
+
    }
+

    /// Return a specific event, given is id, if one exists.
-
    pub fn get_queued_event(&self, id: &QueueId) -> Result<Option<QueuedEvent>, DbError> {
-
        let mut select = self.prepare("SELECT timestamp, event FROM event_queue WHERE id = :id")?;
+
    pub fn get_queued_ci_event(&self, id: &QueueId) -> Result<Option<QueuedCiEvent>, DbError> {
+
        let mut select =
+
            self.prepare("SELECT timestamp, event FROM ci_event_queue WHERE id = :id")?;
        select
            .stmt
            .bind((":id", id.as_str()))
            .map_err(|e| DbError::bind(&select.sql, e))?;

        let mut timestamp: Option<String> = None;
-
        let mut event: Option<BrokerEvent> = None;
+
        let mut event: Option<CiEvent> = None;

        loop {
            match select.stmt.next() {
@@ -235,7 +264,7 @@ impl Db {
        }

        if let (Some(ts), Some(ev)) = (timestamp, event) {
-
            let qe = QueuedEvent::new(id.clone(), ts, ev);
+
            let qe = QueuedCiEvent::new(id.clone(), ts, ev);
            Ok(Some(qe))
        } else {
            Ok(None)
@@ -243,14 +272,14 @@ impl Db {
    }

    /// Add a new event to the event queue, returning its id.
-
    pub fn push_queued_event(&self, event: BrokerEvent) -> Result<QueueId, DbError> {
+
    pub fn push_queued_ci_event(&self, event: CiEvent) -> Result<QueueId, DbError> {
        let json = serde_json::to_string(&event).map_err(DbError::event_to_json)?;

        let id = QueueId::default();
        let ts = now().map_err(DbError::time_format)?;

-
        let mut insert =
-
            self.prepare("INSERT INTO event_queue (id, timestamp, event) VALUES (:id, :ts, :e)")?;
+
        let mut insert = self
+
            .prepare("INSERT INTO ci_event_queue (id, timestamp, event) VALUES (:id, :ts, :e)")?;
        insert
            .stmt
            .bind((":id", id.as_str()))
@@ -271,10 +300,10 @@ impl Db {
        Ok(id)
    }

-
    /// Remove event from queue, given its id. It's OK if the event is
+
    /// Remove CI event from queue, given its id. It's OK if the event is
    /// not in the queue, that is just silently ignored.
-
    pub fn remove_queued_event(&self, id: &QueueId) -> Result<(), DbError> {
-
        let mut remove = self.prepare("DELETE FROM event_queue WHERE id = :id")?;
+
    pub fn remove_queued_ci_event(&self, id: &QueueId) -> Result<(), DbError> {
+
        let mut remove = self.prepare("DELETE FROM ci_event_queue WHERE id = :id")?;
        remove
            .stmt
            .bind((":id", id.as_str()))
@@ -531,7 +560,32 @@ pub struct QueuedEvent {
}

impl QueuedEvent {
-
    fn new(id: QueueId, ts: String, event: BrokerEvent) -> Self {
+
    // fn new(id: QueueId, ts: String, event: BrokerEvent) -> Self {
+
    //     Self { id, ts, event }
+
    // }
+

+
    pub fn id(&self) -> &QueueId {
+
        &self.id
+
    }
+

+
    pub fn timestamp(&self) -> &str {
+
        &self.ts
+
    }
+

+
    pub fn event(&self) -> &BrokerEvent {
+
        &self.event
+
    }
+
}
+

+
#[derive(Clone, Debug, Serialize, Deserialize)]
+
pub struct QueuedCiEvent {
+
    id: QueueId,
+
    ts: String,
+
    event: CiEvent,
+
}
+

+
impl QueuedCiEvent {
+
    fn new(id: QueueId, ts: String, event: CiEvent) -> Self {
        Self { id, ts, event }
    }

@@ -543,7 +597,7 @@ impl QueuedEvent {
        &self.ts
    }

-
    pub fn event(&self) -> &BrokerEvent {
+
    pub fn event(&self) -> &CiEvent {
        &self.event
    }
}