Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fixes for when report dir is set but doesn't exist
Merged liw opened 1 year ago
4 files changed +161 -101 bcbd2ffc 33b4b65b
modified ci-broker.md
@@ -378,6 +378,34 @@ then stdout contains ""id": "xyzzy""
~~~


+
## Runs adapter without a report directory
+

+
_Want:_ CI broker can run without a report directory.
+

+
_Why:_ We don't require the report directory to be specified, or
+
exist, but we do require `cib` to handle this.
+

+
_Who:_ `cib-devs`
+

+
~~~scenario
+
given a Radicle node, with CI configured with broker-with-triggers.yaml and adapter dummy.sh
+
given a Git repository xyzzy in the Radicle node
+
given the Radicle node emits a refsUpdated event for xyzzy
+
when I run ./env.sh synthetic-events synt.sock event.json --log log.txt
+
when I run ./env.sh cib --config broker-with-triggers.yaml process-events
+

+
then stderr contains "CibStart"
+
then stderr contains "CibConfig"
+
then stderr contains "CibEndSuccess"
+

+
when I run cibtool --db ci-broker.db event list
+
then stdout is empty
+

+
when I run cibtool --db ci-broker.db run list --json
+
then stdout contains ""id": "xyzzy""
+
~~~
+

+

## Runs adapters for all matching triggers

_Want:_ CI broker can run its adapter.
modified src/bin/cib.rs
@@ -19,7 +19,7 @@ use radicle_ci_broker::{
    db::{Db, DbError},
    logger::{self, LogLevel},
    notif::{NotificationChannel, NotificationError},
-
    pages::StatusPage,
+
    pages::{PageError, StatusPage},
    queueadd::{AdderError, QueueAdderBuilder},
    queueproc::{QueueError, QueueProcessorBuilder},
};
@@ -214,7 +214,7 @@ impl QueuedCmd {
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
-
            .expect("page updater thread succeeded");
+
            .map_err(CibError::PageUpdater)?;

        Ok(())
    }
@@ -284,7 +284,7 @@ impl ProcessEventsCmd {
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
-
            .expect("page updater thread succeeded");
+
            .map_err(CibError::PageUpdater)?;

        Ok(())
    }
