Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
CI events in the CI broker
Merged liw opened 1 year ago

This patch is not yet ready for merging, but as I’m going away for a few days, I’ll post it for review. The changes touch so many parts of the CI broker that it’s been much too much work to make them. I will try to split changes into smaller patches in the future.

The purpose of these changes it to change the way the CI broker deals with changes in the repositories in its local nodes. The node sends “node events”, via its control socket, which basically say “we got a change from this node, in this repo, which changes these refs to point at these Git objects”. This is simple, but its at a level of abstraction that makes it harder than necessary to think about when combined with continuous integration tasks. Those tasks would need things like “a branch was created”, “a patch was updated”, and so on.

Previously, the CI broker would split the node events (each of which can have any number of ref updates) into simpler “broker events” (which have exactly one ref update).

This patch changes the way the CI broker deals with node events. After these changes, the CI broker turns node events into “CI events”, which are meant to more easily used for CI tasks. https://files.liw.fi/radicle/cub-userguide-draft.html has an initial version of a user guide for the CI broker, also included in this patch, which explains the new CI events.

The fundamental change in the implementation is that CI events are constructed earlier in the process. The updated refs are parsed to determine if a change is for a branch of a patch. This is sub-optimal, but not a change from before. The better way would be to look at COBs, but that’s a bigger change than I’m willing to include in this patch.

(A future patch will add parsing of COBs to allow CI events like “a comment was added to a patch”. Those changes will require the new mechanisms in this patch.)

The broker events are dropped from the code by this patch. To use this new version will require that the queue of broker events in the CI broker database is empty: the new CI broke does not process them.

I’ve not yet tested upgrades, but I expect the process go something like this:

  • stop cib
  • run cib queued to empty the broker event queue
  • run cibtool event list to check the queue is empty
  • upgrade
  • update the cib config to use new CI event filter syntax
  • start cib

Existing CI adapters will work as-is with this patch. The CI broker/adapter interface has not changed in this patch.

Don’t upgrade production instances of the CI broker to this patch.

I hope to merge this next week, and then resume weekly releases of the CI broker. This big change has prevented me from making smaller, unrelated changes, to avoid merge conflicts all over the place.

23 files changed +2198 -1101 975ce518 5483acf4
modified ci-broker.md
@@ -670,9 +670,10 @@ when I try to run bash radenv.sh env RAD_SOCKET=synt.sock cib --config broker.ya
then command is successful

when I run cibtool --db ci-broker.db event list --json
-
then stdout contains "RefChanged"
-
then stdout contains ""oid": "0000000000000000000000000000000000000000""
-
then stdout contains ""old": "0000000000000000000000000000000000000000""
+
then stdout contains "BranchUpdated"
+
then stdout contains ""branch": "main""
+
then stdout contains ""tip": "0000000000000000000000000000000000000000""
+
then stdout contains ""old_tip": "0000000000000000000000000000000000000000""
~~~

## Insert many events into queue
@@ -1151,15 +1152,14 @@ when I run bash radenv.sh cibtool event record --output events.json
then file events.json contains ""type":"refsFetched""
~~~

-
## Convert recorded node events into broker events
+
## Convert recorded node events into CI events

-
_What:_ Node operator can see what broke events are created from node
+
_What:_ Node operator can see what CI events are created from node
events.

-
_Why:_ This is helpful so that node operators can see what broker
-
events are created from node events, which may have been previously
-
recorded. It's also helpful for CI broker developers as a development
-
tool.
+
_Why:_ This is helpful so that node operators can see what CI events
+
are created from node events, which may have been previously recorded.
+
It's also helpful for CI broker developers as a development tool.

_Who:_ `cib-dev`, `node-ops`

@@ -1176,18 +1176,18 @@ when I run synthetic-events synt.sock refsfetched.json --log log.txt

given an installed cibtool
when I run bash radenv.sh cibtool event record --output node-events.json
-
when I run bash radenv.sh cibtool event broker --output broker-events.json node-events.json
-
then file broker-events.json contains "RefChanged""
+
when I run bash radenv.sh cibtool event ci --output ci-events.json node-events.json
+
when I run cat ci-events.json
+
then file ci-events.json contains "BranchUpdated""
~~~


-
## Filter recorded broker events
+
## Filter recorded CI events

-
_What:_ Node operator can see what broker events an event filter
-
allow.
+
_What:_ Node operator can see what CI events an event filter allows.

_Why:_ This is helpful so that node operators can see verify their
-
event filter works as they expect.
+
event filters work as they expect.

_Who:_ `cib-dev`, `node-ops`

@@ -1204,14 +1204,14 @@ when I run synthetic-events synt.sock refsfetched.json --log log.txt

given an installed cibtool
when I run bash radenv.sh cibtool event record --output node-events.json
-
when I run bash radenv.sh cibtool event broker --output broker-events.json node-events.json
+
when I run bash radenv.sh cibtool event ci --output ci-events.json node-events.json

given file allow.yaml
-
when I run cibtool event filter  allow.yaml broker-events.json
-
then stdout contains "RefChanged"
+
when I run cibtool event filter  allow.yaml ci-events.json
+
then stdout contains "BranchUpdated"

given file deny.yaml
-
when I run cibtool event filter deny.yaml broker-events.json
+
when I run cibtool event filter deny.yaml ci-events.json
then stdout is exactly ""
~~~

modified doc/Makefile
@@ -14,9 +14,12 @@
	pikchr-cli $< > $@.tmp
	mv $@.tmp $@

-
all: architecture.html
+
all: architecture.html userguide.html

publish: all
	bash publish.sh

architecture.html: architecture.svg architecture-ext.svg comp.svg comp-ext.svg Makefile
+

+
userguide.html: userguide.subplot userguide.md Makefile
+
	subplot docgen $< --output $@
added doc/userguide.md
@@ -0,0 +1,147 @@
+
# Introduction
+

+
The Radicle CI broker runs CI for repositories in the local Radicle
+
node. This is the user guide for the CI broker.
+

+
The CI broker helps users run validation on changes to their software
+
project, by automating the building and testing of the projects when
+
anything in the repository changes. This is often called "continuous
+
integration".
+

+
(Technically, "continuous integration" is the software development
+
practice to merge changes into the main line of development
+
frequently, at least daily, to avoid painful merge conflict
+
resolutions. However, for this guide we say "CI" to mean "when
+
repository changes, perform these actions", which is a more generic,
+
and quite popular definition, if not very purist.)
+

+
# Overview
+

+
The Radicle node stores Git repositories and synchronizes them with
+
other Radicle nodes. The CI broker connects to its local node and gets
+
"node events" whenever anything changes in the node. The relevant
+
change for the CI broker is that a Git references ("refs") in a
+
repository have been created, updated, or deleted. For now, these are
+
branches. Later, Radicle and the CI broker will support other
+
references, such as tags.
+

+
There are no node events for Git repositories being created or
+
deleted. It's not possible to create a Radicle repository without
+
creating a branch, so just looking at references is enough.
+

+
The CI broker looks at the reference changes and refines them into "CI
+
events", which are more suitable for the kind CI use that the CI
+
broker is meant to enable, than "this ref changed", which is quite low
+
level.
+

+
# CI events
+

+
The CI broker currently supports a small set of CI events. There will
+
be more.
+

+
In the tables below, the fields have the following meanings:
+

+
* `from_node` -- the node from which the event originated
+
* `repo` -- the ID of the repository concerned
+
* `branch` -- the name of the branch created of updated
+
* `tip` -- the newest commit in the branch or patch
+
* `old_tip` -- the previous newest tip, before the change
+

+

+
## `BranchCreated`
+

+
A branch has been created. This may mean the repository has also been
+
created, but that is not certain.
+

+
| Event           | fields              | field types |
+
|:----------------|:--------------------|:------------|
+
| `BranchCreated` | `from_node`         | `NodeId`    |
+
|                 | `repo`              | `RepoId`    |
+
|                 | `branch`            | `RefSting`  |
+
|                 | `tip`               | `Oid`       |
+

+
## `BranchUpdated`
+

+
A branch has been updated.
+

+
| Event           | fields      | field types |
+
|:----------------|:------------|:------------|
+
| `BranchUpdated` | `from_node` | `NodeId`    |
+
|                 | `repo`      | `RepoId`    |
+
|                 | `branch`    | `RefSting`  |
+
|                 | `tip`       | `Oid`       |
+
|                 | `old_tip`   | `Oid`       |
+

+
## `BranchDeleted`
+

+
A branch has been deleted.
+

+
| Event           | fields              | field types |
+
|:----------------|:--------------------|:------------|
+
| `BranchDeleted` | `repo`              | `RepoId`    |
+
|                 | `branch`            | `RefString` |
+
|                 | `tip`               | ` Oid`      |
+

+
## `PatchCreated`
+

+
A patch has been created.
+

+
| Event          | fields      | field types |
+
|:---------------|:------------|:------------|
+
| `PatchCreated` | `from_node` | `NodeId`    |
+
|                | `repo`      | `RepoId`    |
+
|                | `patch`     | `PatchId`   |
+
|                | `new_tip`   | `Oid`       |
+

+
## `PatchUpdated`
+

+
A patch has been updated.
+

+
| Event          | fields      | field types |
+
|:---------------|:------------|:------------|
+
| `PatchUpdated` | `from_node` | `NodeId`    |
+
|                | `repo`      | `RepoId`    |
+
|                | `patch`     | `PatchId`   |
+
|                | `new_tip`   | `Oid`       |
+
|                |             |             |
+

+
# Event filters
+

+
The CI broker configuration can use the following conditions, and
+
AND/OR/NOT operators to build a filter expression: if the expression
+
evaluates as "true", the event is allowed and will trigger a CI run.
+
Otherwise it is discarded and does not trigger a CI run.
+

+
| Condition       | Meaning                                                   |
+
|:----------------|:----------------------------------------------------------|
+
| `Repository`    | Event refers to a specific repository, identified by ID   |
+
| `Branch`        | Event refers to a specific Git branch                     |
+
| `BranchCreated` | Branch was created                                        |
+
| `BranchUpdated` | Branch was updated                                        |
+
| `BranchDeleted` | Branch was deleted                                        |
+
| `Patch`         | Event refers to a specific patch, identified by ID        |
+
| `PatchCreated`  | Patch was created                                         |
+
| `PatchUpdated`  | Patch was updated                                         |
+
| `Allow`         | Change is allowed                                         |
+
| `Deny`          | Changes is not allowed                                    |
+
| `Not`           | Change is allowed is the operand expressions are is false |
+
| `And`           | Change is allowed if all the operands are true            |
+
| `Or`            | Change is allows if any of the operands is true           |
+

+
## Example
+

+
The following example is a snippet of YAML for the CI broker
+
configuration file to match events that refer to the` main` in the CI
+
broker repository.
+

+
~~~yaml
+
filters:
+
  - !And
+
    - !Repository "rad:zwTxygwuz5LDGBq255RA2CbNGrz8"
+
    - !Branch "main"
+
~~~
+

+
The conditions are expressed using the `!Foo` syntax in YAML. `Foo`
+
must be one of the operands from the table above. Simple values are
+
expressed as doubly quoted strings, and lists of operands are
+
sub-lists in YAML syntax.
added doc/userguide.subplot
@@ -0,0 +1,6 @@
+
title: "Radicle CI broker user guide"
+
authors:
+
  - Lars Wirzenius
+
  - The Radicle Project
+
markdowns:
+
  - userguide.md
modified src/bin/cib.rs
@@ -69,7 +69,13 @@ impl Args {
    }

    fn open_db(&self, config: &Config) -> Result<Db, CibError> {
-
        Db::new(&config.db).map_err(CibError::db)
+
        let db = Db::new(&config.db).map_err(CibError::db)?;
+
        let events = db.queued_events().map_err(CibError::Db)?;
+
        if events.is_empty() {
+
            Ok(db)
+
        } else {
+
            Err(CibError::UnprocessedBrokerEvents)
+
        }
    }
}

