Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src pages.rs
//! Status and report pages for CI broker.
//!
//! This module generates an HTML status page for the CI broker, as
//! well as per-repository pages for any repository for which the CI
//! broker has mediated to run CI. The status page gives the latest
//! known status of the broker, plus lists the repositories that CI
//! has run for. The per-repository pages lists all the runs for that
//! repository.

use std::{
    collections::{HashMap, HashSet},
    path::{Path, PathBuf},
    sync::mpsc::RecvTimeoutError,
};

use html_page::{Element, HtmlPage, Tag};
use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
use serde::Serialize;
use time::{OffsetDateTime, macros::format_description};

use radicle::{
    Profile,
    prelude::RepoId,
    storage::{ReadRepository, ReadStorage},
};

use crate::{
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueuedCiEvent},
    ergo::{self, Oid},
    logger,
    msg::{RunId, RunResult},
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
    util::{parse_timestamp, rfc822_timestamp, safely_overwrite},
    worker::Worker,
};

const MAX_RSS_ENTRIES: usize = 10;
const BROKER_RSS: &str = "index.rss";
const FAILURE_RSS: &str = "failed.rss";
const UNFINISHED_RSS: &str = "unfinished.rss";
const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "5";
const STATUS_JSON: &str = "status.json";

/// All possible errors returned from the status page module.
#[derive(Debug, thiserror::Error)]
pub enum PageError {
    /// Error formatting a time as a string.
    #[error("failed to format time as a string")]
    Timeformat(#[from] Box<time::error::Format>),

    #[error("failed to write status page to {0}")]
    Write(PathBuf, #[source] crate::util::UtilError),

    #[error("no node alias has been set for builder")]
    NoAlias,

    #[error("no status data has been set for builder")]
    NoStatusData,

    #[error(transparent)]
    Db(#[from] DbError),

    #[error("failed to lock page data structure")]
    Lock(&'static str),

    #[error("failed to represent status page data as JSON")]
    StatusToJson(#[source] Box<serde_json::Error>),

    #[error("failed to create RSS time stamp from CI run timestamp: {0}")]
    RssTimestamp(String, #[source] crate::util::UtilError),
}

impl PageError {
    fn time_format(err: time::error::Format) -> Self {
        Self::Timeformat(Box::new(err))
    }

    fn status_to_json(err: serde_json::Error) -> Self {
        Self::StatusToJson(Box::new(err))
    }
}

fn now() -> Result<String, PageError> {
    let fmt = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]Z");
    OffsetDateTime::now_utc()
        .format(fmt)
        .map_err(PageError::time_format)
}

struct PageData {
    timestamp: String,
    ci_broker_version: &'static str,
    ci_broker_git_commit: &'static str,
    node_alias: String,
    runs: HashMap<RunId, Run>,
    events: Vec<QueuedCiEvent>,
    broker_event_counter: usize,
    latest_broker_event: Option<CiEvent>,
    latest_ci_run: Option<Run>,
    desc_snippet: Option<String>,
}

impl PageData {
    fn status_page_as_json(&self) -> Result<String, PageError> {
        StatusData::from(self).as_json()
    }

    fn status_page_as_html(&self) -> Result<HtmlPage, PageError> {
        let mut doc = ReportPage::new(&self.node_alias);
        if let Some(snippet) = &self.desc_snippet {
            doc.desc(snippet);
        }
        doc.rss_feeds();
        self.broker_status(&mut doc);
        self.repo_table(&mut doc);
        self.event_queue(&mut doc);
        self.recent_status(&mut doc)?;
        Ok(doc.page())
    }

    fn broker_status(&self, doc: &mut ReportPage) {
        doc.h2("Broker status");
        doc.push(
            Element::new(Tag::P)
                .with_text("Last updated: ")
                .with_text(&self.timestamp)
                .with_child(Element::new(Tag::Br))
                .with_text("CI broker version: ")
                .with_text(self.ci_broker_version)
                .with_text(" (commit ")
                .with_child(Element::new(Tag::Code).with_text(self.ci_broker_git_commit))
                .with_text(")"),
        );
    }

    fn repo_table(&self, doc: &mut ReportPage) {
        doc.h2("Repositories");
        doc.push(Element::new(Tag::P).with_text("Latest CI run for each repository."));

        let total = self.runs.len();
        let failed = self
            .runs
            .values()
            .filter(|run| run.result() == Some(&RunResult::Failure))
            .count();
        doc.push(Element::new(Tag::P).with_text(format!(
            "Total {total} CI runs recorded, of which {failed} failed."
        )));

        let mut table = Element::new(Tag::Table).with_class("repolist").with_child(
            Element::new(Tag::Tr)
                .with_child(Element::new(Tag::Th).with_text("Repository"))
                .with_child(Element::new(Tag::Th).with_text("Run ID"))
                .with_child(Element::new(Tag::Th).with_text("Status"))
                .with_child(Element::new(Tag::Th).with_text("Info")),
        );

        for (alias, rid) in self.repos() {
            let (run_ids, status, info_url) = {
                let run = self.latest_run(rid);
                match run {
                    Some(run) => (
                        Self::run_ids(Some(run)),
                        Self::run_state(run),
                        Self::info_url(Some(run)),
                    ),
                    None => (
                        Self::run_ids(None),
                        Self::run_state_unknown(),
                        Self::info_url(None),
                    ),
                }
            };

            let runs = self.runs(rid);

            table.push_child(
                Element::new(Tag::Tr)
                    .with_child(
                        Element::new(Tag::Td).with_child(Self::repository(rid, &alias, runs)),
                    )
                    .with_child(Element::new(Tag::Td).with_child(run_ids))
                    .with_child(
                        Element::new(Tag::Td).with_child(
                            Element::new(Tag::Span)
                                .with_class("run-status")
                                .with_child(status),
                        ),
                    )
                    .with_child(Element::new(Tag::Td).with_child(info_url)),
            );
        }
        doc.push(table);
    }

    fn event_queue(&self, doc: &mut ReportPage) {
        doc.h2("Event queue");
        let mut table = Element::new(Tag::Table)
            .with_class("event-queue")
            .with_child(
                Element::new(Tag::Tr)
                    .with_child(Element::new(Tag::Th).with_text("Queue id"))
                    .with_child(Element::new(Tag::Th).with_text("Timestamp"))
                    .with_child(Element::new(Tag::Th).with_text("Event")),
            );
        for event in self.events.iter() {
            fn render_event(
                repo: &RepoId,
                alias: Option<String>,
                refname: &str,
                commit: &Oid,
            ) -> Element {
                let alias = if let Some(alias) = alias {
                    Element::new(Tag::Span).with_child(
                        Element::new(Tag::Span)
                            .with_class("alias")
                            .with_text(&alias),
                    )
                } else {
                    Element::new(Tag::Span)
                };
                Element::new(Tag::Span)
                    .with_child(alias)
                    .with_child(Element::new(Tag::Br))
                    .with_child(
                        Element::new(Tag::Span)
                            .with_class("repoid")
                            .with_text(repo.to_string()),
                    )
                    .with_child(Element::new(Tag::Br))
                    .with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
                    .with_child(Element::new(Tag::Br))
                    .with_child(
                        Element::new(Tag::Span)
                            .with_class("commit")
                            .with_text(commit.to_string()),
                    )
            }

            let event_element = match event.event() {
                CiEvent::V1(CiEventV1::Shutdown) => Element::new(Tag::Span).with_text("shutdown"),
                CiEvent::V1(CiEventV1::Terminate(_)) => {
                    Element::new(Tag::Span).with_text("terminate")
                }
                CiEvent::V1(CiEventV1::BranchCreated {
                    from_node: _,
                    repo,
                    branch,
                    tip,
                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
                CiEvent::V1(CiEventV1::BranchUpdated {
                    from_node: _,
                    repo,
                    branch,
                    tip,
                    old_tip: _,
                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
                CiEvent::V1(CiEventV1::BranchDeleted {
                    repo, branch, tip, ..
                }) => render_event(repo, self.repo_alias(*repo), branch, tip),
                CiEvent::V1(CiEventV1::TagCreated {
                    from_node: _,
                    repo,
                    tag,
                    tip,
                }) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
                CiEvent::V1(CiEventV1::TagUpdated {
                    from_node: _,
                    repo,
                    tag,
                    tip,
                    old_tip: _,
                }) => render_event(repo, self.repo_alias(*repo), tag.as_str(), tip),
                CiEvent::V1(CiEventV1::TagDeleted { repo, tag, tip, .. }) => {
                    render_event(repo, self.repo_alias(*repo), tag.as_str(), tip)
                }
                CiEvent::V1(CiEventV1::PatchCreated {
                    from_node: _,
                    repo,
                    patch,
                    new_tip,
                }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
                CiEvent::V1(CiEventV1::PatchUpdated {
                    from_node: _,
                    repo,
                    patch,
                    new_tip,
                }) => render_event(repo, self.repo_alias(*repo), &patch.to_string(), new_tip),
                CiEvent::V1(CiEventV1::CanonicalRefUpdated {
                    from_node: _,
                    repo,
                    refname,
                    target,
                }) => render_event(repo, self.repo_alias(*repo), refname.as_str(), target),
            };

            table.push_child(
                Element::new(Tag::Tr)
                    .with_child(Element::new(Tag::Td).with_text(event.id().to_string()))
                    .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
                    .with_child(Element::new(Tag::Td).with_child(event_element)),
            );
        }
        doc.push(table);
    }

    fn recent_status(&self, doc: &mut ReportPage) -> Result<(), PageError> {
        doc.h2("Recent status");
        let status = StatusData::from(self).as_json()?;
        doc.push(
            Element::new(Tag::P)
                .with_text("See also as a separate file ")
                .with_child(
                    Element::new(Tag::A)
                        .with_attribute("href", STATUS_JSON)
                        .with_child(Element::new(Tag::Code).with_text(STATUS_JSON)),
                )
                .with_text(": "),
        );
        doc.push(
            Element::new(Tag::Blockquote).with_child(Element::new(Tag::Pre).with_text(&status)),
        );
        Ok(())
    }

    fn run_state(run: &Run) -> Element {
        let status = match run.state() {
            RunState::Finished => {
                if let Some(result) = run.result() {
                    format!("{}, {}", run.state(), result)
                } else {
                    format!("{} with unknown result", run.state())
                }
            }
            _ => run.state().to_string(),
        };

        Element::new(Tag::Span)
            .with_class(&status)
            .with_text(status)
    }

    fn run_state_unknown() -> Element {
        Element::new(Tag::Span)
            .with_class("run-status")
            .with_text("unknown")
    }

    fn per_repo_page_as_html(&self, rid: RepoId, alias: &str, timestamp: &str) -> HtmlPage {
        let mut doc = ReportPage::new(alias);
        doc.push(
            Element::new(Tag::P).with_child(
                Element::new(Tag::A)
                    .with_attribute("href", "./index.html")
                    .with_text("Front page"),
            ),
        );

        doc.push(
            Element::new(Tag::P).with_text("Repository ID ").with_child(
                Element::new(Tag::Code)
                    .with_class("repoid")
                    .with_text(rid.to_string()),
            ),
        );
        doc.push(Element::new(Tag::P).with_text(format!("Last updated: {timestamp}")));

        let mut table = Element::new(Tag::Table).with_class("run-list").with_child(
            Element::new(Tag::Tr)
                .with_child(Element::new(Tag::Th).with_text("Run ID"))
                .with_child(Element::new(Tag::Th).with_text("Whence"))
                .with_child(Element::new(Tag::Th).with_text("Status"))
                .with_child(Element::new(Tag::Th).with_text("Info")),
        );

        let mut runs = self.runs(rid);
        runs.sort_by_cached_key(|run| run.timestamp());
        runs.reverse();

        for run in runs {
            let result = if let Some(result) = run.result() {
                result.to_string()
            } else {
                "unknown".into()
            };
            let mut status = Element::new(Tag::Span)
                .with_class(&result)
                .with_text(&result);
            if run.timed_out() {
                status.push_text(" (timed out)");
            }

            let current = match run.state() {
                RunState::Triggered => Element::new(Tag::Span)
                    .with_attribute("state", "triggered")
                    .with_text("triggered"),
                RunState::Running => Element::new(Tag::Span)
                    .with_class("running)")
                    .with_text("running"),
                RunState::Finished => status,
            };

            table.push_child(
                Element::new(Tag::Tr)
                    .with_child(Element::new(Tag::Td).with_child(Self::run_ids(Some(run))))
                    .with_child(
                        Element::new(Tag::Td).with_child(Self::whence_as_html(run.whence())),
                    )
                    .with_child(Element::new(Tag::Td).with_child(current))
                    .with_child(Element::new(Tag::Td).with_child(Self::info_url(Some(run)))),
            );
        }

        doc.push(table);

        doc.page()
    }

    fn repository(repo_id: RepoId, alias: &str, runs: Vec<&Run>) -> Element {
        let failed = runs
            .iter()
            .filter(|run| run.result() == Some(&RunResult::Failure))
            .count();
        let total = runs.len();

        fn failed_recently(runs: &[&Run], n: usize) -> usize {
            let recent = if runs.len() >= n {
                &runs[runs.len() - N..]
            } else {
                runs
            };
            recent
                .iter()
                .filter(|run| run.result() == Some(&RunResult::Failure))
                .count()
        }

        const N: usize = 5;

        Element::new(Tag::Span)
            .with_child(
                Element::new(Tag::A)
                    .with_child(
                        Element::new(Tag::Code)
                            .with_class("alias)")
                            .with_text(alias),
                    )
                    .with_attribute("href", format!("{}.html", rid_to_basename(repo_id))),
            )
            .with_child(Element::new(Tag::Br))
            .with_child(Element::new(Tag::Span).with_text(format!(
                "{failed} failed runs ({} recently)",
                failed_recently(&runs, N)
            )))
            .with_child(Element::new(Tag::Br))
            .with_child(Element::new(Tag::Span).with_text(format!("{total} total runs")))
    }

    fn run_ids(run: Option<&Run>) -> Element {
        if let Some(run) = run {
            let adapter_run_id = if let Some(x) = run.adapter_run_id() {
                Element::new(Tag::Span).with_text(x.as_str())
            } else {
                Element::new(Tag::Span)
            };

            Element::new(Tag::Span)
                .with_child(
                    Element::new(Tag::Span)
                        .with_class("adapter-run-id")
                        .with_text("Adapter: ")
                        .with_child(adapter_run_id)
                        .with_child(Element::new(Tag::Br)),
                )
                .with_child(
                    Element::new(Tag::Span)
                        .with_class("broker-run-id")
                        .with_text("Broker: ")
                        .with_text(run.broker_run_id().to_string()),
                )
                .with_child(Element::new(Tag::Br))
                .with_child(
                    Element::new(Tag::Span)
                        .with_class("timestamp")
                        .with_text("Started: ")
                        .with_text(run.timestamp()),
                )
                .with_child(Element::new(Tag::Br))
                .with_child(
                    Element::new(Tag::Span)
                        .with_class("job-cob-id")
                        .with_text("Job: ")
                        .with_text(run.job_id().map(|id| id.to_string()).unwrap_or("".into())),
                )
        } else {
            Element::new(Tag::Span)
        }
    }

    fn info_url(run: Option<&Run>) -> Element {
        if let Some(run) = run
            && let Some(url) = run.adapter_info_url()
        {
            return Element::new(Tag::A)
                .with_attribute("href", url)
                .with_text("info");
        }
        Element::new(Tag::Span)
    }

    fn whence_as_html(whence: &Whence) -> Element {
        match whence {
            Whence::Branch {
                name,
                commit,
                who: _,
            } => Element::new(Tag::Span)
                .with_text("branch ")
                .with_child(
                    Element::new(Tag::Code)
                        .with_class("branch)")
                        .with_text(name),
                )
                .with_text(", commit  ")
                .with_child(
                    Element::new(Tag::Code)
                        .with_class("commit)")
                        .with_text(commit.to_string()),
                )
                .with_child(Element::new(Tag::Br))
                .with_text("from ")
                .with_child(
                    Element::new(Tag::Span)
                        .with_class("who")
                        .with_text(whence.who().unwrap_or("<commit author not known>")),
                ),
            Whence::Patch {
                patch,
                commit,
                revision,
                who: _,
            } => Element::new(Tag::Span)
                .with_text("patch ")
                .with_child(
                    Element::new(Tag::Code)
                        .with_class("branch")
                        .with_text(patch.to_string()),
                )
                .with_child(Element::new(Tag::Br))
                .with_text("revision ")
                .with_child(Element::new(Tag::Code).with_class("revision)").with_text(&{
                    if let Some(rev) = &revision {
                        rev.to_string()
                    } else {
                        "<unknown patch revision>".to_string()
                    }
                }))
                .with_child(Element::new(Tag::Br))
                .with_text("commit ")
                .with_child(
                    Element::new(Tag::Code)
                        .with_class("commit)")
                        .with_text(commit.to_string()),
                )
                .with_child(Element::new(Tag::Br))
                .with_text("from ")
                .with_child(
                    Element::new(Tag::Span)
                        .with_class("who")
                        .with_text(whence.who().unwrap_or("<patch author not known>")),
                ),
        }
    }

    fn repos(&self) -> Vec<(String, RepoId)> {
        let rids: HashSet<(String, RepoId)> = self
            .runs
            .values()
            .map(|run| (run.repo_alias().to_string(), run.repo_id()))
            .collect();
        let mut repos: Vec<(String, RepoId)> = rids.iter().cloned().collect();
        repos.sort();
        repos
    }

    fn repo_alias(&self, wanted: RepoId) -> Option<String> {
        self.repos().iter().find_map(|(alias, rid)| {
            if *rid == wanted {
                Some(alias.into())
            } else {
                None
            }
        })
    }

    fn runs(&self, repoid: RepoId) -> Vec<&Run> {
        self.runs
            .iter()
            .filter_map(|(_, run)| {
                if run.repo_id() == repoid {
                    Some(run)
                } else {
                    None
                }
            })
            .collect()
    }

    fn latest_run(&self, repoid: RepoId) -> Option<&Run> {
        let mut value: Option<&Run> = None;
        for run in self.runs(repoid) {
            if let Some(latest) = value {
                if run.timestamp() > latest.timestamp() {
                    value = Some(run);
                }
            } else {
                value = Some(run);
            }
        }
        value
    }

    fn status_as_rss(&self) -> Result<Channel, PageError> {
        let mut channel = ChannelBuilder::default();
        channel
            .title("Radicle CI broker run information")
            .description("Latest CI runs known on this instance of the Radicle CI broker.")
            .link("FIXME:link");

        for item in self.sorted_items()?.iter().take(MAX_RSS_ENTRIES) {
            channel.item((*item).clone());
        }
        Ok(channel.build())
    }

    fn failed_as_rss(&self) -> Result<Channel, PageError> {
        let mut channel = ChannelBuilder::default();
        channel
            .title("Radicle CI broker run information")
            .description("Latest FAILED CI runs on this instance of the Radicle CI broker.")
            .link("FIXME:link");

        for item in self
            .sorted_items_for_runs(|run| {
                run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure)
            })?
            .iter()
            .take(MAX_RSS_ENTRIES)
        {
            channel.item(item.clone());
        }

        Ok(channel.build())
    }

    fn unfinished_as_rss(&self) -> Result<Channel, PageError> {
        let mut channel = ChannelBuilder::default();
        channel
            .title("Radicle CI broker run information")
            .description("Latest UNFINISHED CI runs on this instance of the Radicle CI broker.")
            .link("FIXME:link");
        for item in self
            .sorted_items_for_runs(|run| {
                run.state() == RunState::Triggered || run.state() == RunState::Running
            })?
            .iter()
            .take(MAX_RSS_ENTRIES)
        {
            channel.item(item.clone());
        }
        Ok(channel.build())
    }

    fn sorted_items(&self) -> Result<Vec<Item>, PageError> {
        let mut items = vec![];
        for (_alias, repo_id) in self.repos() {
            for run in self.runs(repo_id) {
                if run.state() == RunState::Finished && run.result() == Some(&RunResult::Failure) {
                    items.push(Self::rss_item_from_run(run)?);
                }
            }
        }
        items.sort_by_key(|(ts, _)| *ts);
        items.reverse();
        Ok(items.iter().map(|(_, item)| item.clone()).collect())
    }

    fn sorted_items_for_runs<F>(&self, pred: F) -> Result<Vec<Item>, PageError>
    where
        F: Fn(&Run) -> bool,
    {
        let mut items = vec![];
        for (_alias, repo_id) in self.repos() {
            for run in self.runs(repo_id) {
                if pred(run) {
                    items.push(Self::rss_item_from_run(run)?);
                }
            }
        }
        items.sort_by_key(|(ts, _)| *ts);
        items.reverse();
        Ok(items.iter().map(|(_, item)| item.clone()).collect())
    }

    fn rss_item_from_run(run: &Run) -> Result<(OffsetDateTime, Item), PageError> {
        let mut guid = Guid::default();
        guid.set_value(run.broker_run_id().to_string());
        guid.permalink = false; // guid permalink defaults to true as our guid is not a url set this to false

        let state = if run.state() == RunState::Finished {
            match run.result() {
                Some(result) => result.to_string(),
                None => "unknown".to_string(),
            }
        } else {
            run.state().to_string()
        };
        let title = format!("{state}: {} run {}", run.repo_alias(), run.broker_run_id());

        let ts = run.timestamp().to_string();
        let parsed =
            parse_timestamp(&ts).map_err(|err| PageError::RssTimestamp(ts.clone(), err))?;
        let ts = rfc822_timestamp(&parsed).map_err(|err| PageError::RssTimestamp(ts, err))?;

        let entry = RssEntry {
            repoid: run.repo_id(),
            commit: match run.whence() {
                Whence::Branch { commit, .. } => *commit,
                Whence::Patch { commit, .. } => *commit,
            },
            info_url: run.adapter_info_url().map(String::from),
            status: run.state(),
            result: run.result().cloned(),
        };

        let mut item = ItemBuilder::default()
            .title(Some(title))
            .guid(Some(guid))
            .pub_date(Some(ts))
            .content(entry.to_html().serialize())
            .build();

        if let Some(url) = run.adapter_info_url() {
            item.set_link(Some(url.into()));
        };

        Ok((parsed, item))
    }
}

struct ReportPage {
    page: HtmlPage,
}

impl ReportPage {
    fn new(node_alias: &str) -> Self {
        let mut page = HtmlPage::default();
        let title = format!("CI for Radicle node {node_alias}");
        page.push_to_head(Element::new(Tag::Title).with_text(&title));
        page.push_to_head(Element::new(Tag::Style).with_text(CSS));
        page.push_to_head(
            Element::new(Tag::Meta)
                .with_attribute("http-equiv", "refresh")
                .with_attribute("content", REFERESH_INTERVAL),
        );
        page.push_to_body(Element::new(Tag::H1).with_text(&title));
        Self { page }
    }

    fn desc(&mut self, snippet: &str) {
        let mut desc = Element::new(Tag::Div);
        desc.push_html(snippet);
        self.page.push_to_body(desc);
    }

    fn h2(&mut self, heading: &str) {
        self.push(Element::new(Tag::H2).with_text(heading));
    }

    fn push(&mut self, e: Element) {
        self.page.push_to_body(e);
    }

    fn rss_feeds(&mut self) {
        self.h2("RSS feeds");
        self.push(
            Element::new(Tag::P)
                .with_child(
                    Element::new(Tag::A)
                        .with_text("all")
                        .with_attribute("href", BROKER_RSS),
                )
                .with_text(" ")
                .with_child(
                    Element::new(Tag::A)
                        .with_text("failed")
                        .with_attribute("href", FAILURE_RSS),
                ),
        );
    }

    fn page(self) -> HtmlPage {
        self.page
    }
}

struct RssEntry {
    repoid: RepoId,
    commit: Oid,
    info_url: Option<String>,
    status: RunState,
    result: Option<RunResult>,
}

impl RssEntry {
    fn to_html(&self) -> Element {
        Element::new(Tag::Div)
            .with_class("ci_run")
            .with_child(
                Element::new(Tag::Span)
                    .with_class("repoid")
                    .with_text(self.repoid.to_string()),
            )
            .with_child(
                Element::new(Tag::Span)
                    .with_class("commit")
                    .with_text(self.commit.to_string()),
            )
            .with_child(
                Element::new(Tag::Span)
                    .with_class("info_url")
                    .with_text(self.info_url.as_deref().unwrap_or("")),
            )
            .with_child(
                Element::new(Tag::Span)
                    .with_class("status")
                    .with_text(self.status.to_string()),
            )
            .with_child(Element::new(Tag::Span).with_class("result").with_text(
                match &self.result {
                    None => "undetermined",
                    Some(RunResult::Success) => "success",
                    Some(RunResult::Failure) => "failure",
                },
            ))
    }
}

/// Data for status pages for CI broker.
///
/// There is a "front page" with status about the broker, and a list
/// of repositories for which the broker has run CI. Then there is a
/// page per such repository, with a list of CI runs for that
/// repository.
pub struct StatusPage {
    node_alias: String,
    dirname: Option<PathBuf>,
    args: PageArgs,
}

struct PageArgs {
    run_rx: NotificationReceiver,
    radicle: ergo::Radicle,
    db: Db,
    once: bool,
    desc_snippet: Option<String>,
}

impl StatusPage {
    pub fn set_output_dir(&mut self, dirname: &Path) {
        self.dirname = Some(dirname.into());
    }

    pub fn new(run_rx: NotificationReceiver, radicle: ergo::Radicle, db: Db, once: bool) -> Self {
        Self {
            node_alias: "".into(),
            dirname: None,
            args: PageArgs {
                run_rx,
                radicle,
                db,
                once,
                desc_snippet: None,
            },
        }
    }

    pub fn set_description(&mut self, desc: &str) {
        self.args.desc_snippet = Some(desc.to_string());
    }

    fn update_loop(&mut self) -> Result<(), PageError> {
        'processing_loop: loop {
            self.update_and_write()?;
            if self.args.once {
                return Ok(());
            }

            match self.args.run_rx.wait_for_notification() {
                Ok(_) => (),
                Err(RecvTimeoutError::Timeout) => (),
                Err(RecvTimeoutError::Disconnected) => {
                    logger::pages_disconnected();
                    break 'processing_loop;
                }
            }
        }

        // Make sure we update reports and status JSON at least once.
        self.update_and_write()?;

        Ok(())
    }

    fn update_and_write(&mut self) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname
            && dirname.exists()
        {
            let runs = self.args.db.get_all_runs()?;

            // Create list of events, except ones for private
            // repositories.
            let events: Result<Vec<QueuedCiEvent>, PageError> = self
                .args
                .db
                .queued_ci_events()?
                .iter()
                .filter_map(|id| match self.args.db.get_queued_ci_event(id) {
                    Ok(Some(event)) => match event.event() {
                        CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
                        CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
                        | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
                        | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
                        | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
                            if Self::is_public_repo(self.args.radicle.profile(), repo) {
                                Some(Ok(event))
                            } else {
                                None
                            }
                        }
                        _ => None,
                    },
                    Ok(None) => None, // Event is (no longer?) in database.
                    Err(_) => None,   // We ignore error here on purpose.
                })
                .collect();
            let mut events = events?;
            events.sort_by_cached_key(|e| e.timestamp().to_string());

            let data = PageData {
                timestamp: now()?,
                ci_broker_version: env!("VERSION"),
                ci_broker_git_commit: env!("GIT_HEAD"),
                node_alias: self.node_alias.clone(),
                runs: HashMap::from_iter(
                    runs.iter()
                        .map(|run| (run.broker_run_id().clone(), run.clone())),
                ),
                events,
                broker_event_counter: 0,
                latest_broker_event: None,
                latest_ci_run: None,
                desc_snippet: self.args.desc_snippet.clone(),
            };

            let nameless = String::from("nameless repo");

            // We avoid writing while keeping the lock, to reduce
            // contention.
            let (status, repos) = {
                let status = data.status_page_as_html()?.to_string();

                let mut repos = vec![];
                for (_, rid) in data.repos() {
                    let basename = rid_to_basename(rid);
                    let filename = dirname.join(format!("{basename}.html"));
                    let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
                    let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
                    repos.push((filename, repopage.to_string()));
                }

                (status, repos)
            };

            let filename = dirname.join("index.html");
            Self::write_file(&filename, &status)?;

            for (filename, repopage) in repos {
                Self::write_file(&filename, &repopage)?;
            }

            let filename = dirname.join(STATUS_JSON);
            let json = data.status_page_as_json()?;
            Self::write_file(&filename, &json)?;

            let filename = dirname.join(BROKER_RSS);
            let channel = data.status_as_rss()?;
            let rss = channel.to_string();
            Self::write_file(&filename, &rss)?;

            let filename = dirname.join(FAILURE_RSS);
            let channel = data.failed_as_rss()?;
            let rss = channel.to_string();
            Self::write_file(&filename, &rss)?;

            let filename = dirname.join(UNFINISHED_RSS);
            let channel = data.unfinished_as_rss()?;
            let rss = channel.to_string();
            Self::write_file(&filename, &rss)?;
        }
        Ok(())
    }

    fn is_public_repo(profile: &Profile, rid: &RepoId) -> bool {
        if let Ok(repo) = profile.storage.repository(*rid)
            && let Ok(id_doc) = repo.canonical_identity_doc()
            && id_doc.doc.visibility().is_public()
        {
            return true;
        }
        false
    }

    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
        safely_overwrite(filename, text.as_bytes())
            .map_err(|err| PageError::Write(filename.into(), err))?;
        Ok(())
    }
}

impl Worker for StatusPage {
    const NAME: &str = "status-page";
    type Error = PageError;
    fn work(&mut self) -> Result<(), PageError> {
        match &self.dirname {
            None => logger::pages_directory_unset(),
            Some(report_dir) if !report_dir.exists() => {
                logger::pages_directory_does_not_exist(report_dir)
            }
            Some(_) => (),
        }

        self.update_loop()
    }
}

#[derive(Debug, Clone, Serialize)]
struct StatusData {
    timestamp: String,
    broker_event_counter: usize,
    ci_broker_version: &'static str,
    ci_broker_git_commit: &'static str,
    latest_broker_event: Option<CiEvent>,
    latest_ci_run: Option<Run>,
    event_queue_length: usize,
}

impl StatusData {
    fn as_json(&self) -> Result<String, PageError> {
        serde_json::to_string_pretty(self).map_err(PageError::status_to_json)
    }
}

impl From<&PageData> for StatusData {
    fn from(page: &PageData) -> Self {
        Self {
            timestamp: page.timestamp.clone(),
            broker_event_counter: page.broker_event_counter,
            ci_broker_version: page.ci_broker_version,
            ci_broker_git_commit: page.ci_broker_git_commit,
            latest_broker_event: page.latest_broker_event.clone(),
            latest_ci_run: page.latest_ci_run.clone(),
            event_queue_length: page.events.len(),
        }
    }
}

fn rid_to_basename(repoid: RepoId) -> String {
    let mut basename = repoid.to_string();
    assert!(basename.starts_with("rad:"));
    basename.drain(..4);
    basename
}