@@ -316,6 +316,9 @@ enum CibError {
    #[error("failed to process events from queue")]
    ProcessQueue(#[source] QueueError),

+
    #[error("thread to update report pages failed")]
+
    PageUpdater(#[source] PageError),
+

    #[error("failed to add events to queue")]
    AddEvents(#[source] AdderError),

modified src/logger.rs
@@ -401,7 +401,7 @@ pub fn adapter_config(config: &Config) {
}

pub fn queueproc_start() {
-
    debug!(
+
    info!(
        msg_id = ?Id::QueueProcStart,
        kind = %Kind::Startup,
        "start thread to process events until a shutdown event"
@@ -409,12 +409,21 @@ pub fn queueproc_start() {
}

pub fn queueproc_end(result: &Result<(), QueueError>) {
-
    debug!(
-
        msg_id = ?Id::QueueProcEnd,
-
        kind = %Kind::Debug,
-
        ?result,
-
        "thread to process events ends"
-
    );
+
    if result.is_err() {
+
        error!(
+
            msg_id = ?Id::QueueProcEnd,
+
            kind = %Kind::Debug,
+
            ?result,
+
            "thread to process events ends"
+
        );
+
    } else {
+
        info!(
+
            msg_id = ?Id::QueueProcEnd,
+
            kind = %Kind::Debug,
+
            ?result,
+
            "thread to process events ends"
+
        );
+
    }
}

pub fn queueproc_channel_disconnect() {
@@ -522,7 +531,7 @@ pub fn queueproc_action_shutdown() {
}

pub fn queueadd_start() {
-
    debug!(
+
    info!(
        msg_id = ?Id::QueueAddStart,
        kind = %Kind::Debug,
        "start thread to add events from node to event queue"
@@ -549,12 +558,21 @@ pub fn queueadd_push_event(event: &CiEvent, id: &QueueId) {
}

pub fn queueadd_end(result: &Result<(), AdderError>) {
-
    debug!(
-
        msg_id = ?Id::QueueAddEnd,
-
        kind = %Kind::Debug,
-
        ?result,
-
        "thread to process events ends"
-
    );
+
    if result.is_err() {
+
        error!(
+
            msg_id = ?Id::QueueAddEnd,
+
            kind = %Kind::Debug,
+
            ?result,
+
            "thread to process events ends"
+
        );
+
    } else {
+
        info!(
+
            msg_id = ?Id::QueueAddEnd,
+
            kind = %Kind::Debug,
+
            ?result,
+
            "thread to process events ends"
+
        );
+
    }
}

pub fn pages_directory_unset() {
@@ -583,7 +601,7 @@ pub fn pages_disconnected() {
}

pub fn pages_start() {
-
    debug!(
+
    info!(
        msg_id = ?Id::PagesStart,
        kind = %Kind::Debug,
        "start page updater thread"
@@ -591,12 +609,21 @@ pub fn pages_start() {
}

pub fn pages_end(result: &Result<(), PageError>) {
-
    debug!(
-
        msg_id = ?Id::PagesEnd,
-
        kind = %Kind::Debug,
-
        ?result,
-
        "end page updater thread"
-
    );
+
    if result.is_err() {
+
        error!(
+
            msg_id = ?Id::PagesEnd,
+
            kind = %Kind::Debug,
+
            ?result,
+
            "end page updater thread"
+
        );
+
    } else {
+
        info!(
+
            msg_id = ?Id::PagesEnd,
+
            kind = %Kind::Debug,
+
            ?result,
+
            "end page updater thread"
+
        );
+
    }
}

pub fn event_disconnected() {
modified src/pages.rs
@@ -808,94 +808,96 @@ impl StatusPage {

    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
-
            let runs = db.get_all_runs()?;
-

-
            // Create list of events, except ones for private
-
            // repositories.
-
            let events: Result<Vec<QueuedCiEvent>, PageError> = db
-
                .queued_ci_events()?
-
                .iter()
-
                .filter_map(|id| match db.get_queued_ci_event(id) {
-
                    Ok(Some(event)) => match event.event() {
-
                        CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
-
                        CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
-
                        | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
-
                        | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
-
                        | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
-
                            if Self::is_public_repo(profile, repo) {
-
                                Some(Ok(event))
-
                            } else {
-
                                None
+
            if dirname.exists() {
+
                let runs = db.get_all_runs()?;
+

+
                // Create list of events, except ones for private
+
                // repositories.
+
                let events: Result<Vec<QueuedCiEvent>, PageError> = db
+
                    .queued_ci_events()?
+
                    .iter()
+
                    .filter_map(|id| match db.get_queued_ci_event(id) {
+
                        Ok(Some(event)) => match event.event() {
+
                            CiEvent::V1(CiEventV1::Shutdown) => Some(Ok(event)),
+
                            CiEvent::V1(CiEventV1::BranchCreated { repo, .. })
+
                            | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
+
                            | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
+
                            | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
+
                                if Self::is_public_repo(profile, repo) {
+
                                    Some(Ok(event))
+
                                } else {
+
                                    None
+
                                }
                            }
-
                        }
-
                        _ => None,
-
                    },
-
                    Ok(None) => None, // Event is (no longer?) in database.
-
                    Err(_) => None,   // We ignore error here on purpose.
-
                })
-
                .collect();
-
            let mut events = events?;
-
            events.sort_by_cached_key(|e| e.timestamp().to_string());
-

-
            let data = PageData {
-
                timestamp: now()?,
-
                ci_broker_version: env!("CARGO_PKG_VERSION"),
-
                ci_broker_git_commit: env!("GIT_HEAD"),
-
                node_alias: self.node_alias.clone(),
-
                runs: HashMap::from_iter(
-
                    runs.iter()
-
                        .map(|run| (run.broker_run_id().clone(), run.clone())),
-
                ),
-
                events,
-
                broker_event_counter: 0,
-
                latest_broker_event: None,
-
                latest_ci_run: None,
-
            };
+
                            _ => None,
+
                        },
+
                        Ok(None) => None, // Event is (no longer?) in database.
+
                        Err(_) => None,   // We ignore error here on purpose.
+
                    })
+
                    .collect();
+
                let mut events = events?;
+
                events.sort_by_cached_key(|e| e.timestamp().to_string());
+

+
                let data = PageData {
+
                    timestamp: now()?,
+
                    ci_broker_version: env!("CARGO_PKG_VERSION"),
+
                    ci_broker_git_commit: env!("GIT_HEAD"),
+
                    node_alias: self.node_alias.clone(),
+
                    runs: HashMap::from_iter(
+
                        runs.iter()
+
                            .map(|run| (run.broker_run_id().clone(), run.clone())),
+
                    ),
+
                    events,
+
                    broker_event_counter: 0,
+
                    latest_broker_event: None,
+
                    latest_ci_run: None,
+
                };

-
            let nameless = String::from("nameless repo");
+
                let nameless = String::from("nameless repo");

-
            // We avoid writing while keeping the lock, to reduce
-
            // contention.
-
            let (status, repos) = {
-
                let status = data.status_page_as_html()?.to_string();
+
                // We avoid writing while keeping the lock, to reduce
+
                // contention.
+
                let (status, repos) = {
+
                    let status = data.status_page_as_html()?.to_string();

-
                let mut repos = vec![];
-
                for (_, rid) in data.repos() {
-
                    let basename = rid_to_basename(rid);
-
                    let filename = dirname.join(format!("{basename}.html"));
-
                    let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
-
                    let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
-
                    repos.push((filename, repopage.to_string()));
-
                }
+
                    let mut repos = vec![];
+
                    for (_, rid) in data.repos() {
+
                        let basename = rid_to_basename(rid);
+
                        let filename = dirname.join(format!("{basename}.html"));
+
                        let alias = data.repo_alias(rid).unwrap_or(nameless.clone());
+
                        let repopage = data.per_repo_page_as_html(rid, &alias, &data.timestamp);
+
                        repos.push((filename, repopage.to_string()));
+
                    }

-
                (status, repos)
-
            };
+
                    (status, repos)
+
                };

-
            let filename = dirname.join("index.html");
-
            Self::write_file(&filename, &status)?;
+
                let filename = dirname.join("index.html");
+
                Self::write_file(&filename, &status)?;

-
            for (filename, repopage) in repos {
-
                Self::write_file(&filename, &repopage)?;
-
            }
+
                for (filename, repopage) in repos {
+
                    Self::write_file(&filename, &repopage)?;
+
                }

-
            let filename = dirname.join(STATUS_JSON);
-
            let json = data.status_page_as_json()?;
-
            Self::write_file(&filename, &json)?;
+
                let filename = dirname.join(STATUS_JSON);
+
                let json = data.status_page_as_json()?;
+
                Self::write_file(&filename, &json)?;

-
            let filename = dirname.join(BROKER_RSS);
-
            let channel = data.status_as_rss()?;
-
            let rss = channel.to_string();
-
            Self::write_file(&filename, &rss)?;
+
                let filename = dirname.join(BROKER_RSS);
+
                let channel = data.status_as_rss()?;
+
                let rss = channel.to_string();
+
                Self::write_file(&filename, &rss)?;

-
            let filename = dirname.join(FAILURE_RSS);
-
            let channel = data.failed_as_rss()?;
-
            let rss = channel.to_string();
-
            Self::write_file(&filename, &rss)?;
+
                let filename = dirname.join(FAILURE_RSS);
+
                let channel = data.failed_as_rss()?;
+
                let rss = channel.to_string();
+
                Self::write_file(&filename, &rss)?;

-
            let filename = dirname.join(UNFINISHED_RSS);
-
            let channel = data.unfinished_as_rss()?;
-
            let rss = channel.to_string();
-
            Self::write_file(&filename, &rss)?;
+
                let filename = dirname.join(UNFINISHED_RSS);
+
                let channel = data.unfinished_as_rss()?;
+
                let rss = channel.to_string();
+
                Self::write_file(&filename, &rss)?;
+
            }
        }
        Ok(())
    }