Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
Make CI broker use a persistent event queue
Merged liw opened 1 year ago

This breaks everything, including names of binaries. Existing users beware.

26 files changed +2738 -808 1010100c ddbac4fd
modified Cargo.lock
@@ -357,6 +357,7 @@ dependencies = [
 "anstyle",
 "clap_lex",
 "strsim",
+
 "terminal_size",
]

[[package]]
@@ -637,6 +638,16 @@ dependencies = [
]

[[package]]
+
name = "env_filter"
+
version = "0.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea"
+
dependencies = [
+
 "log",
+
 "regex",
+
]
+

+
[[package]]
name = "env_logger"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -650,6 +661,19 @@ dependencies = [
]

[[package]]
+
name = "env_logger"
+
version = "0.11.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9"
+
dependencies = [
+
 "anstream",
+
 "anstyle",
+
 "env_filter",
+
 "humantime",
+
 "log",
+
]
+

+
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1522,7 +1546,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c"
dependencies = [
-
 "env_logger",
+
 "env_logger 0.10.2",
 "log",
]

@@ -1636,6 +1660,7 @@ version = "0.2.0"
dependencies = [
 "anyhow",
 "clap",
+
 "env_logger 0.11.3",
 "fehler",
 "html-page",
 "log",
@@ -1648,6 +1673,7 @@ dependencies = [
 "serde_json",
 "serde_yaml 0.9.34+deprecated",
 "sqlite",
+
 "sqlite3-sys",
 "subplot-build",
 "subplotlib",
 "tempfile",
@@ -2235,7 +2261,7 @@ dependencies = [
 "anyhow",
 "base64 0.21.7",
 "clap",
-
 "env_logger",
+
 "env_logger 0.10.2",
 "file_diff",
 "git-testament",
 "html-escape",
@@ -2396,6 +2422,16 @@ dependencies = [
]

[[package]]
+
name = "terminal_size"
+
version = "0.3.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7"
+
dependencies = [
+
 "rustix",
+
 "windows-sys 0.48.0",
+
]
+

+
[[package]]
name = "textwrap"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -12,7 +12,7 @@ categories = ["development-tools::build-utils"]

[dependencies]
anyhow = "1.0.82"
-
clap = { version = "4.5.4", features = ["derive"] }
+
clap = { version = "4.5.4", features = ["derive", "wrap_help"] }
html-page = "0.2.0"
log = "0.4.20"
pretty_env_logger = "0.5.0"
@@ -27,6 +27,8 @@ thiserror = "1.0.50"
time = { version = "0.3.34", features = ["formatting", "macros"] }
uuid = { version = "1.7.0", features = ["v4"] }
regex = "1.10.4"
+
sqlite3-sys = "0.15.0"
+
env_logger = "0.11.3"

[dependencies.radicle]
version = "0.11.0"
modified README.md
@@ -41,7 +41,6 @@ The crate contains several binaries:
  between the broker and adapters
* `filter-events` --- a helper program to see what events a CI broker
  config allows
-
* `list_runs` --- list all CI runs in a broker database
* `pagegen` -- helper program to produce sample report pages
  - mostly only useful for testing changes to the page generating code

modified ci-broker.md
@@ -26,6 +26,20 @@ filters:
  - !Branch "main"
~~~

+
~~~{#broker-allow-nothing.yaml .file .yaml}
+
db: ci-broker.db
+
report_dir: reports
+
default_adapter: mcadapterface
+
adapters:
+
  mcadapterface:
+
    command: ./adapter.sh
+
    env:
+
      RADICLE_NATIVE_CI: native-ci.yaml
+
    sensitive_env: {}
+
filters:
+
  - !Branch "this-branch-does-not-exist"
+
~~~
+

## A dummy adapter

This adapter does nothing, just reports a run ID and a successful run.
@@ -37,12 +51,12 @@ echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
echo '{"response":"finished","result":"success"}'
~~~

-
## A trigger message
+
## A `refsFetched` node event

-
This is a request message from the CI broker to the adapter to trigger
-
a run on a repository. The repository is imaginary as is the commit.
+
This is a node event from the node to signal that some git refs have
+
changed in repository.

-
~~~{#trigger.json .file .json}
+
~~~{#refsfetched.json .file .json}
{
  "type": "refsFetched",
  "remote": "z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV",
@@ -50,7 +64,7 @@ a run on a repository. The repository is imaginary as is the commit.
  "updated": [
    {
      "updated": {
-
        "name": "refs/heads/main",
+
        "name": "refs/namespaces/DUMMYNID/refs/heads/main",
        "old": "0000000000000000000000000000000000000000",
        "new": "0000000000000000000000000000000000000000"
      }
@@ -59,7 +73,7 @@ a run on a repository. The repository is imaginary as is the commit.
}
~~~

-
## A shutdown message
+
## A shutdown event

This asks the CI broker to shut down cleanly.

@@ -79,10 +93,11 @@ This asks the CI broker to shut down cleanly.
}
~~~

-
## Set rid in trigger message

-
This is a helper script that reads a trigger message and changes its
-
`rid` field to be the id of the given repository. It also sets the
+
## Set rid in `refsFetched` event
+

+
This is a helper script that reads a `refsFetched` event and changes
+
its `rid` field to be the id of the given repository. It also sets the
`name` field for updated refs to include the repository ID. It writes
the message back to its file.

@@ -112,7 +127,7 @@ if "updated" in o:
    x = o["updated"]
    for oo in x:
        name = oo["updated"]["name"]
-
        oo["updated"]["name"] = f"refs/namespaces/{nid}/{name}"
+
        oo["updated"]["name"] = nid.join(name.split("DUMMYNID"))
        oo["updated"]["new"] = oid
    o["updated"] = x

@@ -120,8 +135,87 @@ with open(filename, "w") as f:
    json.dump(o, fp=f, indent=4)
~~~

+
## Set environment variables and run command
+

+
To avoid having to repeat environment variables to set up and use a
+
Radicle node for verification scenarios, we provide a script that
+
sets them and runs a command.
+

+
~~~{#radenv.sh .file .sh}
+
#!/bin/bash
+

+
set -euo pipefail
+

+
homedir="$(pwd)/homedir"
+

+
env \
+
	HOME="$homedir" \
+
	RAD_PASSPHRASE=secret \
+
	RAD_HOME="$homedir/.radicle" \
+
	RAD_SOCKET=synt.sock \
+
	RUST_LOG=trace \
+
	RADICLE_CI_BROKER_LOG=trace \
+
	"$@"
+
~~~
+

+

+
## Set up a node with repository
+

+
Most of our verification scenarios will need to set up a Radicle node
+
and a test repository there. Rather than repeat it in every scenario,
+
we use this helper script.
+

+
~~~{#setup-node.sh .file .sh}
+
#!/bin/bash
+

+
set -xeuo pipefail
+

+
mkdir -p "$HOME"
+

+
env | grep RAD
+
rad auth --alias brokertest
+

+
git config --global user.email radicle@example.com
+
git config --global user.name TestyMcTestFace
+
git init -b main testy
+

+
cd testy
+
echo "test file" > test.txt
+

+
git add .
+
git commit -am test
+
git status
+
git show
+
rad init --name testy --description test --default-branch main --private --no-confirm --no-seed
+
rad inspect --identity
+
rad id list
+
~~~
+

# Acceptance criteria

+
## Shows config as JSON
+

+
_Requirement:_ The CI broker can write out the configuration is uses
+
at run time as JSON.
+

+
_Justification:_ This is helpful for the node operator to verify that
+
they have configured the program correctly.
+

+
_Stakeholder:_ Lars
+

+
Our verification here is quite simplistic, and only checks that the
+
output is in the JSON format. It does not try to make sure the JSON
+
matches the YAML semantically.
+

+
~~~scenario
+
given an installed cib
+
given file broker.yaml
+
when I run cib --config broker.yaml config --output actual.json
+
when I run jq . actual.json
+
then command is successful
+
~~~
+

+

## Smoke test: Runs adapter

_Requirement:_ CI broker can run its adapter.
@@ -132,36 +226,30 @@ nothing else has a hope of working.
_Stakeholder:_ Lars.

~~~scenario
-
given a directory homedir
-
when I run env HOME=homedir RAD_PASSPHRASE= RAD_HOME=homedir/.radicle rad auth --alias brokertest
-

-
when I run env HOME=homedir git config --global user.email radicle@example.com
-
when I run env HOME=homedir git config --global user.name TestyMcTestFace
-
when I run env HOME=homedir git init testy
-
given file testy/test.txt from dummy.sh
-

-
when I run, in testy, env HOME=../homedir git add .
-
when I run, in testy, env HOME=../homedir git commit -am test
-
when I run, in testy, env HOME=../homedir git status
-
when I run, in testy, env HOME=../homedir RAD_HOME=../homedir/.radicle RAD_PASSPHRASE= rad init --name testy --description test --default-branch master --private --no-confirm --no-seed
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh

given an installed synthetic-events
-
given file trigger.json
-
given file shutdown.json
+
given file refsfetched.json
given file set-rid
-
when I run env HOME=../homedir python3 set-rid trigger.json testy
-
when I run synthetic-events synt.sock trigger.json shutdown.json --log log.txt
+
when I run env HOME=../homedir python3 set-rid refsfetched.json testy
+
when I run synthetic-events synt.sock refsfetched.json --log log.txt

-
given an installed ci-broker
+
given an installed cib
given a directory reports
given file broker.yaml
given file adapter.sh from dummy.sh
when I run chmod +x adapter.sh

-
when I run sed -i 's/"auto"/false/' homedir/.radicle/config.json
-
when I try to run env HOME=homedir RAD_HOME=homedir/.radicle RAD_SOCKET=synt.sock RUST_LOG=debug ci-broker broker.yaml
-
then command is successful
-
then file reports/index.html contains "xyzzy"
+
when I run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml process-events
+

+
given an installed cibtool
+
when I run cibtool --db ci-broker.db event list
+
then stdout is exactly ""
+

+
when I run cibtool --db ci-broker.db run list
+
then stdout contains "id: "xyzzy""
~~~


@@ -181,41 +269,27 @@ user can access individual logs.
_Stakeholder:_ Lars.

~~~scenario
-
given a directory homedir
-
when I run env HOME=homedir RAD_PASSPHRASE= RAD_HOME=homedir/.radicle rad auth --alias brokertest
-

-
when I run env HOME=homedir git config --global user.email radicle@example.com
-
when I run env HOME=homedir git config --global user.name TestyMcTestFace
-
when I run env HOME=homedir git init testy
-
given file testy/test.txt from adapter-with-url.sh
-

-
when I run, in testy, env HOME=../homedir git add .
-
when I run, in testy, env HOME=../homedir git commit -am test
-
when I run, in testy, env HOME=../homedir git status
-
when I run, in testy, env HOME=../homedir RAD_HOME=../homedir/.radicle RAD_PASSPHRASE= rad init --name testy --description test --default-branch master --private --no-confirm --no-seed
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh

given an installed synthetic-events
-
given file trigger.json
-
given file shutdown.json
+
given file refsfetched.json
given file set-rid
-
when I run env HOME=../homedir python3 set-rid trigger.json testy
-
when I run synthetic-events synt.sock trigger.json shutdown.json --log log.txt
+
when I run env HOME=../homedir python3 set-rid refsfetched.json testy
+
when I run synthetic-events synt.sock refsfetched.json --log log.txt

-
given an installed ci-broker
-
given a directory reports
-
given file broker.yaml
given file adapter.sh from adapter-with-url.sh
when I run chmod +x adapter.sh

-
when I run sed -i 's/"auto"/false/' homedir/.radicle/config.json
-
when I try to run env HOME=homedir RAD_HOME=homedir/.radicle RAD_SOCKET=synt.sock RUST_LOG=debug ci-broker broker.yaml
-
then command is successful
+
given an installed cib
+
given file broker.yaml
+
given a directory reports

-
when I run cat reports/index.html
-
then file reports/index.html contains "https://ci.example.com/xyzzy"
+
when I try to run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml insert
+
then command is successful
~~~

-

~~~{#adapter-with-url.sh .file .sh}
#!/bin/bash
set -euo pipefail
@@ -235,17 +309,18 @@ reduces the support burden on the Radicle project.
_Stakeholder:_ Lars.

~~~scenario
-
given a directory homedir
-
when I run env HOME=homedir RAD_HOME=homedir/.radicle RAD_PASSPHRASE= rad auth --alias brokertest
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh

-
given an installed ci-broker
+
given an installed cib
given file broker.yaml
-
when I run sed -i 's/"auto"/false/' homedir/.radicle/config.json
-
when I try to run env HOME=homedir RAD_HOME=homedir/.radicle RAD_SOCKET=xyzzy.sock ci-broker broker.yaml
+
when I try to run bash radenv.sh RAD_SOCKET=xyzzy.sock cib --config broker.yaml insert
then command fails
-
then stderr contains "ERROR: node control socket does not exist: xyzzy.sock"
+
then stderr contains "node control socket does not exist: xyzzy.sock"
~~~

+

## Gives helpful error message if it doesn't understand its configuration file

_Requirement:_ If the CI broker is given a configuration file that it
@@ -262,16 +337,16 @@ a thing that can always be made better. We can later add more
scenarios if we tighten the acceptance criteria.

~~~scenario
-
given a directory homedir
-
when I run env HOME=homedir RAD_HOME=homedir/.radicle RAD_PASSPHRASE= rad auth --alias brokertest
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh

-
given an installed ci-broker
+
given an installed cib
given file broker.yaml
given file not-yaml.yaml
-
when I run sed -i 's/"auto"/false/' homedir/.radicle/config.json
-
when I try to run env HOME=homedir ci-broker not-yaml.yaml
+
when I try to run env HOME=homedir cib --config not-yaml.yaml config
then command fails
-
then stderr contains "ERROR: failed to parse configuration file as YAML: not-yaml.yaml"
+
then stderr contains "failed to parse configuration file as YAML: not-yaml.yaml"
~~~


@@ -292,20 +367,19 @@ handled by a dedicated system, such as `systemd`.
_Stakeholder:_ Lars.

~~~scenario
-
given a directory homedir
-
when I run env HOME=homedir RAD_HOME=homedir/.radicle RAD_PASSPHRASE= rad auth --alias brokertest
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh

-
given an installed ci-broker
+
given an installed cib
given an installed synthetic-events
when I run synthetic-events synt.sock --log log.txt
given file broker.yaml
-
when I run sed -i 's/"auto"/false/' homedir/.radicle/config.json
-
when I try to run env HOME=homedir RAD_HOME=homedir/.radicle RAD_SOCKET=synt.sock ci-broker broker.yaml
-
then stderr contains "connection to the node control socket broke"
+
when I try to run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml insert
+
then command is successful
~~~


-

## Shuts down when requested

_Requirement:_ The test suite can request the CI broker to shut down
@@ -322,17 +396,47 @@ an object id of all zeros. This should be sufficiently impossible to
happen in real life.

~~~scenario
-
given a directory homedir
-
when I run env HOME=homedir RAD_HOME=homedir/.radicle RAD_PASSPHRASE= rad auth --alias brokertest
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh

-
given an installed ci-broker
+
given an installed cib
given an installed synthetic-events
-
given file shutdown.json
given file broker.yaml
-
when I run synthetic-events synt.sock shutdown.json --log synt.log
-
when I run sed -i 's/"auto"/false/' homedir/.radicle/config.json
-
when I try to run env RUST_LOG=trace HOME=homedir RAD_HOME=homedir/.radicle RAD_SOCKET=synt.sock ci-broker broker.yaml
+
when I run synthetic-events synt.sock --log synt.log
+
when I try to run bash radenv.sh RAD_SOCKET=synt.sock cib --config broker.yaml insert
+
when I run cat synt.log
then command is successful
+
when I run cibtool --db ci-broker.db run list
+
then stdout is exactly ""
+
~~~
+

+

+
## Produces a report page upon request
+

+
_Requirement:_ The node operator can run a command to produce a report
+
of all CI runs a CI broker instance has performed.
+

+
_Justification:_ This is useful for diagnosis, if nothing else.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed cibtool
+
when I run cibtool --db x.db run add --id x --repo rad:zwTxygwuz5LDGBq255RA2CbNGrz8 --alias x --url https://x/1 --branch main --commit f1815dde6ae406d8fe3cec0b96c4486766342716 --who x --finished --failure --timestamp 2024-07-09T02:00:00
+

+
given a directory reports
+
when I run bash radenv.sh cibtool --db x.db report --output-dir reports
+

+
then file reports/index.html exists
+
then file reports/index.html contains "zwTxygwuz5LDGBq255RA2CbNGrz8"
+

+
then file reports/zwTxygwuz5LDGBq255RA2CbNGrz8.html exists
+
then file reports/zwTxygwuz5LDGBq255RA2CbNGrz8.html contains "success"
~~~


@@ -408,3 +512,332 @@ when I run nc -U synt.sock
when I run sleep 0.1
then file synt.sock does not exist
~~~
+

+

+
# Acceptance criteria for persistent database
+

+
The CI broker uses an SQLite database for persistent data. Many
+
processes may need to access or modify the database at the same time.
+
While SQLite is good at managing that, it needs to be used in the
+
right way for everything to work correctly. The acceptance criteria in
+
this chapter address that.
+

+
To enable the verification of these acceptance criteria, the CI broker
+
database allows for a "counter", as a single row in a dedicated table.
+
Concurrency is tested by having multiple processes update the counter
+
at the same time and verifying the end result is as intended and that
+
every value is set exactly once.
+

+
## Count in a single process
+

+
_Requirement:_ A single process can increment the test counter
+
correctly.
+

+
_Justification:_ If this doesn't work with a single process, it won't
+
work of multiple processes, either.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given an installed cibtool
+
then file count.db does not exist
+
when I run cibtool --db count.db counter show
+
then stdout is exactly "0\n"
+
when I run cibtool --db count.db counter count --goal 1000
+
when I run cibtool --db count.db counter show
+
then stdout is exactly "1000\n"
+
~~~
+

+

+
## Insert events into queue
+

+
_Requirement:_ Insert broker events generated from node events into
+
persistent event queue in the database, when allowed by the CI broker
+
event filter.
+

+
_Justification:_ This is fundamental for running CI when repositories
+
in a node change.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed cib
+
given an installed synthetic-events
+

+
given file refsfetched.json
+
when I run synthetic-events synt.sock refsfetched.json --log synt.log
+

+
given file broker.yaml
+
when I try to run bash radenv.sh env RAD_SOCKET=synt.sock cib --config broker.yaml insert
+
then command is successful
+

+
when I run cibtool --db ci-broker.db event list --verbose
+
then stdout contains "RefChanged"
+
then stdout contains "oid: Oid(0000000000000000000000000000000000000000)"
+
then stdout contains "old: Some(Oid(0000000000000000000000000000000000000000))"
+
~~~
+

+
## Insert many events into queue
+

+
_Requirement:_ Insert many events that arrive quickly.
+

+
_Justification:_ We need at least some rudimentary performance testing.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed cib
+
given an installed synthetic-events
+

+
given file refsfetched.json
+
when I run synthetic-events synt.sock refsfetched.json --log synt.log --repeat 1000
+

+
given file broker.yaml
+
when I try to run bash radenv.sh env RAD_SOCKET=synt.sock cib --config broker.yaml insert
+
then command is successful
+

+

+
when I run cibtool --db ci-broker.db event count
+
then stdout is exactly "1000\n"
+
~~~
+

+
## Don't insert events into queue when not allowed by filter
+

+
_Requirement:_ Nothing is inserted into the persistent event queue
+
then the CI broker's filter does not allow any events.
+

+
_Justification:_ This is fundamental for running CI when repositories
+
in a node change.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed cib
+
given an installed synthetic-events
+

+
given file refsfetched.json
+
when I run synthetic-events synt.sock refsfetched.json --log synt.log
+

+
given file broker.yaml from broker-allow-nothing.yaml
+
when I try to run bash radenv.sh env RAD_SOCKET=synt.sock cib --config broker.yaml insert
+
then command is successful
+

+
when I run cibtool --db ci-broker.db event count
+
then stdout is exactly "0\n"
+
~~~
+

+
## Process queued events
+

+
_Requirement:_ It's possible to run the CI broker in a mode where it
+
only processes events from its persistent event queue.
+

+
_Justification:_ This is primarily useful for testing the CI broker
+
queuing implementation.
+

+
_Stakeholders:_ Lars.
+

+
We verify this by adding events to the queue with `cibtool`, and then
+
running the CI broker and verifying it terminates after processing the
+
events. We carefully add a shutdown event so that the CI broker shuts
+
down.
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given file adapter.sh from dummy.sh
+
when I run chmod +x adapter.sh
+

+
given an installed cib
+
given an installed cibtool
+

+
when I run bash radenv.sh cibtool --db ci-broker.db event add --repo testy --ref main --commit HEAD
+
when I run cibtool --db ci-broker.db event shutdown
+

+
given file broker.yaml
+
when I run ls -l adapter.sh
+
when I run bash radenv.sh cib --config broker.yaml queued
+
then stderr contains "Action: run:"
+
then stderr contains "Action: shutdown"
+

+
when I run cibtool --db ci-broker.db event list
+
then stdout is exactly ""
+

+
when I run cibtool --db ci-broker.db run list
+
then stdout contains "Success"
+
~~~
+

+

+
## Count in concurrent processes
+

+
_Requirement:_ Two process can concurrently increment the test counter
+
correctly.
+

+
_Justification:_ This is necessary, if not necessarily sufficient, for
+
concurrent database use to work correctly.
+

+
_Stakeholder:_ Lars.
+

+
Due to limitations in Subplot we mange the concurrent processes using
+
a helper shell script,k `count.sh`, found below. It runs two
+
concurrent `cibtool` processes that update the same database file, and
+
count to a desired goal. The script then verifies that everything went
+
correctly.
+

+
~~~scenario
+
given an installed cibtool
+
given file count.sh
+
when I run env RUST_LOG=debug bash -x count.sh 100 10
+
then stdout contains "OK\n"
+
~~~
+

+
~~~{#count.sh .file .sh}
+
#!/bin/bash
+

+
set -euo pipefail
+

+
run() {
+
	cibtool --db "$DB" counter count --goal "$goal"
+
}
+

+
DB=count.db
+

+
goal="$1"
+
reps="$2"
+

+
for x in $(seq "$reps"); do
+
	echo "Repetition $x"
+

+
	rm -f "$DB" ./?.out
+

+
	run >1.out 2>&1 &
+
	one=$!
+

+
	run >2.out 2>&1 &
+
	two=$!
+

+
	if ! wait "$one"; then
+
		echo "first run failed"
+
		cat 1.out
+
		exit 1
+
	fi
+

+
	if ! wait "$two"; then
+
		echo "second run failed"
+
		cat 2.out
+
		exit 1
+
	fi
+

+
	if grep ERROR ./?.out; then
+
		echo found ERRORs
+
		exit 1
+
	fi
+

+
	n="$(sqlite3 "$DB" 'select counter from counter_test')"
+
	[ "$n" == "$goal" ] || (
+
		echo "wrong count $n"
+
		exit 1
+
	)
+

+
	if awk '/increment to/ { print $NF }' ./?.out | sort -n | uniq -d | grep .; then
+
		echo "duplicate increments"
+
		exit 1
+
	fi
+
done
+

+
echo OK
+
~~~
+
# Acceptance criteria for event queue management
+

+
The CI builder database contains a queue of broker events. The
+
`cibtool` management tool can be used to examine and manipulate the
+
queue.
+

+
## Events can be queued and removed from queue
+

+
_Requirement:_ `cibtool` can show the queued events, can inject an
+
event, and remove an event.
+

+
_Justification:_ This is the minimum functionality needed to manage
+
the event queue.
+

+
_Stakeholder:_ Lars.
+

+
We verify that this works by adding a new broker event, and then
+
removing it. We randomly choose the repository id for the CI broker
+
itself for this test, but the id shouldn't matter, it just needs to
+
be of the correct form.
+

+
~~~scenario
+
given file radenv.sh
+
given file setup-node.sh
+
when I run bash radenv.sh bash setup-node.sh
+

+
given an installed cibtool
+
when I run cibtool --db x.db event list
+
then stdout is exactly ""
+

+
when I run bash radenv.sh cibtool --db x.db event add --repo testy --ref main --commit HEAD --base c0ffee --id-file id.txt
+

+
when I run cibtool --db x.db event show --id-file id.txt
+
then stdout contains "rad:"
+
then stdout contains "main"
+
then stdout contains "c0ffee"
+

+
when I run cibtool --db x.db event remove --id-file id.txt
+

+
when I run cibtool --db x.db event list
+
then stdout is exactly ""
+
~~~
+

+
## Can add shutdown event to queue
+

+
_Requirement:_ `cibtool` can add a shutdown event to the queued
+
events.
+

+
_Justification:_ This is needed for testing, and for the node operator
+
to be able to do this cleanly.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given an installed cibtool
+
when I run cibtool --db x.db event list
+
then stdout is exactly ""
+

+
when I run cibtool --db x.db event shutdown --id-file id.txt
+

+
when I run cibtool --db x.db event show --id-file id.txt
+
then stdout contains "Shutdown"
+
~~~
+
## Add CI run information to database
+

+
_Requirement:_ `cibtool` can add information about a CI run, possibly
+
one that is imaginary.
+

+
_Justification:_ This is primarily needed for testing.
+

+
_Stakeholder:_ Lars.
+

+
~~~scenario
+
given an installed cibtool
+
when I run cibtool --db x.db run list
+
then stdout is exactly ""
+

+
when I run cibtool --db x.db run add --id x --repo rad:zwTxygwuz5LDGBq255RA2CbNGrz8 --alias x --url https://x/1 --branch main --commit f1815dde6ae406d8fe3cec0b96c4486766342716 --who x --finished --failure --timestamp 2024-07-09T02:00:00
+
when I run cibtool --db x.db run list
+
then stdout contains "rad:zwTxygwuz5LDGBq255RA2CbNGrz8"
+
~~~
modified ci-broker.yaml
@@ -3,6 +3,16 @@
    rust:
      function: install_ci_broker

+
- given: "an installed cib"
+
  impl:
+
    rust:
+
      function: install_cib
+

+
- given: "an installed cibtool"
+
  impl:
+
    rust:
+
      function: install_cibtool
+

- given: "an installed synthetic-events"
  impl:
    rust:
modified src/adapter.rs
@@ -19,7 +19,6 @@ use log::{debug, error};

use crate::{
    msg::{MessageError, Request, Response},
-
    pages::StatusPage,
    run::{Run, RunState},
};

@@ -51,31 +50,19 @@ impl Adapter {
        self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
    }

-
    pub fn run(
-
        &self,
-
        trigger: &Request,
-
        run: &mut Run,
-
        status: &mut StatusPage,
-
    ) -> Result<(), AdapterError> {
+
    pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
        debug!("running adapter");
        run.set_state(RunState::Triggered);
-
        status.push_run(run.clone());
-
        let x = self.run_helper(trigger, run, status);
+
        let x = self.run_helper(trigger, run);
        run.set_state(RunState::Finished);
-
        status.push_run(run.clone());
        x
    }

-
    fn run_helper(
-
        &self,
-
        trigger: &Request,
-
        run: &mut Run,
-
        status: &mut StatusPage,
-
    ) -> Result<(), AdapterError> {
+
    fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
-
        debug!("spawn adapter sub-process");
+
        debug!("spawn adapter sub-process: {:?}", self.bin);
        let mut child = Command::new(&self.bin)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
@@ -112,7 +99,6 @@ impl Adapter {
                    if let Some(url) = info_url {
                        run.set_adapter_info_url(&url);
                    }
-
                    status.push_run(run.clone());
                }
                _ => return Err(AdapterError::NotTriggered(resp)),
            }
@@ -127,7 +113,6 @@ impl Adapter {
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
-
                    status.push_run(run.clone());
                }
                _ => return Err(AdapterError::NotFinished(resp)),
            }
@@ -230,11 +215,10 @@ mod test {
    use radicle::git::Oid;
    use radicle::prelude::RepoId;

-
    use super::{Adapter, Run, StatusPage};
+
    use super::{Adapter, Run};
    use crate::{
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
-
        pages::PageBuilder,
        run::Whence,
        test::{log_in_tests, mock_adapter, trigger_request, TestResult},
    };
@@ -251,12 +235,6 @@ mod test {
            "2024-02-29T12:58:12+02:00".into(),
        )
    }
-
    fn status_page() -> StatusPage {
-
        PageBuilder::default()
-
            .node_alias("test.alias")
-
            .build()
-
            .unwrap()
-
    }

    #[test]
    fn adapter_reports_success() -> TestResult<()> {
@@ -273,8 +251,7 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -295,8 +272,7 @@ echo '{"response":"finished","result":"failure"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);

        match x {
            Ok(_) => (),
@@ -324,8 +300,7 @@ exit 1
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -346,8 +321,7 @@ kill -9 $BASHPID
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -372,8 +346,7 @@ kill -9 $BASHPID
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -396,8 +369,7 @@ kill -9 $BASHPID
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Failed(_))));

@@ -419,8 +391,7 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -444,8 +415,7 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -473,8 +443,7 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -494,8 +463,7 @@ echo '{"response":"finished","result":"success"}'
        let bin = tmp.path().join("adapter.sh");

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -523,8 +491,7 @@ echo '{"response":"finished","result":"success"}'
        write(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -556,8 +523,7 @@ echo '{"response":"finished","result":"success"}'
        mock_adapter(&bin, ADAPTER)?;

        let mut run = run();
-
        let mut status = status_page();
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
deleted src/bin/broker-messages.rs
@@ -1,53 +0,0 @@
-
use radicle::git::RefString;
-
use radicle::Profile;
-
use radicle_ci_broker::{
-
    event::BrokerEvent,
-
    msg::{Oid, RepoId, RequestBuilder, Response, RunId, RunResult},
-
};
-

-
fn main() {
-
    let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").expect("create rid");
-
    let commit =
-
        Oid::try_from("b4fb1e347be7db19f0859717062f94116b5bec9f").expect("create commit id");
-
    let name = RefString::try_from("refs/namespaces/z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV/refs/heads/patches/8d8232ddcb217fa1402eec4d955e227ef3bb5881").expect("create name");
-
    let be = BrokerEvent::RefChanged {
-
        rid,
-
        name,
-
        oid: commit,
-
        old: None,
-
    };
-
    let profile = Profile::load().expect("create profile");
-
    let trigger = RequestBuilder::default()
-
        .profile(&profile)
-
        .broker_event(&be)
-
        .build_trigger()
-
        .expect("create trigger");
-
    println!("Trigger request:\n{}\n", trigger);
-

-
    let triggered = Response::triggered(RunId::from("any-string-works-as-run-id"));
-
    println!(
-
        "Triggered response:\n{}\n",
-
        serde_json::to_string(&triggered).expect("serialize message")
-
    );
-

-
    let triggered = Response::triggered_with_url(
-
        RunId::from("any-string-works-as-run-id"),
-
        "https://ci.example.com/xyzzy",
-
    );
-
    println!(
-
        "Triggered response with URL:\n{}\n",
-
        serde_json::to_string(&triggered).expect("serialize message")
-
    );
-

-
    let finished = Response::finished(RunResult::Success);
-
    println!(
-
        "Successful response:\n{}\n",
-
        serde_json::to_string(&finished).expect("serialize message")
-
    );
-

-
    let finished = Response::finished(RunResult::Failure);
-
    println!(
-
        "Failure response:\n{}\n",
-
        serde_json::to_string(&finished).expect("serialize message")
-
    );
-
}
deleted src/bin/ci-broker.rs
@@ -1,170 +0,0 @@
-
use std::{
-
    error::Error,
-
    path::{Path, PathBuf},
-
    process::exit,
-
    thread::{sleep, spawn},
-
    time::Duration,
-
};
-

-
use log::{debug, error, info};
-

-
use radicle::prelude::Profile;
-
use radicle_ci_broker::msg::MessageError;
-
use radicle_ci_broker::{
-
    adapter::Adapter,
-
    broker::Broker,
-
    config::Config,
-
    error::BrokerError,
-
    event::{BrokerEvent, NodeEventSource},
-
    msg::RequestBuilder,
-
    pages::{PageBuilder, PageError, StatusPage},
-
};
-

-
fn main() {
-
    if let Err(e) = fallible_main() {
-
        eprintln!("ERROR: {}", e);
-
        let mut e = e.source();
-
        while let Some(source) = e {
-
            eprintln!("caused by: {}", source);
-
            e = source.source();
-
        }
-
        exit(1);
-
    }
-
}
-

-
fn fallible_main() -> Result<(), BrokerError> {
-
    pretty_env_logger::init();
-
    info!("Radicle CI broker starts");
-

-
    let mut args = std::env::args().skip(1);
-
    let filename: PathBuf = if let Some(filename) = args.next() {
-
        PathBuf::from(filename)
-
    } else {
-
        return Err(BrokerError::Usage);
-
    };
-

-
    let config = Config::load(&filename)?;
-
    debug!("loaded configuration: {:#?}", config);
-

-
    let mut broker = Broker::new(config.db())?;
-
    debug!(
-
        "created broker, db has {} CI runs",
-
        broker.all_runs()?.len()
-
    );
-

-
    // FIXME: this is broken. the config file only lists how to invoke
-
    // each adapter, not what adapter to use for each repo.
-
    // for (rid, spec) in config.adapters.iter() {
-
    //     debug!("setting adapter for {rid:?} to {spec:#?}");
-
    //     let rid = RepoId::from_urn(rid).map_err(|e| BrokerError::BadRepoId(rid.into(), e))?;
-
    //     let adapter = Adapter::new(&spec.command).with_environment(spec.envs());
-
    //     broker.set_repository_adapter(&rid, &adapter);
-
    // }
-
    // debug!("set per-repository adapters");
-

-
    let spec =
-
        config
-
            .adapter(&config.default_adapter)
-
            .ok_or(BrokerError::UnknownDefaultAdapter(
-
                config.default_adapter.clone(),
-
            ))?;
-
    let adapter = Adapter::new(&spec.command)
-
        .with_environment(spec.envs())
-
        .with_environment(spec.sensitive_envs());
-
    broker.set_default_adapter(&adapter);
-
    debug!("set default adapter");
-

-
    let profile = Profile::load()?;
-
    debug!("loaded profile {profile:#?}");
-

-
    let mut source = NodeEventSource::new(&profile)?;
-
    debug!("created node event source");
-

-
    for filter in config.filters.iter() {
-
        source.allow(filter.clone());
-
    }
-
    debug!("added filters to node event source");
-

-
    // Spawn a thread that updates the status pages.
-
    let mut page = PageBuilder::default()
-
        .node_alias(&profile.config.node.alias)
-
        .runs(broker.all_runs()?)
-
        .build()?;
-
    let page2 = page.clone();
-
    let report_dir = if let Some(dir) = &config.report_dir {
-
        dir.to_path_buf()
-
    } else {
-
        PathBuf::from(".")
-
    };
-
    let report_dir2 = report_dir.clone();
-
    let interval = Duration::from_secs(config.status_page_update_interval());
-
    let status_thread = spawn(move || status_updater(report_dir2, page2, interval));
-
    debug!(
-
        "started thread to update status pages in the background: {:?}",
-
        status_thread.thread().id()
-
    );
-

-
    // This loop ends when there's an error, e.g., failure to read an
-
    // event from the node.
-
    'event_loop: loop {
-
        debug!("waiting for event from node");
-
        for e in source.event()? {
-
            match e {
-
                BrokerEvent::Shutdown => break 'event_loop,
-
                BrokerEvent::RefChanged { .. } => {
-
                    info!("broker event {e:#?}");
-
                    let result = RequestBuilder::default()
-
                        .profile(&profile)
-
                        .broker_event(&e)
-
                        .build_trigger();
-
                    match result {
-
                        Ok(req) => {
-
                            if let Err(e) = broker.execute_ci(&req, &mut page) {
-
                                error!("failed to run adapter, or adapter failed to run CI: {e}");
-
                            }
-
                        }
-
                        Err(MessageError::NoEventHandler) => {
-
                            debug!("no handler found for the specific event");
-
                            continue;
-
                        }
-
                        Err(e) => {
-
                            return Err(e.into());
-
                        }
-
                    }
-
                }
-
            }
-
        }
-
    }
-

-
    update_report_page(&report_dir, &mut page)?;
-

-
    Ok(())
-
}
-

-
fn update_report_page(dirname: &Path, page: &mut StatusPage) -> Result<(), PageError> {
-
    if dirname.exists() {
-
        page.update_timestamp();
-
        if let Err(e) = page.write(dirname) {
-
            eprintln!(
-
                "ERROR: failed to update report pages in {}: {e}",
-
                dirname.display()
-
            );
-
            return Err(e);
-
        }
-
    }
-
    Ok(())
-
}
-

-
fn status_updater(dirname: PathBuf, mut page: StatusPage, interval: Duration) {
-
    let filename = dirname.join("status.json");
-
    loop {
-
        page.update_timestamp();
-
        if dirname.exists() {
-
            if let Err(e) = page.write_json(&filename) {
-
                eprintln!("ERROR: failed to update {}: {e}", filename.display());
-
            }
-
            update_report_page(&dirname, &mut page).ok();
-
        }
-
        sleep(interval);
-
    }
-
}
added src/bin/cib.rs
@@ -0,0 +1,290 @@
+
//! The CI broker.
+

+
#![allow(clippy::result_large_err)]
+

+
use std::{
+
    error::Error,
+
    fs::write,
+
    path::{Path, PathBuf},
+
    process::exit,
+
};
+

+
use clap::Parser;
+
use log::{debug, error, info};
+

+
use radicle::Profile;
+

+
use radicle_ci_broker::{
+
    adapter::Adapter,
+
    broker::{Broker, BrokerError},
+
    config::{Config, ConfigError},
+
    db::{Db, DbError},
+
    pages::{PageBuilder, PageError},
+
    queueadd::{AdderError, QueueAdderBuilder},
+
    queueproc::{QueueError, QueueProcessorBuilder},
+
};
+

+
fn main() {
+
    if let Err(e) = fallible_main() {
+
        error!("ERROR: {}", e);
+
        let mut e = e.source();
+
        while let Some(source) = e {
+
            error!("caused by: {}", source);
+
            e = source.source();
+
        }
+
        exit(1);
+
    }
+
}
+

+
fn fallible_main() -> Result<(), CibError> {
+
    let args = Args::parse();
+

+
    pretty_env_logger::init_custom_env("RADICLE_CI_BROKER_LOG");
+
    info!("Radicle CI broker starts");
+

+
    let config = Config::load(&args.config).map_err(|e| CibError::read_config(&args.config, e))?;
+
    debug!("loaded configuration: {:#?}", config);
+

+
    args.run(&config)?;
+

+
    Ok(())
+
}
+

+
#[derive(Debug, Parser)]
+
struct Args {
+
    #[clap(long)]
+
    config: PathBuf,
+

+
    #[clap(subcommand)]
+
    cmd: Cmd,
+
}
+

+
impl Args {
+
    fn run(&self, config: &Config) -> Result<(), CibError> {
+
        match &self.cmd {
+
            Cmd::Config(x) => x.run(self, config)?,
+
            Cmd::Insert(x) => x.run(self, config)?,
+
            Cmd::Queued(x) => x.run(self, config)?,
+
            Cmd::ProcessEvents(x) => x.run(self, config)?,
+
        }
+
        Ok(())
+
    }
+

+
    fn open_db(&self, config: &Config) -> Result<Db, CibError> {
+
        Db::new(&config.db).map_err(CibError::db)
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
enum Cmd {
+
    Config(ConfigCmd),
+
    Insert(InsertCmd),
+
    Queued(QueuedCmd),
+
    ProcessEvents(ProcessEventsCmd),
+
}
+

+
#[derive(Debug, Parser)]
+
struct ConfigCmd {
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+
}
+

+
impl ConfigCmd {
+
    fn run(&self, _args: &Args, config: &Config) -> Result<(), CibError> {
+
        let json = config.to_json();
+

+
        if let Some(output) = &self.output {
+
            write(output, json.as_bytes()).map_err(|e| CibError::write_config(output, e))?;
+
        } else {
+
            println!("{json}");
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
struct InsertCmd {}
+

+
impl InsertCmd {
+
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let adder = QueueAdderBuilder::default()
+
            .db(args.open_db(config)?)
+
            .filters(&config.filters)
+
            .build()
+
            .map_err(CibError::QueueAdder)?;
+
        let thread = adder.add_events_in_thread();
+
        thread
+
            .join()
+
            .expect("wait for thread to finish")
+
            .map_err(CibError::add_events)?;
+
        debug!("cib ends");
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
struct QueuedCmd {}
+

+
impl QueuedCmd {
+
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let db = args.open_db(config)?;
+

+
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let spec =
+
            config
+
                .adapter(&config.default_adapter)
+
                .ok_or(CibError::UnknownDefaultAdapter(
+
                    config.default_adapter.clone(),
+
                ))?;
+
        let adapter = Adapter::new(&spec.command)
+
            .with_environment(spec.envs())
+
            .with_environment(spec.sensitive_envs());
+
        debug!("default adapter: {adapter:?}");
+
        broker.set_default_adapter(&adapter);
+

+
        let processor = QueueProcessorBuilder::default()
+
            .db(db)
+
            .broker(broker)
+
            .build()
+
            .map_err(CibError::process_queue)?;
+
        let thread = processor.process_in_thread();
+
        thread
+
            .join()
+
            .expect("wait for thread to finish")
+
            .map_err(CibError::process_queue)?;
+

+
        debug!("cib ends");
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, Parser)]
+
struct ProcessEventsCmd {}
+

+
impl ProcessEventsCmd {
+
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
+
        let adder = QueueAdderBuilder::default()
+
            .db(args.open_db(config)?)
+
            .filters(&config.filters)
+
            .push_shutdown()
+
            .build()
+
            .map_err(CibError::QueueAdder)?;
+
        let adder = adder.add_events_in_thread();
+

+
        let profile = Profile::load().map_err(CibError::profile)?;
+

+
        let db = args.open_db(config)?;
+

+
        let mut page = PageBuilder::default()
+
            .node_alias(&profile.config.node.alias)
+
            .runs(db.get_all_runs().map_err(CibError::db)?)
+
            .build()
+
            .map_err(CibError::page_updater)?;
+

+
        if let Some(dirname) = &config.report_dir {
+
            page.set_output_dir(dirname);
+
        }
+
        let thread = page.update_in_thread(db, &profile.config.node.alias, true);
+
        thread.join().unwrap().map_err(CibError::page_updater)?;
+

+
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let spec =
+
            config
+
                .adapter(&config.default_adapter)
+
                .ok_or(CibError::UnknownDefaultAdapter(
+
                    config.default_adapter.clone(),
+
                ))?;
+
        let adapter = Adapter::new(&spec.command)
+
            .with_environment(spec.envs())
+
            .with_environment(spec.sensitive_envs());
+
        debug!("default adapter: {adapter:?}");
+
        broker.set_default_adapter(&adapter);
+

+
        let processor = QueueProcessorBuilder::default()
+
            .db(args.open_db(config)?)
+
            .broker(broker)
+
            .build()
+
            .map_err(CibError::process_queue)?;
+
        let processor = processor.process_in_thread();
+
        processor
+
            .join()
+
            .expect("wait for processor thread to finish")
+
            .map_err(CibError::process_queue)?;
+

+
        adder
+
            .join()
+
            .expect("wait for adder thread to finish")
+
            .map_err(CibError::add_events)?;
+

+
        debug!("cib ends");
+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
enum CibError {
+
    #[error("failed to read configuration file {0}")]
+
    ReadConfig(PathBuf, #[source] ConfigError),
+

+
    #[error("failed to write config as JSON to file {0}")]
+
    WriteConfig(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to look up node profile")]
+
    Profile(#[source] radicle::profile::Error),
+

+
    #[error("failed to use SQLite database")]
+
    Db(#[source] DbError),
+

+
    #[error("failed to create report page")]
+
    PageUpdater(#[source] PageError),
+

+
    #[error("failed create broker data type")]
+
    NewBroker(#[source] BrokerError),
+

+
    #[error("failed to add node events into queue")]
+
    QueueAdder(#[source] AdderError),
+

+
    #[error("failed to process events from queue")]
+
    ProcessQueue(#[source] QueueError),
+

+
    #[error("failed to add events to queue")]
+
    AddEvents(#[source] AdderError),
+

+
    #[error("default adapter is not in list of adapters")]
+
    UnknownDefaultAdapter(String),
+
}
+

+
impl CibError {
+
    fn read_config(filename: &Path, e: ConfigError) -> Self {
+
        Self::ReadConfig(filename.into(), e)
+
    }
+

+
    fn write_config(filename: &Path, e: std::io::Error) -> Self {
+
        Self::WriteConfig(filename.into(), e)
+
    }
+

+
    fn profile(e: radicle::profile::Error) -> Self {
+
        Self::Profile(e)
+
    }
+

+
    fn db(e: DbError) -> Self {
+
        Self::Db(e)
+
    }
+

+
    fn page_updater(e: PageError) -> Self {
+
        Self::PageUpdater(e)
+
    }
+

+
    fn new_broker(e: BrokerError) -> Self {
+
        Self::NewBroker(e)
+
    }
+

+
    fn process_queue(e: QueueError) -> Self {
+
        Self::ProcessQueue(e)
+
    }
+

+
    fn add_events(e: AdderError) -> Self {
+
        Self::AddEvents(e)
+
    }
+
}
added src/bin/cibtool.rs
@@ -0,0 +1,713 @@
+
//! A management tool for the CI broker.
+
//!
+
//! This tool lets the node operator query and manage the CI broker's
+
//! database: the persistent event queue, the list of CI runs, etc.
+
//!
+
//! The tool is also used as part of the CI broker's acceptance test
+
//! suite (see the `ci-broker.subplot` document).
+

+
use std::{
+
    error::Error,
+
    fs::{read, write},
+
    path::PathBuf,
+
    process::exit,
+
    str::FromStr,
+
};
+

+
use clap::Parser;
+

+
use radicle::{
+
    git::RefString,
+
    prelude::{NodeId, RepoId},
+
    storage::ReadStorage,
+
    Profile, Storage,
+
};
+
use radicle_git_ext::Oid;
+

+
use radicle_ci_broker::{
+
    broker::BrokerError,
+
    db::{Db, DbError, QueueId},
+
    event::BrokerEvent,
+
    msg::{RunId, RunResult},
+
    pages::{PageBuilder, PageError},
+
    run::{Run, Whence},
+
};
+

+
fn main() {
+
    if let Err(e) = fallible_main() {
+
        eprintln!("ERROR: {}", e);
+
        let mut e = e.source();
+
        while let Some(source) = e {
+
            eprintln!("caused by: {}", source);
+
            e = source.source();
+
        }
+
        exit(1);
+
    }
+
}
+

+
#[allow(clippy::result_large_err)]
+
fn fallible_main() -> Result<(), CibToolError> {
+
    pretty_env_logger::init();
+

+
    let args = Args::parse();
+
    args.run()?;
+

+
    Ok(())
+
}
+

+
/// Radicle CI broker management tool for node operators.
+
///
+
/// Query and update the CI broker database file: the queue of events
+
/// waiting to be processed, the list of CI runs. Also, generate HTML
+
/// report pages from the database.
+
///
+
/// This tool can be used whether the CI broker is running or not.
+
#[derive(Parser)]
+
struct Args {
+
    /// Name of the SQLite database file. The file will be created if
+
    /// it does not already exist. Locking is handled automatically.
+
    #[clap(long)]
+
    db: PathBuf,
+

+
    #[clap(subcommand)]
+
    cmd: Cmd,
+
}
+

+
impl Args {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            Cmd::Counter(x) => x.run(self)?,
+
            Cmd::Event(x) => x.run(self)?,
+
            Cmd::Run(x) => x.run(self)?,
+
            Cmd::Report(x) => x.run(self)?,
+
        }
+
        Ok(())
+
    }
+

+
    #[allow(clippy::result_large_err)]
+
    fn open_db(&self) -> Result<Db, CibToolError> {
+
        Ok(Db::new(&self.db)?)
+
    }
+
}
+

+
#[derive(Parser)]
+
enum Cmd {
+
    /// Manage a counter in the database. This is meant to be used
+
    /// only by the CI broker test suite, not by people.
+
    #[clap(hide = true)]
+
    Counter(CounterCmd),
+

+
    /// Manage the event queue. The events are for Git refs having
+
    /// changed.
+
    Event(EventCmd),
+

+
    /// Manage the list of CI runs.
+
    Run(RunCmd),
+

+
    /// Produce HTML reports based on database contents.
+
    Report(ReportCmd),
+
}
+

+
#[derive(Parser)]
+
struct CounterCmd {
+
    #[clap(subcommand)]
+
    cmd: CounterSubCmd,
+
}
+

+
impl CounterCmd {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            CounterSubCmd::Show(x) => x.run(args)?,
+
            CounterSubCmd::Count(x) => x.run(args)?,
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
enum CounterSubCmd {
+
    /// Show the current value of the counter.
+
    Show(ShowCounter),
+

+
    /// Count until the counter reaches a minimum value.
+
    Count(CountCounter),
+
}
+

+
#[derive(Parser)]
+
struct ShowCounter {}
+

+
impl ShowCounter {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        let counter = db.get_counter()?;
+
        println!("{}", counter.unwrap_or(0));
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct CountCounter {
+
    /// The minimum value which counting aims at.
+
    #[clap(long)]
+
    goal: i64,
+
}
+

+
impl CountCounter {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        Self::inc(&db, self.goal)?;
+

+
        Ok(())
+
    }
+

+
    fn inc(db: &Db, goal: i64) -> Result<(), DbError> {
+
        let mut prev: i64 = -1;
+
        loop {
+
            db.begin()?;
+
            println!("BEGIN");
+

+
            let current = db.get_counter()?;
+
            println!("  current as read={current:?}");
+
            let current = current.unwrap_or(0);
+
            println!("  current: {current}; prev={prev}");
+
            if current < prev {
+
                panic!("current < prev");
+
            }
+
            if current >= goal {
+
                println!("GOAL");
+
                db.rollback()?;
+
                println!("ROLLBACK");
+
                break;
+
            }
+

+
            let new = current + 1;
+
            if (new == 1 && db.create_counter(new).is_err()) || db.update_counter(new).is_err() {
+
                db.rollback()?;
+
                println!("ROLLBACK");
+
            } else {
+
                println!("  increment to {new}");
+
                db.commit()?;
+
                println!("COMMIT");
+
                prev = new;
+
            }
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct EventCmd {
+
    #[clap(subcommand)]
+
    cmd: EventSubCmd,
+
}
+

+
impl EventCmd {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            EventSubCmd::Add(x) => x.run(args)?,
+
            EventSubCmd::Shutdown(x) => x.run(args)?,
+
            EventSubCmd::List(x) => x.run(args)?,
+
            EventSubCmd::Count(x) => x.run(args)?,
+
            EventSubCmd::Show(x) => x.run(args)?,
+
            EventSubCmd::Remove(x) => x.run(args)?,
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
enum EventSubCmd {
+
    /// List events in the queue, waiting to be processed.
+
    List(ListEvents),
+

+
    /// Show the number of events in the queue.
+
    Count(CountEvents),
+

+
    /// Add an event to the queue.
+
    Add(AddEvent),
+

+
    /// Add a shutdown event to the queue.
+
    ///
+
    /// The shutdown event causes the CI broker to terminate.
+
    Shutdown(Shutdown),
+

+
    /// Show an event in the queue.
+
    Show(ShowEvent),
+

+
    /// Remove an event from the queue.
+
    Remove(RemoveEvent),
+
}
+

+
#[derive(Parser)]
+
struct ListEvents {
+
    /// Show more details about the event.
+
    #[clap(long)]
+
    verbose: bool,
+
}
+

+
impl ListEvents {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        for id in db.queued_events()? {
+
            if self.verbose {
+
                if let Some(e) = db.get_queued_event(&id)? {
+
                    println!("{id}: {:?}", e);
+
                } else {
+
                    println!("{id}: No such event");
+
                }
+
            } else {
+
                println!("{id}");
+
            }
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct CountEvents {}
+

+
impl CountEvents {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        println!("{}", db.queued_events()?.len());
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct AddEvent {
+
    /// Set the repository the event refers to. Can be a RID, or the
+
    /// repository name.
+
    #[clap(long)]
+
    repo: String,
+

+
    /// Set the name of the ref the event refers to.
+
    #[clap(long, alias = "ref")]
+
    name: String,
+

+
    /// Set the commit the event refers to. Can be the SHA1 commit id,
+
    /// or a symbolic Git revision, as understood by `git rev-parse`.
+
    /// For example, `HEAD`.
+
    #[clap(long)]
+
    commit: String,
+

+
    /// The base commit referred to by the event. Optional, but must
+
    /// be a SHA commit id.
+
    #[clap(long)]
+
    base: Option<Oid>,
+

+
    /// Write the event to this file, as JSON, instead of adding it to
+
    /// the queue.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+

+
    /// Write the event ID to this file, after adding the event to the
+
    /// queue.
+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl AddEvent {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let rid = if let Ok(rid) = RepoId::from_urn(&self.repo) {
+
            rid
+
        } else {
+
            self.lookup_rid(&self.repo)?
+
        };
+

+
        let oid = if let Ok(rid) = Oid::from_str(&self.commit) {
+
            rid
+
        } else {
+
            self.lookup_commit(rid, &self.commit)?
+
        };
+

+
        let name = format!(
+
            "refs/namespaces/{}/refs/heads/{}",
+
            self.lookup_nid()?,
+
            self.name.as_str()
+
        );
+
        let name = RefString::try_from(name).expect("RefString");
+

+
        let event = BrokerEvent::new(&rid, &name, &oid, self.base);
+

+
        if let Some(output) = &self.output {
+
            let json = serde_json::to_string_pretty(&event)
+
                .map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
+
            std::fs::write(output, json.as_bytes())
+
                .map_err(|e| CibToolError::Write(output.into(), e))?;
+
        } else {
+
            let db = args.open_db()?;
+
            let id = db.push_queued_event(event)?;
+
            println!("{id}");
+

+
            if let Some(filename) = &self.id_file {
+
                write(filename, id.to_string().as_bytes()).expect("write id file");
+
            }
+
        }
+
        Ok(())
+
    }
+

+
    #[allow(clippy::result_large_err)]
+
    fn lookup_nid(&self) -> Result<NodeId, CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        Ok(*profile.id())
+
    }
+

+
    #[allow(clippy::result_large_err)]
+
    fn lookup_rid(&self, wanted: &str) -> Result<RepoId, CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        let storage =
+
            Storage::open(profile.storage(), profile.info()).map_err(CibToolError::Storage)?;
+

+
        let mut rid = None;
+
        let repo_infos = storage.repositories().map_err(CibToolError::Repositories)?;
+
        for ri in repo_infos {
+
            let project = ri
+
                .doc
+
                .project()
+
                .map_err(|e| CibToolError::Project(ri.rid, e))?;
+

+
            if project.name() == wanted {
+
                if rid.is_some() {
+
                    return Err(CibToolError::DuplicateRepositories(wanted.into()));
+
                }
+
                rid = Some(ri.rid);
+
            }
+
        }
+

+
        if let Some(rid) = rid {
+
            Ok(rid)
+
        } else {
+
            Err(CibToolError::NotFound(wanted.into()))
+
        }
+
    }
+

+
    #[allow(clippy::result_large_err)]
+
    fn lookup_commit(&self, rid: RepoId, gitref: &str) -> Result<Oid, CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        let storage =
+
            Storage::open(profile.storage(), profile.info()).map_err(CibToolError::Storage)?;
+
        let repo = storage
+
            .repository(rid)
+
            .map_err(|e| CibToolError::RepoOpen(rid, e))?;
+
        let object = repo
+
            .backend
+
            .revparse_single(gitref)
+
            .map_err(|e| CibToolError::RevParse(gitref.into(), e))?;
+

+
        Ok(object.id().into())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct ShowEvent {
+
    /// ID of event to show.
+
    #[clap(long, required_unless_present = "id_file")]
+
    id: Option<QueueId>,
+

+
    /// Show event as JSON? Default is a debugging format useful for
+
    /// programmers.
+
    #[clap(long)]
+
    json: bool,
+

+
    /// Write output to this file.
+
    #[clap(long)]
+
    output: Option<PathBuf>,
+

+
    /// Read ID of event to show from this file.
+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl ShowEvent {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+

+
        let id = if let Some(id) = &self.id {
+
            id.clone()
+
        } else {
+
            assert!(self.id_file.is_some());
+
            let file = self.id_file.as_ref().unwrap();
+
            let id = read(file).expect("read id file");
+
            let id = String::from_utf8_lossy(&id).to_string();
+
            QueueId::from(&id)
+
        };
+

+
        if let Some(event) = db.get_queued_event(&id)? {
+
            if self.json {
+
                let json = serde_json::to_string_pretty(&event.event())
+
                    .map_err(|e| CibToolError::EventToJson(event.event().clone(), e))?;
+
                if let Some(filename) = &self.output {
+
                    std::fs::write(filename, json.as_bytes())
+
                        .map_err(|e| CibToolError::Write(filename.into(), e))?;
+
                } else {
+
                    println!("{json}");
+
                }
+
            } else {
+
                println!("{event:#?}");
+
            }
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct RemoveEvent {
+
    /// ID of event to remove.
+
    #[clap(long, required_unless_present = "id_file")]
+
    id: Option<QueueId>,
+

+
    /// Read ID of event to remove from this file.
+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl RemoveEvent {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+

+
        let id = if let Some(id) = &self.id {
+
            id.clone()
+
        } else {
+
            assert!(self.id_file.is_some());
+
            let file = self.id_file.as_ref().unwrap();
+
            let id = read(file).expect("read id file");
+
            let id = String::from_utf8_lossy(&id).to_string();
+
            QueueId::from(&id)
+
        };
+

+
        db.remove_queued_event(&id)?;
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct Shutdown {
+
    /// Write ID of the event to this file, after adding the event to
+
    /// the queue.
+
    #[clap(long)]
+
    id_file: Option<PathBuf>,
+
}
+

+
impl Shutdown {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+
        let id = db.push_queued_event(BrokerEvent::Shutdown)?;
+

+
        if let Some(filename) = &self.id_file {
+
            write(filename, id.to_string().as_bytes()).expect("write id file");
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct RunCmd {
+
    #[clap(subcommand)]
+
    cmd: RunSubCmd,
+
}
+

+
impl RunCmd {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        match &self.cmd {
+
            RunSubCmd::Add(x) => x.run(args)?,
+
            RunSubCmd::List(x) => x.run(args)?,
+
        }
+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
enum RunSubCmd {
+
    /// Add information about a CI run to the database.
+
    Add(AddRun),
+

+
    /// List known CI runs on this node to the database.
+
    List(ListRuns),
+
}
+

+
#[derive(Parser)]
+
struct AddRun {
+
    /// Set the run ID.
+
    #[clap(long)]
+
    id: RunId,
+

+
    /// Set alias of node that performed the CI run.
+
    #[clap(long)]
+
    alias: String,
+

+
    /// Set optional URL to information about the CI run.
+
    #[clap(long)]
+
    url: Option<String>,
+

+
    /// Set the repository ID that the CI run for.
+
    #[clap(long)]
+
    repo: RepoId,
+

+
    /// Set timestamp of the CI run.
+
    #[clap(long)]
+
    timestamp: String,
+

+
    /// Set the Git branch used by the CI run.
+
    #[clap(long)]
+
    branch: String,
+

+
    /// Set the commit SHA ID used by the CI run.
+
    #[clap(long)]
+
    commit: Oid,
+

+
    /// Set the author of the commit used by the CI run.
+
    #[clap(long)]
+
    who: Option<String>,
+

+
    /// Set the state of the CI run to "triggered".
+
    #[clap(long, required_unless_present_any = ["running", "finished"])]
+
    triggered: bool,
+

+
    /// Set the state of the CI run to "running".
+
    #[clap(long)]
+
    #[clap(long, required_unless_present_any = ["triggered", "finished"])]
+
    running: bool,
+

+
    /// Set the state of the CI run to "finished".
+
    #[clap(long)]
+
    #[clap(long, required_unless_present_any = ["triggered", "running"])]
+
    finished: bool,
+

+
    /// Mark the finished CI run as successful.
+
    #[clap(long, required_unless_present = "failure")]
+
    success: bool,
+
    /// Mark the finished CI run as having failed.
+

+
    #[clap(long, required_unless_present = "success")]
+
    failure: bool,
+
}
+

+
impl AddRun {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+

+
        let whence = Whence::Branch {
+
            name: self.branch.clone(),
+
            commit: self.commit,
+
            who: self.who.clone(),
+
        };
+
        let mut run = Run::new(self.repo, &self.alias, whence, self.timestamp.clone());
+
        run.set_adapter_run_id(RunId::default());
+
        if let Some(url) = &self.url {
+
            run.set_adapter_info_url(url);
+
        }
+

+
        run.set_result(if self.success {
+
            RunResult::Success
+
        } else {
+
            RunResult::Failure
+
        });
+

+
        db.push_run(run).unwrap();
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct ListRuns {}
+

+
impl ListRuns {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let db = args.open_db()?;
+

+
        for run in db.get_all_runs()? {
+
            println!("{} {run:#?}", run.adapter_run_id().unwrap());
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Parser)]
+
struct ReportCmd {
+
    /// Write HTML files to this directory. The directory must exist:
+
    /// it is not created automatically.
+
    #[clap(long)]
+
    output_dir: PathBuf,
+
}
+

+
impl ReportCmd {
+
    #[allow(clippy::result_large_err)]
+
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+

+
        let db = args.open_db()?;
+

+
        let mut page = PageBuilder::default()
+
            .node_alias(&profile.config.node.alias)
+
            .runs(db.get_all_runs()?)
+
            .build()?;
+

+
        page.set_output_dir(&self.output_dir);
+
        let thread = page.update_in_thread(db, &profile.config.node.alias, true);
+
        thread.join().unwrap()?;
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
#[allow(clippy::large_enum_variant)]
+
enum CibToolError {
+
    #[error("failed to look up node profile")]
+
    Profile(#[source] radicle::profile::Error),
+

+
    #[error("failed to look up open node storage")]
+
    Storage(#[source] radicle::storage::Error),
+

+
    #[error("failed to list repositories in node storage")]
+
    Repositories(#[source] radicle::storage::Error),
+

+
    #[error("failed to look up project info for repository {0}")]
+
    Project(RepoId, #[source] radicle::identity::doc::PayloadError),
+

+
    #[error("node has more than one repository called {0}")]
+
    DuplicateRepositories(String),
+

+
    #[error("node has no repository called: {0}")]
+
    NotFound(String),
+

+
    #[error("failed to open git repository in node storage: {0}")]
+
    RepoOpen(RepoId, #[source] radicle::storage::RepositoryError),
+

+
    #[error("failed to parse git ref as a commit id: {0}")]
+
    RevParse(String, #[source] radicle::git::raw::Error),
+

+
    #[error(transparent)]
+
    Broker(#[from] BrokerError),
+

+
    #[error(transparent)]
+
    Db(#[from] DbError),
+

+
    #[error("failed to serialize broker event to JSON: {0:#?}")]
+
    EventToJson(BrokerEvent, #[source] serde_json::Error),
+

+
    #[error("failed to write file: {0}")]
+
    Write(PathBuf, #[source] std::io::Error),
+

+
    #[error(transparent)]
+
    Page(#[from] PageError),
+
}
deleted src/bin/filter-events.rs
@@ -1,48 +0,0 @@
-
//! Show broker events that are allowed by a filter. This is meant to
-
//! be helpful for testing a filter configuration.
-

-
use log::info;
-
use std::{error::Error, path::PathBuf};
-

-
use radicle::prelude::Profile;
-
use radicle_ci_broker::{config::Config, error::BrokerError, event::NodeEventSource};
-

-
fn main() {
-
    if let Err(e) = fallible_main() {
-
        eprintln!("ERROR: {}", e);
-
        let mut e = e.source();
-
        while let Some(source) = e {
-
            eprintln!("caused by: {}", source);
-
            e = source.source();
-
        }
-
    }
-
}
-

-
fn fallible_main() -> Result<(), BrokerError> {
-
    pretty_env_logger::init();
-

-
    let mut args = std::env::args().skip(1);
-
    let filename: PathBuf = if let Some(filename) = args.next() {
-
        PathBuf::from(filename)
-
    } else {
-
        return Err(BrokerError::Usage);
-
    };
-

-
    info!("using file: {:?}", filename);
-

-
    let config = Config::load(&filename)?;
-

-
    let profile = Profile::load()?;
-
    let mut source = NodeEventSource::new(&profile)?;
-
    for filter in config.filters.iter() {
-
        source.allow(filter.clone());
-
    }
-

-
    // This loop ends when there's an error, e.g., failure to read an
-
    // event from the node.
-
    loop {
-
        for e in source.event()? {
-
            println!("{:#?}", e);
-
        }
-
    }
-
}
deleted src/bin/list_runs.rs
@@ -1,30 +0,0 @@
-
use std::{error::Error, path::PathBuf};
-

-
use radicle_ci_broker::{db::Db, error::BrokerError};
-

-
fn main() {
-
    if let Err(e) = fallible_main() {
-
        eprintln!("ERROR: {}", e);
-
        let mut e = e.source();
-
        while let Some(source) = e {
-
            eprintln!("caused by: {}", source);
-
            e = source.source();
-
        }
-
    }
-
}
-

-
fn fallible_main() -> Result<(), BrokerError> {
-
    let mut args = std::env::args().skip(1);
-
    let filename: PathBuf = if let Some(filename) = args.next() {
-
        PathBuf::from(filename)
-
    } else {
-
        return Err(BrokerError::Usage);
-
    };
-

-
    let mut db = Db::new(&filename)?;
-
    for run in db.all_runs()? {
-
        println!("{run:#?}");
-
    }
-

-
    Ok(())
-
}
deleted src/bin/node-events.rs
@@ -1,33 +0,0 @@
-
use std::str::FromStr;
-

-
use radicle::git::{Oid, RefString};
-
use radicle::{
-
    node::Event,
-
    prelude::{NodeId, RepoId},
-
    storage::RefUpdate,
-
};
-

-
fn main() {
-
    let event = Event::RefsFetched {
-
        remote: NodeId::from_str("z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV").unwrap(),
-
        rid: RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").unwrap(),
-
        updated: vec![RefUpdate::Skipped {
-
            name: RefString::try_from("shutdown").expect("create name"),
-
            oid: Oid::try_from("0").unwrap(),
-
        }],
-
    };
-
    println!("{}", serde_json::to_string(&event).unwrap());
-

-
    println!();
-

-
    let event = Event::RefsFetched {
-
        remote: NodeId::from_str("z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV").unwrap(),
-
        rid: RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").unwrap(),
-
        updated: vec![RefUpdate::Updated {
-
            name: RefString::try_from("shutdown").expect("create name"),
-
            old: Oid::try_from("0").unwrap(),
-
            new: Oid::try_from("0").unwrap(),
-
        }],
-
    };
-
    println!("{}", serde_json::to_string(&event).unwrap());
-
}
deleted src/bin/pagegen.rs
@@ -1,102 +0,0 @@
-
use std::{path::Path, str::FromStr};
-

-
use radicle::git::Oid;
-
use radicle::identity::Did;
-
use radicle::prelude::RepoId;
-

-
use radicle_ci_broker::{
-
    msg::{Author, Revision, RunId, RunResult},
-
    pages::{PageBuilder, PageError},
-
    run::{Run, RunState, Whence},
-
};
-

-
const DOMAIN: &str = "radicle.liw.fi";
-
const RID_1: &str = "rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5";
-
const COMMIT_1: &str = "a48081f2717f069d456ec09f31d9e639b232dbed";
-
const USERID_1: &str = "J. Random Hacker <jrh@example.com>";
-
const AUTHOR_DID_3: &str = "did:key:z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV";
-

-
const DIR: &str = "html";
-

-
fn main() -> Result<(), PageError> {
-
    let mut page = PageBuilder::default().node_alias(DOMAIN).build()?;
-

-
    let rid1 = rid(RID_1);
-
    let mut run1 = Run::new(
-
        rid1,
-
        "heartwood",
-
        Whence::branch("master", oid(COMMIT_1), Some(USERID_1)),
-
        "2024-02-27T18:29:25+02:00".into(),
-
    );
-
    run1.set_adapter_run_id(RunId::default());
-
    run1.set_state(RunState::Running);
-
    run1.set_result(RunResult::Success);
-
    page.push_run(run1.clone());
-

-
    let mut run2 = run1.clone();
-
    run2.set_state(RunState::Finished);
-
    page.push_run(run2);
-

-
    let mut run3 = Run::new(
-
        rid1,
-
        "heartwood",
-
        Whence::patch(
-
            oid(COMMIT_1),
-
            oid(COMMIT_1),
-
            Revision {
-
                id: oid(COMMIT_1),
-
                author: author(AUTHOR_DID_3),
-
                description: "Dummy patch description".into(),
-
                base: oid(COMMIT_1),
-
                oid: oid(COMMIT_1),
-
                timestamp: 0,
-
            },
-
            Some("Helpful Person <helpful@example.com>"),
-
        ),
-
        "2024-02-27T18:29:09+02:00".into(),
-
    );
-
    run3.set_adapter_run_id(RunId::default());
-
    run3.set_state(RunState::Finished);
-
    run3.set_result(RunResult::Failure);
-
    page.push_run(run3);
-

-
    let rid2 = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").unwrap();
-
    let alias2 = "radicle-ci-broker";
-
    let mut run4 = Run::new(
-
        rid2,
-
        alias2,
-
        Whence::branch(
-
            "master",
-
            oid(COMMIT_1),
-
            Some("J. Random Hacker <random@example.com>"),
-
        ),
-
        "2024-02-27T18:29:25+02:00".into(),
-
    );
-
    run4.set_adapter_run_id(RunId::default());
-
    run4.set_state(RunState::Finished);
-
    run4.set_result(RunResult::Success);
-
    page.push_run(run4);
-

-
    page.write(Path::new(DIR)).unwrap();
-

-
    Ok(())
-
}
-

-
fn rid(urn: &str) -> RepoId {
-
    RepoId::from_urn(urn).unwrap()
-
}
-

-
fn oid(commit_id: &str) -> Oid {
-
    Oid::from_str(commit_id).unwrap()
-
}
-

-
fn did(did: &str) -> Did {
-
    Did::from_str(did).unwrap()
-
}
-

-
fn author(id: &str) -> Author {
-
    Author {
-
        id: did(id),
-
        alias: None,
-
    }
-
}
deleted src/bin/parse-broker-messages.rs
@@ -1,11 +0,0 @@
-
use radicle_ci_broker::msg::Response;
-

-
fn main() {
-
    for line in std::io::stdin().lines() {
-
        let line = line.unwrap();
-
        let resp = Response::from_str(&line).unwrap();
-
        println!("{line}");
-
        println!("{resp:#?}");
-
        println!();
-
    }
-
}
modified src/bin/synthetic-events.rs
@@ -1,9 +1,9 @@
-
// Read node events, expressed as JSON, from files, and output them as
-
// single-line JSON objects to a Unix domain socket.
-
//
-
// This is a helper tool for testing of the CI broker as a whole.
-
//
-
// This program forks and runs itself in the background.
+
//! Read node events, expressed as JSON, from files, and output them as
+
//! single-line JSON objects to a Unix domain socket.
+
//!
+
//! This is a helper tool for testing of the CI broker as a whole.
+
//!
+
//! This program forks and runs itself in the background.

use std::{
    env::current_exe,
@@ -37,6 +37,9 @@ fn main() -> anyhow::Result<()> {

        log(&mut f, "daemon starts\n".into());

+
        let repeat = args.repeat.unwrap_or(1);
+
        log(&mut f, format!("repeat={repeat}\n"));
+

        let mut events = vec![];
        for filename in args.events.iter() {
            log(
@@ -61,7 +64,9 @@ fn main() -> anyhow::Result<()> {
            log(&mut f, "daemon accepted connection\n".into());
            let mut stream = stream?;
            for e in events.iter() {
-
                stream.write_all(e.as_bytes())?;
+
                for _ in 0..repeat {
+
                    stream.write_all(e.as_bytes())?;
+
                }
                log(&mut f, "daemon sent a message\n".into());
            }
            log(&mut f, "daemon sent all messages\n".into());
@@ -83,10 +88,12 @@ fn main() -> anyhow::Result<()> {
            remove_file(&args.socket)?;
        }
        eprintln!("launching daemon");
+
        let repeat = args.repeat.unwrap_or(1);
        Command::new(exe)
            .arg("--daemon")
            .arg("--log")
            .arg(args.log.unwrap_or(PathBuf::from("/dev/null")))
+
            .arg(&format!("--repeat={repeat}"))
            .arg(&args.socket)
            .args(&args.events)
            .stdin(Stdio::null())
@@ -110,6 +117,8 @@ struct Args {
    daemon: bool,
    #[clap(long)]
    log: Option<PathBuf>,
+
    #[clap(long)]
+
    repeat: Option<usize>,
    socket: PathBuf,
    events: Vec<PathBuf>,
}
modified src/broker.rs
@@ -3,7 +3,11 @@
//! This is type and module of its own to facilitate automated
//! testing.

-
use std::{collections::HashMap, error::Error, path::Path};
+
use std::{
+
    collections::HashMap,
+
    error::Error,
+
    path::{Path, PathBuf},
+
};

use log::{debug, error, info};
use time::{macros::format_description, OffsetDateTime};
@@ -12,10 +16,8 @@ use radicle::prelude::RepoId;

use crate::{
    adapter::Adapter,
-
    db::Db,
-
    error::BrokerError,
+
    db::{Db, DbError},
    msg::{PatchEvent, PushEvent, Request},
-
    pages::StatusPage,
    run::{Run, Whence},
};

@@ -24,7 +26,6 @@ use crate::{
/// The broker gets repository change events from the local Radicle
/// node, and executes the appropriate adapter to run CI on the
/// repository.
-
#[derive(Debug)]
pub struct Broker {
    default_adapter: Option<Adapter>,
    adapters: HashMap<RepoId, Adapter>,
@@ -44,7 +45,7 @@ impl Broker {

    #[allow(clippy::result_large_err)]
    pub fn all_runs(&mut self) -> Result<Vec<Run>, BrokerError> {
-
        self.db.all_runs().map_err(BrokerError::Db)
+
        Ok(self.db.get_all_runs()?)
    }

    pub fn set_default_adapter(&mut self, adapter: &Adapter) {
@@ -64,11 +65,7 @@ impl Broker {
    }

    #[allow(clippy::result_large_err)]
-
    pub fn execute_ci(
-
        &mut self,
-
        trigger: &Request,
-
        status: &mut StatusPage,
-
    ) -> Result<Run, BrokerError> {
+
    pub fn execute_ci(&mut self, trigger: &Request) -> Result<Run, BrokerError> {
        info!("Start CI run on {trigger:#?}");
        let run = match trigger {
            Request::Trigger {
@@ -106,7 +103,7 @@ impl Broker {
                    // log the error. The `Run` value records the
                    // result of the run.
                    debug!("broker runs adapter");
-
                    if let Err(e) = adapter.run(trigger, &mut run, status) {
+
                    if let Err(e) = adapter.run(trigger, &mut run) {
                        error!("failed to run adapter or it failed to run CI: {e}");
                        let mut e = e.source();
                        while let Some(source) = e {
@@ -122,8 +119,8 @@ impl Broker {
                }
            }
        };
-
        self.db.push_run(&run)?;
        debug!("Finish CI run: {run:#?}");
+
        self.db.push_run(run.clone())?;

        Ok(run)
    }
@@ -134,6 +131,47 @@ fn now() -> String {
    OffsetDateTime::now_utc().format(fmt).expect("format time")
}

+
/// All possible errors from this module.
+
#[derive(Debug, thiserror::Error)]
+
#[allow(clippy::large_enum_variant)]
+
pub enum BrokerError {
+
    /// Error from an node event subscriber.
+
    #[error(transparent)]
+
    NodeEvent(#[from] crate::event::NodeEventError),
+

+
    /// Error from Radicle.
+
    #[error(transparent)]
+
    RadicleProfile(#[from] radicle::profile::Error),
+

+
    /// Error from spawning a sub-process.
+
    #[error("failed to spawn a CI adapter sub-process: {0}")]
+
    SpawnAdapter(PathBuf, #[source] std::io::Error),
+

+
    /// Default adapter is not in list of adapters.
+
    #[error("default adapter is not in list of adapters")]
+
    UnknownDefaultAdapter(String),
+

+
    /// No adapter set for repository and no default adapter set.
+
    #[error("could not determine what adapter to use for repository {0}")]
+
    NoAdapter(RepoId),
+

+
    /// Request is not a trigger message.
+
    #[error("tried to execute CI based on a message that is not a trigger one: {0:#?}")]
+
    NotTrigger(Request),
+

+
    /// Could not convert repository ID from string.
+
    #[error("failed to understand repository id {0:?}")]
+
    BadRepoId(String, #[source] radicle::identity::IdError),
+

+
    /// Patch event doesn't have any revisions.
+
    #[error("expected at least one revision in a patch event")]
+
    NoRevisions,
+

+
    /// Database error.
+
    #[error(transparent)]
+
    Db(#[from] DbError),
+
}
+

#[cfg(test)]
mod test {
    use std::path::Path;
@@ -142,7 +180,6 @@ mod test {
    use super::{Adapter, Broker, RepoId};
    use crate::{
        msg::{RunId, RunResult},
-
        pages::{PageBuilder, StatusPage},
        run::RunState,
        test::{log_in_tests, mock_adapter, trigger_request, TestResult},
    };
@@ -161,13 +198,6 @@ mod test {
        RepoId::from_urn(RID).unwrap()
    }

-
    fn status_page() -> StatusPage {
-
        PageBuilder::default()
-
            .node_alias("test.alias")
-
            .build()
-
            .unwrap()
-
    }
-

    #[test]
    fn has_no_adapters_initially() -> TestResult<()> {
        let tmp = tempdir().unwrap();
@@ -260,8 +290,7 @@ echo '{"response":"finished","result":"success"}'

        let trigger = trigger_request()?;

-
        let mut status = status_page();
-
        let x = broker.execute_ci(&trigger, &mut status);
+
        let x = broker.execute_ci(&trigger);
        assert!(x.is_ok());
        let run = x.unwrap();
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
@@ -294,8 +323,7 @@ exit 1

        let trigger = trigger_request()?;

-
        let mut status = status_page();
-
        let x = broker.execute_ci(&trigger, &mut status);
+
        let x = broker.execute_ci(&trigger);
        assert!(x.is_ok());
        let run = x.unwrap();
        assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
modified src/config.rs
@@ -41,7 +41,12 @@ impl Config {
    pub fn db(&self) -> &Path {
        &self.db
    }
+

+
    pub fn to_json(&self) -> String {
+
        serde_json::to_string_pretty(self).expect("serialize config to JSON")
+
    }
}
+

impl fmt::Debug for Adapter {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
modified src/db.rs
@@ -1,150 +1,648 @@
-
//! Persistent database for CI run information.
+
//! Database abstraction for the Radicle CI broker.
+
//!
+
//! This module is a wrapper around the [SQLite](https://sqlite.org/)
+
//! database. It is meant to suffice for the Radicle CI broker, and
+
//! does not try to be a more generic wrapper.
+
//!
+
//! The database stores the following kinds of data:
+
//!
+
//! - "counter": This is used for testing the implementation of
+
//!   concurrent access to the database. It is not useful for anything
+
//!   else.

use std::{
    fmt,
    path::{Path, PathBuf},
+
    time::Duration,
};

-
use log::info;
-
use sqlite::{Connection, State};
+
use log::{debug, trace};
+
use sqlite::{Connection, State, Statement};
+
use time::{macros::format_description, OffsetDateTime};
+
use uuid::Uuid;

-
use crate::run::Run;
+
use crate::{event::BrokerEvent, msg::RunId, run::Run};

-
const CREATE_TABLES: &str =
-
    "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)";
-

-
const INSERT_ROW: &str = "INSERT OR REPLACE INTO ci_runs (run_id, json) VALUES (:id, :json)";
-

-
const ALL_RUNS: &str = "SELECT json FROM ci_runs";
+
// how long to retry when SQL fails for busy database
+
const MAX_WAIT: Duration = Duration::from_millis(5_000);

+
/// The CI broker database. It stores the data that needs to be
+
/// persistent, even if the process terminates.
pub struct Db {
    filename: PathBuf,
    conn: Connection,
}

-
impl fmt::Debug for Db {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
-
        write!(f, "<Db:{}>", self.filename.display())
+
impl Db {
+
    /// Open or create a database. It is created if it doesn't exist.
+
    /// If it is created, tables are created.
+
    pub fn new<P: AsRef<Path>>(filename: P) -> Result<Self, DbError> {
+
        let filename = filename.as_ref();
+
        debug!("open database {}", filename.display());
+
        let mut db = Db {
+
            filename: filename.into(),
+
            conn: sqlite::open(filename).map_err(|e| DbError::open(filename, e))?,
+
        };
+

+
        let ms = MAX_WAIT.as_millis().try_into().unwrap();
+
        trace!("set busy timeout to {ms} milliseconds");
+
        db.conn
+
            .set_busy_timeout(ms)
+
            .map_err(|e| DbError::busy_timer(filename, e))?;
+

+
        db.create_tables()?;
+

+
        Ok(db)
    }
-
}

-
impl Db {
-
    pub fn new(filename: &Path) -> Result<Self, DbError> {
-
        info!("open database {}", filename.display());
-
        let conn = sqlite::open(filename).map_err(|e| DbError::open(filename, e))?;
+
    fn create_tables(&self) -> Result<(), DbError> {
+
        const TABLES: &[&str] = &[
+
            "CREATE TABLE IF NOT EXISTS counter_test (counter INT)",
+
            "CREATE TABLE IF NOT EXISTS event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT)",
+
            "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)",
+
        ];

-
        info!("create tables");
-
        conn.execute(CREATE_TABLES)
-
            .map_err(|e| DbError::create_tables(CREATE_TABLES, e))?;
+
        for table in TABLES.iter() {
+
            let mut stmt = self.prepare(table)?;
+
            Self::execute_valueless(&mut stmt)?;
+
        }

-
        info!("database setup OK");
-
        Ok(Db {
-
            conn,
-
            filename: filename.into(),
-
        })
+
        Ok(())
+
    }
+

+
    /// Name of database file.
+
    pub fn filename(&self) -> &Path {
+
        &self.filename
+
    }
+

+
    /// Start a transaction.
+
    pub fn begin(&self) -> Result<(), DbError> {
+
        let mut stmt = self.prepare("BEGIN TRANSACTION")?;
+
        Self::execute_valueless(&mut stmt)
+
    }
+

+
    /// Commit a transaction.
+
    pub fn commit(&self) -> Result<(), DbError> {
+
        let mut stmt = self.prepare("COMMIT")?;
+
        Self::execute_valueless(&mut stmt)
    }

-
    pub fn push_run(&mut self, run: &Run) -> Result<(), DbError> {
-
        let json = serde_json::to_string(&run).map_err(DbError::to_json)?;
+
    /// Roll back a transaction.
+
    pub fn rollback(&self) -> Result<(), DbError> {
+
        let mut stmt = self.prepare("ROLLBACK")?;
+
        Self::execute_valueless(&mut stmt)
+
    }

-
        let mut stmt = self
-
            .conn
-
            .prepare(INSERT_ROW)
-
            .map_err(|e| DbError::prepare(INSERT_ROW, e))?;
+
    // Prepare a statement for execution.
+
    fn prepare<'a>(&'a self, sql: &str) -> Result<Stmt<'a>, DbError> {
+
        trace!("prepare {}", sql);
+
        match self.conn.prepare(sql) {
+
            Ok(stmt) => Ok(Stmt::new(sql, stmt)),
+
            Err(e) => Err(DbError::preapre(sql, &self.filename, e)),
+
        }
+
    }

-
        let run_id = format!("{}", run.adapter_run_id().unwrap());
-
        stmt.bind((":id", run_id.as_str()))
-
            .map_err(|e| DbError::bind(":id", e))?;
-
        stmt.bind((":json", json.as_str()))
-
            .map_err(|e| DbError::bind(":json", e))?;
-
        stmt.next().map_err(DbError::insert_run)?;
+
    // Execute a statement that doesn't return any rows with values.
+
    // This means basically any statement except SELECT.
+
    fn execute_valueless(stmt: &mut Stmt) -> Result<(), DbError> {
+
        trace!("execute {}", stmt.sql);
+
        stmt.stmt.reset().map_err(DbError::reset)?;
+
        match stmt.stmt.next() {
+
            Ok(_) => Ok(()),
+
            Err(e) => Err(DbError::execute(&stmt.sql, e)),
+
        }
+
    }
+

+
    /// Create the counter with an initial value. Only use this if
+
    /// there isn't a counter row already.
+
    pub fn create_counter(&self, counter: i64) -> Result<(), DbError> {
+
        let mut insert = self.prepare("INSERT INTO counter_test (counter) VALUES (:1)")?;
+
        insert.stmt.bind((1, counter)).unwrap();
+
        match insert.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::insert_counter(&insert.sql, e)),
+
        }
+
        Ok(())
+
    }

+
    /// Update the counter to have a new value.
+
    pub fn update_counter(&self, counter: i64) -> Result<(), DbError> {
+
        let mut update = self.prepare("UPDATE counter_test SET counter = :1")?;
+
        update.stmt.bind((1, counter)).unwrap();
+
        match update.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::update_counter(&update.sql, e)),
+
        }
        Ok(())
    }

-
    pub fn all_runs(&mut self) -> Result<Vec<Run>, DbError> {
-
        let mut stmt = self
-
            .conn
-
            .prepare(ALL_RUNS)
-
            .map_err(|e| DbError::prepare(ALL_RUNS, e))?;
+
    /// Return the current value of the counter, if any.
+
    pub fn get_counter(&self) -> Result<Option<i64>, DbError> {
+
        let mut select = self.prepare("SELECT counter FROM counter_test")?;
+
        let mut counter = None;
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    counter = Some(select.stmt.read("counter").unwrap());
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_counter(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(counter)
+
    }
+

+
    /// Return list of broker events currently in the queue.
+
    pub fn queued_events(&self) -> Result<Vec<QueueId>, DbError> {
+
        let mut select = self.prepare("SELECT id FROM event_queue")?;
+

+
        let mut ids = vec![];
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let id: String = select
+
                        .stmt
+
                        .read("id")
+
                        .map_err(|e| DbError::list_events(&select.sql, e))?;
+
                    ids.push(QueueId::from(&id));
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::list_events(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(ids)
+
    }
+

+
    /// Return a specific event, given is id, if one exists.
+
    pub fn get_queued_event(&self, id: &QueueId) -> Result<Option<QueuedEvent>, DbError> {
+
        let mut select = self.prepare("SELECT timestamp, event FROM event_queue WHERE id = :id")?;
+
        select
+
            .stmt
+
            .bind((":id", id.as_str()))
+
            .map_err(|e| DbError::bind(&select.sql, e))?;
+

+
        let mut timestamp = None;
+
        let mut event = None;
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    timestamp = Some(
+
                        select
+
                            .stmt
+
                            .read("timestamp")
+
                            .map_err(|e| DbError::get_event(&select.sql, e))?,
+
                    );
+
                    let json: String = select
+
                        .stmt
+
                        .read("event")
+
                        .map_err(|e| DbError::get_event(&select.sql, e))?;
+
                    event = Some(
+
                        serde_json::from_str(&json)
+
                            .map_err(|e| DbError::event_from_json(&json, e))?,
+
                    );
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_event(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        if timestamp.is_some() && event.is_some() {
+
            let qe = QueuedEvent::new(id.clone(), timestamp.unwrap(), event.unwrap());
+
            Ok(Some(qe))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    /// Add a new event to the event queue, returning its id.
+
    pub fn push_queued_event(&self, event: BrokerEvent) -> Result<QueueId, DbError> {
+
        let json = serde_json::to_string(&event).expect("serialize BrokerEvent to JSON");
+

+
        let id = QueueId::default();
+
        let ts = now();
+

+
        let mut insert =
+
            self.prepare("INSERT INTO event_queue (id, timestamp, event) VALUES (:id, :ts, :e)")?;
+
        insert
+
            .stmt
+
            .bind((":id", id.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":ts", ts.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":e", json.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        match insert.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::push_event(&insert.sql, e)),
+
        }
+

+
        Ok(id)
+
    }
+

+
    /// Remove event from queue, given its id. It's OK if the event is
+
    /// not in the queue, that is just silently ignored.
+
    pub fn remove_queued_event(&self, id: &QueueId) -> Result<(), DbError> {
+
        let mut remove = self.prepare("DELETE FROM event_queue WHERE id = :id")?;
+
        remove
+
            .stmt
+
            .bind((":id", id.as_str()))
+
            .map_err(|e| DbError::bind(&remove.sql, e))?;
+

+
        match remove.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::remove_event(&remove.sql, e)),
+
        }
+

+
        Ok(())
+
    }
+

+
    /// Return list of CI runs currently in the database.
+
    pub fn list_runs(&self) -> Result<Vec<RunId>, DbError> {
+
        let mut select = self.prepare("SELECT run_id FROM ci_runs")?;
+

+
        let mut run_ids = vec![];
+

+
        loop {
+
            let next = select.stmt.next();
+
            match next {
+
                Ok(State::Row) => {
+
                    let run_id: String = select
+
                        .stmt
+
                        .read("run_id")
+
                        .map_err(|e| DbError::get_run(&select.sql, e))?;
+
                    let run_id = RunId::from(run_id.as_str());
+
                    run_ids.push(run_id);
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::list_runs(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(run_ids)
+
    }
+

+
    /// Return all CI runs currently in the database.
+
    pub fn get_all_runs(&self) -> Result<Vec<Run>, DbError> {
+
        let mut select = self.prepare("SELECT json FROM ci_runs")?;

        let mut runs = vec![];
-
        while let Ok(State::Row) = stmt.next() {
-
            let json: String = stmt.read("json").map_err(DbError::get_run)?;
-
            let run: Run = serde_json::from_str(&json).map_err(DbError::from_json)?;
-
            runs.push(run);
+

+
        loop {
+
            let next = select.stmt.next();
+
            match next {
+
                Ok(State::Row) => {
+
                    let json: String = select
+
                        .stmt
+
                        .read("json")
+
                        .map_err(|e| DbError::get_run(&select.sql, e))?;
+
                    let run = serde_json::from_str(&json)
+
                        .map_err(|e| DbError::run_from_json(&json, e))?;
+
                    runs.push(run);
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_all_runs(&select.sql, e));
+
                }
+
            }
        }

        Ok(runs)
    }
+

+
    /// Return a specific CI run, given is id, if one exists.
+
    pub fn get_run(&self, id: &RunId) -> Result<Option<Run>, DbError> {
+
        let mut select = self.prepare("SELECT json FROM ci_runs WHERE run_id = :id")?;
+
        select
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&select.sql, e))?;
+

+
        let mut run = None;
+

+
        select.stmt.reset().unwrap();
+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let json: String = select
+
                        .stmt
+
                        .read("json")
+
                        .map_err(|e| DbError::get_run(&select.sql, e))?;
+
                    run = Some(
+
                        serde_json::from_str(&json)
+
                            .map_err(|e| DbError::run_from_json(&json, e))?,
+
                    );
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::get_run(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(run)
+
    }
+

+
    /// Add a new CI run to the database, returning its id.
+
    pub fn push_run(&self, run: Run) -> Result<RunId, DbError> {
+
        assert!(run.adapter_run_id().is_some());
+
        let id = run.adapter_run_id().unwrap().clone();
+

+
        let json = serde_json::to_string(&run).expect("serialize BrokerEvent to JSON");
+

+
        let mut insert = self.prepare("INSERT INTO ci_runs (run_id, json) VALUES (:id, :json)")?;
+
        insert
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+
        insert
+
            .stmt
+
            .bind((":json", json.as_str()))
+
            .map_err(|e| DbError::bind(&insert.sql, e))?;
+

+
        match insert.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::push_run(&insert.sql, e)),
+
        }
+

+
        Ok(id)
+
    }
+

+
    /// Remove a CI run from database, given its id. It's OK if the run is
+
    /// not in the database, that is just silently ignored.
+
    pub fn remove_run(&self, id: &RunId) -> Result<(), DbError> {
+
        let mut remove = self.prepare("DELETE FROM ci_runs WHERE id = :id")?;
+
        remove
+
            .stmt
+
            .bind((":id", id.to_string().as_str()))
+
            .map_err(|e| DbError::bind(&remove.sql, e))?;
+

+
        match remove.stmt.next() {
+
            Ok(_) => (),
+
            Err(e) => return Err(DbError::remove_run(&remove.sql, e)),
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
fn now() -> String {
+
    let fmt =
+
        format_description!("[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:6]Z");
+
    OffsetDateTime::now_utc().format(fmt).expect("format time")
+
}
+

+
// A wrapper around a statement that remembers its text form.
+
struct Stmt<'a> {
+
    sql: String,
+
    stmt: Statement<'a>,
+
}
+

+
impl<'a> Stmt<'a> {
+
    fn new(sql: &str, stmt: Statement<'a>) -> Self {
+
        Self {
+
            sql: sql.into(),
+
            stmt,
+
        }
+
    }
+
}
+

+
/// An identifier for an event in the event queue in the database.
+
#[derive(Clone, Debug)]
+
pub struct QueueId {
+
    id: String,
+
}
+

+
impl fmt::Display for QueueId {
+
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
+
        write!(f, "{}", self.id)
+
    }
+
}
+

+
impl Default for QueueId {
+
    fn default() -> Self {
+
        Self {
+
            id: Uuid::new_v4().to_string(),
+
        }
+
    }
+
}
+

+
impl From<&str> for QueueId {
+
    fn from(id: &str) -> Self {
+
        Self { id: id.into() }
+
    }
+
}
+

+
impl From<&String> for QueueId {
+
    fn from(id: &String) -> Self {
+
        Self { id: id.into() }
+
    }
+
}
+

+
impl QueueId {
+
    fn as_str(&self) -> &str {
+
        self.id.as_str()
+
    }
+
}
+

+
#[derive(Clone, Debug)]
+
pub struct QueuedEvent {
+
    id: QueueId,
+
    ts: String,
+
    event: BrokerEvent,
+
}
+

+
impl QueuedEvent {
+
    fn new(id: QueueId, ts: String, event: BrokerEvent) -> Self {
+
        Self { id, ts, event }
+
    }
+

+
    pub fn id(&self) -> &QueueId {
+
        &self.id
+
    }
+

+
    pub fn timestamp(&self) -> &str {
+
        &self.ts
+
    }
+

+
    pub fn event(&self) -> &BrokerEvent {
+
        &self.event
+
    }
}

/// All errors from this module.
#[derive(Debug, thiserror::Error)]
pub enum DbError {
-
    /// Error opening a database file.
+
    #[error("failed to set a busy timer one SQLite database {0}")]
+
    BusyTimer(PathBuf, #[source] sqlite::Error),
+

    #[error("failed to open SQLite database {0}")]
    Open(PathBuf, #[source] sqlite::Error),

-
    /// Error creating tables.
-
    #[error("failed to create tables: {0}")]
-
    CreateTables(&'static str, #[source] sqlite::Error),
+
    #[error("failed to prepare SQL statement SQLite database {0}: {1}")]
+
    Prepare(String, PathBuf, #[source] sqlite::Error),
+

+
    #[error("failed to reset connection to SQLite")]
+
    Reset(#[source] sqlite::Error),

-
    /// Error preparing an SQL statement.
-
    #[error("failed to prepare SQL statement {0}")]
-
    Prepare(&'static str, #[source] sqlite::Error),
+
    #[error("failed to execute SQL statement in SQLite: {0}")]
+
    Execute(String, #[source] sqlite::Error),

-
    /// Error binding a value to an SQL statement placeholder.
-
    #[error("failed to bind a value to SQL statement placeholder {0}")]
-
    Bind(&'static str, #[source] sqlite::Error),
+
    #[error("failed to bind a value in SQL statement in SQLite: {0}")]
+
    Bind(String, #[source] sqlite::Error),

-
    /// Error inserting or updating a run in SQL database.
-
    #[error("failed to insert or update a run in SQL database")]
-
    InsertRun(#[source] sqlite::Error),
+
    #[error("failed to insert a counter into database")]
+
    InsertCounter(String, #[source] sqlite::Error),

-
    /// Error getting a run from SQL query.
-
    #[error("failed to get CI run from SQL query result")]
-
    GetRun(#[source] sqlite::Error),
+
    #[error("failed to update a counter in database")]
+
    UpdateCounter(String, #[source] sqlite::Error),

-
    /// Error serializing a [`Run`]` into a string.
-
    #[error("failed to serialize a CI run into JSON")]
-
    ToJson(#[source] serde_json::Error),
+
    #[error("failed to retrieve a counter from database")]
+
    GetCounter(String, #[source] sqlite::Error),

-
    /// Error deserializing a [`Run`]` from a string.
-
    #[error("failed to parse JSON as a CI run")]
-
    FromJson(#[source] serde_json::Error),
+
    #[error("failed to insert an event into database")]
+
    InsertEvent(String, #[source] sqlite::Error),
+

+
    #[error("failed to list queued events in database")]
+
    ListEvents(String, #[source] sqlite::Error),
+

+
    #[error("failed to retrieve a queued event in database")]
+
    GetEvent(String, #[source] sqlite::Error),
+

+
    #[error("failed to parse queued event as JSON: {0}")]
+
    EventFromJson(String, #[source] serde_json::Error),
+

+
    #[error("failed to insert an event into queue")]
+
    PushEvent(String, #[source] sqlite::Error),
+

+
    #[error("failed to remove an event from queue")]
+
    RemoveEvent(String, #[source] sqlite::Error),
+

+
    #[error("failed to list CI runs in database")]
+
    ListRuns(String, #[source] sqlite::Error),
+

+
    #[error("failed to get all CI runs from database")]
+
    GetAllRuns(String, #[source] sqlite::Error),
+

+
    #[error("failed to parse CI run as JSON: {0}")]
+
    RunFromJson(String, #[source] serde_json::Error),
+

+
    #[error("failed to retrieve a CI run from database")]
+
    GetRun(String, #[source] sqlite::Error),
+

+
    #[error("failed to insert a CI run into database")]
+
    PushRun(String, #[source] sqlite::Error),
+

+
    #[error("failed to remove a CI run from database")]
+
    RemoveRun(String, #[source] sqlite::Error),
}

impl DbError {
+
    fn busy_timer(filename: &Path, e: sqlite::Error) -> Self {
+
        Self::BusyTimer(filename.into(), e)
+
    }
+

    fn open(filename: &Path, e: sqlite::Error) -> Self {
        Self::Open(filename.into(), e)
    }

-
    fn create_tables(query: &'static str, e: sqlite::Error) -> Self {
-
        Self::CreateTables(query, e)
+
    fn preapre(sql: &str, filename: &Path, e: sqlite::Error) -> Self {
+
        Self::Prepare(sql.into(), filename.into(), e)
+
    }
+

+
    fn reset(e: sqlite::Error) -> Self {
+
        Self::Reset(e)
+
    }
+

+
    fn execute(sql: &str, e: sqlite::Error) -> Self {
+
        Self::Execute(sql.into(), e)
+
    }
+

+
    fn bind(sql: &str, e: sqlite::Error) -> Self {
+
        Self::Bind(sql.into(), e)
+
    }
+

+
    fn insert_counter(sql: &str, e: sqlite::Error) -> Self {
+
        Self::InsertCounter(sql.into(), e)
+
    }
+

+
    fn update_counter(sql: &str, e: sqlite::Error) -> Self {
+
        Self::UpdateCounter(sql.into(), e)
+
    }
+

+
    fn get_counter(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetCounter(sql.into(), e)
+
    }
+

+
    fn list_events(sql: &str, e: sqlite::Error) -> Self {
+
        Self::ListEvents(sql.into(), e)
+
    }
+

+
    fn get_event(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetEvent(sql.into(), e)
+
    }
+

+
    fn event_from_json(json: &str, e: serde_json::Error) -> Self {
+
        Self::EventFromJson(json.into(), e)
+
    }
+

+
    fn push_event(sql: &str, e: sqlite::Error) -> Self {
+
        Self::PushEvent(sql.into(), e)
+
    }
+

+
    fn remove_event(sql: &str, e: sqlite::Error) -> Self {
+
        Self::RemoveEvent(sql.into(), e)
    }

-
    fn prepare(stmt: &'static str, e: sqlite::Error) -> Self {
-
        Self::Prepare(stmt, e)
+
    fn list_runs(sql: &str, e: sqlite::Error) -> Self {
+
        Self::ListRuns(sql.into(), e)
    }

-
    fn bind(placeholder: &'static str, e: sqlite::Error) -> Self {
-
        Self::Bind(placeholder, e)
+
    fn get_all_runs(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetAllRuns(sql.into(), e)
    }

-
    fn insert_run(e: sqlite::Error) -> Self {
-
        Self::InsertRun(e)
+
    fn run_from_json(json: &str, e: serde_json::Error) -> Self {
+
        Self::RunFromJson(json.into(), e)
    }

-
    fn to_json(e: serde_json::Error) -> Self {
-
        Self::ToJson(e)
+
    fn get_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::GetRun(sql.into(), e)
    }

-
    fn from_json(e: serde_json::Error) -> Self {
-
        Self::FromJson(e)
+
    fn push_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::PushRun(sql.into(), e)
    }

-
    fn get_run(e: sqlite::Error) -> Self {
-
        Self::GetRun(e)
+
    fn remove_run(sql: &str, e: sqlite::Error) -> Self {
+
        Self::RemoveRun(sql.into(), e)
    }
}
deleted src/error.rs
@@ -1,78 +0,0 @@
-
//! Possible errors returned by the CI broker library.
-
//!
-
//! Each module has its own error type, this module collects them
-
//! together into a unified type, for callers who don't care to handle
-
//! module errors in specific ways.
-

-
use std::path::PathBuf;
-

-
use radicle::prelude::RepoId;
-

-
use crate::{
-
    adapter::AdapterError,
-
    config::ConfigError,
-
    db::DbError,
-
    msg::{MessageError, Request},
-
    pages::PageError,
-
};
-

-
/// All possible errors from the CI broker messages.
-
#[derive(Debug, thiserror::Error)]
-
#[allow(clippy::large_enum_variant)]
-
pub enum BrokerError {
-
    /// A configuration related error.
-
    #[error(transparent)]
-
    Config(#[from] ConfigError),
-

-
    /// A message related error.
-
    #[error(transparent)]
-
    Message(#[from] MessageError),
-

-
    /// An adapter related error.
-
    #[error(transparent)]
-
    Adapter(#[from] AdapterError),
-

-
    /// Error from an node event subscriber.
-
    #[error(transparent)]
-
    NodeEvent(#[from] crate::event::NodeEventError),
-

-
    /// Error from Radicle.
-
    #[error(transparent)]
-
    RadicleProfile(#[from] radicle::profile::Error),
-

-
    /// Error from spawning a sub-process.
-
    #[error("failed to spawn a CI adapter sub-process: {0}")]
-
    SpawnAdapter(PathBuf, #[source] std::io::Error),
-

-
    /// Usage error.
-
    #[error("usage: radicle-ci-broker CONFIG")]
-
    Usage,
-

-
    /// Default adapter is not in list of adapters.
-
    #[error("default adapter is not in list of adapters")]
-
    UnknownDefaultAdapter(String),
-

-
    /// No adapter set for repository and no default adapter set.
-
    #[error("could not determine what adapter to use for repository {0}")]
-
    NoAdapter(RepoId),
-

-
    /// Request is not a trigger message.
-
    #[error("tried to execute CI based on a message that is not a trigger one: {0:#?}")]
-
    NotTrigger(Request),
-

-
    /// Could not convert repository ID from string.
-
    #[error("failed to understand repository id {0:?}")]
-
    BadRepoId(String, #[source] radicle::identity::IdError),
-

-
    /// Status page error.
-
    #[error(transparent)]
-
    StatusPage(#[from] PageError),
-

-
    /// Database error.
-
    #[error(transparent)]
-
    Db(#[from] DbError),
-

-
    /// Patch event doesn't have any revisions.
-
    #[error("expected at least one revision in a patch event")]
-
    NoRevisions,
-
}
modified src/event.rs
@@ -111,10 +111,6 @@ impl NodeEventSource {
            if let Some(event) = self.events.next() {
                trace!("got event from local node: {event:#?}");
                match event {
-
                    Ok(ref event) if Self::is_shutdown_request(event) => {
-
                        info!("got shutdown request from node control socket");
-
                        return Ok(vec![BrokerEvent::Shutdown]);
-
                    }
                    Ok(event) => {
                        trace!("got node event {:#?}", event);
                        let mut result = vec![];
@@ -140,21 +136,10 @@ impl NodeEventSource {
                }
            } else {
                trace!("no more node events from control socket: iterator ended");
-
                return Err(NodeEventError::BrokenConnection);
+
                return Ok(vec![]);
            }
        }
    }
-

-
    fn is_shutdown_request(event: &Event) -> bool {
-
        if let Event::RefsFetched { updated, .. } = event {
-
            if let Some(RefUpdate::Skipped { name, .. }) = updated.first() {
-
                if name == &RefString::try_from("shutdown").expect("create name") {
-
                    return true;
-
                }
-
            }
-
        }
-
        false
-
    }
}

/// Possible errors from accessing the local Radicle node.
@@ -298,7 +283,7 @@ impl TryFrom<&str> for Filters {
/// A single node event can represent many git refs having changed,
/// but that's hard to process or filter. The broker breaks up such
/// complex events to simpler ones that only affect one ref at a time.
-
#[derive(Debug, Clone, Serialize)]
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BrokerEvent {
    /// Request the CI broker shuts down in an orderly fashion.
    Shutdown,
@@ -322,7 +307,7 @@ pub enum BrokerEvent {
}

impl BrokerEvent {
-
    fn new(rid: &RepoId, name: &RefString, oid: &Oid, old: Option<Oid>) -> Self {
+
    pub fn new(rid: &RepoId, name: &RefString, oid: &Oid, old: Option<Oid>) -> Self {
        Self::RefChanged {
            rid: *rid,
            name: name.clone(),
@@ -331,6 +316,10 @@ impl BrokerEvent {
        }
    }

+
    pub fn shutdown() -> Self {
+
        Self::Shutdown
+
    }
+

    /// Break up a potentially complex node event into a vector of
    /// simpler broker events.
    pub fn from_event(e: &Event) -> Option<Vec<Self>> {
@@ -346,7 +335,7 @@ impl BrokerEvent {
                    RefUpdate::Skipped { name, oid }
                        if name.as_str() == "shutdown" && oid.is_zero() =>
                    {
-
                        events.push(Self::Shutdown);
+
                        events.push(Self::shutdown());
                    }
                    RefUpdate::Created { name, oid } => {
                        events.push(Self::new(rid, name, oid, None));
@@ -364,7 +353,7 @@ impl BrokerEvent {
    }

    /// Is this broker event allowed by a filter?
-
    pub fn is_allowed(&self, filter: &EventFilter) -> bool {
+
    fn is_allowed(&self, filter: &EventFilter) -> bool {
        debug!("is_allowed called: filter={filter:?}");
        let res = self.is_allowed_helper(filter, 0);
        debug!("is_allowed: res={res}");
@@ -434,15 +423,10 @@ impl BrokerEvent {
    /// The RefString will start with `refs/namespaces/<nid>/...`
    pub fn nid(&self) -> Option<NodeId> {
        if let Some(name) = self.name() {
-
            let mut parts = name.split('/');
-
            if let Some(nid) = parts.nth(2) {
-
                let parsed = nid.parse();
-
                if parsed.is_ok() {
-
                    return parsed.ok();
-
                }
-
            }
+
            parse_nid_from_refstring(name)
+
        } else {
+
            None
        }
-
        None
    }

    pub fn patch_id(&self) -> Option<Oid> {
@@ -455,6 +439,74 @@ impl BrokerEvent {
    }
}

+
/// Extract the NID from a the ref string in a repository.
+
/// The RefString should start with `refs/namespaces/<nid>/...`
+
pub fn parse_nid_from_refstring(name: &RefString) -> Option<NodeId> {
+
    let pat = Regex::new(r"^refs/namespaces/(?P<nid>[^/]+)/").unwrap();
+
    if let Some(captures) = pat.captures(name.as_str()) {
+
        if let Some(m) = captures.name("nid") {
+
            if let Ok(parsed) = m.as_str().parse() {
+
                return Some(parsed);
+
            }
+
        }
+
    }
+
    None
+
}
+

+
#[cfg(test)]
+
mod test_broker_event {
+
    use std::str::FromStr;
+

+
    use super::{BrokerEvent, NodeId, Oid, RefString, RepoId};
+

+
    #[test]
+
    fn name_for_branch() {
+
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").expect("RepoId");
+
        let name = RefString::try_from("main").expect("RefString");
+
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a").expect("Oid");
+
        let be = BrokerEvent::new(&rid, &name, &oid, None);
+
        assert_eq!(be.name(), Some(&name));
+
    }
+

+
    #[test]
+
    fn name_for_shutdown() {
+
        let be = BrokerEvent::shutdown();
+
        assert_eq!(be.name(), None);
+
    }
+

+
    #[test]
+
    fn nid_for_plain_branch_name() {
+
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").expect("RepoId");
+
        let name = RefString::try_from("main").expect("RefString");
+
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a").expect("Oid");
+
        let be = BrokerEvent::new(&rid, &name, &oid, None);
+
        assert_eq!(be.nid(), None);
+
    }
+

+
    #[test]
+
    fn nid_for_ref_without_namespace() {
+
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").expect("RepoId");
+
        let name = RefString::try_from("something/else/main").expect("RefString");
+
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a").expect("Oid");
+
        let be = BrokerEvent::new(&rid, &name, &oid, None);
+
        assert_eq!(be.nid(), None);
+
    }
+

+
    #[test]
+
    fn nid_for_ref_with_namespace() {
+
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8").expect("RepoId");
+
        let name = RefString::try_from(
+
            "refs/namespaces/z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV/main",
+
        )
+
        .expect("RefString");
+
        let nid =
+
            NodeId::from_str("z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV").expect("NodeId");
+
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a").expect("Oid");
+
        let be = BrokerEvent::new(&rid, &name, &oid, None);
+
        assert_eq!(be.nid(), Some(nid));
+
    }
+
}
+

/// Parsed reference to one of the supported types
/// Patch with patch ID
/// Push with branch name
@@ -480,25 +532,85 @@ pub enum ParsedRef {
/// }
/// ```
pub fn parse_ref(s: &str) -> Option<ParsedRef> {
+
    trace!("parse_ref: s={s:?}");
+

    let patch_re = Regex::new(r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$").unwrap();
    if let Some(patch_captures) = patch_re.captures(s) {
+
        trace!("parse_ref: patch_captures={patch_captures:?}");
        if let Some(patch_id) = patch_captures.get(1) {
+
            trace!("parse_ref: patch_id={patch_id:?}");
            let patch_id_str = patch_id.as_str();
            let oid = Oid::try_from(patch_id_str).unwrap();
+
            trace!("parse_ref: patch oid={oid:?}");
            return Some(ParsedRef::Patch(oid));
        }
    }

-
    let push_re = Regex::new(r"^refs/namespaces/[^/]+/refs/heads/([^/]+)$").unwrap();
+
    let push_re = Regex::new(r"^refs/namespaces/[^/]+/refs/heads/(.+)$").unwrap();
    if let Some(push_captures) = push_re.captures(s) {
+
        trace!("parse_ref: push_captures={push_captures:?}");
        if let Some(branch) = push_captures.get(1) {
+
            trace!("parse_ref: branch={branch:?}");
            return Some(ParsedRef::Push(branch.as_str().to_string()));
        }
    }

+
    trace!("parse_ref: neither push nor patch");
    None
}

+
#[cfg(test)]
+
fn log_init() {
+
    let _ = env_logger::builder()
+
        .is_test(true)
+
        .format_timestamp(None)
+
        .filter_level(log::LevelFilter::Trace)
+
        .try_init();
+
}
+

+
#[cfg(test)]
+
mod test_parse_ref {
+
    use super::{log_init, parse_ref, Oid, ParsedRef};
+

+
    #[test]
+
    fn plain_branch_name_is_none() {
+
        log_init();
+
        assert_eq!(parse_ref("main"), None);
+
    }
+

+
    #[test]
+
    fn namespaced_branch() {
+
        log_init();
+
        assert_eq!(
+
            parse_ref(
+
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/main"
+
            ),
+
            Some(ParsedRef::Push("main".into()))
+
        );
+
    }
+

+
    #[test]
+
    fn namespaced_branch_with_slashes() {
+
        log_init();
+
        assert_eq!(
+
            parse_ref(
+
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/liw/cob/draft/v2"
+
            ),
+
            Some(ParsedRef::Push("liw/cob/draft/v2".into()))
+
        );
+
    }
+

+
    #[test]
+
    fn namespaced_patch() {
+
        log_init();
+
        const SHA: &str = "0a4c69183fc8b8d849f5ab977d70f2a1f4788bca";
+
        assert_eq!(
+
            parse_ref(&format!("refs/namespaces/NID/refs/heads/patches/{SHA}")),
+
            Some(ParsedRef::Patch(Oid::try_from(SHA).unwrap()))
+
        );
+
    }
+
}
+

pub fn is_patch_update(name: &str) -> Option<&str> {
    let mut parts = name.split("/refs/cobs/xyz.radicle.patch/");
    if let Some(suffix) = parts.nth(1) {
@@ -521,10 +633,11 @@ pub fn push_branch(name: &str) -> String {

#[cfg(test)]
mod test {
-
    use super::{is_patch_update, parse_ref, push_branch, Oid, ParsedRef};
+
    use super::{is_patch_update, log_init, parse_ref, push_branch, Oid, ParsedRef};

    #[test]
    fn test_parse_patch_ref() {
+
        log_init();
        let patch_ref =
            "refs/namespaces/NID/refs/heads/patches/9183ed6232687d3105482960cecb01a53018b80a";

@@ -538,9 +651,11 @@ mod test {

    #[test]
    fn test_parse_push_ref() {
+
        log_init();
        let push_ref =
            "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main";
        let parsed_ref = parse_ref(push_ref);
+
        eprintln!("parsed_ref={parsed_ref:#?}");
        assert!(parsed_ref.is_some());
        if let Some(ref parsed) = parsed_ref {
            match parsed {
@@ -552,6 +667,7 @@ mod test {

    #[test]
    fn test_parse_invalid_ref() {
+
        log_init();
        let invalid_ref = "invalid_ref";
        let parsed_ref = parse_ref(invalid_ref);
        assert!(parsed_ref.is_none());
@@ -559,6 +675,7 @@ mod test {

    #[test]
    fn branch_is_not_patch_update() {
+
        log_init();
        assert_eq!(
            is_patch_update(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main"
@@ -569,6 +686,7 @@ mod test {

    #[test]
    fn patch_branch_is_not_patch_update() {
+
        log_init();
        assert_eq!(
            is_patch_update(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/patches/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
@@ -579,6 +697,7 @@ mod test {

    #[test]
    fn patch_update() {
+
        log_init();
        assert_eq!(
            is_patch_update(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/cobs/xyz.radicle.patch/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
@@ -589,6 +708,7 @@ mod test {

    #[test]
    fn get_push_branch() {
+
        log_init();
        assert_eq!(
            push_branch(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/branch_name"
@@ -599,6 +719,7 @@ mod test {

    #[test]
    fn get_no_push_branch() {
+
        log_init();
        assert_eq!(
            push_branch(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/rad/sigrefs"
modified src/lib.rs
@@ -9,10 +9,11 @@ pub mod adapter;
pub mod broker;
pub mod config;
pub mod db;
-
pub mod error;
pub mod event;
pub mod msg;
pub mod pages;
+
pub mod queueadd;
+
pub mod queueproc;
pub mod run;
#[cfg(test)]
pub mod test;
modified src/pages.rs
@@ -12,6 +12,8 @@ use std::{
    fs::write,
    path::{Path, PathBuf},
    sync::{Arc, Mutex, MutexGuard},
+
    thread::{sleep, spawn, JoinHandle},
+
    time::Duration,
};

use html_page::{Document, Element, Tag};
@@ -22,6 +24,7 @@ use time::{macros::format_description, OffsetDateTime};
use radicle::prelude::RepoId;

use crate::{
+
    db::{Db, DbError},
    event::BrokerEvent,
    msg::{RunId, RunResult},
    run::{Run, RunState, Whence},
@@ -29,6 +32,7 @@ use crate::{

const CSS: &str = include_str!("radicle-ci.css");
const REFERESH_INTERVAL: &str = "300";
+
const UPDATE_INTERVAL: u64 = 300;

/// All possible errors returned from the status page module.
#[derive(Debug, thiserror::Error)]
@@ -41,6 +45,9 @@ pub enum PageError {

    #[error("no status data has been set for builder")]
    NoStatusData,
+

+
    #[error(transparent)]
+
    Db(#[from] DbError),
}

/// A builder for constructing a [`StatusPage`] value. It will only
@@ -439,12 +446,16 @@ impl PageData {
/// repository.
pub struct StatusPage {
    data: Arc<Mutex<PageData>>,
+
    node_alias: String,
+
    dirname: Option<PathBuf>,
}

impl StatusPage {
    fn new(data: PageData) -> Self {
        Self {
            data: Arc::new(Mutex::new(data)),
+
            node_alias: String::new(),
+
            dirname: None,
        }
    }

@@ -452,6 +463,10 @@ impl StatusPage {
        self.data.lock().expect("lock StatusPage::data")
    }

+
    pub fn set_output_dir(&mut self, dirname: &Path) {
+
        self.dirname = Some(dirname.into());
+
    }
+

    pub fn update_timestamp(&mut self) {
        let mut data = self.lock();
        data.timestamp = now();
@@ -473,6 +488,34 @@ impl StatusPage {
        }
    }

+
    pub fn update_in_thread(
+
        mut self,
+
        db: Db,
+
        node_alias: &str,
+
        once: bool,
+
    ) -> JoinHandle<Result<(), PageError>> {
+
        self.node_alias = node_alias.into();
+
        spawn(move || loop {
+
            self.update_and_write(&db)?;
+
            if once {
+
                return Ok(());
+
            }
+
            sleep(Duration::from_secs(UPDATE_INTERVAL));
+
        })
+
    }
+

+
    fn update_and_write(&mut self, db: &Db) -> Result<(), PageError> {
+
        if let Some(dirname) = &self.dirname {
+
            let mut page = PageBuilder::default()
+
                .node_alias(&self.node_alias)
+
                .runs(db.get_all_runs()?)
+
                .build()?;
+

+
            page.write(dirname)?;
+
        }
+
        Ok(())
+
    }
+

    /// Write the status page (as index.html) and per-repository pages
    /// (`<RID>.html`) into the directory given as an argument. The directory must exist.
    pub fn write(&mut self, dirname: &Path) -> Result<(), PageError> {
@@ -525,6 +568,8 @@ impl Clone for StatusPage {
    fn clone(&self) -> Self {
        Self {
            data: Arc::clone(&self.data),
+
            node_alias: self.node_alias.clone(),
+
            dirname: self.dirname.clone(),
        }
    }
}
added src/queueadd.rs
@@ -0,0 +1,106 @@
+
use std::thread::{spawn, JoinHandle};
+

+
use radicle::Profile;
+

+
use log::{debug, info};
+

+
use crate::{
+
    db::{Db, DbError},
+
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
+
};
+

+
#[derive(Default)]
+
pub struct QueueAdderBuilder {
+
    db: Option<Db>,
+
    filters: Option<Vec<EventFilter>>,
+
    push_shutdown: bool,
+
}
+

+
impl QueueAdderBuilder {
+
    pub fn build(self) -> Result<QueueAdder, AdderError> {
+
        Ok(QueueAdder {
+
            db: self.db.ok_or(AdderError::Missing("db"))?,
+
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
+
            push_shutdown: self.push_shutdown,
+
        })
+
    }
+

+
    pub fn db(mut self, db: Db) -> Self {
+
        self.db = Some(db);
+
        self
+
    }
+

+
    pub fn filters(mut self, filters: &[EventFilter]) -> Self {
+
        self.filters = Some(filters.to_vec());
+
        self
+
    }
+

+
    pub fn push_shutdown(mut self) -> Self {
+
        self.push_shutdown = true;
+
        self
+
    }
+
}
+

+
pub struct QueueAdder {
+
    filters: Vec<EventFilter>,
+
    db: Db,
+
    push_shutdown: bool,
+
}
+

+
impl QueueAdder {
+
    pub fn add_events_in_thread(self) -> JoinHandle<Result<(), AdderError>> {
+
        spawn(move || self.add_events())
+
    }
+

+
    pub fn add_events(&self) -> Result<(), AdderError> {
+
        let profile = Profile::load()?;
+
        debug!("loaded profile {profile:#?}");
+

+
        let mut source = NodeEventSource::new(&profile)?;
+
        debug!("created node event source");
+

+
        for filter in self.filters.iter() {
+
            source.allow(filter.clone());
+
        }
+
        debug!("added filters to node event source");
+

+
        // This loop ends when there's an error, e.g., failure to read an
+
        // event from the node.
+
        'event_loop: loop {
+
            debug!("waiting for event from node");
+
            let events = source.event()?;
+
            if events.is_empty() {
+
                break 'event_loop;
+
            } else {
+
                for e in events {
+
                    debug!("got event {e:#?}");
+
                    info!("insert broker event into queue: {e:#?}");
+
                    self.db.push_queued_event(e)?;
+
                }
+
            }
+
        }
+

+
        // Add a shutdown event to the queue so that the queue
+
        // processing thread knows to stop.
+
        if self.push_shutdown {
+
            self.db.push_queued_event(BrokerEvent::Shutdown)?;
+
        }
+

+
        Ok(())
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum AdderError {
+
    #[error("programming error: QueueAdderBuilder field {0} was not set")]
+
    Missing(&'static str),
+

+
    #[error(transparent)]
+
    Profile(#[from] radicle::profile::Error),
+

+
    #[error(transparent)]
+
    NodeEvent(#[from] NodeEventError),
+

+
    #[error(transparent)]
+
    Db(#[from] DbError),
+
}
added src/queueproc.rs
@@ -0,0 +1,163 @@
+
//! Process events in the persistent event queue.
+

+
#![allow(clippy::result_large_err)]
+

+
use std::{
+
    thread::{sleep, spawn, JoinHandle},
+
    time::Duration,
+
};
+

+
use log::debug;
+
use radicle::Profile;
+

+
use crate::{
+
    broker::Broker,
+
    broker::BrokerError,
+
    db::{Db, DbError, QueueId, QueuedEvent},
+
    event::BrokerEvent,
+
    msg::{MessageError, RequestBuilder},
+
};
+

+
const WAIT_FOR_EVENTS_DURATION: Duration = Duration::from_millis(10_000);
+

+
#[derive(Default)]
+
pub struct QueueProcessorBuilder {
+
    db: Option<Db>,
+
    broker: Option<Broker>,
+
}
+

+
impl QueueProcessorBuilder {
+
    pub fn build(self) -> Result<QueueProcessor, QueueError> {
+
        Ok(QueueProcessor {
+
            db: self.db.ok_or(QueueError::Missing("db"))?,
+
            profile: Profile::load().map_err(QueueError::Profile)?,
+
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
+
        })
+
    }
+

+
    pub fn db(mut self, db: Db) -> Self {
+
        self.db = Some(db);
+
        self
+
    }
+

+
    pub fn broker(mut self, broker: Broker) -> Self {
+
        self.broker = Some(broker);
+
        self
+
    }
+
}
+

+
pub struct QueueProcessor {
+
    db: Db,
+
    profile: Profile,
+
    broker: Broker,
+
}
+

+
impl QueueProcessor {
+
    pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
+
        spawn(move || self.process_until_shutdown())
+
    }
+

+
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
+
        let mut done = false;
+
        while !done {
+
            if let Some(qe) = self.pick_event()? {
+
                debug!("picked event from queue: {}: {:#?}", qe.id(), qe.event());
+
                done = self.process_event(qe.event())?;
+
                self.drop_event(qe.id())?;
+
            } else {
+
                self.wait_for_events();
+
            }
+
        }
+

+
        Ok(())
+
    }
+

+
    fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
+
        let ids = self.db.queued_events().map_err(QueueError::db)?;
+
        debug!("event queue: {ids:?}");
+

+
        let mut queue = vec![];
+
        for id in ids.iter() {
+
            if let Some(qe) = self.db.get_queued_event(id).map_err(QueueError::db)? {
+
                queue.push(qe);
+
            }
+
        }
+
        queue.sort_by_cached_key(|qe| qe.timestamp().to_string());
+

+
        if let Some(qe) = queue.first() {
+
            Ok(Some(qe.clone()))
+
        } else {
+
            Ok(None)
+
        }
+
    }
+

+
    fn process_event(&mut self, event: &BrokerEvent) -> Result<bool, QueueError> {
+
        match event {
+
            BrokerEvent::RefChanged {
+
                rid,
+
                name: _,
+
                oid,
+
                old: _,
+
            } => {
+
                debug!("Action: run: {rid} {oid}");
+

+
                let trigger = RequestBuilder::default()
+
                    .profile(&self.profile)
+
                    .broker_event(event)
+
                    .build_trigger()
+
                    .map_err(|e| QueueError::build_trigger(event, e))?;
+
                self.broker
+
                    .execute_ci(&trigger)
+
                    .map_err(QueueError::execute_ci)?;
+
                Ok(false)
+
            }
+
            BrokerEvent::Shutdown => {
+
                debug!("Action: shutdown");
+
                Ok(true)
+
            }
+
        }
+
    }
+

+
    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
+
        debug!("remove event {id}");
+
        self.db.remove_queued_event(id).map_err(QueueError::db)?;
+
        Ok(())
+
    }
+

+
    fn wait_for_events(&self) {
+
        sleep(WAIT_FOR_EVENTS_DURATION);
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
#[allow(clippy::large_enum_variant)]
+
pub enum QueueError {
+
    #[error("failed to load node profile")]
+
    Profile(#[source] radicle::profile::Error),
+

+
    #[error("programming error: QueueProcessorBuilder field {0} was not set")]
+
    Missing(&'static str),
+

+
    #[error("failed to use SQLite database")]
+
    Db(#[source] DbError),
+

+
    #[error("failed to create a trigger message from broker event {0:?}")]
+
    BuildTrigger(BrokerEvent, #[source] MessageError),
+

+
    #[error("failed to run CI")]
+
    ExecuteCi(#[source] BrokerError),
+
}
+

+
impl QueueError {
+
    fn db(e: DbError) -> Self {
+
        Self::Db(e)
+
    }
+

+
    fn build_trigger(event: &BrokerEvent, err: MessageError) -> Self {
+
        Self::BuildTrigger(event.clone(), err)
+
    }
+

+
    fn execute_ci(e: BrokerError) -> Self {
+
        Self::ExecuteCi(e)
+
    }
+
}
modified src/subplot.rs
@@ -27,6 +27,36 @@ fn install_ci_broker(context: &ScenarioContext) {
#[step]
#[context(SubplotContext)]
#[context(Runcmd)]
+
fn install_cib(context: &ScenarioContext) {
+
    let target_path = bindir();
+
    assert!(target_path.join("cib").exists());
+
    context.with_mut(
+
        |context: &mut Runcmd| {
+
            context.prepend_to_path(target_path);
+
            Ok(())
+
        },
+
        false,
+
    )?;
+
}
+

+
#[step]
+
#[context(SubplotContext)]
+
#[context(Runcmd)]
+
fn install_cibtool(context: &ScenarioContext) {
+
    let target_path = bindir();
+
    assert!(target_path.join("cibtool").exists());
+
    context.with_mut(
+
        |context: &mut Runcmd| {
+
            context.prepend_to_path(target_path);
+
            Ok(())
+
        },
+
        false,
+
    )?;
+
}
+

+
#[step]
+
#[context(SubplotContext)]
+
#[context(Runcmd)]
fn install_synthetic_events(context: &ScenarioContext) {
    let target_path = bindir();
    assert!(target_path.join("synthetic-events").exists());