Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: make sure the HTML and JSON files get created
Lars Wirzenius committed 1 year ago
commit 7a11bedc001353d784199515ddfa19dda7632671
parent 413fd68642e79dbf7a3cacb35a75b17ae99355f3
5 files changed +77 -14
modified ci-broker.md
@@ -243,6 +243,8 @@ given file adapter.sh from dummy.sh
when I run chmod +x adapter.sh

when I run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml process-events
+
then file reports/index.html exists
+
then file reports/status.json exists

given an installed cibtool
when I run cibtool --db ci-broker.db event list
modified src/bin/cib.rs
@@ -202,7 +202,8 @@ impl ProcessEventsCmd {
        if let Some(dirname) = &config.report_dir {
            page.set_output_dir(dirname);
        }
-
        page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);
+
        let page_updater =
+
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);

        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
        let spec =
@@ -229,6 +230,16 @@ impl ProcessEventsCmd {
            .join()
            .expect("wait for processor thread to finish")
            .map_err(CibError::process_queue)?;
+
        info!("event processing thread has ended");
+

+
        // The page updating thread ends when the channel for run
+
        // notifications is closed by the processor thread ending.
+
        info!("wait for page updater thread to end");
+
        page_updater
+
            .join()
+
            .expect("wait for page updater thread to finish")
+
            .expect("page updater thread succeeded");
+
        info!("page updater thread has ended");

        debug!("cib ends");
        Ok(())
modified src/pages.rs
@@ -17,7 +17,7 @@ use std::{
};

use html_page::{Element, HtmlPage, Tag};
-
use log::{debug, info};
+
use log::{debug, info, trace, warn};
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

@@ -34,6 +34,7 @@ use crate::{
const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "300";
const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
+
const STATUS_JSON: &str = "status.json";

/// All possible errors returned from the status page module.
#[derive(Debug, thiserror::Error)]
@@ -108,6 +109,10 @@ struct PageData {
}

impl PageData {
+
    fn status_page_as_json(&self) -> Result<String, PageError> {
+
        StatusData::from(self).as_json()
+
    }
+

    fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
        let mut doc = HtmlPage::default();

@@ -148,8 +153,8 @@ impl PageData {
        doc.push_to_body(
            Element::new(Tag::P).with_child(
                Element::new(Tag::A)
-
                    .with_attribute("href", "status.json")
-
                    .with_text("status.json:"),
+
                    .with_attribute("href", STATUS_JSON)
+
                    .with_text(STATUS_JSON),
            ),
        );
        doc.push_to_body(
@@ -493,31 +498,49 @@ impl StatusPage {
        once: bool,
    ) -> JoinHandle<Result<(), PageError>> {
        if self.dirname.is_none() {
-
            info!("not writing HTML report pages as output directory has not been set");
+
            warn!("not writing HTML report pages as output directory has not been set");
        }
+

        self.node_alias = node_alias.into();
        info!(
            "wait about {} seconds to update HTML report pages again",
            UPDATE_INTERVAL.as_secs()
        );
-
        spawn(move || loop {
-
            loop {
+
        spawn(move || {
+
            'processing_loop: loop {
                self.update_and_write(&db)?;
                if once {
                    return Ok(());
                }

+
                trace!("wait for run notification or timeout");
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
-
                    Ok(_) | Err(RecvTimeoutError::Timeout) => (),
-
                    Err(RecvTimeoutError::Disconnected) => break,
+
                    Ok(_) => trace!("page updater: got a run notification"),
+
                    Err(RecvTimeoutError::Timeout) => {
+
                        trace!("page updater: timeout on run notification channel")
+
                    }
+
                    Err(RecvTimeoutError::Disconnected) => {
+
                        debug!("page updater: run notification channel disconnected");
+
                        break 'processing_loop;
+
                    }
                }
            }
+

+
            // Make sure we update reports and status JSON at least once.
+
            info!("make sure we update reports and status JSON at least once");
+
            self.update_and_write(&db)?;
+

+
            info!("end page updater thread");
+
            Ok(())
        })
    }

    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
-
            info!("write HTML report pages to {}", dirname.display());
+
            info!(
+
                "write HTML report pages and {STATUS_JSON} to {}",
+
                dirname.display()
+
            );

            let runs = db.get_all_runs()?;

@@ -560,6 +583,10 @@ impl StatusPage {
            for (filename, repopage) in repos {
                Self::write_file(&filename, &repopage)?;
            }
+

+
            let filename = dirname.join(STATUS_JSON);
+
            let json = data.status_page_as_json()?;
+
            Self::write_file(&filename, &json)?;
        }
        Ok(())
    }
modified src/queueadd.rs
@@ -62,6 +62,8 @@ impl QueueAdder {
    }

    pub fn add_events(&self) -> Result<(), AdderError> {
+
        info!("start thread to add events from node to event queue");
+

        let profile = Profile::load()?;
        debug!("loaded profile {profile:#?}");

modified src/queueproc.rs
@@ -2,7 +2,11 @@

#![allow(clippy::result_large_err)]

-
use std::thread::{spawn, JoinHandle};
+
use std::{
+
    sync::mpsc::RecvTimeoutError,
+
    thread::{spawn, JoinHandle},
+
    time::Duration,
+
};

use log::{debug, info};
use radicle::Profile;
@@ -15,6 +19,8 @@ use crate::{
    notif::{NotificationReceiver, NotificationSender},
};

+
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
+

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
@@ -69,23 +75,38 @@ impl QueueProcessor {
    }

    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
+
        info!("start thread to process events until a shutdown event");
        let mut done = false;
-
        while !done && self.events_rx.recv().is_ok() {
-
            debug!("Looking for an event to process");
+
        while !done {
+
            info!("Looking for an event to process");
            while let Some(qe) = self.pick_event()? {
                info!("picked event from queue: {}: {:?}", qe.id(), qe.event());
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
            }
+
            info!("waiting for next event from node");
+
            match self.events_rx.recv_timeout(RECV_TIMEOUT) {
+
                Ok(_) => {
+
                    info!("got event notification");
+
                }
+
                Err(RecvTimeoutError::Timeout) => {
+
                    info!("timed out on event notification");
+
                }
+
                Err(RecvTimeoutError::Disconnected) => {
+
                    info!("event notification channel disconnected");
+
                    done = true;
+
                }
+
            }
        }

+
        info!("thread to process events ends");
        Ok(())
    }

    fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
        let ids = self.db.queued_events().map_err(QueueError::db)?;
-
        debug!("event queue: {ids:?}");
+
        debug!("event queue has {} events", ids.len());

        let mut queue = vec![];
        for id in ids.iter() {