Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add table with current event queue to front report page
Lars Wirzenius committed 1 year ago
commit 07b3a48a5e1edbfa031d6c2b77dc3c7f9aeda870
parent 2a40a12a9def72594cbab2ce783cae949619e5e5
4 files changed +108 -15
modified src/bin/cib.rs
@@ -209,8 +209,7 @@ impl ProcessEventsCmd {
        if let Some(dirname) = &config.report_dir {
            page.set_output_dir(dirname);
        }
-
        let page_updater =
-
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);
+
        let page_updater = page.update_in_thread(run_notification.rx(), profile, db, false);

        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
        let spec =
modified src/bin/cibtoolcmd/report.rs
@@ -23,8 +23,7 @@ impl Leaf for ReportCmd {
        page.set_output_dir(&self.output_dir);

        let mut run_notification = NotificationChannel::default();
-
        let thread =
-
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, true);
+
        let thread = page.update_in_thread(run_notification.rx(), profile, db, true);
        thread.join().unwrap()?;

        Ok(())
modified src/pages.rs
@@ -21,11 +21,15 @@ use log::{debug, info, trace, warn};
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

-
use radicle::prelude::RepoId;
+
use radicle::{
+
    prelude::RepoId,
+
    storage::{ReadRepository, ReadStorage},
+
    Profile,
+
};

