Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: make sure the HTML and JSON files get created
Merged liw opened 1 year ago

This is the most rudimentary requirement. It does not verify they get updated as the CI broker continues running and doing things, as that’s a little difficult to arrange a test for.

This revealed a problem in how the cib process-events threads work together, so fix that too. Changed the page updater thread to output the HTML report and JSON status file at least once, even if there were not events to process. In addition, the event processing thread uses a timeout to receive notification of new events, so that it doesn’t get stuck, if there aren’t any events coming from the node control socket.

Added some logging to make it easier to debug these things in the future.

Signed-off-by: Lars Wirzenius liw@liw.fi

5 files changed +77 -14 413fd686 7a11bedc
modified ci-broker.md
@@ -243,6 +243,8 @@ given file adapter.sh from dummy.sh
when I run chmod +x adapter.sh

when I run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml process-events
+
then file reports/index.html exists
+
then file reports/status.json exists

given an installed cibtool
when I run cibtool --db ci-broker.db event list
modified src/bin/cib.rs
@@ -202,7 +202,8 @@ impl ProcessEventsCmd {
        if let Some(dirname) = &config.report_dir {
            page.set_output_dir(dirname);
        }
-
        page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);
+
        let page_updater =
+
            page.update_in_thread(run_notification.rx(), db, &profile.config.node.alias, false);

        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
        let spec =
@@ -229,6 +230,16 @@ impl ProcessEventsCmd {
            .join()
            .expect("wait for processor thread to finish")
            .map_err(CibError::process_queue)?;
+
        info!("event processing thread has ended");
+

+
        // The page updating thread ends when the channel for run
+
        // notifications is closed by the processor thread ending.
+
        info!("wait for page updater thread to end");
+
        page_updater
+
            .join()
+
            .expect("wait for page updater thread to finish")
+
            .expect("page updater thread succeeded");
+
        info!("page updater thread has ended");

        debug!("cib ends");
        Ok(())
modified src/pages.rs
@@ -17,7 +17,7 @@ use std::{
};

use html_page::{Element, HtmlPage, Tag};
-
use log::{debug, info};
+
use log::{debug, info, trace, warn};
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

@@ -34,6 +34,7 @@ use crate::{
const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "300";
const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
+
const STATUS_JSON: &str = "status.json";

/// All possible errors returned from the status page module.
#[derive(Debug, thiserror::Error)]
@@ -108,6 +109,10 @@ struct PageData {
}

impl PageData {
+
    fn status_page_as_json(&self) -> Result<String, PageError> {
+
        StatusData::from(self).as_json()
+
    }
+

    fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
        let mut doc = HtmlPage::default();

@@ -148,8 +153,8 @@ impl PageData {
        doc.push_to_body(
            Element::new(Tag::P).with_child(
                Element::new(Tag::A)
-
                    .with_attribute("href", "status.json")
-
                    .with_text("status.json:"),
+
                    .with_attribute("href", STATUS_JSON)
+
                    .with_text(STATUS_JSON),
            ),
        );
        doc.push_to_body(
@@ -493,31 +498,49 @@ impl StatusPage {
        once: bool,
    ) -> JoinHandle<Result<(), PageError>> {
        if self.dirname.is_none() {
-
            info!("not writing HTML report pages as output directory has not been set");
+
            warn!("not writing HTML report pages as output directory has not been set");
        }
+

        self.node_alias = node_alias.into();
        info!(
            "wait about {} seconds to update HTML report pages again",
            UPDATE_INTERVAL.as_secs()
        );
-
        spawn(move || loop {
-
            loop {
+
        spawn(move || {
+
            'processing_loop: loop {
                self.update_and_write(&db)?;
                if once {
                    return Ok(());
                }

+
                trace!("wait for run notification or timeout");
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
-
                    Ok(_) | Err(RecvTimeoutError::Timeout) => (),
-
                    Err(RecvTimeoutError::Disconnected) => break,
+
                    Ok(_) => trace!("page updater: got a run notification"),
+
                    Err(RecvTimeoutError::Timeout) => {
+
                        trace!("page updater: timeout on run notification channel")
+
                    }
+
                    Err(RecvTimeoutError::Disconnected) => {
+
                        debug!("page updater: run notification channel disconnected");
+
                        break 'processing_loop;
+
                    }
                }
            }
+

+
            // Make sure we update reports and status JSON at least once.
+
            info!("make sure we update reports and status JSON at least once");
+
            self.update_and_write(&db)?;
+

+
            info!("end page updater thread");
+
            Ok(())
        })
    }

    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
-
            info!("write HTML report pages to {}", dirname.display());
+
            info!(
+
                "write HTML report pages and {STATUS_JSON} to {}",
+
                dirname.display()
+
            );

            let runs = db.get_all_runs()?;

@@ -560,6 +583,10 @@ impl StatusPage {
            for (filename, repopage) in repos {
                Self::write_file(&filename, &repopage)?;
            }
+

+
            let filename = dirname.join(STATUS_JSON);
+
            let json = data.status_page_as_json()?;
+
            Self::write_file(&filename, &json)?;
        }
        Ok(())
    }
modified src/queueadd.rs
@@ -62,6 +62,8 @@ impl QueueAdder {
    }

    pub fn add_events(&self) -> Result<(), AdderError> {
+
        info!("start thread to add events from node to event queue");
+

        let profile = Profile::load()?;
        debug!("loaded profile {profile:#?}");

modified src/queueproc.rs
@@ -2,7 +2,11 @@

#![allow(clippy::result_large_err)]

-
use std::thread::{spawn, JoinHandle};
+
use std::{
+
    sync::mpsc::RecvTimeoutError,
+
    thread::{spawn, JoinHandle},
+
    time::Duration,
+
};

use log::{debug, info};
use radicle::Profile;
@@ -15,6 +19,8 @@ use crate::{
    notif::{NotificationReceiver, NotificationSender},
};

+
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
+

#[derive(Default)]
pub struct QueueProcessorBuilder {
    db: Option<Db>,
@@ -69,23 +75,38 @@ impl QueueProcessor {
    }

    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
+
        info!("start thread to process events until a shutdown event");
        let mut done = false;
-
        while !done && self.events_rx.recv().is_ok() {
-
            debug!("Looking for an event to process");
+
        while !done {
+
            info!("Looking for an event to process");
            while let Some(qe) = self.pick_event()? {
                info!("picked event from queue: {}: {:?}", qe.id(), qe.event());
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
            }
+
            info!("waiting for next event from node");
+
            match self.events_rx.recv_timeout(RECV_TIMEOUT) {
+
                Ok(_) => {
+
                    info!("got event notification");
+
                }
+
                Err(RecvTimeoutError::Timeout) => {
+
                    info!("timed out on event notification");
+
                }
+
                Err(RecvTimeoutError::Disconnected) => {
+
                    info!("event notification channel disconnected");
+
                    done = true;
+
                }
+
            }
        }

+
        info!("thread to process events ends");
        Ok(())
    }

    fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
        let ids = self.db.queued_events().map_err(QueueError::db)?;
-
        debug!("event queue: {ids:?}");
+
        debug!("event queue has {} events", ids.len());

        let mut queue = vec![];
        for id in ids.iter() {