Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: add table with current event queue to front report page
Merged liw opened 1 year ago

Add a table of pending events in the event queue. except for ones for private repositories.

The public queue allows people to have a idea of how long they will have to wait for CI to run for their change.

The table leaks the event queue, commit IDs, branch names, and patch IDs, but if that’s a problem, we can make this a table configurable, on a per node basis.

4 files changed +108 -15 2a40a12a 07b3a48a
modified src/bin/cib.rs
@@ -209,8 +209,7 @@ impl ProcessEventsCmd {
        if let Some(dirname) = &config.report_dir {
            page.set_output_dir(dirname);
        }
-
        let page_updater =
-
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);
+
        let page_updater = page.update_in_thread(run_notification.rx(), profile, db, false);

        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
        let spec =
modified src/bin/cibtoolcmd/report.rs
@@ -23,8 +23,7 @@ impl Leaf for ReportCmd {
        page.set_output_dir(&self.output_dir);

        let mut run_notification = NotificationChannel::default();
-
        let thread =
-
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, true);
+
        let thread = page.update_in_thread(run_notification.rx(), profile, db, true);
        thread.join().unwrap()?;

        Ok(())
modified src/pages.rs
@@ -21,11 +21,15 @@ use log::{debug, info, trace, warn};
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

-
use radicle::prelude::RepoId;
+
use radicle::{
+
    prelude::RepoId,
+
    storage::{ReadRepository, ReadStorage},
+
    Profile,
+
};

