Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor: use worker abstraction in pages
Lars Wirzenius committed 10 months ago
commit a1de334746fa5cb2aceeb71ea50f49d3b1cdf266
parent 663de60
3 files changed +62 -58
modified src/bin/cib.rs
@@ -203,16 +203,16 @@ impl QueuedCmd {
            .map_err(CibError::process_queue)?;

        let db = args.open_db(config)?;
-
        let mut page = StatusPage::default();
-
        if let Some(dirname) = config.report_dir() {
-
            page.set_output_dir(dirname);
-
        }
-
        let page_updater = page.update_in_thread(
+
        let mut page = StatusPage::new(
            run_notifications.rx().map_err(CibError::notification)?,
            profile,
            db,
            true,
        );
+
        if let Some(dirname) = config.report_dir() {
+
            page.set_output_dir(dirname);
+
        }
+
        let page_updater = start_thread(page);
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
@@ -248,16 +248,16 @@ impl ProcessEventsCmd {

        let db = args.open_db(config)?;

-
        let mut page = StatusPage::default();
-
        if let Some(dirname) = config.report_dir() {
-
            page.set_output_dir(dirname);
-
        }
-
        let page_updater = page.update_in_thread(
+
        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibError::notification)?,
            profile,
            db,
            false,
        );
+
        if let Some(dirname) = config.report_dir() {
+
            page.set_output_dir(dirname);
+
        }
+
        let page_updater = start_thread(page);

        let adapters = config.to_adapters().map_err(CibError::Adapters)?;

modified src/bin/cibtoolcmd/report.rs
@@ -1,4 +1,4 @@
-
use radicle_ci_broker::pages::StatusPage;
+
use radicle_ci_broker::{pages::StatusPage, worker::start_thread};

use super::*;

@@ -17,16 +17,16 @@ impl Leaf for ReportCmd {

        let db = args.open_db()?;

-
        let mut page = StatusPage::default();
-
        page.set_output_dir(&self.output_dir);
-

        let mut run_notification = NotificationChannel::new_run();
-
        let thread = page.update_in_thread(
+
        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibToolError::Notification)?,
            profile,
            db,
            true,
        );
+
        page.set_output_dir(&self.output_dir);
+

+
        let thread = start_thread(page);
        thread.join().unwrap()?;

        Ok(())
modified src/pages.rs
@@ -12,8 +12,6 @@ use std::{
    fs::write,
    path::{Path, PathBuf},
    sync::mpsc::RecvTimeoutError,
-
    thread::{spawn, JoinHandle},
-
    time::Duration,
};

use html_page::{Element, HtmlPage, Tag};
@@ -36,6 +34,7 @@ use crate::{
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
    util::{parse_timestamp, rfc822_timestamp},
+
    worker::Worker,
};

const MAX_RSS_ENTRIES: usize = 10;
@@ -44,7 +43,6 @@ const FAILURE_RSS: &str = "failed.rss";
const UNFINISHED_RSS: &str = "unfinished.rss";
const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "300";
-
const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_JSON: &str = "status.json";

/// All possible errors returned from the status page module.
@@ -760,10 +758,17 @@ impl RssEntry {
/// of repositories for which the broker has run CI. Then there is a
/// page per such repository, with a list of CI runs for that
/// repository.
-
#[derive(Default)]
pub struct StatusPage {
    node_alias: String,
    dirname: Option<PathBuf>,
+
    args: PageArgs,
+
}
+

+
struct PageArgs {
+
    run_rx: NotificationReceiver,
+
    profile: Profile,
+
    db: Db,
+
    once: bool,
}

impl StatusPage {
@@ -771,46 +776,27 @@ impl StatusPage {
        self.dirname = Some(dirname.into());
    }

-
    pub fn update_in_thread(
-
        mut self,
-
        run_rx: NotificationReceiver,
-
        profile: Profile,
-
        db: Db,
-
        once: bool,
-
    ) -> JoinHandle<Result<(), PageError>> {
-
        match &self.dirname {
-
            None => logger::pages_directory_unset(),
-
            Some(report_dir) if !report_dir.exists() => {
-
                logger::pages_directory_does_not_exist(report_dir)
-
            }
-
            Some(_) => (),
+
    pub fn new(run_rx: NotificationReceiver, profile: Profile, db: Db, once: bool) -> Self {
+
        Self {
+
            node_alias: "".into(),
+
            dirname: None,
+
            args: PageArgs {
+
                run_rx,
+
                profile,
+
                db,
+
                once,
+
            },
        }
-

-
        self.node_alias = profile.config.alias().to_string();
-
        logger::pages_interval(UPDATE_INTERVAL);
-

-
        spawn(move || {
-
            logger::pages_start();
-
            let result = self.update_loop(run_rx, profile, db, once);
-
            logger::pages_end(&result);
-
            result
-
        })
    }

-
    fn update_loop(
-
        mut self,
-
        run_rx: NotificationReceiver,
-
        profile: Profile,
-
        db: Db,
-
        once: bool,
-
    ) -> Result<(), PageError> {
+
    fn update_loop(&mut self) -> Result<(), PageError> {
        'processing_loop: loop {
-
            self.update_and_write(&profile, &db)?;
-
            if once {
+
            self.update_and_write()?;
+
            if self.args.once {
                return Ok(());
            }

-
            match run_rx.wait_for_notification() {
+
            match self.args.run_rx.wait_for_notification() {
                Ok(_) => (),
                Err(RecvTimeoutError::Timeout) => (),
                Err(RecvTimeoutError::Disconnected) => {
@@ -821,29 +807,31 @@ impl StatusPage {
        }

        // Make sure we update reports and status JSON at least once.
-
        self.update_and_write(&profile, &db)?;
+
        self.update_and_write()?;

        Ok(())
    }

-
    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
+
    fn update_and_write(&mut self) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
            if dirname.exists() {
-
                let runs = db.get_all_runs()?;
+
                let runs = self.args.db.get_all_runs()?;

                // Create list of events, except ones for private
                // repositories.
-
                let events: Result<Vec<QueuedCiEvent>, PageError> = db
+
                let events: Result<Vec<QueuedCiEvent>, PageError> = self
+
                    .args
+
                    .db
                    .queued_ci_events()?
                    .iter()
-
                    .filter_map(|id| match db.get_queued_ci_event(id) {
+
                    .filter_map(|id| match self.args.db.get_queued_ci_event(id) {
                        Ok(Some(event)) => match event.event() {
                            CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
                            CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
                            | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
                            | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
                            | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
-
                                if Self::is_public_repo(profile, repo) {
+
                                if Self::is_public_repo(&self.args.profile, repo) {
                                    Some(Ok(event))
                                } else {
                                    None
@@ -939,6 +927,22 @@ impl StatusPage {
    }
}

+
impl Worker for StatusPage {
+
    const NAME: &str = "status-page";
+
    type Error = PageError;
+
    fn work(&mut self) -> Result<(), PageError> {
+
        match &self.dirname {
+
            None => logger::pages_directory_unset(),
+
            Some(report_dir) if !report_dir.exists() => {
+
                logger::pages_directory_does_not_exist(report_dir)
+
            }
+
            Some(_) => (),
+
        }
+

+
        self.update_loop()
+
    }
+
}
+

#[derive(Debug, Clone, Serialize)]
struct StatusData {
    timestamp: String,