@@ -190,7 +196,6 @@ impl ProcessEventsCmd {
            .events_tx(events_notification.tx())
            .db(args.open_db(config)?)
            .filters(&config.filters)
-
            .push_shutdown()
            .build()
            .map_err(CibError::QueueAdder)?;
        adder.add_events_in_thread();
@@ -243,6 +248,7 @@ impl ProcessEventsCmd {
}

#[derive(Debug, thiserror::Error)]
+
#[allow(clippy::large_enum_variant)]
enum CibError {
    #[error("failed to read configuration file {0}")]
    ReadConfig(PathBuf, #[source] ConfigError),
@@ -259,6 +265,9 @@ enum CibError {
    #[error("failed to use SQLite database")]
    Db(#[source] DbError),

+
    #[error("database has unprocessed broker events")]
+
    UnprocessedBrokerEvents,
+

    #[error("failed create broker data type")]
    NewBroker(#[source] BrokerError),

modified src/bin/cibtool.rs
@@ -7,6 +7,7 @@
//! suite (see the `ci-broker.subplot` document).

#![allow(clippy::result_large_err)]
+
#![allow(unused_imports)] // FIXME

use std::{
    error::Error,
@@ -29,8 +30,8 @@ use radicle_git_ext::Oid;

use radicle_ci_broker::{
    broker::BrokerError,
-
    db::{Db, DbError, QueueId, QueuedEvent},
-
    event::{BrokerEvent, BrokerEventError},
+
    ci_event::{CiEvent, CiEventError},
+
    db::{Db, DbError, QueueId, QueuedCiEvent},
    logger,
    msg::{RunId, RunResult},
    notif::NotificationChannel,
@@ -182,7 +183,7 @@ impl Subcommand for EventCmd {
            EventSubCmd::Show(x) => x.run(args)?,
            EventSubCmd::Remove(x) => x.run(args)?,
            EventSubCmd::Record(x) => x.run(args)?,
-
            EventSubCmd::Broker(x) => x.run(args)?,
+
            EventSubCmd::Ci(x) => x.run(args)?,
            EventSubCmd::Filter(x) => x.run(args)?,
        }
        Ok(())
@@ -200,7 +201,7 @@ enum EventSubCmd {
    Show(cibtoolcmd::ShowEvent),
    Remove(cibtoolcmd::RemoveEvent),
    Record(cibtoolcmd::RecordEvents),
-
    Broker(cibtoolcmd::BrokerEvents),
+
    Ci(cibtoolcmd::CiEvents),
    Filter(cibtoolcmd::FilterEvents),
}

@@ -274,8 +275,8 @@ enum CibToolError {
    #[error(transparent)]
    Db(#[from] DbError),

-
    #[error("failed to serialize broker event to JSON: {0:#?}")]
-
    EventToJson(BrokerEvent, #[source] serde_json::Error),
+
    #[error("failed to serialize CI event to JSON: {0:#?}")]
+
    EventToJson(CiEvent, #[source] serde_json::Error),

    #[error("failed to serialize node event to JSON: {0:#?}")]
    NodeEevnetToJson(radicle::node::Event, #[source] serde_json::Error),
@@ -344,11 +345,8 @@ enum CibToolError {
    CreateBrokerEventsFile(PathBuf, #[source] std::io::Error),

    #[error("failed to read filters from YAML file {0}")]
-
    ReadFilters(
-
        PathBuf,
-
        #[source] radicle_ci_broker::event::BrokerEventError,
-
    ),
+
    ReadFilters(PathBuf, #[source] radicle_ci_broker::filter::FilterError),

-
    #[error("failed to check if event is allowed: {0:#?}")]
-
    EventIsAllowed(BrokerEvent, #[source] BrokerEventError),
+
    #[error("failed to construct a CiEvent::BranchCreated")]
+
    BranchCreted(#[source] CiEventError),
}
modified src/bin/cibtoolcmd/event.rs
@@ -1,6 +1,7 @@
use std::io::Write;

-
use radicle_ci_broker::{event::EventFilter, node_event_source::NodeEventSource};
+
#[allow(unused_imports)] // FIXME
+
use radicle_ci_broker::{filter::EventFilter, node_event_source::NodeEventSource};

use super::*;

@@ -19,11 +20,11 @@ pub struct ListEvents {
impl Leaf for ListEvents {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
-
        let event_ids = db.queued_events()?;
+
        let event_ids = db.queued_ci_events()?;
        if self.json {
-
            let events: Result<Vec<QueuedEvent>, DbError> = event_ids
+
            let events: Result<Vec<QueuedCiEvent>, DbError> = event_ids
                .iter()
-
                .filter_map(|id| match db.get_queued_event(id) {
+
                .filter_map(|id| match db.get_queued_ci_event(id) {
                    Ok(Some(event)) => Some(Ok(event)),
                    Err(e) => Some(Err(e)),
                    _ => None,
@@ -35,7 +36,7 @@ impl Leaf for ListEvents {
            println!("{}", json);
        } else if self.verbose {
            for id in event_ids {
-
                if let Some(e) = db.get_queued_event(&id)? {
+
                if let Some(e) = db.get_queued_ci_event(&id)? {
                    println!("{id}: {:?}", e);
                } else {
                    println!("{id}: No such event");
@@ -57,7 +58,7 @@ pub struct CountEvents {}
impl Leaf for CountEvents {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
-
        println!("{}", db.queued_events()?.len());
+
        println!("{}", db.queued_ci_events()?.len());
        Ok(())
    }
}
@@ -148,6 +149,8 @@ impl AddEvent {

impl Leaf for AddEvent {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
+
        let nid = self.lookup_nid()?;
+

        let rid = if let Ok(rid) = RepoId::from_urn(&self.repo) {
            rid
        } else {
@@ -160,15 +163,16 @@ impl Leaf for AddEvent {
            self.lookup_commit(rid, &self.commit)?
        };

-
        let name = format!(
-
            "refs/namespaces/{}/refs/heads/{}",
-
            self.lookup_nid()?,
-
            self.name.as_str()
-
        );
+
        let name = format!("refs/namespaces/{}/refs/heads/{}", nid, self.name.as_str());
        let name =
            RefString::try_from(name.clone()).map_err(|e| CibToolError::RefString(name, e))?;

-
        let event = BrokerEvent::new(&rid, &name, &oid, self.base);
+
        let event = if let Some(base) = &self.base {
+
            CiEvent::branch_updated(nid, rid, &name, oid, *base)
+
                .map_err(CibToolError::BranchCreted)?
+
        } else {
+
            CiEvent::branch_created(nid, rid, &name, oid).map_err(CibToolError::BranchCreted)?
+
        };

        if let Some(output) = &self.output {
            let json = serde_json::to_string_pretty(&event)
@@ -177,7 +181,7 @@ impl Leaf for AddEvent {
                .map_err(|e| CibToolError::Write(output.into(), e))?;
        } else {
            let db = args.open_db()?;
-
            let id = db.push_queued_event(event)?;
+
            let id = db.push_queued_ci_event(event)?;
            println!("{id}");

            if let Some(filename) = &self.id_file {
@@ -224,7 +228,7 @@ impl Leaf for ShowEvent {
            return Err(CibToolError::MissingId);
        };

-
        if let Some(event) = db.get_queued_event(&id)? {
+
        if let Some(event) = db.get_queued_ci_event(&id)? {
            if self.json {
                let json = serde_json::to_string_pretty(&event.event())
                    .map_err(|e| CibToolError::EventToJson(event.event().clone(), e))?;
@@ -268,7 +272,7 @@ impl Leaf for RemoveEvent {
            return Err(CibToolError::MissingId);
        };

-
        db.remove_queued_event(&id)?;
+
        db.remove_queued_ci_event(&id)?;
        Ok(())
    }
}
@@ -287,7 +291,7 @@ pub struct Shutdown {
impl Leaf for Shutdown {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let db = args.open_db()?;
-
        let id = db.push_queued_event(BrokerEvent::Shutdown)?;
+
        let id = db.push_queued_ci_event(CiEvent::Shutdown)?;

        if let Some(filename) = &self.id_file {
            write(filename, id.to_string().as_bytes())
@@ -346,16 +350,16 @@ impl Leaf for RecordEvents {
    }
}

-
/// Convert node events into broker events.
+
/// Convert node events into CI events.
///
/// Node events are read from the specified file. Note that one node
/// event can result in any number of broker events.
///
-
/// The events are written to the standard output or to the specified
-
/// file, as one JSON object per line.
+
/// The CI events are written to the standard output or to the
+
/// specified file, as one JSON object per line.
#[derive(Parser)]
-
pub struct BrokerEvents {
-
    /// Write broker events to this file.
+
pub struct CiEvents {
+
    /// Write CI events to this file.
    #[clap(long)]
    output: Option<PathBuf>,

@@ -363,7 +367,7 @@ pub struct BrokerEvents {
    input: PathBuf,
}

-
impl Leaf for BrokerEvents {
+
impl Leaf for CiEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
        let bytes = std::fs::read(&self.input)
            .map_err(|e| CibToolError::ReadEvents(self.input.clone(), e))?;
@@ -377,18 +381,18 @@ impl Leaf for BrokerEvents {
            node_events.push(event);
        }

-
        let mut broker_events = vec![];
+
        let mut ci_events: Vec<CiEvent> = vec![];
        for node_event in node_events.iter() {
-
            if let Some(mut bes) = BrokerEvent::from_event(node_event) {
-
                broker_events.append(&mut bes);
+
            if let Ok(mut cevs) = CiEvent::from_node_event(node_event) {
+
                ci_events.append(&mut cevs);
            }
        }

        let mut jsons = vec![];
-
        for be in broker_events.iter() {
+
        for ci_event in ci_events.iter() {
            jsons.push(
-
                serde_json::to_string(be)
-
                    .map_err(|err| CibToolError::EventToJson(be.clone(), err))?,
+
                serde_json::to_string(ci_event)
+
                    .map_err(|err| CibToolError::EventToJson(ci_event.clone(), err))?,
            );
        }

@@ -426,7 +430,7 @@ pub struct FilterEvents {

impl Leaf for FilterEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
-
        let filters = EventFilter::from_yaml_file(&self.filters)
+
        let filters = EventFilter::from_file(&self.filters)
            .map_err(|e| CibToolError::ReadFilters(self.filters.clone(), e))?;

        let bytes = std::fs::read(&self.input)
@@ -434,19 +438,16 @@ impl Leaf for FilterEvents {
        let text = String::from_utf8(bytes)
            .map_err(|e| CibToolError::BrokerEventNotUtf8(self.input.clone(), e))?;

-
        let mut broker_events = vec![];
+
        let mut ci_events = vec![];
        for line in text.lines() {
-
            let event: BrokerEvent = serde_json::from_str(line)
+
            let event: CiEvent = serde_json::from_str(line)
                .map_err(|e| CibToolError::JsonToNodeEvent(self.input.clone(), e))?;
-
            broker_events.push(event);
+
            ci_events.push(event);
        }

-
        for event in broker_events.iter() {
+
        for event in ci_events.iter() {
            for filter in filters.iter() {
-
                if event
-
                    .is_allowed(filter)
-
                    .map_err(|e| CibToolError::EventIsAllowed(event.clone(), e))?
-
                {
+
                if filter.allows(event) {
                    let json = serde_json::to_string_pretty(event)
                        .map_err(|e| CibToolError::EventToJson(event.clone(), e))?;
                    println!("{json}");
modified src/bin/cibtoolcmd/trigger.rs
@@ -35,10 +35,11 @@ impl Leaf for TriggerCmd {
        let name = format!("refs/namespaces/{nid}/refs/heads/{}", self.name.as_str());
        let name =
            RefString::try_from(name.clone()).map_err(|e| CibToolError::RefString(name, e))?;
-
        let event = BrokerEvent::new(&rid, &name, &oid, Some(base));
+
        let event = CiEvent::branch_updated(nid, rid, &name, oid, base)
+
            .map_err(CibToolError::BranchCreted)?;

        let db = args.open_db()?;
-
        let id = db.push_queued_event(event)?;
+
        let id = db.push_queued_ci_event(event)?;
        println!("{id}");

        if let Some(filename) = &self.id_file {
modified src/broker.rs
@@ -133,10 +133,6 @@ pub enum BrokerError {
    #[error(transparent)]
    Timeformat(#[from] time::error::Format),

-
    /// Error from an node event subscriber.
-
    #[error(transparent)]
-
    NodeEvent(#[from] crate::event::BrokerEventError),
-

    /// Error from Radicle.
    #[error(transparent)]
    RadicleProfile(#[from] radicle::profile::Error),
added src/ci_event.rs
@@ -0,0 +1,605 @@
+
use std::path::{Path, PathBuf};
+

+
use regex::Regex;
+
use serde::{Deserialize, Serialize};
+

+
use radicle_git_ext::Oid;
+

+
use radicle::{
+
    cob::patch::PatchId,
+
    git::RefString,
+
    node::{Event, NodeId},
+
    prelude::RepoId,
+
    storage::RefUpdate,
+
};
+

+
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+
pub enum CiEvent {
+
    Shutdown,
+
    BranchCreated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        branch: RefString,
+
        tip: Oid,
+
    },
+
    BranchUpdated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        branch: RefString,
+
        tip: Oid,
+
        old_tip: Oid,
+
    },
+
    BranchDeleted {
+
        repo: RepoId,
+
        branch: RefString,
+
        tip: Oid,
+
    },
+
    PatchCreated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        patch: PatchId,
+
        new_tip: Oid,
+
    },
+
    PatchUpdated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        patch: PatchId,
+
        new_tip: Oid,
+
    },
+
}
+

+
impl CiEvent {
+
    pub fn branch_created(
+
        node: NodeId,
+
        repo: RepoId,
+
        branch: &str,
+
        tip: Oid,
+
    ) -> Result<Self, CiEventError> {
+
        Ok(Self::BranchCreated {
+
            from_node: node,
+
            repo,
+
            branch: RefString::try_from(branch)
+
                .map_err(|e| CiEventError::RefString(branch.into(), e))?,
+
            tip,
+
        })
+
    }
+

+
    pub fn branch_updated(
+
        node: NodeId,
+
        repo: RepoId,
+
        branch: &str,
+
        tip: Oid,
+
        old_tip: Oid,
+
    ) -> Result<Self, CiEventError> {
+
        let branch =
+
            namespaced_branch(branch).map_err(|_| CiEventError::without_namespace2(branch))?;
+
        Ok(Self::BranchUpdated {
+
            from_node: node,
+
            repo,
+
            branch: RefString::try_from(branch.clone())
+
                .map_err(|e| CiEventError::RefString(branch.clone(), e))?,
+
            tip,
+
            old_tip,
+
        })
+
    }
+

+
    pub fn from_node_event(event: &Event) -> Result<Vec<Self>, CiEventError> {
+
        fn ref_string(s: String) -> Result<RefString, CiEventError> {
+
            RefString::try_from(s.clone()).map_err(|e| CiEventError::ref_string(s, e))
+
        }
+

+
        fn branch(ref_name: &str, update: &RefUpdate) -> Result<RefString, CiEventError> {
+
            ref_string(
+
                namespaced_branch(ref_name)
+
                    .map_err(|_| CiEventError::without_namespace(ref_name, update.clone()))?,
+
            )
+
        }
+

+
        match event {
+
            Event::RefsFetched {
+
                remote,
+
                rid,
+
                updated,
+
            } => {
+
                let mut events = vec![];
+
                for update in updated {
+
                    let e = match update {
+
                        RefUpdate::Created { name, oid } => {
+
                            eprintln!("created: {name:?}");
+
                            if let Ok(patch_id) = patch_id(name) {
+
                                CiEvent::PatchCreated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    patch: patch_id,
+
                                    new_tip: *oid,
+
                                }
+
                            } else if let Ok(branch) = namespaced_branch(name) {
+
                                CiEvent::BranchCreated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    branch: ref_string(branch)?,
+
                                    tip: *oid,
+
                                }
+
                            } else {
+
                                eprintln!("don't know what it is, ignoring");
+
                                continue;
+
                            }
+
                        }
+
                        RefUpdate::Updated { name, old, new } => {
+
                            eprintln!("updated: {name:?}");
+
                            if let Ok(patch_id) = patch_id(name) {
+
                                eprintln!("it's a patch");
+
                                CiEvent::PatchUpdated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    patch: patch_id,
+
                                    new_tip: *new,
+
                                }
+
                            } else if let Ok(branch) = namespaced_branch(name) {
+
                                eprintln!("it's a branch update");
+
                                CiEvent::BranchUpdated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    branch: ref_string(branch)?,
+
                                    tip: *new,
+
                                    old_tip: *old,
+
                                }
+
                            } else {
+
                                eprintln!("don't know what it is, ignoring");
+
                                continue;
+
                            }
+
                        }
+
                        RefUpdate::Deleted { name, oid } => CiEvent::BranchDeleted {
+
                            repo: *rid,
+
                            branch: branch(name, update)?,
+
                            tip: *oid,
+
                        },
+
                        RefUpdate::Skipped { .. } => continue,
+
                    };
+
                    events.push(e);
+
                }
+
                Ok(events)
+
            }
+
            Event::RefsSynced { .. }
+
            | Event::RefsAnnounced { .. }
+
            | Event::NodeAnnounced { .. }
+
            | Event::SeedDiscovered { .. }
+
            | Event::SeedDropped { .. }
+
            | Event::PeerConnected { .. }
+
            | Event::PeerDisconnected { .. }
+
            | Event::LocalRefsAnnounced { .. }
+
            | Event::UploadPack { .. }
+
            | Event::InventoryAnnounced { .. } => Ok(vec![]),
+
        }
+
    }
+
}
+

+
pub struct CiEvents {
+
    events: Vec<CiEvent>,
+
}
+

+
impl CiEvents {
+
    pub fn from_file(filename: &Path) -> Result<Self, CiEventError> {
+
        let events = std::fs::read(filename).map_err(|e| CiEventError::read_file(filename, e))?;
+
        let events = String::from_utf8(events).map_err(|e| CiEventError::not_utf8(filename, e))?;
+
        let events: Result<Vec<CiEvent>, _> = events.lines().map(serde_json::from_str).collect();
+
        let events = events.map_err(|e| CiEventError::not_json(filename, e))?;
+

+
        Ok(Self { events })
+
    }
+

+
    pub fn iter(&self) -> impl Iterator<Item = &CiEvent> {
+
        self.events.iter()
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum CiEventError {
+
    #[error("updated ref name has no name space: {0:?}): from {1:#?}")]
+
    WithoutNamespace(String, RefUpdate),
+

+
    #[error("updated ref name has no name space: {0:?})")]
+
    WithoutNamespace2(String),
+

+
    #[error("failed to create a RefString from {0:?}")]
+
    RefString(String, radicle::git::fmt::Error),
+

+
    #[error("failed to read broker events file {0}")]
+
    ReadFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("broker events file is not UTF8: {0}")]
+
    NotUtf8(PathBuf, #[source] std::string::FromUtf8Error),
+

+
    #[error("broker events file is not valid JSON: {0}")]
+
    NotJson(PathBuf, #[source] serde_json::Error),
+
}
+

+
impl CiEventError {
+
    fn without_namespace(refname: &str, update: RefUpdate) -> Self {
+
        Self::WithoutNamespace(refname.into(), update)
+
    }
+

+
    fn without_namespace2(refname: &str) -> Self {
+
        Self::WithoutNamespace2(refname.into())
+
    }
+

+
    fn ref_string(name: String, err: radicle::git::fmt::Error) -> Self {
+
        Self::RefString(name, err)
+
    }
+

+
    fn read_file(filename: &Path, err: std::io::Error) -> Self {
+
        Self::ReadFile(filename.into(), err)
+
    }
+

+
    fn not_utf8(filename: &Path, err: std::string::FromUtf8Error) -> Self {
+
        Self::NotUtf8(filename.into(), err)
+
    }
+

+
    fn not_json(filename: &Path, err: serde_json::Error) -> Self {
+
        Self::NotJson(filename.into(), err)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use radicle::{prelude::NodeId, storage::RefUpdate};
+
    use std::str::FromStr;
+

+
    const MAIN_BRANCH_REF_NAME: &str = "refs/namespaces/NID/refs/heads/main";
+
    const MAIN_BRANCH_NAME: &str = "main";
+

+
    const PATCH_REF_NAME: &str =
+
        "refs/namespaces/NID/refs/heads/patches/f9fa90725474de9002be503ae3cda4670c9a174";
+
    const PATCH_ID: &str = "f9fa90725474de9002be503ae3cda4670c9a174";
+

+
    fn nid() -> NodeId {
+
        const NID: &str = "z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV";
+
        NodeId::from_str(NID).unwrap()
+
    }
+

+
    fn rid() -> RepoId {
+
        const RID: &str = "rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5";
+
        RepoId::from_urn(RID).unwrap()
+
    }
+

+
    fn oid_from(oid: &str) -> Oid {
+
        Oid::try_from(oid).unwrap()
+
    }
+

+
    fn oid() -> Oid {
+
        const OID: &str = "ff3099ba5de28d954c41d0b5a84316f943794ea4";
+
        oid_from(OID)
+
    }
+

+
    fn refstring(s: &str) -> RefString {
+
        RefString::try_from(s).unwrap()
+
    }
+

+
    #[test]
+
    fn nothing_updated() {
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid: rid(),
+
            updated: vec![],
+
        };
+
        let result = CiEvent::from_node_event(&event);
+
        assert!(result.is_ok());
+
        assert_eq!(result.unwrap(), vec![]);
+
    }
+

+
    #[test]
+
    fn skipped() {
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid: rid(),
+
            updated: vec![RefUpdate::Skipped {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                oid: oid(),
+
            }],
+
        };
+

+
        let result = CiEvent::from_node_event(&event);
+
        assert!(result.is_ok());
+
        assert_eq!(result.unwrap(), vec![]);
+
    }
+

+
    #[test]
+
    fn branch_created() {
+
        let rid = rid();
+
        let main = refstring(MAIN_BRANCH_NAME);
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Created {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::BranchCreated {
+
                            from_node: _,
+
                            repo,
+
                            branch,
+
                            tip,
+
                        } if repo == rid && branch == main && tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn branch_updated() {
+
        let rid = rid();
+
        let main = refstring(MAIN_BRANCH_NAME);
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Updated {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                old: oid,
+
                new: oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::BranchUpdated {
+
                            from_node: _,
+
                            repo,
+
                            branch,
+
                            tip,
+
                            old_tip,
+
                        } if repo == rid && branch == main && tip == oid && old_tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn branch_deleted() {
+
        let rid = rid();
+
        let main = refstring(MAIN_BRANCH_NAME);
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Deleted {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::BranchDeleted { repo, branch, tip }
+
                            if repo == rid && branch == main && tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn patch_created() {
+
        let rid = rid();
+
        let patch_id = oid_from(PATCH_ID).into();
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Created {
+
                name: refstring(PATCH_REF_NAME),
+
                oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::PatchCreated {
+
                            from_node: _,
+
                            repo,
+
                            patch,
+
                            new_tip,
+
                        } if repo == rid && patch == patch_id && new_tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn patch_updated() {
+
        let rid = rid();
+
        let patch_id = oid_from(PATCH_ID).into();
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Updated {
+
                name: refstring(PATCH_REF_NAME),
+
                old: oid,
+
                new: oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::PatchUpdated {
+
                            from_node: _,
+
                            repo,
+
                            patch,
+
                            new_tip,
+
                        } if repo == rid && patch == patch_id && new_tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+
}
+

+
fn namespaced_branch(refname: &str) -> Result<String, ParseError> {
+
    const PAT_BRANCH: &str = r"^refs/namespaces/[^/]+/refs/heads/(.+)$";
+
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| ParseError::Regex(PAT_BRANCH, e))?;
+
    if let Some(push_captures) = push_re.captures(refname) {
+
        if let Some(branch) = push_captures.get(1) {
+
            return Ok(branch.as_str().to_string());
+
        }
+
    }
+
    Err(ParseError::NotBranch(refname.into()))
+
}
+

+
fn patch_id(refname: &str) -> Result<PatchId, ParseError> {
+
    eprintln!("refname: {refname:?}");
+
    const PAT_PATCH: &str = r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$";
+
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| ParseError::regex(PAT_PATCH, e))?;
+
    if let Some(patch_captures) = patch_re.captures(refname) {
+
        eprintln!("captures: {patch_captures:?}");
+
        if let Some(patch_id) = patch_captures.get(1) {
+
            eprintln!("patch_id: {patch_id:?}");
+
            let oid = Oid::try_from(patch_id.as_str())
+
                .map_err(|e| ParseError::oid(patch_id.as_str(), e))?;
+
            eprintln!("oid: {oid:?}");
+
            return Ok(oid.into());
+
        }
+
    }
+

+
    Err(ParseError::not_patch(refname))
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
enum ParseError {
+
    #[error("programming error: unacceptable regular expression {0:?}")]
+
    Regex(&'static str, regex::Error),
+

+
    #[error("Git ref name without name space: {0:?}")]
+
    NotBranch(String),
+

+
    #[error("unacceptable Git ref for patch: {0:?}")]
+
    NotPatch(String),
+

+
    #[error("Git ref name includes unacceptable Git object id: {0:?}")]
+
    Oid(String, radicle::git::raw::Error),
+
}
+

+
impl ParseError {
+
    fn regex(pattern: &'static str, err: regex::Error) -> Self {
+
        Self::Regex(pattern, err)
+
    }
+

+
    fn not_patch(refname: &str) -> Self {
+
        Self::NotPatch(refname.into())
+
    }
+

+
    fn oid(refname: &str, err: radicle::git::raw::Error) -> Self {
+
        Self::Oid(refname.into(), err)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test_namespaced_branch {
+
    use super::{namespaced_branch, ParseError};
+

+
    #[test]
+
    fn empty() {
+
        assert!(matches!(
+
            namespaced_branch(""),
+
            Err(ParseError::NotBranch(_))
+
        ));
+
    }
+

+
    #[test]
+
    fn lacks_namespace() {
+
        assert!(matches!(
+
            namespaced_branch(""),
+
            Err(ParseError::NotBranch(_))
+
        ));
+
    }
+

+
    #[test]
+
    fn has_namespace() {
+
        let x = namespaced_branch("refs/namespaces/DID/refs/heads/main");
+
        assert!(x.is_ok());
+
        assert_eq!(x.unwrap(), "main");
+
    }
+

+
    #[test]
+
    fn has_namespace_with_path() {
+
        let x = namespaced_branch("refs/namespaces/DID/refs/heads/liw/debug/this/path");
+
        assert!(x.is_ok());
+
        assert_eq!(x.unwrap(), "liw/debug/this/path");
+
    }
+
}
+

+
#[cfg(test)]
+
mod test_patch_id {
+
    use super::{patch_id, Oid, ParseError};
+

+
    #[test]
+
    fn empty() {
+
        assert!(matches!(patch_id(""), Err(ParseError::NotPatch(_))));
+
    }
+

+
    #[test]
+
    fn lacks_namespace() {
+
        assert!(matches!(patch_id(""), Err(ParseError::NotPatch(_))));
+
    }
+

+
    #[test]
+
    fn has_namespace() {
+
        let x = patch_id(
+
            "refs/namespaces/DID/refs/heads/patches/f9fa90725474de9002be503ae3cda4670c9a1741",
+
        );
+
        assert!(x.is_ok());
+
        assert_eq!(
+
            x.unwrap(),
+
            Oid::try_from("f9fa90725474de9002be503ae3cda4670c9a1741")
+
                .unwrap()
+
                .into()
+
        );
+
    }
+

+
    #[test]
+
    fn has_namespace_with_path() {
+
        assert!(matches!(
+
            patch_id("refs/namespaces/DID/refs/heads/patches/coffee/beef"),
+
            Err(ParseError::NotPatch(_))
+
        ));
+
    }
+
}
added src/ci_event_source.rs
@@ -0,0 +1,69 @@
+
use std::fmt;
+

+
use radicle::Profile;
+

+
use crate::{
+
    ci_event::CiEvent,
+
    logger,
+
    node_event_source::{NodeEventError, NodeEventSource},
+
};
+

+
pub struct CiEventSource {
+
    source: NodeEventSource,
+
}
+

+
impl CiEventSource {
+
    pub fn new(profile: &Profile) -> Result<Self, CiEventSourceError> {
+
        let source = Self {
+
            source: NodeEventSource::new(profile).map_err(CiEventSourceError::Subscribe)?,
+
        };
+
        logger::ci_event_source_created(&source);
+
        Ok(source)
+
    }
+

+
    pub fn event(&mut self) -> Result<Option<Vec<CiEvent>>, CiEventSourceError> {
+
        let result = self.source.node_event();
+
        logger::debug2(format!("ci_event_source: result={result:?}"));
+
        match &result {
+
            Err(NodeEventError::BrokenConnection) => {
+
                logger::ci_event_source_disconnected();
+
                Err(CiEventSourceError::BrokenConnection(result.unwrap_err()))
+
            }
+
            Err(err) => {
+
                logger::error("error reading event from node", &err);
+
                Err(CiEventSourceError::NodeEventError(result.unwrap_err()))
+
            }
+
            Ok(None) => {
+
                logger::ci_event_source_end();
+
                Ok(None)
+
            }
+
            Ok(Some(event)) => {
+
                let ci_events =
+
                    CiEvent::from_node_event(event).map_err(CiEventSourceError::CiEvent)?;
+
                logger::ci_event_source_got_events(&ci_events);
+
                Ok(Some(ci_events))
+
            }
+
        }
+
    }
+
}
+

+
impl fmt::Debug for CiEventSource {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "CiEventSource<path={:?}", self.source)
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum CiEventSourceError {
+
    #[error("failed to subscribe to node events")]
+
    Subscribe(#[source] crate::node_event_source::NodeEventError),
+

+
    #[error("connection to node control socket broke")]
+
    BrokenConnection(#[source] crate::node_event_source::NodeEventError),
+

+
    #[error("failed to read event from node")]
+
    NodeEventError(#[source] crate::node_event_source::NodeEventError),
+

+
    #[error("failed to create CI events from node event")]
+
    CiEvent(#[source] crate::ci_event::CiEventError),
+
}
modified src/config.rs
@@ -8,7 +8,7 @@ use std::{

use serde::{Deserialize, Serialize};

-
use crate::event::EventFilter;
+
use crate::filter::EventFilter;

const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;

modified src/db.rs
@@ -21,7 +21,7 @@ use sqlite::{Connection, State, Statement};
use time::{macros::format_description, OffsetDateTime};
use uuid::Uuid;

-
use crate::{event::BrokerEvent, msg::RunId, run::Run};
+
use crate::{ci_event::CiEvent, msg::RunId, run::Run};

// how long to retry when SQL fails for busy database
const MAX_WAIT: Duration = Duration::from_millis(60_000);
@@ -58,6 +58,7 @@ impl Db {
        const TABLES: &[&str] = &[
            "CREATE TABLE IF NOT EXISTS counter_test (counter INT)",
            "CREATE TABLE IF NOT EXISTS event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT)",
+
            "CREATE TABLE IF NOT EXISTS ci_event_queue (id TEXT PRIMARY KEY, timestamp TEXT, event TEXT)",
            "CREATE TABLE IF NOT EXISTS ci_runs (broker_run_id TEXT PRIMARY KEY, json TEXT)",
        ];

@@ -170,6 +171,10 @@ impl Db {
    }

    /// Return list of identifiers for broker events currently in the queue.
+
    ///
+
    /// Note that there is no longer any way to retrieve the broker
+
    /// events. This method is meant only for making sure there are no
+
    /// unprocessed broker events in the queue.
    pub fn queued_events(&self) -> Result<Vec<QueueId>, DbError> {
        let mut select = self.prepare("SELECT id FROM event_queue")?;

@@ -196,16 +201,44 @@ impl Db {
        Ok(ids)
    }

+
    /// Return list of identifiers for CI events currently in the queue.
+
    pub fn queued_ci_events(&self) -> Result<Vec<QueueId>, DbError> {
+
        let mut select = self.prepare("SELECT id FROM ci_event_queue")?;
+

+
        let mut ids = vec![];
+

+
        loop {
+
            match select.stmt.next() {
+
                Ok(State::Row) => {
+
                    let id: String = select
+
                        .stmt
+
                        .read("id")
+
                        .map_err(|e| DbError::list_events(&select.sql, e))?;
+
                    ids.push(QueueId::from(&id));
+
                }
+
                Ok(State::Done) => {
+
                    break;
+
                }
+
                Err(e) => {
+
                    return Err(DbError::list_events(&select.sql, e));
+
                }
+
            }
+
        }
+

+
        Ok(ids)
+
    }
+

    /// Return a specific event, given is id, if one exists.
-
    pub fn get_queued_event(&self, id: &QueueId) -> Result<Option<QueuedEvent>, DbError> {
-
        let mut select = self.prepare("SELECT timestamp, event FROM event_queue WHERE id = :id")?;
+
    pub fn get_queued_ci_event(&self, id: &QueueId) -> Result<Option<QueuedCiEvent>, DbError> {
+
        let mut select =
+
            self.prepare("SELECT timestamp, event FROM ci_event_queue WHERE id = :id")?;
        select
            .stmt
            .bind((":id", id.as_str()))
            .map_err(|e| DbError::bind(&select.sql, e))?;

        let mut timestamp: Option<String> = None;
-
        let mut event: Option<BrokerEvent> = None;
+
        let mut event: Option<CiEvent> = None;

        loop {
            match select.stmt.next() {
@@ -235,7 +268,7 @@ impl Db {
        }

        if let (Some(ts), Some(ev)) = (timestamp, event) {
-
            let qe = QueuedEvent::new(id.clone(), ts, ev);
+
            let qe = QueuedCiEvent::new(id.clone(), ts, ev);
            Ok(Some(qe))
        } else {
            Ok(None)
@@ -243,14 +276,14 @@ impl Db {
    }

    /// Add a new event to the event queue, returning its id.
-
    pub fn push_queued_event(&self, event: BrokerEvent) -> Result<QueueId, DbError> {
+
    pub fn push_queued_ci_event(&self, event: CiEvent) -> Result<QueueId, DbError> {
        let json = serde_json::to_string(&event).map_err(DbError::event_to_json)?;

        let id = QueueId::default();
        let ts = now().map_err(DbError::time_format)?;

-
        let mut insert =
-
            self.prepare("INSERT INTO event_queue (id, timestamp, event) VALUES (:id, :ts, :e)")?;
+
        let mut insert = self
+
            .prepare("INSERT INTO ci_event_queue (id, timestamp, event) VALUES (:id, :ts, :e)")?;
        insert
            .stmt
            .bind((":id", id.as_str()))
@@ -271,10 +304,10 @@ impl Db {
        Ok(id)
    }

-
    /// Remove event from queue, given its id. It's OK if the event is
+
    /// Remove CI event from queue, given its id. It's OK if the event is
    /// not in the queue, that is just silently ignored.
-
    pub fn remove_queued_event(&self, id: &QueueId) -> Result<(), DbError> {
-
        let mut remove = self.prepare("DELETE FROM event_queue WHERE id = :id")?;
+
    pub fn remove_queued_ci_event(&self, id: &QueueId) -> Result<(), DbError> {
+
        let mut remove = self.prepare("DELETE FROM ci_event_queue WHERE id = :id")?;
        remove
            .stmt
            .bind((":id", id.as_str()))
@@ -524,14 +557,14 @@ impl QueueId {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
-
pub struct QueuedEvent {
+
pub struct QueuedCiEvent {
    id: QueueId,
    ts: String,
-
    event: BrokerEvent,
+
    event: CiEvent,
}

-
impl QueuedEvent {
-
    fn new(id: QueueId, ts: String, event: BrokerEvent) -> Self {
+
impl QueuedCiEvent {
+
    fn new(id: QueueId, ts: String, event: CiEvent) -> Self {
        Self { id, ts, event }
    }

@@ -543,7 +576,7 @@ impl QueuedEvent {
        &self.ts
    }

-
    pub fn event(&self) -> &BrokerEvent {
+
    pub fn event(&self) -> &CiEvent {
        &self.event
    }
}
deleted src/event.rs
@@ -1,694 +0,0 @@
-
//! Get events from local node.
-
//!
-
//! [`BrokerEventSource`] subscribes to the local node for node
-
//! events, creates them to corresponding sets of broker events, and
-
//! returns the events allowed by the configured filters.
-
//!
-
//! Events can be filtered based on various criteria and can be
-
//! combined with logical operators. Filters can be read from a JSON
-
//! file so that they're easy for a user to define.
-
//!
-
//! # Example
-
//!
-
//! ```
-
//! use radicle_ci_broker::event::Filters;
-
//! let filters = r#"{
-
//!   "filters": [
-
//!     {
-
//!       "And": [
-
//!         {
-
//!           "Repository": "rad:z3bBRYgzcYYBNjipFdDTwPgHaihPX"
-
//!         },
-
//!         {
-
//!           "RefSuffix": "refs/heads/main"
-
//!         }
-
//!       ]
-
//!     }
-
//!   ]
-
//! }"#;
-
//! let e = Filters::try_from(filters).unwrap();
-
//! ```
-

-
use radicle::{
-
    node::{Event, NodeId},
-
    prelude::RepoId,
-
    storage::RefUpdate,
-
    Profile,
-
};
-
use radicle_git_ext::{ref_format::RefString, Oid};
-
use regex::Regex;
-
use serde::{Deserialize, Serialize};
-
use std::{
-
    fs::read,
-
    path::{Path, PathBuf},
-
};
-

-
use crate::{
-
    logger,
-
    node_event_source::{NodeEventError, NodeEventSource},
-
};
-

-
/// Source of events from the local Radicle node.
-
///
-
/// The events are filtered. Only events allowed by at least one
-
/// filter are returned. See [`BrokerEventSource::allow`] and
-
/// [`EventFilter`].
-
pub struct BrokerEventSource {
-
    source: NodeEventSource,
-
    allowed: Vec<EventFilter>,
-
}
-

-
impl BrokerEventSource {
-
    /// Create a new source of node events, for a given Radicle
-
    /// profile.
-
    pub fn new(profile: &Profile) -> Result<Self, BrokerEventError> {
-
        let source = NodeEventSource::new(profile).map_err(BrokerEventError::CannotSubscribe)?;
-
        Ok(Self {
-
            source,
-
            allowed: vec![],
-
        })
-
    }
-

-
    /// Add an event filter for allowed events for this event source.
-
    pub fn allow(&mut self, filter: EventFilter) {
-
        self.allowed.push(filter);
-
    }
-

-
    fn allowed(&self, event: &BrokerEvent) -> Result<bool, BrokerEventError> {
-
        for filter in self.allowed.iter() {
-
            if !event.is_allowed(filter)? {
-
                return Ok(false);
-
            }
-
        }
-
        Ok(true)
-
    }
-

-
    /// Get the allowed next event from an event source. This will
-
    /// block until there is an allowed event, or until there will be
-
    /// no more events from this source, or there's an error.
-
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, BrokerEventError> {
-
        loop {
-
            match self.source.node_event() {
-
                Err(NodeEventError::BrokenConnection) => {
-
                    logger::event_disconnected();
-
                    return Err(BrokerEventError::BrokenConnection);
-
                }
-
                Err(err) => {
-
                    logger::error("error reading event from node", &err);
-
                    return Err(BrokerEventError::NodeEventError(err));
-
                }
-
                Ok(None) => {
-
                    logger::event_end();
-
                    return Ok(vec![]);
-
                }
-
                Ok(Some(event)) => {
-
                    let mut result = vec![];
-
                    if let Some(broker_events) = BrokerEvent::from_event(&event) {
-
                        for e in broker_events {
-
                            if self.allowed(&e)? {
-
                                result.push(e);
-
                            }
-
                        }
-
                        if !result.is_empty() {
-
                            return Ok(result);
-
                        }
-
                    }
-
                }
-
            }
-
        }
-
    }
-
}
-

-
/// Possible errors from accessing the local Radicle node.
-
#[derive(Debug, thiserror::Error)]
-
pub enum BrokerEventError {
-
    /// Can't create a [`NodeEventSource`].
-
    #[error("failed to subscribe to node events")]
-
    CannotSubscribe(#[source] NodeEventError),
-

-
    /// Regex compilation error.
-
    #[error("programming error in regular expression {0:?}")]
-
    Regex(&'static str, regex::Error),
-

-
    /// Some error from getting an event from the node.
-
    #[error("failed to get next event from node")]
-
    NodeEventError(#[from] NodeEventError),
-

-
    /// Connection to the node control socket broke.
-
    #[error("connection to the node control socket broke")]
-
    BrokenConnection,
-

-
    /// Some error from parsing a repository id.
-
    #[error(transparent)]
-
    Id(#[from] radicle::identity::IdError),
-

-
    /// An error reading a filter file.
-
    #[error("failed to read filter file: {0}")]
-
    ReadFilterFile(PathBuf, #[source] std::io::Error),
-

-
    /// An error parsing JSON as filters, when read from a file.
-
    #[error("failed to parser filters file: {0}")]
-
    FiltersJsonFile(PathBuf, #[source] serde_json::Error),
-

-
    /// An error parsing YAML as filters, when read from a file.
-
    #[error("failed to parser filters file: {0}")]
-
    FiltersYamlFile(PathBuf, #[source] serde_yml::Error),
-

-
    /// An error parsing JSON as filters, from an in-memory string.
-
    #[error("failed to parser filters as JSON")]
-
    FiltersJsonString(#[source] serde_json::Error),
-

-
    /// An error parsing a Git object id as string into an Oid.
-
    #[error("failed to parse string as a Git object id: {0:?}")]
-
    ParseOid(String, #[source] radicle::git::raw::Error),
-
}
-

-
/// An event filter for allowing events. Or an "AND" combination of events.
-
///
-
/// NOTE: Adding "OR" and "NOT" would be easy, too.
-
#[derive(Debug, Clone, Deserialize, Serialize)]
-
#[serde(deny_unknown_fields)]
-
pub enum EventFilter {
-
    /// Event concerns a specific repository.
-
    Repository(RepoId),
-

-
    /// Event concerns a git ref that ends with a given string.
-
    RefSuffix(String),
-

-
    /// Event concerns a specific git branch.
-
    Branch(String),
-

-
    /// Event concerns any Radicle patch.
-
    AnyPatch,
-

-
    /// Event concerns changes on specific Radicle patch.
-
    Patch(String),
-

-
    /// Event concerns changed refs on any Radicle patch branch.
-
    AnyPatchRef,
-

-
    /// Event concerns changed refs on any Radicle branch.
-
    AnyPushRef,
-

-
    /// Event concerns changed refs on the branch of the specified Radicle patch.
-
    PatchRef(String),
-

-
    /// Combine any number of filters that both must allow the events.
-
    And(Vec<Box<Self>>),
-

-
    /// Combine any number of filters such that at least one allows the events.
-
    Or(Vec<Box<Self>>),
-

-
    /// Combine any number of filters such that none allows the events.
-
    Not(Vec<Box<Self>>),
-
}
-

-
impl EventFilter {
-
    /// Create a filter for a repository.
-
    pub fn repository(rid: &str) -> Result<Self, BrokerEventError> {
-
        Ok(Self::Repository(RepoId::from_urn(rid)?))
-
    }
-

-
    /// Create a filter for a git ref that ends with a string.
-
    pub fn glob(pattern: &str) -> Result<Self, BrokerEventError> {
-
        Ok(Self::RefSuffix(pattern.into()))
-
    }
-

-
    /// Create a filter combining other filters with AND.
-
    pub fn and(conds: &[Self]) -> Self {
-
        Self::And(conds.iter().map(|c| Box::new(c.clone())).collect())
-
    }
-

-
    /// Create a filter combining other filters with OR.
-
    pub fn or(conds: &[Self]) -> Self {
-
        Self::Or(conds.iter().map(|c| Box::new(c.clone())).collect())
-
    }
-

-
    /// Create a filter combining other filters with NOT.
-
    pub fn not(conds: &[Self]) -> Self {
-
        Self::Not(conds.iter().map(|c| Box::new(c.clone())).collect())
-
    }
-

-
    /// Read filters from a JSON file.
-
    ///
-
    /// This function is the same as reading a file and calling
-
    /// [`Filters::try_from`], but returns just a vector of filters
-
    /// instead of a `Filter`.
-
    ///
-
    /// See the module description for an example of the file content.
-
    pub fn from_file(filename: &Path) -> Result<Vec<Self>, BrokerEventError> {
-
        let filters =
-
            read(filename).map_err(|e| BrokerEventError::ReadFilterFile(filename.into(), e))?;
-
        let filters: Filters = serde_json::from_slice(&filters)
-
            .map_err(|e| BrokerEventError::FiltersJsonFile(filename.into(), e))?;
-
        Ok(filters.filters)
-
    }
-

-
    /// Read filters from a YAML file.
-
    pub fn from_yaml_file(filename: &Path) -> Result<Vec<Self>, BrokerEventError> {
-
        let filters =
-
            read(filename).map_err(|e| BrokerEventError::ReadFilterFile(filename.into(), e))?;
-
        let filters: Filters = serde_yml::from_slice(&filters)
-
            .map_err(|e| BrokerEventError::FiltersYamlFile(filename.into(), e))?;
-
        Ok(filters.filters)
-
    }
-
}
-

-
/// A set of filters for [`NodeEventSource`] to use. This struct
-
/// represents the serialized set of filters. See the module
-
/// description for an example.
-
#[derive(Debug, Deserialize, Serialize)]
-
pub struct Filters {
-
    filters: Vec<EventFilter>,
-
}
-

-
impl TryFrom<&str> for Filters {
-
    type Error = BrokerEventError;
-

-
    fn try_from(s: &str) -> Result<Self, Self::Error> {
-
        serde_json::from_str(s).map_err(BrokerEventError::FiltersJsonString)
-
    }
-
}
-

-
/// A single node event can represent many git refs having changed,
-
/// but that's hard to process or filter. The broker breaks up such
-
/// complex events to simpler ones that only affect one ref at a time.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
-
pub enum BrokerEvent {
-
    /// Request the CI broker shuts down in an orderly fashion.
-
    Shutdown,
-

-
    /// A git ref in a git repository has changed to refer to a given
-
    /// commit. This covers both the case of a new ref, and the case
-
    /// of a changed ref.
-
    RefChanged {
-
        /// Repository id.
-
        rid: RepoId,
-

-
        /// Ref name.
-
        name: RefString,
-

-
        /// New git commit.
-
        oid: Oid,
-

-
        /// Old git commit
-
        old: Option<Oid>,
-
    },
-
}
-

-
impl BrokerEvent {
-
    pub fn new(rid: &RepoId, name: &RefString, oid: &Oid, old: Option<Oid>) -> Self {
-
        Self::RefChanged {
-
            rid: *rid,
-
            name: name.clone(),
-
            oid: *oid,
-
            old,
-
        }
-
    }
-

-
    pub fn shutdown() -> Self {
-
        Self::Shutdown
-
    }
-

-
    /// Break up a potentially complex node event into a vector of
-
    /// simpler broker events.
-
    pub fn from_event(e: &Event) -> Option<Vec<Self>> {
-
        if let Event::RefsFetched {
-
            remote: _,
-
            rid,
-
            updated,
-
        } = e
-
        {
-
            let mut events = vec![];
-
            for up in updated {
-
                match up {
-
                    RefUpdate::Skipped { name, oid }
-
                        if name.as_str() == "shutdown" && oid.is_zero() =>
-
                    {
-
                        events.push(Self::shutdown());
-
                    }
-
                    RefUpdate::Created { name, oid } => {
-
                        events.push(Self::new(rid, name, oid, None));
-
                    }
-
                    RefUpdate::Updated { name, old, new } => {
-
                        events.push(Self::new(rid, name, new, Some(*old)));
-
                    }
-
                    _ => (),
-
                }
-
            }
-
            Some(events)
-
        } else {
-
            None
-
        }
-
    }
-

-
    /// Is this broker event allowed by a filter?
-
    pub fn is_allowed(&self, filter: &EventFilter) -> Result<bool, BrokerEventError> {
-
        let res = self.is_allowed_helper(filter)?;
-
        Ok(res)
-
    }
-

-
    fn is_allowed_helper(&self, filter: &EventFilter) -> Result<bool, BrokerEventError> {
-
        let allowed = match self {
-
            Self::Shutdown => true,
-
            Self::RefChanged {
-
                rid,
-
                name,
-
                oid: _,
-
                old: _,
-
            } => {
-
                let parsed = parse_ref(name)?;
-

-
                match filter {
-
                    EventFilter::Repository(wanted) => rid == wanted,
-
                    EventFilter::RefSuffix(wanted) => name.ends_with(wanted),
-
                    EventFilter::Branch(wanted) => parsed == Some(ParsedRef::Push(wanted.into())),
-
                    EventFilter::AnyPatch => matches!(parsed, Some(ParsedRef::Patch(_))),
-
                    EventFilter::Patch(wanted) => {
-
                        let oid = Oid::try_from(wanted.as_str())
-
                            .map_err(|e| BrokerEventError::ParseOid(wanted.into(), e))?;
-
                        parsed == Some(ParsedRef::Patch(oid))
-
                    }
-
                    EventFilter::AnyPatchRef => matches!(parsed, Some(ParsedRef::Patch(_))),
-
                    EventFilter::AnyPushRef => matches!(parsed, Some(ParsedRef::Push(_))),
-
                    EventFilter::PatchRef(wanted) => {
-
                        let oid = Oid::try_from(wanted.as_str())
-
                            .map_err(|e| BrokerEventError::ParseOid(wanted.into(), e))?;
-
                        parsed == Some(ParsedRef::Patch(oid))
-
                    }
-
                    EventFilter::And(conds) => conds
-
                        .iter()
-
                        .all(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
-
                    EventFilter::Or(conds) => conds
-
                        .iter()
-
                        .any(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
-
                    EventFilter::Not(conds) => !conds
-
                        .iter()
-
                        .any(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
-
                }
-
            }
-
        };
-

-
        Ok(allowed)
-
    }
-

-
    pub fn name(&self) -> Option<&RefString> {
-
        match self {
-
            BrokerEvent::Shutdown => None,
-
            BrokerEvent::RefChanged { name, .. } => Some(name),
-
        }
-
    }
-

-
    /// Extract the NID from the RefString.
-
    /// The RefString will start with `refs/namespaces/<nid>/...`
-
    pub fn nid(&self) -> Result<Option<NodeId>, BrokerEventError> {
-
        if let Some(name) = self.name() {
-
            Ok(parse_nid_from_refstring(name)?)
-
        } else {
-
            Ok(None)
-
        }
-
    }
-

-
    pub fn patch_id(&self) -> Result<Option<Oid>, BrokerEventError> {
-
        if let Some(name) = self.name() {
-
            if let Some(ParsedRef::Patch(oid)) = parse_ref(name)? {
-
                return Ok(Some(oid));
-
            }
-
        }
-
        Ok(None)
-
    }
-
}
-

-
/// Extract the NID from a the ref string in a repository.
-
/// The RefString should start with `refs/namespaces/<nid>/...`
-
pub fn parse_nid_from_refstring(name: &RefString) -> Result<Option<NodeId>, BrokerEventError> {
-
    const PAT: &str = r"^refs/namespaces/(?P<nid>[^/]+)/";
-
    let pat = Regex::new(PAT).map_err(|e| BrokerEventError::Regex(PAT, e))?;
-
    if let Some(captures) = pat.captures(name.as_str()) {
-
        if let Some(m) = captures.name("nid") {
-
            if let Ok(parsed) = m.as_str().parse() {
-
                return Ok(Some(parsed));
-
            }
-
        }
-
    }
-
    Ok(None)
-
}
-

-
#[cfg(test)]
-
mod test_broker_event {
-
    use std::str::FromStr;
-

-
    use super::{BrokerEvent, NodeId, Oid, RefString, RepoId};
-

-
    #[test]
-
    fn name_for_branch() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from("main")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.name(), Some(&name));
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn name_for_shutdown() {
-
        let be = BrokerEvent::shutdown();
-
        assert_eq!(be.name(), None);
-
    }
-

-
    #[test]
-
    fn nid_for_plain_branch_name() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from("main")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.nid()?, None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn nid_for_ref_without_namespace() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from("something/else/main")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.nid()?, None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn nid_for_ref_with_namespace() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from(
-
            "refs/namespaces/z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV/main",
-
        )?;
-
        let nid = NodeId::from_str("z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.nid()?, Some(nid));
-
        Ok(())
-
    }
-
}
-

-
/// Parsed reference to one of the supported types
-
/// Patch with patch ID
-
/// Push with branch name
-
#[derive(Debug, Eq, PartialEq)]
-
pub enum ParsedRef {
-
    Patch(Oid),
-
    Push(String),
-
}
-

-
/// Parse the given reference to a ParsedRef.
-
///
-
/// # Example
-
/// ```Rust
-
/// if let Some(parsed_ref) = parse_ref(name) {
-
///     match parsed_ref {
-
///         ParsedRef::Patch(_oid) => {
-
///             debug!("build_trigger: is patch");
-
///         }
-
///         ParsedRef::Push(_branch) => {
-
///             debug!("build_trigger: is push");
-
///         }
-
///     }
-
/// }
-
/// ```
-
pub fn parse_ref(s: &str) -> Result<Option<ParsedRef>, BrokerEventError> {
-
    const PAT_PATCH: &str = r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$";
-
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| BrokerEventError::Regex(PAT_PATCH, e))?;
-
    if let Some(patch_captures) = patch_re.captures(s) {
-
        if let Some(patch_id) = patch_captures.get(1) {
-
            let patch_id_str = patch_id.as_str();
-
            let oid = Oid::try_from(patch_id_str)
-
                .map_err(|e| BrokerEventError::ParseOid(patch_id_str.into(), e))?;
-
            return Ok(Some(ParsedRef::Patch(oid)));
-
        }
-
    }
-

-
    const PAT_BRANCH: &str = r"^refs/namespaces/[^/]+/refs/heads/(.+)$";
-
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| BrokerEventError::Regex(PAT_BRANCH, e))?;
-
    if let Some(push_captures) = push_re.captures(s) {
-
        if let Some(branch) = push_captures.get(1) {
-
            return Ok(Some(ParsedRef::Push(branch.as_str().to_string())));
-
        }
-
    }
-

-
    Ok(None)
-
}
-

-
#[cfg(test)]
-
mod test_parse_ref {
-
    use super::{parse_ref, Oid, ParsedRef};
-

-
    #[test]
-
    fn plain_branch_name_is_none() -> anyhow::Result<()> {
-
        assert_eq!(parse_ref("main")?, None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn namespaced_branch() -> anyhow::Result<()> {
-
        assert_eq!(
-
            parse_ref(
-
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/main"
-
            )?,
-
            Some(ParsedRef::Push("main".into()))
-
        );
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn namespaced_branch_with_slashes() -> anyhow::Result<()> {
-
        assert_eq!(
-
            parse_ref(
-
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/liw/cob/draft/v2"
-
            )?,
-
            Some(ParsedRef::Push("liw/cob/draft/v2".into()))
-
        );
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn namespaced_patch() -> anyhow::Result<()> {
-
        const SHA: &str = "0a4c69183fc8b8d849f5ab977d70f2a1f4788bca";
-
        assert_eq!(
-
            parse_ref(&format!("refs/namespaces/NID/refs/heads/patches/{SHA}"))?,
-
            Some(ParsedRef::Patch(Oid::try_from(SHA)?))
-
        );
-
        Ok(())
-
    }
-
}
-

-
pub fn is_patch_update(name: &str) -> Option<&str> {
-
    let mut parts = name.split("/refs/cobs/xyz.radicle.patch/");
-
    if let Some(suffix) = parts.nth(1) {
-
        if parts.next().is_none() {
-
            return Some(suffix);
-
        }
-
    }
-
    None
-
}
-

-
pub fn push_branch(name: &str) -> String {
-
    let mut parts = name.split("/refs/heads/");
-
    if let Some(suffix) = parts.nth(1) {
-
        if parts.next().is_none() {
-
            return suffix.to_string();
-
        }
-
    }
-
    "".to_string()
-
}
-

-
#[cfg(test)]
-
mod test {
-
    use super::{is_patch_update, parse_ref, push_branch, Oid, ParsedRef};
-

-
    #[test]
-
    fn test_parse_patch_ref() -> anyhow::Result<()> {
-
        let patch_ref =
-
            "refs/namespaces/NID/refs/heads/patches/9183ed6232687d3105482960cecb01a53018b80a";
-

-
        assert_eq!(
-
            parse_ref(patch_ref)?,
-
            Some(ParsedRef::Patch(Oid::try_from(
-
                "9183ed6232687d3105482960cecb01a53018b80a"
-
            )?))
-
        );
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn test_parse_push_ref() -> anyhow::Result<()> {
-
        let push_ref =
-
            "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main";
-
        let parsed_ref = parse_ref(push_ref)?;
-
        eprintln!("parsed_ref={parsed_ref:#?}");
-
        assert!(parsed_ref.is_some());
-
        if let Some(ref parsed) = parsed_ref {
-
            match parsed {
-
                ParsedRef::Push(branch) => assert_eq!(branch, "main"),
-
                _ => panic!("Expected Push ref"),
-
            }
-
        }
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn test_parse_invalid_ref() -> anyhow::Result<()> {
-
        let invalid_ref = "invalid_ref";
-
        let parsed_ref = parse_ref(invalid_ref)?;
-
        assert!(parsed_ref.is_none());
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn branch_is_not_patch_update() {
-
        assert_eq!(
-
            is_patch_update(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main"
-
            ),
-
            None
-
        );
-
    }
-

-
    #[test]
-
    fn patch_branch_is_not_patch_update() {
-
        assert_eq!(
-
            is_patch_update(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/patches/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
-
            ),
-
            None
-
        );
-
    }
-

-
    #[test]
-
    fn patch_update() {
-
        assert_eq!(
-
            is_patch_update(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/cobs/xyz.radicle.patch/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
-
            ),
-
            Some("bbb54a2c9314a528a4fff9d6c2aae874ed098433")
-
        );
-
    }
-

-
    #[test]
-
    fn get_push_branch() {
-
        assert_eq!(
-
            push_branch(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/branch_name"
-
            ),
-
            "branch_name".to_string()
-
        );
-
    }
-

-
    #[test]
-
    fn get_no_push_branch() {
-
        assert_eq!(
-
            push_branch(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/rad/sigrefs"
-
            ),
-
            "".to_string()
-
        );
-
    }
-
}
added src/filter.rs
@@ -0,0 +1,526 @@
+
use std::path::{Path, PathBuf};
+

+
use serde::{Deserialize, Serialize};
+

+
use radicle::{cob::patch::PatchId, git::RefString, prelude::RepoId};
+
use radicle_git_ext::Oid;
+

+
use crate::{ci_event::CiEvent, logger};
+

+
/// A Boolean expression for filtering broker events.
+
#[derive(Debug, Clone, Deserialize, Serialize)]
+
#[serde(deny_unknown_fields)]
+
pub enum EventFilter {
+
    /// Event for a specific repository.
+
    Repository(RepoId),
+

+
    /// Event is for a specific branch.
+
    Branch(RefString),
+

+
    /// Branch was created.
+
    BranchCreated,
+

+
    /// Branch was updated.
+
    BranchUpdated,
+

+
    /// Branch was deleted.
+
    BranchDeleted,
+

+
    /// Event is for a specific patch.
+
    Patch(Oid),
+

+
    /// Patch was created.
+
    PatchCreated,
+

+
    /// Patch was updated,
+
    PatchUpdated,
+

+
    /// Allow any event.
+
    Allow,
+

+
    /// Don't allow any event.
+
    Deny,
+

+
    /// Allow the opposite of the contained filter.
+
    Not(Box<Self>),
+

+
    /// Allow if all contained filters allow.
+
    And(Vec<Box<Self>>),
+

+
    /// Allow if any contained filter allow.
+
    Or(Vec<Box<Self>>),
+
}
+

+
impl EventFilter {
+
    pub fn allows(&self, event: &CiEvent) -> bool {
+
        logger::debug2(format!("EventFilter::allows: event={event:?}"));
+
        match self {
+
            Self::Allow => return true,
+
            Self::Deny => return false,
+
            Self::Not(expr) => return !expr.allows(event),
+
            Self::And(exprs) => return exprs.iter().all(|e| e.allows(event)),
+
            Self::Or(exprs) => return exprs.iter().any(|e| e.allows(event)),
+
            _ => (),
+
        }
+

+
        match event {
+
            CiEvent::Shutdown => true,
+
            CiEvent::BranchCreated { repo, branch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Branch(wanted) => branch == wanted,
+
                Self::BranchCreated => true,
+
                _ => false,
+
            },
+
            CiEvent::BranchUpdated { repo, branch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Branch(wanted) => branch == wanted,
+
                Self::BranchUpdated => true,
+
                _ => false,
+
            },
+
            CiEvent::BranchDeleted { repo, branch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Branch(wanted) => branch == wanted,
+
                Self::BranchDeleted => true,
+
                _ => false,
+
            },
+
            CiEvent::PatchCreated { repo, patch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Patch(wanted) => *patch == PatchId::from(wanted),
+
                Self::PatchCreated => true,
+
                _ => false,
+
            },
+
            CiEvent::PatchUpdated { repo, patch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Patch(wanted) => *patch == PatchId::from(wanted),
+
                Self::PatchUpdated => true,
+
                _ => false,
+
            },
+
        }
+
    }
+

+
    pub fn from_file(filename: &Path) -> Result<Vec<Self>, FilterError> {
+
        Filters::from_file(filename)
+
    }
+
}
+

+
#[derive(Deserialize)]
+
struct Filters {
+
    filters: Vec<EventFilter>,
+
}
+

+
impl Filters {
+
    fn from_file(filename: &Path) -> Result<Vec<EventFilter>, FilterError> {
+
        let data =
+
            std::fs::read(filename).map_err(|e| FilterError::ReadFile(filename.into(), e))?;
+
        let filters: Self =
+
            serde_yml::from_slice(&data).map_err(|e| FilterError::ParseYaml(filename.into(), e))?;
+
        Ok(filters.filters)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use radicle::prelude::{Did, RepoId};
+

+
    use super::*;
+

+
    fn did() -> Did {
+
        Did::decode("did:key:z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV").unwrap()
+
    }
+

+
    fn rid() -> RepoId {
+
        const RID: &str = "rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5";
+
        RepoId::from_urn(RID).unwrap()
+
    }
+

+
    fn other_rid() -> RepoId {
+
        const RID: &str = "rad:zwTxygwuz5LDGBq255RA2CbNGrz8";
+
        RepoId::from_urn(RID).unwrap()
+
    }
+

+
    fn oid_from(oid: &str) -> Oid {
+
        Oid::try_from(oid).unwrap()
+
    }
+

+
    fn oid() -> Oid {
+
        const OID: &str = "ff3099ba5de28d954c41d0b5a84316f943794ea4";
+
        oid_from(OID)
+
    }
+

+
    fn other_oid() -> Oid {
+
        const OID: &str = "bde68ac76ce093bcc583aa612f45e13fee2353a0";
+
        oid_from(OID)
+
    }
+

+
    fn patch_id() -> PatchId {
+
        PatchId::from(oid())
+
    }
+

+
    fn other_patch_id() -> PatchId {
+
        PatchId::from(other_oid())
+
    }
+

+
    fn refstring(s: &str) -> RefString {
+
        RefString::try_from(s).unwrap()
+
    }
+

+
    fn shutdown() -> CiEvent {
+
        CiEvent::Shutdown
+
    }
+

+
    fn all_events(
+
        did: Did,
+
        repo: RepoId,
+
        branch: RefString,
+
        patch: PatchId,
+
        tip: Oid,
+
        old_tip: Oid,
+
    ) -> Vec<CiEvent> {
+
        vec![
+
            CiEvent::BranchCreated {
+
                from_node: did.into(),
+
                repo,
+
                branch: branch.clone(),
+
                tip,
+
            },
+
            CiEvent::BranchUpdated {
+
                from_node: did.into(),
+
                repo,
+
                branch: branch.clone(),
+
                tip,
+
                old_tip,
+
            },
+
            CiEvent::BranchDeleted { repo, branch, tip },
+
            CiEvent::PatchCreated {
+
                from_node: did.into(),
+
                repo,
+
                patch,
+
                new_tip: tip,
+
            },
+
            CiEvent::PatchUpdated {
+
                from_node: did.into(),
+
                repo,
+
                patch,
+
                new_tip: tip,
+
            },
+
        ]
+
    }
+

+
    // Verify that shutdown is allowed, even when filtering for
+
    // something else.
+
    #[test]
+
    fn allows_shutdown() {
+
        let filter = EventFilter::Repository(rid());
+
        assert!(filter.allows(&shutdown()))
+
    }
+

+
    #[test]
+
    fn allows_all_for_default_repository() {
+
        let filter = EventFilter::Repository(rid());
+
        let events = all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid());
+
        assert!(events.iter().all(|e| filter.allows(e)));
+
    }
+

+
    #[test]
+
    fn doesnt_allow_any_for_other_repository() {
+
        let filter = EventFilter::Repository(rid());
+
        let events = all_events(
+
            did(),
+
            other_rid(),
+
            refstring("main"),
+
            patch_id(),
+
            oid(),
+
            oid(),
+
        );
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_all_for_main_branch() {
+
        let filter = EventFilter::Branch(refstring("main"));
+
        let events = all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid());
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter().filter(|e| {
+
            matches!(
+
                e,
+
                CiEvent::BranchCreated { .. }
+
                    | CiEvent::BranchUpdated { .. }
+
                    | CiEvent::BranchDeleted { .. }
+
            )
+
        }) {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn doesnt_allow_any_for_other_branch() {
+
        let filter = EventFilter::Branch(refstring("main"));
+
        let events = all_events(
+
            did(),
+
            other_rid(),
+
            refstring("other"),
+
            patch_id(),
+
            oid(),
+
            oid(),
+
        );
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_branch_creation() {
+
        let filter = EventFilter::BranchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::BranchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_branch_creation() {
+
        let filter = EventFilter::BranchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::BranchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_branch_update() {
+
        let filter = EventFilter::BranchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::BranchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_branch_update() {
+
        let filter = EventFilter::BranchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::BranchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_branch_deletion() {
+
        let filter = EventFilter::BranchDeleted;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::BranchDeleted { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_branch_deletion() {
+
        let filter = EventFilter::BranchDeleted;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::BranchDeleted { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_specific_patch() {
+
        let filter = EventFilter::Patch(oid());
+
        let events = all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid());
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter().filter(|e| {
+
            matches!(
+
                e,
+
                CiEvent::PatchCreated { .. } | CiEvent::PatchUpdated { .. }
+
            )
+
        }) {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn doesnt_allows_other_patch() {
+
        let filter = EventFilter::Patch(oid());
+
        let events = all_events(
+
            did(),
+
            rid(),
+
            refstring("main"),
+
            other_patch_id(),
+
            oid(),
+
            oid(),
+
        );
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter().filter(|e| {
+
            matches!(
+
                e,
+
                CiEvent::PatchCreated { .. } | CiEvent::PatchUpdated { .. }
+
            )
+
        }) {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_patch_creation() {
+
        let filter = EventFilter::PatchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::PatchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_patch_creation() {
+
        let filter = EventFilter::PatchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::PatchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_patch_update() {
+
        let filter = EventFilter::PatchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::PatchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_patch_update() {
+
        let filter = EventFilter::PatchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::PatchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_any_event() {
+
        let filter = EventFilter::Allow;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_no_event() {
+
        let filter = EventFilter::Deny;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_opposite() {
+
        let filter = EventFilter::Not(Box::new(EventFilter::Deny));
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_if_all_allow() {
+
        let filter = EventFilter::And(vec![
+
            Box::new(EventFilter::Allow),
+
            Box::new(EventFilter::Allow),
+
        ]);
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_if_any_allows() {
+
        let filter = EventFilter::Or(vec![
+
            Box::new(EventFilter::Deny),
+
            Box::new(EventFilter::Allow),
+
        ]);
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum FilterError {
+
    #[error("failed to read event filters file {0}")]
+
    ReadFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to parse YAML event filters file {0}")]
+
    ParseYaml(PathBuf, #[source] serde_yml::Error),
+
}
modified src/lib.rs
@@ -7,9 +7,11 @@

pub mod adapter;
pub mod broker;
+
pub mod ci_event;
+
pub mod ci_event_source;
pub mod config;
pub mod db;
-
pub mod event;
+
pub mod filter;
pub mod logger;
pub mod msg;
pub mod node_event_source;
modified src/logger.rs
@@ -4,11 +4,19 @@
use std::sync::Once;
use std::{path::Path, sync::Mutex, time::Duration};

-
use radicle::{git::raw::Oid, identity::RepoId};
+
use radicle::{git::raw::Oid, identity::RepoId, node::Event};
use slog::{debug, error, info, o, warn, Drain};
use slog_scope::GlobalLoggerGuard;

-
use crate::{config::Config, db::QueueId, event::BrokerEvent, msg::Request, run::Run};
+
use crate::{
+
    ci_event::CiEvent,
+
    ci_event_source::CiEventSource,
+
    config::Config,
+
    db::{QueueId, QueuedCiEvent},
+
    msg::Request,
+
    node_event_source::NodeEventSource,
+
    run::Run,
+
};

pub fn open() -> GlobalLoggerGuard {
    let logger = slog_json::Json::new(std::io::stderr())
@@ -64,6 +72,68 @@ pub fn end_cib_in_error() {
    );
}

+
pub fn node_event_source_created(source: &NodeEventSource) {
+
    info!(
+
        slog_scope::logger(),
+
        "created node event source";
+
        "source" => format!("{source:#?}")
+
    );
+
}
+

+
pub fn node_event_source_got_event(event: &Event) {
+
    info!(
+
        slog_scope::logger(),
+
        "node event source received event";
+
        "node_event" => format!("{event:#?}")
+
    );
+
}
+

+
pub fn node_event_source_eof(source: &NodeEventSource) {
+
    info!(
+
        slog_scope::logger(),
+
        "node event source end of file on control socket";
+
        "node_event_source" => format!("{source:#?}")
+
    );
+
}
+

+
pub fn ci_event_source_created(source: &CiEventSource) {
+
    info!(
+
        slog_scope::logger(),
+
        "created CI event source";
+
        "source" => format!("{source:#?}")
+
    );
+
}
+

+
pub fn ci_event_source_got_events(events: &[CiEvent]) {
+
    info!(
+
        slog_scope::logger(),
+
        "CI event source received events";
+
        "ci_events" => format!("{events:#?}")
+
    );
+
}
+

+
pub fn ci_event_source_disconnected() {
+
    info!(
+
        slog_scope::logger(),
+
        "CI event source received disconnection"
+
    );
+
}
+

+
pub fn ci_event_source_end() {
+
    info!(
+
        slog_scope::logger(),
+
        "CI event source was notified end of events"
+
    );
+
}
+

+
pub fn ci_event_source_eof(source: &CiEventSource) {
+
    info!(
+
        slog_scope::logger(),
+
        "CI event source end of file";
+
        "ci_event_source" => format!("{source:#?}")
+
    );
+
}
+

pub fn loaded_config(config: &Config) {
    debug!(slog_scope::logger(), "loaded configuration {config:#?}");
}
@@ -86,8 +156,11 @@ pub fn queueproc_channel_disconnect() {
    );
}

-
pub fn queueproc_picked_event(id: &QueueId) {
-
    info!(slog_scope::logger(), "picked event from queue: {id}");
+
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent) {
+
    info!(
+
        slog_scope::logger(),
+
        "picked event from queue: {id}: {event:#?}"
+
    );
}

pub fn queueproc_remove_event(id: &QueueId) {
@@ -116,7 +189,7 @@ pub fn queueadd_control_socket_close() {
    );
}

-
pub fn queueadd_push_event(e: &BrokerEvent) {
+
pub fn queueadd_push_event(e: &CiEvent) {
    debug!(
        slog_scope::logger(),
        "insert broker event into queue: {e:?}"
@@ -124,7 +197,7 @@ pub fn queueadd_push_event(e: &BrokerEvent) {
}

pub fn queueadd_end() {
-
    info!(slog_scope::logger(), "start thread to process events ends");
+
    info!(slog_scope::logger(), "thread to process events ends");
}

pub fn pages_directory_unset() {
modified src/msg.rs
@@ -31,7 +31,7 @@ use radicle::{
    Profile,
};

-
use crate::event::{parse_ref, push_branch, BrokerEvent, ParsedRef};
+
use crate::ci_event::CiEvent;

// This gets put into every [`Request`] message so the adapter can
// detect its getting a message it knows how to handle.
@@ -110,7 +110,7 @@ impl fmt::Display for RunResult {
#[derive(Debug, Default)]
pub struct RequestBuilder<'a> {
    profile: Option<&'a Profile>,
-
    event: Option<&'a BrokerEvent>,
+
    ci_event: Option<&'a CiEvent>,
}

impl<'a> RequestBuilder<'a> {
@@ -120,172 +120,295 @@ impl<'a> RequestBuilder<'a> {
        self
    }

-
    /// Set the broker event to use.
-
    pub fn broker_event(mut self, event: &'a BrokerEvent) -> Self {
-
        self.event = Some(event);
+
    /// Set the CI event to use.
+
    pub fn ci_event(mut self, event: &'a CiEvent) -> Self {
+
        self.ci_event = Some(event);
        self
    }

-
    /// Create a [`Request::Trigger`] message.
-
    pub fn build_trigger(self) -> Result<Request, MessageError> {
+
    /// Create a [`Request::Trigger``] message from a [`crate::ci_event::Civet`].
+
    pub fn build_trigger_from_ci_event(self) -> Result<Request, MessageError> {
        let profile = self.profile.ok_or(MessageError::NoProfile)?;
-
        let event = self.event.ok_or(MessageError::NoEvent)?;
-

-
        let (rid, name, oid, old) = match event {
-
            BrokerEvent::Shutdown => panic!("got shutdown"),
-
            BrokerEvent::RefChanged {
-
                rid,
-
                name,
-
                oid,
-
                old,
-
            } => (rid, name, oid, old),
-
        };
-

-
        let rad_repo = profile.storage.repository(*rid)?;
-
        let git_repo = radicle_surf::Repository::open(paths::repository(&profile.storage, rid))?;
-
        let project_info = rad_repo.project()?;
-
        let msg_repository = Repository {
-
            id: *rid,
-
            name: project_info.name().to_string(),
-
            description: project_info.description().to_string(),
-
            private: !rad_repo.identity()?.visibility.is_public(),
-
            default_branch: project_info.default_branch().to_string(),
-
            delegates: rad_repo.delegates()?.iter().copied().collect(),
-
        };

-
        let author = match extract_author(profile, event) {
-
            Ok(author) => author,
-
            Err(err) => {
-
                return Err(err);
+
        match self.ci_event {
+
            None => Err(MessageError::CiEventNotSet),
+
            Some(CiEvent::BranchCreated {
+
                from_node,
+
                repo,
+
                branch,
+
                tip,
+
            }) => {
+
                let rad_repo = profile.storage.repository(*repo)?;
+
                let project_info = rad_repo.project()?;
+

+
                let common = EventCommonFields {
+
                    version: PROTOCOL_VERSION,
+
                    event_type: EventType::Push,
+
                    repository: Repository {
+
                        id: *repo,
+
                        name: project_info.name().to_string(),
+
                        description: project_info.description().to_string(),
+
                        private: !rad_repo.identity()?.visibility.is_public(),
+
                        default_branch: project_info.default_branch().to_string(),
+
                        delegates: rad_repo.delegates()?.iter().copied().collect(),
+
                    },
+
                };
+

+
                let did = Did::from(*from_node);
+
                let pusher = did_to_author(profile, &did)?;
+

+
                let push = PushEvent {
+
                    pusher,
+
                    before: *tip, // Branch created: we only use the tip
+
                    after: *tip,
+
                    branch: push_branch(branch),
+
                    commits: vec![*tip], // Branch created, only use tip.
+
                };
+
                Ok(Request::Trigger {
+
                    common,
+
                    push: Some(push),
+
                    patch: None,
+
                })
            }
-
        };
+
            Some(CiEvent::BranchUpdated {
+
                from_node,
+
                repo,
+
                branch,
+
                tip,
+
                old_tip,
+
            }) => {
+
                let rad_repo = profile.storage.repository(*repo)?;
+
                let git_repo =
+
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))?;
+
                let project_info = rad_repo.project()?;
+

+
                let common = EventCommonFields {
+
                    version: PROTOCOL_VERSION,
+
                    event_type: EventType::Push,
+
                    repository: Repository {
+
                        id: *repo,
+
                        name: project_info.name().to_string(),
+
                        description: project_info.description().to_string(),
+
                        private: !rad_repo.identity()?.visibility.is_public(),
+
                        default_branch: project_info.default_branch().to_string(),
+
                        delegates: rad_repo.delegates()?.iter().copied().collect(),
+
                    },
+
                };
+

+
                let did = Did::from(*from_node);
+
                let pusher = did_to_author(profile, &did)?;
+

+
                let mut commits: Vec<Oid> = git_repo
+
                    .history(tip)?
+
                    .take_while(|c| {
+
                        if let Ok(c) = c {
+
                            c.id != *old_tip
+
                        } else {
+
                            false
+
                        }
+
                    })
+
                    .map(|r| r.map(|c| c.id))
+
                    .collect::<Result<Vec<Oid>, _>>()?;
+
                if commits.is_empty() {
+
                    commits = vec![*old_tip];
+
                }

-
        match parse_ref(name)? {
-
            None => Err(MessageError::NoEventHandler),
-
            Some(ParsedRef::Patch(_oid)) => Ok(Request::Trigger {
-
                common: EventCommonFields {
+
                let push = PushEvent {
+
                    pusher,
+
                    before: *tip, // Branch created: we only use the tip
+
                    after: *tip,
+
                    branch: push_branch(branch),
+
                    commits,
+
                };
+

+
                Ok(Request::Trigger {
+
                    common,
+
                    push: Some(push),
+
                    patch: None,
+
                })
+
            }
+
            Some(CiEvent::PatchCreated {
+
                from_node,
+
                repo,
+
                patch: patch_id,
+
                new_tip,
+
            }) => {
+
                let rad_repo = profile.storage.repository(*repo)?;
+
                let git_repo =
+
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))?;
+
                let project_info = rad_repo.project()?;
+

+
                let common = EventCommonFields {
                    version: PROTOCOL_VERSION,
                    event_type: EventType::Patch,
-
                    repository: msg_repository,
-
                },
-
                push: None,
-
                patch: Some(
-
                    self.build_trigger_from_patch(event, rad_repo, &git_repo, profile, author)?,
-
                ),
-
            }),
-
            Some(ParsedRef::Push(_branch)) => Ok(Request::Trigger {
-
                common: EventCommonFields {
-
                    version: PROTOCOL_VERSION,
-
                    event_type: EventType::Push,
-
                    repository: msg_repository,
-
                },
-
                push: Some(self.build_trigger_from_push(
-
                    git_repo,
+
                    repository: Repository {
+
                        id: *repo,
+
                        name: project_info.name().to_string(),
+
                        description: project_info.description().to_string(),
+
                        private: !rad_repo.identity()?.visibility.is_public(),
+
                        default_branch: project_info.default_branch().to_string(),
+
                        delegates: rad_repo.delegates()?.iter().copied().collect(),
+
                    },
+
                };
+

+
                let did = Did::from(*from_node);
+
                let author = did_to_author(profile, &did)?;
+

+
                let patch_cob = patch::Patches::open(&rad_repo)?
+
                    .get(patch_id)?
+
                    .ok_or(MessageError::Trigger)?;
+

+
                let revisions: Vec<Revision> = patch_cob
+
                    .revisions()
+
                    .map(|(rid, r)| {
+
                        Ok::<Revision, MessageError>(Revision {
+
                            id: rid.into(),
+
                            author: did_to_author(profile, r.author().id())?,
+
                            description: r.description().to_string(),
+
                            base: *r.base(),
+
                            oid: r.head(),
+
                            timestamp: r.timestamp().as_secs(),
+
                        })
+
                    })
+
                    .collect::<Result<Vec<Revision>, MessageError>>()?;
+
                let patch_author_pk = radicle::crypto::PublicKey::from(author.id);
+
                let patch_latest_revision = patch_cob
+
                    .latest_by(&patch_author_pk)
+
                    .ok_or(MessageError::Trigger)?;
+
                let patch_base = patch_latest_revision.1.base();
+
                let commits: Vec<Oid> = git_repo
+
                    .history(*new_tip)?
+
                    .take_while(|c| {
+
                        if let Ok(c) = c {
+
                            c.id != *patch_base
+
                        } else {
+
                            false
+
                        }
+
                    })
+
                    .map(|r| r.map(|c| c.id))
+
                    .collect::<Result<Vec<Oid>, _>>()?;
+

+
                let patch = Patch {
+
                    id: **patch_id,
                    author,
-
                    name,
-
                    old.unwrap_or(*oid),
-
                    *oid,
-
                )?),
-
                patch: None,
-
            }),
-
        }
-
    }
-

-
    fn build_trigger_from_patch(
-
        &self,
-
        event: &BrokerEvent,
-
        repository: radicle::storage::git::Repository,
-
        repo: &radicle_surf::Repository,
-
        profile: &Profile,
-
        author: Author,
-
    ) -> Result<PatchEvent, MessageError> {
-
        let patch_id = event.patch_id()?.ok_or(MessageError::Trigger)?;
-
        let patch = patch::Patches::open(&repository)?
-
            .get(&patch_id.into())?
-
            .ok_or(MessageError::Trigger)?;
-

-
        let revs: Vec<Revision> = patch
-
            .revisions()
-
            .map(|(rid, r)| {
-
                Ok::<Revision, MessageError>(Revision {
-
                    id: rid.into(),
-
                    author: did_to_author(profile, r.author().id())?,
-
                    description: r.description().to_string(),
-
                    base: *r.base(),
-
                    oid: r.head(),
-
                    timestamp: r.timestamp().as_secs(),
+
                    title: patch_cob.title().to_string(),
+
                    state: State {
+
                        status: patch_cob.state().to_string(),
+
                        conflicts: match patch_cob.state() {
+
                            patch::State::Open { conflicts, .. } => conflicts.to_vec(),
+
                            _ => vec![],
+
                        },
+
                    },
+
                    before: *patch_base,
+
                    after: *new_tip,
+
                    commits,
+
                    target: patch_cob.target().head(&rad_repo)?,
+
                    labels: patch_cob.labels().map(|l| l.name().to_string()).collect(),
+
                    assignees: patch_cob.assignees().collect(),
+
                    revisions,
+
                };
+

+
                Ok(Request::Trigger {
+
                    common,
+
                    push: None,
+
                    patch: Some(PatchEvent {
+
                        action: PatchAction::Created,
+
                        patch,
+
                    }),
                })
-
            })
-
            .collect::<Result<Vec<Revision>, MessageError>>()?;
-
        let patch_author_pk = radicle::crypto::PublicKey::from(author.id);
-
        let patch_latest_revision = patch
-
            .latest_by(&patch_author_pk)
-
            .ok_or(MessageError::Trigger)?;
-
        let patch_head = patch_latest_revision.1.head();
-
        let patch_base = patch_latest_revision.1.base();
-
        let patch_commits: Vec<Oid> = repo
-
            .history(patch_head)?
-
            .take_while(|c| {
-
                if let Ok(c) = c {
-
                    c.id != *patch_base
-
                } else {
-
                    false
-
                }
-
            })
-
            .map(|r| r.map(|c| c.id))
-
            .collect::<Result<Vec<Oid>, _>>()?;
-
        let action = if patch.revisions().count() > 1 {
-
            PatchAction::Updated
-
        } else {
-
            PatchAction::Created
-
        };
-
        Ok(PatchEvent {
-
            action,
-
            patch: Patch {
-
                id: patch_id,
-
                author,
-
                title: patch.title().to_string(),
-
                state: State {
-
                    status: patch.state().to_string(),
-
                    conflicts: match patch.state() {
-
                        patch::State::Open { conflicts, .. } => conflicts.to_vec(),
-
                        _ => vec![],
+
            }
+
            Some(CiEvent::PatchUpdated {
+
                from_node,
+
                repo,
+
                patch: patch_id,
+
                new_tip,
+
            }) => {
+
                let rad_repo = profile.storage.repository(*repo)?;
+
                let git_repo =
+
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))?;
+
                let project_info = rad_repo.project()?;
+

+
                let common = EventCommonFields {
+
                    version: PROTOCOL_VERSION,
+
                    event_type: EventType::Patch,
+
                    repository: Repository {
+
                        id: *repo,
+
                        name: project_info.name().to_string(),
+
                        description: project_info.description().to_string(),
+
                        private: !rad_repo.identity()?.visibility.is_public(),
+
                        default_branch: project_info.default_branch().to_string(),
+
                        delegates: rad_repo.delegates()?.iter().copied().collect(),
                    },
-
                },
-
                before: *patch_base,
-
                after: patch_head,
-
                commits: patch_commits,
-
                target: patch.target().head(&repository)?,
-
                labels: patch.labels().map(|l| l.name().to_string()).collect(),
-
                assignees: patch.assignees().collect(),
-
                revisions: revs,
-
            },
-
        })
-
    }
-

-
    fn build_trigger_from_push(
-
        &self,
-
        repo: radicle_surf::Repository,
-
        pusher: Author,
-
        branch: &str,
-
        before: Oid,
-
        after: Oid,
-
    ) -> Result<PushEvent, MessageError> {
-
        let mut push_commits: Vec<Oid> = repo
-
            .history(after)?
-
            .take_while(|c| if let Ok(c) = c { c.id != before } else { false })
-
            .map(|r| r.map(|c| c.id))
-
            .collect::<Result<Vec<Oid>, _>>()?;
-
        if push_commits.is_empty() {
-
            push_commits = vec![before];
+
                };
+

+
                let did = Did::from(*from_node);
+
                let author = did_to_author(profile, &did)?;
+

+
                let patch_cob = patch::Patches::open(&rad_repo)?
+
                    .get(patch_id)?
+
                    .ok_or(MessageError::Trigger)?;
+

+
                let revisions: Vec<Revision> = patch_cob
+
                    .revisions()
+
                    .map(|(rid, r)| {
+
                        Ok::<Revision, MessageError>(Revision {
+
                            id: rid.into(),
+
                            author: did_to_author(profile, r.author().id())?,
+
                            description: r.description().to_string(),
+
                            base: *r.base(),
+
                            oid: r.head(),
+
                            timestamp: r.timestamp().as_secs(),
+
                        })
+
                    })
+
                    .collect::<Result<Vec<Revision>, MessageError>>()?;
+
                let patch_author_pk = radicle::crypto::PublicKey::from(author.id);
+
                let patch_latest_revision = patch_cob
+
                    .latest_by(&patch_author_pk)
+
                    .ok_or(MessageError::Trigger)?;
+
                let patch_base = patch_latest_revision.1.base();
+
                let commits: Vec<Oid> = git_repo
+
                    .history(*new_tip)?
+
                    .take_while(|c| {
+
                        if let Ok(c) = c {
+
                            c.id != *patch_base
+
                        } else {
+
                            false
+
                        }
+
                    })
+
                    .map(|r| r.map(|c| c.id))
+
                    .collect::<Result<Vec<Oid>, _>>()?;
+

+
                let patch = Patch {
+
                    id: **patch_id,
+
                    author,
+
                    title: patch_cob.title().to_string(),
+
                    state: State {
+
                        status: patch_cob.state().to_string(),
+
                        conflicts: match patch_cob.state() {
+
                            patch::State::Open { conflicts, .. } => conflicts.to_vec(),
+
                            _ => vec![],
+
                        },
+
                    },
+
                    before: *patch_base,
+
                    after: *new_tip,
+
                    commits,
+
                    target: patch_cob.target().head(&rad_repo)?,
+
                    labels: patch_cob.labels().map(|l| l.name().to_string()).collect(),
+
                    assignees: patch_cob.assignees().collect(),
+
                    revisions,
+
                };
+

+
                Ok(Request::Trigger {
+
                    common,
+
                    push: None,
+
                    patch: Some(PatchEvent {
+
                        action: PatchAction::Updated,
+
                        patch,
+
                    }),
+
                })
+
            }
+
            _ => Err(MessageError::UnknownCiEvent(self.ci_event.unwrap().clone())),
        }
-
        Ok(PushEvent {
-
            pusher,
-
            before,
-
            after,
-
            branch: push_branch(branch),
-
            commits: push_commits,
-
        })
    }
}

@@ -386,16 +509,6 @@ fn did_to_author(profile: &Profile, did: &Did) -> Result<Author, MessageError> {
    Ok(Author { id: *did, alias })
}

-
fn extract_author(profile: &Profile, event: &BrokerEvent) -> Result<Author, MessageError> {
-
    let nid = match event.nid()? {
-
        Some(nid) => nid,
-
        None => {
-
            return Err(MessageError::Trigger);
-
        }
-
    };
-
    did_to_author(profile, &Did::from(nid))
-
}
-

impl fmt::Display for Request {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
@@ -716,6 +829,14 @@ pub enum MessageError {
    #[error("RequestBuilder has no event handler set")]
    NoEventHandler,

+
    /// We got a CI event we don't know what to do with.
+
    #[error("programming error: unknown CI event {0:?}")]
+
    UnknownCiEvent(CiEvent),
+

+
    /// CI event was not set for [`Requestbuilder`].
+
    #[error("programming error: CI event was not set for request builder")]
+
    CiEventNotSet,
+

    /// Request message lacks commits to run CI on.
    #[error("unacceptable request message: lacks Git commits to run CI on")]
    NoCommits,
@@ -759,7 +880,7 @@ pub enum MessageError {
    #[error(transparent)]
    RadicleProfile(#[from] radicle::profile::Error),

-
    /// Error retrieving context to generate trigger from BrokerEvent
+
    /// Error retrieving context to generate trigger.
    #[error("could not generate trigger from event")]
    Trigger,

@@ -767,10 +888,6 @@ pub enum MessageError {
    #[error(transparent)]
    StorageError(#[from] radicle::storage::Error),

-
    /// Error from event module.
-
    #[error(transparent)]
-
    EventError(#[from] crate::event::BrokerEventError),
-

    /// Error from Radicle repository.
    #[error(transparent)]
    RepositoryError(#[from] radicle::storage::RepositoryError),
@@ -789,10 +906,9 @@ pub enum MessageError {
}

#[cfg(test)]
-
pub mod tests {
-
    use crate::event::BrokerEvent;
+
pub mod trigger_from_ci_event_tests {
+
    use crate::ci_event::CiEvent;
    use crate::msg::{EventType, Request, RequestBuilder};
-
    use radicle::git::raw::Oid;
    use radicle::git::RefString;
    use radicle::patch::{MergeTarget, Patches};
    use radicle::prelude::Did;
@@ -801,7 +917,58 @@ pub mod tests {
    use crate::test::{MockNode, TestResult};

    #[test]
-
    fn trigger_push() -> TestResult<()> {
+
    fn trigger_push_from_branch_created() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
+

+
        let project = mock_node.node().project();
+
        let (_, repo_head) = project.repo.head()?;
+
        let cmt = radicle::test::fixtures::commit(
+
            "my test commit",
+
            &[repo_head.into()],
+
            &project.backend,
+
        );
+

+
        let ci_event = CiEvent::BranchCreated {
+
            from_node: *profile.id(),
+
            repo: project.id,
+
            branch: RefString::try_from(
+
                "refs/namespaces/$nid/refs/heads/master".replace("$nid", &profile.id().to_string()),
+
            )?,
+
            tip: cmt,
+
        };
+

+
        let req = RequestBuilder::default()
+
            .profile(&profile)
+
            .ci_event(&ci_event)
+
            .build_trigger_from_ci_event()?;
+
        let Request::Trigger {
+
            common,
+
            push,
+
            patch,
+
        } = req;
+

+
        assert!(patch.is_none());
+
        assert!(push.is_some());
+
        assert_eq!(common.event_type, EventType::Push);
+
        assert_eq!(common.repository.id, project.id);
+
        assert_eq!(common.repository.name, project.repo.project()?.name());
+

+
        let push = push.unwrap();
+
        assert_eq!(push.after, cmt);
+
        assert_eq!(push.before, cmt); // in this case of branch creation
+
        assert_eq!(
+
            push.branch,
+
            "master".replace("$nid", &profile.id().to_string())
+
        );
+
        assert_eq!(push.commits, vec![cmt]);
+
        assert_eq!(push.pusher.id, Did::from(profile.id()));
+

+
        Ok(())
+
    }
+

+
    #[test]
+
    fn trigger_push_from_branch_updated() -> TestResult<()> {
        let mock_node = MockNode::new()?;
        let profile = mock_node.profile()?;

@@ -813,19 +980,20 @@ pub mod tests {
            &project.backend,
        );

-
        let be = BrokerEvent::RefChanged {
-
            rid: project.id,
-
            name: RefString::try_from(
+
        let ci_event = CiEvent::BranchUpdated {
+
            from_node: *profile.id(),
+
            repo: project.id,
+
            branch: RefString::try_from(
                "refs/namespaces/$nid/refs/heads/master".replace("$nid", &profile.id().to_string()),
            )?,
-
            oid: cmt,
-
            old: Some(repo_head),
+
            old_tip: repo_head,
+
            tip: cmt,
        };

        let req = RequestBuilder::default()
            .profile(&profile)
-
            .broker_event(&be)
-
            .build_trigger()?;
+
            .ci_event(&ci_event)
+
            .build_trigger_from_ci_event()?;
        let Request::Trigger {
            common,
            push,
@@ -840,7 +1008,7 @@ pub mod tests {

        let push = push.unwrap();
        assert_eq!(push.after, cmt);
-
        assert_eq!(push.before, repo_head);
+
        assert_eq!(push.before, cmt); // in this case of branch creation
        assert_eq!(
            push.branch,
            "master".replace("$nid", &profile.id().to_string())
@@ -852,7 +1020,7 @@ pub mod tests {
    }

    #[test]
-
    fn trigger_patch() -> TestResult<()> {
+
    fn trigger_patch_from_patch_created() -> TestResult<()> {
        let mock_node = MockNode::new()?;
        let profile = mock_node.profile()?;

@@ -879,21 +1047,17 @@ pub mod tests {
            &node.signer,
        )?;

-
        let be = BrokerEvent::RefChanged {
-
            rid: project.id,
-
            name: RefString::try_from(
-
                "refs/namespaces/$nid/refs/heads/patches/$patchId"
-
                    .replace("$nid", &profile.id().to_string())
-
                    .replace("$patchId", &patch_cob.id.to_string()),
-
            )?,
-
            oid: radicle_git_ext::Oid::from(Oid::from_str(&patch_cob.id.to_string())?),
-
            old: None,
+
        let ci_event = CiEvent::PatchCreated {
+
            from_node: *profile.id(),
+
            repo: project.id,
+
            patch: *patch_cob.id(),
+
            new_tip: cmt,
        };

        let req = RequestBuilder::default()
            .profile(&profile)
-
            .broker_event(&be)
-
            .build_trigger()?;
+
            .ci_event(&ci_event)
+
            .build_trigger_from_ci_event()?;
        let Request::Trigger {
            common,
            push,
@@ -926,4 +1090,113 @@ pub mod tests {

        Ok(())
    }
+

+
    #[test]
+
    fn trigger_patch_from_patch_updated() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
+

+
        let project = mock_node.node().project();
+
        let (_, repo_head) = project.repo.head()?;
+
        let cmt = radicle::test::fixtures::commit(
+
            "my test commit",
+
            &[repo_head.into()],
+
            &project.backend,
+
        );
+

+
        let node = mock_node.node();
+

+
        let mut patches = Patches::open(&project.repo)?;
+
        let mut cache = radicle::cob::cache::NoCache;
+
        let patch_cob = patches.create(
+
            "my patch title",
+
            "my patch description",
+
            MergeTarget::Delegates,
+
            repo_head,
+
            cmt,
+
            &[],
+
            &mut cache,
+
            &node.signer,
+
        )?;
+

+
        let ci_event = CiEvent::PatchUpdated {
+
            from_node: *profile.id(),
+
            repo: project.id,
+
            patch: *patch_cob.id(),
+
            new_tip: cmt,
+
        };
+

+
        let req = RequestBuilder::default()
+
            .profile(&profile)
+
            .ci_event(&ci_event)
+
            .build_trigger_from_ci_event()?;
+
        let Request::Trigger {
+
            common,
+
            push,
+
            patch,
+
        } = req;
+

+
        assert!(patch.is_some());
+
        assert!(push.is_none());
+
        assert_eq!(common.event_type, EventType::Patch);
+
        assert_eq!(common.repository.id, project.id);
+
        assert_eq!(common.repository.name, project.repo.project()?.name());
+

+
        let patch = patch.unwrap();
+
        assert_eq!(patch.action.as_str(), "updated");
+
        assert_eq!(patch.patch.id.to_string(), patch_cob.id.to_string());
+
        assert_eq!(patch.patch.title, patch_cob.title());
+
        assert_eq!(patch.patch.state.status, patch_cob.state().to_string());
+
        assert_eq!(patch.patch.target, repo_head);
+
        assert_eq!(patch.patch.revisions.len(), 1);
+
        let rev = patch.patch.revisions.first().unwrap();
+
        assert_eq!(rev.id.to_string(), patch_cob.id.to_string());
+
        assert_eq!(rev.base, repo_head);
+
        assert_eq!(rev.oid, cmt);
+
        assert_eq!(rev.author.id, Did::from(profile.id()));
+
        assert_eq!(rev.description, patch_cob.description());
+
        assert_eq!(rev.timestamp, patch_cob.timestamp().as_secs());
+
        assert_eq!(patch.patch.after, cmt);
+
        assert_eq!(patch.patch.before, repo_head);
+
        assert_eq!(patch.patch.commits, vec![cmt]);
+

+
        Ok(())
+
    }
+
}
+

+
// Parse a Git ref to get the branch it refers to. If it doesn't
+
// return to a branch, return the empty string.
+
fn push_branch(name: &str) -> String {
+
    let mut parts = name.split("/refs/heads/");
+
    if let Some(suffix) = parts.nth(1) {
+
        if parts.next().is_none() {
+
            return suffix.to_string();
+
        }
+
    }
+
    "".to_string()
+
}
+

+
#[cfg(test)]
+
mod test_push_branch {
+
    use super::push_branch;
+

+
    #[test]
+
    fn get_push_branch() {
+
        assert_eq!(
+
            push_branch(
+
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/branch_name"
+
            ),
+
            "branch_name".to_string()
+
        );
+
    }
+

+
    #[test]
+
    fn get_no_push_branch() {
+
        assert_eq!(
+
            push_branch(
+
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/rad/sigrefs"
+
            ),
+
            "".to_string()
+
        );
+
    }
}
modified src/node_event_source.rs
@@ -1,6 +1,6 @@
//! Read node events from the local node.

-
use std::{path::PathBuf, time};
+
use std::{fmt, path::PathBuf, time};

use radicle::{
    node::{Event, Handle},
@@ -10,11 +10,8 @@ use radicle::{
use crate::logger;

/// Source of events from the local Radicle node.
-
///
-
/// The events are filtered. Only events allowed by at least one
-
/// filter are returned. See [`NodeEventSource::allow`] and
-
/// [`EventFilter`].
pub struct NodeEventSource {
+
    profile_path: PathBuf,
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
}

@@ -27,30 +24,40 @@ impl NodeEventSource {
            return Err(NodeEventError::NoControlSocket(socket));
        }
        let node = radicle::Node::new(socket.clone());
-
        match node.subscribe(time::Duration::MAX) {
+
        let source = match node.subscribe(time::Duration::MAX) {
            Ok(events) => Ok(Self {
+
                profile_path: profile.home.path().into(),
                events: Box::new(events.into_iter()),
            }),
            Err(err) => {
                logger::error("failed to subscribe to node events", &err);
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
            }
-
        }
+
        }?;
+
        logger::node_event_source_created(&source);
+
        Ok(source)
    }

    /// Get the next node event from an event source, without
    /// filtering. This will block until there is an event, or until
    /// there will be no more events from this source, or there's an
    /// error.
+
    ///
+
    /// A closed or broken connection to the node is not an error,
+
    /// it's treated as end of file.
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
+
        logger::debug("node_event: try to get an event");
        if let Some(event) = self.events.next() {
            match event {
-
                Ok(event) => Ok(Some(event)),
+
                Ok(event) => {
+
                    logger::node_event_source_got_event(&event);
+
                    Ok(Some(event))
+
                }
                Err(radicle::node::Error::Io(err))
                    if err.kind() == std::io::ErrorKind::ConnectionReset =>
                {
                    logger::event_disconnected();
-
                    Err(NodeEventError::BrokenConnection)
+
                    Ok(None)
                }
                Err(err) => {
                    logger::error("error reading event from node", &err);
@@ -58,11 +65,18 @@ impl NodeEventSource {
                }
            }
        } else {
+
            logger::node_event_source_eof(self);
            Ok(None)
        }
    }
}

+
impl fmt::Debug for NodeEventSource {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "NodeEventSource<path={}", self.profile_path.display())
+
    }
+
}
+

/// Possible errors from accessing the local Radicle node.
#[derive(Debug, thiserror::Error)]
pub enum NodeEventError {
modified src/pages.rs
@@ -21,14 +21,15 @@ use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

use radicle::{
+
    git::ext::Oid,
    prelude::RepoId,
    storage::{ReadRepository, ReadStorage},
    Profile,
};

use crate::{
-
    db::{Db, DbError, QueuedEvent},
-
    event::{parse_ref, BrokerEvent, ParsedRef},
+
    ci_event::CiEvent,
+
    db::{Db, DbError, QueuedCiEvent},
    logger,
    msg::RunId,
    notif::NotificationReceiver,
@@ -77,9 +78,9 @@ struct PageData {
    ci_broker_git_commit: &'static str,
    node_alias: String,
    runs: HashMap<RunId, Run>,
-
    events: Vec<QueuedEvent>,
+
    events: Vec<QueuedCiEvent>,
    broker_event_counter: usize,
-
    latest_broker_event: Option<BrokerEvent>,
+
    latest_broker_event: Option<CiEvent>,
    latest_ci_run: Option<Run>,
}

@@ -160,41 +161,58 @@ impl PageData {
                    .with_child(Element::new(Tag::Th).with_text("Event")),
            );
        for event in self.events.iter() {
+
            fn render_event(repo: &RepoId, refname: &str, commit: &Oid) -> Element {
+
                Element::new(Tag::Span)
+
                    .with_child(
+
                        Element::new(Tag::Span)
+
                            .with_class("repoid")
+
                            .with_text(&repo.to_string()),
+
                    )
+
                    .with_child(Element::new(Tag::Br))
+
                    .with_child(Element::new(Tag::Span).with_class("ref").with_text(refname))
+
                    .with_child(Element::new(Tag::Br))
+
                    .with_child(
+
                        Element::new(Tag::Span)
+
                            .with_class("commit")
+
                            .with_text(&commit.to_string()),
+
                    )
+
            }
+

+
            let event_element = match event.event() {
+
                CiEvent::Shutdown => Element::new(Tag::Span).with_text("shutdown"),
+
                CiEvent::BranchCreated {
+
                    from_node: _,
+
                    repo,
+
                    branch,
+
                    tip,
+
                } => render_event(repo, branch, tip),
+
                CiEvent::BranchUpdated {
+
                    from_node: _,
+
                    repo,
+
                    branch,
+
                    tip,
+
                    old_tip: _,
+
                } => render_event(repo, branch, tip),
+
                CiEvent::BranchDeleted { repo, branch, tip } => render_event(repo, branch, tip),
+
                CiEvent::PatchCreated {
+
                    from_node: _,
+
                    repo,
+
                    patch,
+
                    new_tip,
+
                } => render_event(repo, &patch.to_string(), new_tip),
+
                CiEvent::PatchUpdated {
+
                    from_node: _,
+
                    repo,
+
                    patch,
+
                    new_tip,
+
                } => render_event(repo, &patch.to_string(), new_tip),
+
            };
+

            table.push_child(
                Element::new(Tag::Tr)
                    .with_child(Element::new(Tag::Td).with_text(&event.id().to_string()))
                    .with_child(Element::new(Tag::Td).with_text(event.timestamp()))
-
                    .with_child(
-
                        Element::new(Tag::Td).with_child(match event.event() {
-
                            BrokerEvent::Shutdown => Element::new(Tag::Span).with_text("shutdown"),
-
                            BrokerEvent::RefChanged {
-
                                rid,
-
                                name,
-
                                oid,
-
                                old: _,
-
                            } => Element::new(Tag::Span)
-
                                .with_child(
-
                                    Element::new(Tag::Span)
-
                                        .with_class("repoid")
-
                                        .with_text(&rid.to_string()),
-
                                )
-
                                .with_child(Element::new(Tag::Br))
-
                                .with_child(Element::new(Tag::Span).with_class("ref").with_text(
-
                                    &match parse_ref(name) {
-
                                        Ok(Some(ParsedRef::Push(branch))) => branch,
-
                                        Ok(Some(ParsedRef::Patch(patch))) => patch.to_string(),
-
                                        Ok(None) => "unknown".into(),
-
                                        Err(_) => "unknown".into(),
-
                                    },
-
                                ))
-
                                .with_child(Element::new(Tag::Br))
-
                                .with_child(
-
                                    Element::new(Tag::Span)
-
                                        .with_class("commit")
-
                                        .with_text(&oid.to_string()),
-
                                ),
-
                        }),
-
                    ),
+
                    .with_child(Element::new(Tag::Td).with_child(event_element)),
            );
        }
        doc.push_to_body(table);
@@ -523,19 +541,23 @@ impl StatusPage {

            // Create list of events, except ones for private
            // repositories.
-
            let events: Result<Vec<QueuedEvent>, PageError> = db
-
                .queued_events()?
+
            let events: Result<Vec<QueuedCiEvent>, PageError> = db
+
                .queued_ci_events()?
                .iter()
-
                .filter_map(|id| match db.get_queued_event(id) {
+
                .filter_map(|id| match db.get_queued_ci_event(id) {
                    Ok(Some(event)) => match event.event() {
-
                        BrokerEvent::Shutdown => Some(Ok(event)),
-
                        BrokerEvent::RefChanged { rid, .. } => {
-
                            if Self::is_public_repo(profile, rid) {
+
                        CiEvent::Shutdown => Some(Ok(event)),
+
                        CiEvent::BranchCreated { repo, .. }
+
                        | CiEvent::BranchUpdated { repo, .. }
+
                        | CiEvent::PatchCreated { repo, .. }
+
                        | CiEvent::PatchUpdated { repo, .. } => {
+
                            if Self::is_public_repo(profile, repo) {
                                Some(Ok(event))
                            } else {
                                None
                            }
                        }
+
                        _ => None,
                    },
                    Ok(None) => None, // Event is (no longer?) in database.
                    Err(_) => None,   // We ignore error here on purpose.
@@ -615,7 +637,7 @@ struct StatusData {
    broker_event_counter: usize,
    ci_broker_version: &'static str,
    ci_broker_git_commit: &'static str,
-
    latest_broker_event: Option<BrokerEvent>,
+
    latest_broker_event: Option<CiEvent>,
    latest_ci_run: Option<Run>,
    event_queue_length: usize,
}
modified src/queueadd.rs
@@ -3,8 +3,10 @@ use std::thread::{spawn, JoinHandle};
use radicle::Profile;

use crate::{
+
    ci_event::CiEvent,
+
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError},
-
    event::{BrokerEvent, BrokerEventError, BrokerEventSource, EventFilter},
+
    filter::EventFilter,
    logger,
    notif::NotificationSender,
};
@@ -13,7 +15,6 @@ use crate::{
pub struct QueueAdderBuilder {
    db: Option<Db>,
    filters: Option<Vec<EventFilter>>,
-
    push_shutdown: bool,
    events_tx: Option<NotificationSender>,
}

@@ -22,7 +23,6 @@ impl QueueAdderBuilder {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            filters: self.filters.ok_or(AdderError::Missing("filters"))?,
-
            push_shutdown: self.push_shutdown,
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
        })
    }
@@ -41,17 +41,11 @@ impl QueueAdderBuilder {
        self.filters = Some(filters.to_vec());
        self
    }
-

-
    pub fn push_shutdown(mut self) -> Self {
-
        self.push_shutdown = true;
-
        self
-
    }
}

pub struct QueueAdder {
    filters: Vec<EventFilter>,
    db: Db,
-
    push_shutdown: bool,
    events_tx: NotificationSender,
}

@@ -65,39 +59,40 @@ impl QueueAdder {

        let profile = Profile::load()?;

-
        let mut source = BrokerEventSource::new(&profile)?;
-

-
        for filter in self.filters.iter() {
-
            source.allow(filter.clone());
-
        }
+
        let mut source = CiEventSource::new(&profile)?;

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
        'event_loop: loop {
-
            let events = source.event()?;
-
            if events.is_empty() {
-
                logger::queueadd_control_socket_close();
-
                break 'event_loop;
-
            } else {
-
                for e in events {
-
                    logger::queueadd_push_event(&e);
-
                    self.push_event(e)?;
+
            let events = source.event();
+
            logger::debug2(format!("queueadd: events={events:?}"));
+
            match events {
+
                Err(e) => {
+
                    logger::queueadd_control_socket_close();
+
                    return Err(e.into());
+
                }
+
                Ok(None) => {
+
                    break 'event_loop;
+
                }
+
                Ok(Some(events)) => {
+
                    for e in events {
+
                        for filter in self.filters.iter() {
+
                            if filter.allows(&e) {
+
                                logger::queueadd_push_event(&e);
+
                                self.push_event(e.clone())?;
+
                            }
+
                        }
+
                    }
                }
            }
        }

-
        // Add a shutdown event to the queue so that the queue
-
        // processing thread knows to stop.
-
        if self.push_shutdown {
-
            self.push_event(BrokerEvent::Shutdown)?;
-
        }
-

        logger::queueadd_end();
        Ok(())
    }

-
    fn push_event(&self, e: BrokerEvent) -> Result<(), AdderError> {
-
        self.db.push_queued_event(e)?;
+
    fn push_event(&self, e: CiEvent) -> Result<(), AdderError> {
+
        self.db.push_queued_ci_event(e)?;
        self.events_tx.send(()).map_err(|_| AdderError::Send)?;
        Ok(())
    }
@@ -112,7 +107,7 @@ pub enum AdderError {
    Profile(#[from] radicle::profile::Error),

    #[error(transparent)]
-
    NodeEvent(#[from] BrokerEventError),
+
    CiEvent(#[from] CiEventSourceError),

    #[error(transparent)]
    Db(#[from] DbError),
modified src/queueproc.rs
@@ -12,8 +12,8 @@ use radicle::Profile;

use crate::{
    broker::{Broker, BrokerError},
-
    db::{Db, DbError, QueueId, QueuedEvent},
-
    event::BrokerEvent,
+
    ci_event::CiEvent,
+
    db::{Db, DbError, QueueId, QueuedCiEvent},
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
@@ -79,7 +79,7 @@ impl QueueProcessor {
        let mut done = false;
        while !done {
            while let Some(qe) = self.pick_event()? {
-
                logger::queueproc_picked_event(qe.id());
+
                logger::queueproc_picked_event(qe.id(), &qe);
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
@@ -98,13 +98,13 @@ impl QueueProcessor {
        Ok(())
    }

-
    fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
-
        let ids = self.db.queued_events().map_err(QueueError::db)?;
+
    fn pick_event(&self) -> Result<Option<QueuedCiEvent>, QueueError> {
+
        let ids = self.db.queued_ci_events().map_err(QueueError::db)?;
        logger::debug2(format!("queue in db has {} events", ids.len()));

        let mut queue = vec![];
        for id in ids.iter() {
-
            if let Some(qe) = self.db.get_queued_event(id).map_err(QueueError::db)? {
+
            if let Some(qe) = self.db.get_queued_ci_event(id).map_err(QueueError::db)? {
                queue.push(qe);
            }
        }
@@ -117,36 +117,54 @@ impl QueueProcessor {
        }
    }

-
    fn process_event(&mut self, event: &BrokerEvent) -> Result<bool, QueueError> {
+
    fn process_event(&mut self, event: &CiEvent) -> Result<bool, QueueError> {
        match event {
-
            BrokerEvent::RefChanged {
-
                rid,
-
                name: _,
-
                oid,
-
                old: _,
+
            CiEvent::Shutdown => {
+
                logger::queueproc_action_shutdown();
+
                Ok(true)
+
            }
+
            CiEvent::BranchCreated {
+
                from_node: _,
+
                repo,
+
                branch: _,
+
                tip,
            } => {
-
                logger::queueproc_action_run(rid, oid);
-

+
                logger::queueproc_action_run(repo, tip);
                let trigger = RequestBuilder::default()
                    .profile(&self.profile)
-
                    .broker_event(event)
-
                    .build_trigger()
+
                    .ci_event(event)
+
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e))?;
                self.broker
                    .execute_ci(&trigger)
                    .map_err(QueueError::execute_ci)?;
                Ok(false)
            }
-
            BrokerEvent::Shutdown => {
-
                logger::queueproc_action_shutdown();
-
                Ok(true)
+
            CiEvent::BranchUpdated {
+
                from_node: _,
+
                repo,
+
                branch: _,
+
                tip,
+
                old_tip: _,
+
            } => {
+
                logger::queueproc_action_run(repo, tip);
+
                let trigger = RequestBuilder::default()
+
                    .profile(&self.profile)
+
                    .ci_event(event)
+
                    .build_trigger_from_ci_event()
+
                    .map_err(|e| QueueError::build_trigger(event, e))?;
+
                self.broker
+
                    .execute_ci(&trigger)
+
                    .map_err(QueueError::execute_ci)?;
+
                Ok(false)
            }
+
            _ => unimplemented!("unknown CI event {event:#?}"),
        }
    }

    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
        logger::queueproc_remove_event(id);
-
        self.db.remove_queued_event(id).map_err(QueueError::db)?;
+
        self.db.remove_queued_ci_event(id).map_err(QueueError::db)?;
        Ok(())
    }
}
@@ -164,7 +182,7 @@ pub enum QueueError {
    Db(#[source] DbError),

    #[error("failed to create a trigger message from broker event {0:?}")]
-
    BuildTrigger(BrokerEvent, #[source] MessageError),
+
    BuildTrigger(CiEvent, #[source] MessageError),

    #[error("failed to run CI")]
    ExecuteCi(#[source] BrokerError),
@@ -178,7 +196,7 @@ impl QueueError {
        Self::Db(e)
    }

-
    fn build_trigger(event: &BrokerEvent, err: MessageError) -> Self {
+
    fn build_trigger(event: &CiEvent, err: MessageError) -> Self {
        Self::BuildTrigger(event.clone(), err)
    }

modified src/test.rs
@@ -5,7 +5,7 @@ use std::{
};

use crate::adapter::Adapter;
-
use crate::event::BrokerEvent;
+
use crate::ci_event::CiEvent;
use crate::msg::{Request, RequestBuilder};
use radicle::crypto::ssh::Keystore;
use radicle::crypto::test::signer::MockSigner;
@@ -64,19 +64,19 @@ pub fn trigger_request() -> TestResult<Request> {
    let cmt =
        radicle::test::fixtures::commit("my test commit", &[repo_head.into()], &project.backend);

-
    let be = BrokerEvent::RefChanged {
-
        rid: project.id,
-
        name: RefString::try_from(
+
    let ci_event = CiEvent::BranchCreated {
+
        from_node: *profile.id(),
+
        repo: project.id,
+
        branch: RefString::try_from(
            "refs/namespaces/$nid/refs/heads/master".replace("$nid", &profile.id().to_string()),
        )?,
-
        oid: cmt,
-
        old: Some(repo_head),
+
        tip: cmt,
    };

    Ok(RequestBuilder::default()
        .profile(&profile)
-
        .broker_event(&be)
-
        .build_trigger()?)
+
        .ci_event(&ci_event)
+
        .build_trigger_from_ci_event()?)
}

pub fn mock_adapter(filename: &Path, script: &str) -> TestResult<Adapter> {