use crate::{
-
    db::{Db, DbError},
-
    event::BrokerEvent,
+
    db::{Db, DbError, QueuedEvent},
+
    event::{parse_ref, BrokerEvent, ParsedRef},
    msg::RunId,
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
@@ -68,6 +72,7 @@ pub enum PageError {
pub struct PageBuilder {
    node_alias: Option<String>,
    runs: Vec<Run>,
+
    events: Vec<QueuedEvent>,
}

impl PageBuilder {
@@ -81,6 +86,11 @@ impl PageBuilder {
        self
    }

+
    pub fn events(mut self, events: Vec<QueuedEvent>) -> Self {
+
        self.events = events;
+
        self
+
    }
+

    pub fn build(self) -> Result<StatusPage, PageError> {
        let mut runs = HashMap::new();
        for run in self.runs.iter() {
@@ -103,6 +113,7 @@ struct PageData {
    ci_broker_git_commit: &'static str,
    node_alias: String,
    runs: HashMap<RunId, Run>,
+
    events: Vec<QueuedEvent>,
    broker_event_counter: usize,
    latest_broker_event: Option<BrokerEvent>,
    latest_ci_run: Option<Run>,
@@ -175,6 +186,55 @@ impl PageData {
        }
        doc.push_to_body(table);

+
        Self::h1(&mut doc, "Event queue");
+
        let mut table = Element::new(Tag::Table)
+
            .with_class("event-queue")
+
            .with_child(
+
                Element::new(Tag::Tr)
+
                    .with_child(Element::new(Tag::Th).with_text("Queue id"))
+
                    .with_child(Element::new(Tag::Th).with_text("Timestamp"))
+
                    .with_child(Element::new(Tag::Th).with_text("Event")),
+
            );
+
        for event in self.events.iter() {
+
            table.push_child(
+
                Element::new(Tag::Tr)
+
                    .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
+
                    .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
+
                    .with_child(
+
                        Element::new(Tag::Td).with_child(match event.event() {
+
                            BrokerEvent::Shutdown => Element::new(Tag::Span).with_text("shutdown"),
+
                            BrokerEvent::RefChanged {
+
                                rid,
+
                                name,
+
                                oid,
+
                                old: _,
+
                            } => Element::new(Tag::Span)
+
                                .with_child(
+
                                    Element::new(Tag::Span)
+
                                        .with_class("repoid")
+
                                        .with_text(&rid.to_string()),
+
                                )
+
                                .with_child(Element::new(Tag::Br))
+
                                .with_child(Element::new(Tag::Span).with_class("ref").with_text(
+
                                    &match parse_ref(name) {
+
                                        Ok(Some(ParsedRef::Push(branch))) => branch,
+
                                        Ok(Some(ParsedRef::Patch(patch))) => patch.to_string(),
+
                                        Ok(None) => "unknown".into(),
+
                                        Err(_) => "unknown".into(),
+
                                    },
+
                                ))
+
                                .with_child(Element::new(Tag::Br))
+
                                .with_child(
+
                                    Element::new(Tag::Span)
+
                                        .with_class("commit")
+
                                        .with_text(&oid.to_string()),
+
                                ),
+
                        }),
+
                    ),
+
            );
+
        }
+
        doc.push_to_body(table);
+

        Self::h1(&mut doc, "Recent status");
        let status = StatusData::from(self).as_json()?;
        doc.push_to_body(
@@ -461,22 +521,22 @@ impl StatusPage {
    pub fn update_in_thread(
        mut self,
        run_rx: NotificationReceiver,
+
        profile: Profile,
        db: Db,
-
        node_alias: &str,
        once: bool,
    ) -> JoinHandle<Result<(), PageError>> {
        if self.dirname.is_none() {
            warn!("not writing HTML report pages as output directory has not been set");
        }

-
        self.node_alias = node_alias.into();
+
        self.node_alias = profile.config.alias().to_string();
        info!(
            "wait about {} seconds to update HTML report pages again",
            UPDATE_INTERVAL.as_secs()
        );
        spawn(move || {
            'processing_loop: loop {
-
                self.update_and_write(&db)?;
+
                self.update_and_write(&profile, &db)?;
                if once {
                    return Ok(());
                }
@@ -496,14 +556,14 @@ impl StatusPage {

            // Make sure we update reports and status JSON at least once.
            info!("make sure we update reports and status JSON at least once");
-
            self.update_and_write(&db)?;
+
            self.update_and_write(&profile, &db)?;

            info!("end page updater thread");
            Ok(())
        })
    }

-
    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
+
    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
            info!(
                "write HTML report pages and {STATUS_JSON} to {}",
@@ -512,6 +572,29 @@ impl StatusPage {

            let runs = db.get_all_runs()?;

+
            // Create list of events, except ones for private
+
            // repositories.
+
            let events: Result<Vec<QueuedEvent>, PageError> = db
+
                .queued_events()?
+
                .iter()
+
                .filter_map(|id| match db.get_queued_event(id) {
+
                    Ok(Some(event)) => match event.event() {
+
                        BrokerEvent::Shutdown => Some(Ok(event)),
+
                        BrokerEvent::RefChanged { rid, .. } => {
+
                            if Self::is_public_repo(profile, rid) {
+
                                Some(Ok(event))
+
                            } else {
+
                                None
+
                            }
+
                        }
+
                    },
+
                    Ok(None) => None, // Event is (no longer?) in database.
+
                    Err(_) => None,   // We ignore error here on purpose.
+
                })
+
                .collect();
+
            let mut events = events?;
+
            events.sort_by_cached_key(|e| e.timestamp().to_string());
+

            let data = PageData {
                timestamp: now()?,
                ci_broker_version: env!("CARGO_PKG_VERSION"),
@@ -521,6 +604,7 @@ impl StatusPage {
                    runs.iter()
                        .map(|run| (run.broker_run_id().clone(), run.clone())),
                ),
+
                events,
                broker_event_counter: 0,
                latest_broker_event: None,
                latest_ci_run: None,
@@ -559,6 +643,17 @@ impl StatusPage {
        Ok(())
    }

+
    fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
+
        if let Ok(repo) = profile.storage.repository(*rid) {
+
            if let Ok(id_doc) = repo.canonical_identity_doc() {
+
                if id_doc.doc.visibility.is_public() {
+
                    return true;
+
                }
+
            }
+
        }
+
        false
+
    }
+

    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
        debug!("write file {}", filename.display());
        write(filename, text).map_err(|e| PageError::Write(filename.into(), e))?;
modified src/radicle-ci.css
@@ -34,7 +34,7 @@ code.patch {
    font-weight: bold;
}

-
code.commit {
+
code.commit, span.commit {
    font-weight: bold;
}

@@ -42,7 +42,7 @@ code.revision {
    font-weight: bold;
}

-
code.repoid {
+
code.repoid, span.repoid {
    font-weight: bold;
}

@@ -70,7 +70,7 @@ span.who {
}

table {
-
    width: 90%;
+
    width: 100%;
    table-layout: fixed;
}