use crate::{
-
    db::{Db, DbError},
-
    event::BrokerEvent,
+
    db::{Db, DbError, QueuedEvent},
+
    event::{parse_ref, BrokerEvent, ParsedRef},
    msg::RunId,
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
@@ -68,6 +72,7 @@ pub enum PageError {
pub struct PageBuilder {
    node_alias: Option<String>,
    runs: Vec<Run>,
+
    events: Vec<QueuedEvent>,
}

impl PageBuilder {
@@ -81,6 +86,11 @@ impl PageBuilder {
        self
    }

+
    pub fn events(mut self, events: Vec<QueuedEvent>) -> Self {
+
        self.events = events;
+
        self
+
    }
+

    pub fn build(self) -> Result<StatusPage, PageError> {
        let mut runs = HashMap::new();
        for run in self.runs.iter() {
@@ -103,6 +113,7 @@ struct PageData {
    ci_broker_git_commit: &'static str,
    node_alias: String,
    runs: HashMap<RunId, Run>,
+
    events: Vec<QueuedEvent>,
    broker_event_counter: usize,
    latest_broker_event: Option<BrokerEvent>,
    latest_ci_run: Option<Run>,
@@ -175,6 +186,55 @@ impl PageData {
        }
        doc.push_to_body(table);

+
        Self::h1(&mut doc, "Event queue");
+
        let mut table = Element::new(Tag::Table)
+
            .with_class("event-queue")
+
            .with_child(
+
                Element::new(Tag::Tr)
+
                    .with_child(Element::new(Tag::Th).with_text("Queue id"))
+
                    .with_child(Element::new(Tag::Th).with_text("Timestamp"))
+
                    .with_child(Element::new(Tag::Th).with_text("Event")),
+
            );
+
        for event in self.events.iter() {
+
            table.push_child(
+
                Element::new(Tag::Tr)
+
                    .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
+
                    .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
+
                    .with_child(
+
                        Element::new(Tag::Td).with_child(match event.event() {
+
                            BrokerEvent::Shutdown => Element::new(Tag::Span).with_text("shutdown"),
+
                            BrokerEvent::RefChanged {
+
                                rid,
+
                                name,
+
                                oid,
+
                                old: _,
+
                            } => Element::new(Tag::Span)
+
                                .with_child(
+
                                    Element::new(Tag::Span)
+
                                        .with_class("repoid")
+
                                        .with_text(&rid.to_string()),
+
                                )
+
                                .with_child(Element::new(Tag::Br))
+
                                .with_child(Element::new(Tag::Span).with_class("ref").with_text(
+
                                    &match parse_ref(name) {
+
                                        Ok(Some(ParsedRef::Push(branch))) => branch,
+
                                        Ok(Some(ParsedRef::Patch(patch))) => patch.to_string(),
+
                                        Ok(None) => "unknown".into(),
+
                                        Err(_) => "unknown".into(),
+
                                    },
+
                                ))
+
                                .with_child(Element::new(Tag::Br))
+
                                .with_child(
+
                                    Element::new(Tag::Span)
+
                                        .with_class("commit")
+
                                        .with_text(&oid.to_string()),
+
                                ),
+
                        }),
+
                    ),
+
            );
+
        }
+
        doc.push_to_body(table);
+

        Self::h1(&mut doc, "Recent status");
        let status = StatusData::from(self).as_json()?;
        doc.push_to_body(
@@ -461,22 +521,22 @@ impl StatusPage {
    pub fn update_in_thread(
        mut self,
        run_rx: NotificationReceiver,
+
        profile: Profile,
        db: Db,
-
        node_alias: &str,
        once: bool,
    ) -> JoinHandle<Result<(), PageError>> {
        if self.dirname.is_none() {
            warn!("not writing HTML report pages as output directory has not been set");
        }

-
        self.node_alias = node_alias.into();
+
        self.node_alias = profile.config.alias().to_string();
        info!(
            "wait about {} seconds to update HTML report pages again",
            UPDATE_INTERVAL.as_secs()
        );
        spawn(move || {
            'processing_loop: loop {
-
                self.update_and_write(&db)?;
+
                self.update_and_write(&profile, &db)?;
                if once {
                    return Ok(());
                }
@@ -496,14 +556,14 @@ impl StatusPage {

            // Make sure we update reports and status JSON at least once.
            info!("make sure we update reports and status JSON at least once");
-
            self.update_and_write(&db)?;
+
            self.update_and_write(&profile, &db)?;

            info!("end page updater thread");
            Ok(())
        })
    }

-
    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
+
    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
            info!(
                "write HTML report pages and {STATUS_JSON} to {}",
@@ -512,6 +572,29 @@ impl StatusPage {

            let runs = db.get_all_runs()?;

+
            // Create list of events, except ones for private
+
            // repositories.
+
            let events: Result<Vec<QueuedEvent>, PageError> = db
+
                .queued_events()?
+
                .iter()
+
                .filter_map(|id| match db.get_queued_event(id) {
+
                    Ok(Some(event)) => match event.event() {
+
                        BrokerEvent::Shutdown => Some(Ok(event)),
+
                        BrokerEvent::RefChanged { rid, .. } => {
+
                            if Self::is_public_repo(profile, rid) {
+
                                Some(Ok(event))
+
                            } else {
+
                                None
+
                            }
+
                        }
+
                    },
+
                    Ok(None) => None, // Event is (no longer?) in database.
+
                    Err(_) => None,   // We ignore error here on purpose.
+
                })
+
                .collect();
+
            let mut events = events?;
+
            events.sort_by_cached_key(|e| e.timestamp().to_string());
+

            let data = PageData {
                timestamp: now()?,
                ci_broker_version: env!("CARGO_PKG_VERSION"),
@@ -521,6 +604,7 @@ impl StatusPage {
                    runs.iter()
                        .map(|run| (run.broker_run_id().clone(), run.clone())),
                ),
+
                events,
                broker_event_counter: 0,
                latest_broker_event: None,
                latest_ci_run: None,
@@ -559,6 +643,17 @@ impl StatusPage {
        Ok(())
    }

+
    fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
+
        if let Ok(repo) = profile.storage.repository(*rid) {
+
            if let Ok(id_doc) = repo.canonical_identity_doc() {
+
                if id_doc.doc.visibility.is_public() {
+
                    return true;
+
                }
+
            }
+
        }
+
        false
+
    }
+

    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
        debug!("write file {}", filename.display());
        write(filename, text).map_err(|e| PageError::Write(filename.into(), e))?;
modified src/radicle-ci.css
@@ -34,7 +34,7 @@ code.patch {
    font-weight: bold;
}

-
code.commit {
+
code.commit, span.commit {
    font-weight: bold;
}

@@ -42,7 +42,7 @@ code.revision {
    font-weight: bold;
}

-
code.repoid {
+
code.repoid, span.repoid {
    font-weight: bold;
}

@@ -70,7 +70,7 @@ span.who {
}

table {
-
    width: 90%;
+
    width: 100%;
    table-layout: fixed;
}