Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
chore: drop old broker event code, which isn't used anymore
Lars Wirzenius committed 1 year ago
commit f147c77d3270ca14b08670fe7e4c8253fc449922
parent 07612f53e9f77c08991a37d898724b337071b33a
7 files changed +52 -1062
modified src/bin/cibtool.rs
@@ -32,7 +32,6 @@ use radicle_ci_broker::{
    broker::BrokerError,
    ci_event::{CiEvent, CiEventError},
    db::{Db, DbError, QueueId, QueuedCiEvent},
-
    event::{BrokerEvent, BrokerEventError},
    logger,
    msg::{RunId, RunResult},
    notif::NotificationChannel,
modified src/db.rs
@@ -21,7 +21,7 @@ use sqlite::{Connection, State, Statement};
use time::{macros::format_description, OffsetDateTime};
use uuid::Uuid;

-
use crate::{ci_event::CiEvent, event::BrokerEvent, msg::RunId, run::Run};
+
use crate::{ci_event::CiEvent, msg::RunId, run::Run};

// how long to retry when SQL fails for busy database
const MAX_WAIT: Duration = Duration::from_millis(60_000);
@@ -171,6 +171,10 @@ impl Db {
    }

    /// Return list of identifiers for broker events currently in the queue.
+
    ///
+
    /// Note that there is no longer any way to retrieve the broker
+
    /// events. This method is meant only for making sure there are no
+
    /// unprocessed broker events in the queue.
    pub fn queued_events(&self) -> Result<Vec<QueueId>, DbError> {
        let mut select = self.prepare("SELECT id FROM event_queue")?;

@@ -553,31 +557,6 @@ impl QueueId {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
-
pub struct QueuedEvent {
-
    id: QueueId,
-
    ts: String,
-
    event: BrokerEvent,
-
}
-

-
impl QueuedEvent {
-
    // fn new(id: QueueId, ts: String, event: BrokerEvent) -> Self {
-
    //     Self { id, ts, event }
-
    // }
-

-
    pub fn id(&self) -> &QueueId {
-
        &self.id
-
    }
-

-
    pub fn timestamp(&self) -> &str {
-
        &self.ts
-
    }
-

-
    pub fn event(&self) -> &BrokerEvent {
-
        &self.event
-
    }
-
}
-

-
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QueuedCiEvent {
    id: QueueId,
    ts: String,
deleted src/event.rs
@@ -1,694 +0,0 @@
-
//! Get events from local node.
-
//!
-
//! [`BrokerEventSource`] subscribes to the local node for node
-
//! events, creates them to corresponding sets of broker events, and
-
//! returns the events allowed by the configured filters.
-
//!
-
//! Events can be filtered based on various criteria and can be
-
//! combined with logical operators. Filters can be read from a JSON
-
//! file so that they're easy for a user to define.
-
//!
-
//! # Example
-
//!
-
//! ```
-
//! use radicle_ci_broker::event::Filters;
-
//! let filters = r#"{
-
//!   "filters": [
-
//!     {
-
//!       "And": [
-
//!         {
-
//!           "Repository": "rad:z3bBRYgzcYYBNjipFdDTwPgHaihPX"
-
//!         },
-
//!         {
-
//!           "RefSuffix": "refs/heads/main"
-
//!         }
-
//!       ]
-
//!     }
-
//!   ]
-
//! }"#;
-
//! let e = Filters::try_from(filters).unwrap();
-
//! ```
-

-
use radicle::{
-
    node::{Event, NodeId},
-
    prelude::RepoId,
-
    storage::RefUpdate,
-
    Profile,
-
};
-
use radicle_git_ext::{ref_format::RefString, Oid};
-
use regex::Regex;
-
use serde::{Deserialize, Serialize};
-
use std::{
-
    fs::read,
-
    path::{Path, PathBuf},
-
};
-

-
use crate::{
-
    logger,
-
    node_event_source::{NodeEventError, NodeEventSource},
-
};
-

-
/// Source of events from the local Radicle node.
-
///
-
/// The events are filtered. Only events allowed by at least one
-
/// filter are returned. See [`BrokerEventSource::allow`] and
-
/// [`EventFilter`].
-
pub struct BrokerEventSource {
-
    source: NodeEventSource,
-
    allowed: Vec<EventFilter>,
-
}
-

-
impl BrokerEventSource {
-
    /// Create a new source of node events, for a given Radicle
-
    /// profile.
-
    pub fn new(profile: &Profile) -> Result<Self, BrokerEventError> {
-
        let source = NodeEventSource::new(profile).map_err(BrokerEventError::CannotSubscribe)?;
-
        Ok(Self {
-
            source,
-
            allowed: vec![],
-
        })
-
    }
-

-
    /// Add an event filter for allowed events for this event source.
-
    pub fn allow(&mut self, filter: EventFilter) {
-
        self.allowed.push(filter);
-
    }
-

-
    fn allowed(&self, event: &BrokerEvent) -> Result<bool, BrokerEventError> {
-
        for filter in self.allowed.iter() {
-
            if !event.is_allowed(filter)? {
-
                return Ok(false);
-
            }
-
        }
-
        Ok(true)
-
    }
-

-
    /// Get the allowed next event from an event source. This will
-
    /// block until there is an allowed event, or until there will be
-
    /// no more events from this source, or there's an error.
-
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, BrokerEventError> {
-
        loop {
-
            match self.source.node_event() {
-
                Err(NodeEventError::BrokenConnection) => {
-
                    logger::event_disconnected();
-
                    return Err(BrokerEventError::BrokenConnection);
-
                }
-
                Err(err) => {
-
                    logger::error("error reading event from node", &err);
-
                    return Err(BrokerEventError::NodeEventError(err));
-
                }
-
                Ok(None) => {
-
                    logger::event_end();
-
                    return Ok(vec![]);
-
                }
-
                Ok(Some(event)) => {
-
                    let mut result = vec![];
-
                    if let Some(broker_events) = BrokerEvent::from_event(&event) {
-
                        for e in broker_events {
-
                            if self.allowed(&e)? {
-
                                result.push(e);
-
                            }
-
                        }
-
                        if !result.is_empty() {
-
                            return Ok(result);
-
                        }
-
                    }
-
                }
-
            }
-
        }
-
    }
-
}
-

-
/// Possible errors from accessing the local Radicle node.
-
#[derive(Debug, thiserror::Error)]
-
pub enum BrokerEventError {
-
    /// Can't create a [`NodeEventSource`].
-
    #[error("failed to subscribe to node events")]
-
    CannotSubscribe(#[source] NodeEventError),
-

-
    /// Regex compilation error.
-
    #[error("programming error in regular expression {0:?}")]
-
    Regex(&'static str, regex::Error),
-

-
    /// Some error from getting an event from the node.
-
    #[error("failed to get next event from node")]
-
    NodeEventError(#[from] NodeEventError),
-

-
    /// Connection to the node control socket broke.
-
    #[error("connection to the node control socket broke")]
-
    BrokenConnection,
-

-
    /// Some error from parsing a repository id.
-
    #[error(transparent)]
-
    Id(#[from] radicle::identity::IdError),
-

-
    /// An error reading a filter file.
-
    #[error("failed to read filter file: {0}")]
-
    ReadFilterFile(PathBuf, #[source] std::io::Error),
-

-
    /// An error parsing JSON as filters, when read from a file.
-
    #[error("failed to parser filters file: {0}")]
-
    FiltersJsonFile(PathBuf, #[source] serde_json::Error),
-

-
    /// An error parsing YAML as filters, when read from a file.
-
    #[error("failed to parser filters file: {0}")]
-
    FiltersYamlFile(PathBuf, #[source] serde_yml::Error),
-

-
    /// An error parsing JSON as filters, from an in-memory string.
-
    #[error("failed to parser filters as JSON")]
-
    FiltersJsonString(#[source] serde_json::Error),
-

-
    /// An error parsing a Git object id as string into an Oid.
-
    #[error("failed to parse string as a Git object id: {0:?}")]
-
    ParseOid(String, #[source] radicle::git::raw::Error),
-
}
-

-
/// An event filter for allowing events. Or an "AND" combination of events.
-
///
-
/// NOTE: Adding "OR" and "NOT" would be easy, too.
-
#[derive(Debug, Clone, Deserialize, Serialize)]
-
#[serde(deny_unknown_fields)]
-
pub enum EventFilter {
-
    /// Event concerns a specific repository.
-
    Repository(RepoId),
-

-
    /// Event concerns a git ref that ends with a given string.
-
    RefSuffix(String),
-

-
    /// Event concerns a specific git branch.
-
    Branch(String),
-

-
    /// Event concerns any Radicle patch.
-
    AnyPatch,
-

-
    /// Event concerns changes on specific Radicle patch.
-
    Patch(String),
-

-
    /// Event concerns changed refs on any Radicle patch branch.
-
    AnyPatchRef,
-

-
    /// Event concerns changed refs on any Radicle branch.
-
    AnyPushRef,
-

-
    /// Event concerns changed refs on the branch of the specified Radicle patch.
-
    PatchRef(String),
-

-
    /// Combine any number of filters that both must allow the events.
-
    And(Vec<Box<Self>>),
-

-
    /// Combine any number of filters such that at least one allows the events.
-
    Or(Vec<Box<Self>>),
-

-
    /// Combine any number of filters such that none allows the events.
-
    Not(Vec<Box<Self>>),
-
}
-

-
impl EventFilter {
-
    /// Create a filter for a repository.
-
    pub fn repository(rid: &str) -> Result<Self, BrokerEventError> {
-
        Ok(Self::Repository(RepoId::from_urn(rid)?))
-
    }
-

-
    /// Create a filter for a git ref that ends with a string.
-
    pub fn glob(pattern: &str) -> Result<Self, BrokerEventError> {
-
        Ok(Self::RefSuffix(pattern.into()))
-
    }
-

-
    /// Create a filter combining other filters with AND.
-
    pub fn and(conds: &[Self]) -> Self {
-
        Self::And(conds.iter().map(|c| Box::new(c.clone())).collect())
-
    }
-

-
    /// Create a filter combining other filters with OR.
-
    pub fn or(conds: &[Self]) -> Self {
-
        Self::Or(conds.iter().map(|c| Box::new(c.clone())).collect())
-
    }
-

-
    /// Create a filter combining other filters with NOT.
-
    pub fn not(conds: &[Self]) -> Self {
-
        Self::Not(conds.iter().map(|c| Box::new(c.clone())).collect())
-
    }
-

-
    /// Read filters from a JSON file.
-
    ///
-
    /// This function is the same as reading a file and calling
-
    /// [`Filters::try_from`], but returns just a vector of filters
-
    /// instead of a `Filter`.
-
    ///
-
    /// See the module description for an example of the file content.
-
    pub fn from_file(filename: &Path) -> Result<Vec<Self>, BrokerEventError> {
-
        let filters =
-
            read(filename).map_err(|e| BrokerEventError::ReadFilterFile(filename.into(), e))?;
-
        let filters: Filters = serde_json::from_slice(&filters)
-
            .map_err(|e| BrokerEventError::FiltersJsonFile(filename.into(), e))?;
-
        Ok(filters.filters)
-
    }
-

-
    /// Read filters from a YAML file.
-
    pub fn from_yaml_file(filename: &Path) -> Result<Vec<Self>, BrokerEventError> {
-
        let filters =
-
            read(filename).map_err(|e| BrokerEventError::ReadFilterFile(filename.into(), e))?;
-
        let filters: Filters = serde_yml::from_slice(&filters)
-
            .map_err(|e| BrokerEventError::FiltersYamlFile(filename.into(), e))?;
-
        Ok(filters.filters)
-
    }
-
}
-

-
/// A set of filters for [`NodeEventSource`] to use. This struct
-
/// represents the serialized set of filters. See the module
-
/// description for an example.
-
#[derive(Debug, Deserialize, Serialize)]
-
pub struct Filters {
-
    filters: Vec<EventFilter>,
-
}
-

-
impl TryFrom<&str> for Filters {
-
    type Error = BrokerEventError;
-

-
    fn try_from(s: &str) -> Result<Self, Self::Error> {
-
        serde_json::from_str(s).map_err(BrokerEventError::FiltersJsonString)
-
    }
-
}
-

-
/// A single node event can represent many git refs having changed,
-
/// but that's hard to process or filter. The broker breaks up such
-
/// complex events to simpler ones that only affect one ref at a time.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
-
pub enum BrokerEvent {
-
    /// Request the CI broker shuts down in an orderly fashion.
-
    Shutdown,
-

-
    /// A git ref in a git repository has changed to refer to a given
-
    /// commit. This covers both the case of a new ref, and the case
-
    /// of a changed ref.
-
    RefChanged {
-
        /// Repository id.
-
        rid: RepoId,
-

-
        /// Ref name.
-
        name: RefString,
-

-
        /// New git commit.
-
        oid: Oid,
-

-
        /// Old git commit
-
        old: Option<Oid>,
-
    },
-
}
-

-
impl BrokerEvent {
-
    pub fn new(rid: &RepoId, name: &RefString, oid: &Oid, old: Option<Oid>) -> Self {
-
        Self::RefChanged {
-
            rid: *rid,
-
            name: name.clone(),
-
            oid: *oid,
-
            old,
-
        }
-
    }
-

-
    pub fn shutdown() -> Self {
-
        Self::Shutdown
-
    }
-

-
    /// Break up a potentially complex node event into a vector of
-
    /// simpler broker events.
-
    pub fn from_event(e: &Event) -> Option<Vec<Self>> {
-
        if let Event::RefsFetched {
-
            remote: _,
-
            rid,
-
            updated,
-
        } = e
-
        {
-
            let mut events = vec![];
-
            for up in updated {
-
                match up {
-
                    RefUpdate::Skipped { name, oid }
-
                        if name.as_str() == "shutdown" && oid.is_zero() =>
-
                    {
-
                        events.push(Self::shutdown());
-
                    }
-
                    RefUpdate::Created { name, oid } => {
-
                        events.push(Self::new(rid, name, oid, None));
-
                    }
-
                    RefUpdate::Updated { name, old, new } => {
-
                        events.push(Self::new(rid, name, new, Some(*old)));
-
                    }
-
                    _ => (),
-
                }
-
            }
-
            Some(events)
-
        } else {
-
            None
-
        }
-
    }
-

-
    /// Is this broker event allowed by a filter?
-
    pub fn is_allowed(&self, filter: &EventFilter) -> Result<bool, BrokerEventError> {
-
        let res = self.is_allowed_helper(filter)?;
-
        Ok(res)
-
    }
-

-
    fn is_allowed_helper(&self, filter: &EventFilter) -> Result<bool, BrokerEventError> {
-
        let allowed = match self {
-
            Self::Shutdown => true,
-
            Self::RefChanged {
-
                rid,
-
                name,
-
                oid: _,
-
                old: _,
-
            } => {
-
                let parsed = parse_ref(name)?;
-

-
                match filter {
-
                    EventFilter::Repository(wanted) => rid == wanted,
-
                    EventFilter::RefSuffix(wanted) => name.ends_with(wanted),
-
                    EventFilter::Branch(wanted) => parsed == Some(ParsedRef::Push(wanted.into())),
-
                    EventFilter::AnyPatch => matches!(parsed, Some(ParsedRef::Patch(_))),
-
                    EventFilter::Patch(wanted) => {
-
                        let oid = Oid::try_from(wanted.as_str())
-
                            .map_err(|e| BrokerEventError::ParseOid(wanted.into(), e))?;
-
                        parsed == Some(ParsedRef::Patch(oid))
-
                    }
-
                    EventFilter::AnyPatchRef => matches!(parsed, Some(ParsedRef::Patch(_))),
-
                    EventFilter::AnyPushRef => matches!(parsed, Some(ParsedRef::Push(_))),
-
                    EventFilter::PatchRef(wanted) => {
-
                        let oid = Oid::try_from(wanted.as_str())
-
                            .map_err(|e| BrokerEventError::ParseOid(wanted.into(), e))?;
-
                        parsed == Some(ParsedRef::Patch(oid))
-
                    }
-
                    EventFilter::And(conds) => conds
-
                        .iter()
-
                        .all(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
-
                    EventFilter::Or(conds) => conds
-
                        .iter()
-
                        .any(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
-
                    EventFilter::Not(conds) => !conds
-
                        .iter()
-
                        .any(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
-
                }
-
            }
-
        };
-

-
        Ok(allowed)
-
    }
-

-
    pub fn name(&self) -> Option<&RefString> {
-
        match self {
-
            BrokerEvent::Shutdown => None,
-
            BrokerEvent::RefChanged { name, .. } => Some(name),
-
        }
-
    }
-

-
    /// Extract the NID from the RefString.
-
    /// The RefString will start with `refs/namespaces/<nid>/...`
-
    pub fn nid(&self) -> Result<Option<NodeId>, BrokerEventError> {
-
        if let Some(name) = self.name() {
-
            Ok(parse_nid_from_refstring(name)?)
-
        } else {
-
            Ok(None)
-
        }
-
    }
-

-
    pub fn patch_id(&self) -> Result<Option<Oid>, BrokerEventError> {
-
        if let Some(name) = self.name() {
-
            if let Some(ParsedRef::Patch(oid)) = parse_ref(name)? {
-
                return Ok(Some(oid));
-
            }
-
        }
-
        Ok(None)
-
    }
-
}
-

-
/// Extract the NID from a the ref string in a repository.
-
/// The RefString should start with `refs/namespaces/<nid>/...`
-
pub fn parse_nid_from_refstring(name: &RefString) -> Result<Option<NodeId>, BrokerEventError> {
-
    const PAT: &str = r"^refs/namespaces/(?P<nid>[^/]+)/";
-
    let pat = Regex::new(PAT).map_err(|e| BrokerEventError::Regex(PAT, e))?;
-
    if let Some(captures) = pat.captures(name.as_str()) {
-
        if let Some(m) = captures.name("nid") {
-
            if let Ok(parsed) = m.as_str().parse() {
-
                return Ok(Some(parsed));
-
            }
-
        }
-
    }
-
    Ok(None)
-
}
-

-
#[cfg(test)]
-
mod test_broker_event {
-
    use std::str::FromStr;
-

-
    use super::{BrokerEvent, NodeId, Oid, RefString, RepoId};
-

-
    #[test]
-
    fn name_for_branch() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from("main")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.name(), Some(&name));
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn name_for_shutdown() {
-
        let be = BrokerEvent::shutdown();
-
        assert_eq!(be.name(), None);
-
    }
-

-
    #[test]
-
    fn nid_for_plain_branch_name() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from("main")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.nid()?, None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn nid_for_ref_without_namespace() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from("something/else/main")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.nid()?, None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn nid_for_ref_with_namespace() -> anyhow::Result<()> {
-
        let rid = RepoId::from_urn("rad:zwTxygwuz5LDGBq255RA2CbNGrz8")?;
-
        let name = RefString::try_from(
-
            "refs/namespaces/z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV/main",
-
        )?;
-
        let nid = NodeId::from_str("z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV")?;
-
        let oid = Oid::from_str("11d03559fb10183b0f14331175f254fbb077159a")?;
-
        let be = BrokerEvent::new(&rid, &name, &oid, None);
-
        assert_eq!(be.nid()?, Some(nid));
-
        Ok(())
-
    }
-
}
-

-
/// Parsed reference to one of the supported types
-
/// Patch with patch ID
-
/// Push with branch name
-
#[derive(Debug, Eq, PartialEq)]
-
pub enum ParsedRef {
-
    Patch(Oid),
-
    Push(String),
-
}
-

-
/// Parse the given reference to a ParsedRef.
-
///
-
/// # Example
-
/// ```Rust
-
/// if let Some(parsed_ref) = parse_ref(name) {
-
///     match parsed_ref {
-
///         ParsedRef::Patch(_oid) => {
-
///             debug!("build_trigger: is patch");
-
///         }
-
///         ParsedRef::Push(_branch) => {
-
///             debug!("build_trigger: is push");
-
///         }
-
///     }
-
/// }
-
/// ```
-
pub fn parse_ref(s: &str) -> Result<Option<ParsedRef>, BrokerEventError> {
-
    const PAT_PATCH: &str = r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$";
-
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| BrokerEventError::Regex(PAT_PATCH, e))?;
-
    if let Some(patch_captures) = patch_re.captures(s) {
-
        if let Some(patch_id) = patch_captures.get(1) {
-
            let patch_id_str = patch_id.as_str();
-
            let oid = Oid::try_from(patch_id_str)
-
                .map_err(|e| BrokerEventError::ParseOid(patch_id_str.into(), e))?;
-
            return Ok(Some(ParsedRef::Patch(oid)));
-
        }
-
    }
-

-
    const PAT_BRANCH: &str = r"^refs/namespaces/[^/]+/refs/heads/(.+)$";
-
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| BrokerEventError::Regex(PAT_BRANCH, e))?;
-
    if let Some(push_captures) = push_re.captures(s) {
-
        if let Some(branch) = push_captures.get(1) {
-
            return Ok(Some(ParsedRef::Push(branch.as_str().to_string())));
-
        }
-
    }
-

-
    Ok(None)
-
}
-

-
#[cfg(test)]
-
mod test_parse_ref {
-
    use super::{parse_ref, Oid, ParsedRef};
-

-
    #[test]
-
    fn plain_branch_name_is_none() -> anyhow::Result<()> {
-
        assert_eq!(parse_ref("main")?, None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn namespaced_branch() -> anyhow::Result<()> {
-
        assert_eq!(
-
            parse_ref(
-
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/main"
-
            )?,
-
            Some(ParsedRef::Push("main".into()))
-
        );
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn namespaced_branch_with_slashes() -> anyhow::Result<()> {
-
        assert_eq!(
-
            parse_ref(
-
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/liw/cob/draft/v2"
-
            )?,
-
            Some(ParsedRef::Push("liw/cob/draft/v2".into()))
-
        );
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn namespaced_patch() -> anyhow::Result<()> {
-
        const SHA: &str = "0a4c69183fc8b8d849f5ab977d70f2a1f4788bca";
-
        assert_eq!(
-
            parse_ref(&format!("refs/namespaces/NID/refs/heads/patches/{SHA}"))?,
-
            Some(ParsedRef::Patch(Oid::try_from(SHA)?))
-
        );
-
        Ok(())
-
    }
-
}
-

-
pub fn is_patch_update(name: &str) -> Option<&str> {
-
    let mut parts = name.split("/refs/cobs/xyz.radicle.patch/");
-
    if let Some(suffix) = parts.nth(1) {
-
        if parts.next().is_none() {
-
            return Some(suffix);
-
        }
-
    }
-
    None
-
}
-

-
pub fn push_branch(name: &str) -> String {
-
    let mut parts = name.split("/refs/heads/");
-
    if let Some(suffix) = parts.nth(1) {
-
        if parts.next().is_none() {
-
            return suffix.to_string();
-
        }
-
    }
-
    "".to_string()
-
}
-

-
#[cfg(test)]
-
mod test {
-
    use super::{is_patch_update, parse_ref, push_branch, Oid, ParsedRef};
-

-
    #[test]
-
    fn test_parse_patch_ref() -> anyhow::Result<()> {
-
        let patch_ref =
-
            "refs/namespaces/NID/refs/heads/patches/9183ed6232687d3105482960cecb01a53018b80a";
-

-
        assert_eq!(
-
            parse_ref(patch_ref)?,
-
            Some(ParsedRef::Patch(Oid::try_from(
-
                "9183ed6232687d3105482960cecb01a53018b80a"
-
            )?))
-
        );
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn test_parse_push_ref() -> anyhow::Result<()> {
-
        let push_ref =
-
            "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main";
-
        let parsed_ref = parse_ref(push_ref)?;
-
        eprintln!("parsed_ref={parsed_ref:#?}");
-
        assert!(parsed_ref.is_some());
-
        if let Some(ref parsed) = parsed_ref {
-
            match parsed {
-
                ParsedRef::Push(branch) => assert_eq!(branch, "main"),
-
                _ => panic!("Expected Push ref"),
-
            }
-
        }
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn test_parse_invalid_ref() -> anyhow::Result<()> {
-
        let invalid_ref = "invalid_ref";
-
        let parsed_ref = parse_ref(invalid_ref)?;
-
        assert!(parsed_ref.is_none());
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn branch_is_not_patch_update() {
-
        assert_eq!(
-
            is_patch_update(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main"
-
            ),
-
            None
-
        );
-
    }
-

-
    #[test]
-
    fn patch_branch_is_not_patch_update() {
-
        assert_eq!(
-
            is_patch_update(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/patches/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
-
            ),
-
            None
-
        );
-
    }
-

-
    #[test]
-
    fn patch_update() {
-
        assert_eq!(
-
            is_patch_update(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/cobs/xyz.radicle.patch/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
-
            ),
-
            Some("bbb54a2c9314a528a4fff9d6c2aae874ed098433")
-
        );
-
    }
-

-
    #[test]
-
    fn get_push_branch() {
-
        assert_eq!(
-
            push_branch(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/branch_name"
-
            ),
-
            "branch_name".to_string()
-
        );
-
    }
-

-
    #[test]
-
    fn get_no_push_branch() {
-
        assert_eq!(
-
            push_branch(
-
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/rad/sigrefs"
-
            ),
-
            "".to_string()
-
        );
-
    }
-
}
modified src/lib.rs
@@ -11,7 +11,6 @@ pub mod ci_event;
pub mod ci_event_source;
pub mod config;
pub mod db;
-
pub mod event;
pub mod filter;
pub mod logger;
pub mod msg;
modified src/msg.rs
@@ -31,10 +31,7 @@ use radicle::{
    Profile,
};

-
use crate::{
-
    ci_event::CiEvent,
-
    event::{parse_ref, push_branch, BrokerEvent, ParsedRef},
-
};
+
use crate::ci_event::CiEvent;

// This gets put into every [`Request`] message so the adapter can
// detect its getting a message it knows how to handle.
@@ -113,7 +110,6 @@ impl fmt::Display for RunResult {
#[derive(Debug, Default)]
pub struct RequestBuilder<'a> {
    profile: Option<&'a Profile>,
-
    event: Option<&'a BrokerEvent>,
    ci_event: Option<&'a CiEvent>,
}

@@ -124,12 +120,6 @@ impl<'a> RequestBuilder<'a> {
        self
    }

-
    /// Set the broker event to use.
-
    pub fn broker_event(mut self, event: &'a BrokerEvent) -> Self {
-
        self.event = Some(event);
-
        self
-
    }
-

    /// Set the CI event to use.
    pub fn ci_event(mut self, event: &'a CiEvent) -> Self {
        self.ci_event = Some(event);
@@ -420,168 +410,6 @@ impl<'a> RequestBuilder<'a> {
            _ => Err(MessageError::UnknownCiEvent(self.ci_event.unwrap().clone())),
        }
    }
-

-
    /// Create a [`Request::Trigger`] message.
-
    pub fn build_trigger(self) -> Result<Request, MessageError> {
-
        let profile = self.profile.ok_or(MessageError::NoProfile)?;
-
        let event = self.event.ok_or(MessageError::NoEvent)?;
-

-
        let (rid, name, oid, old) = match event {
-
            BrokerEvent::Shutdown => panic!("got shutdown"),
-
            BrokerEvent::RefChanged {
-
                rid,
-
                name,
-
                oid,
-
                old,
-
            } => (rid, name, oid, old),
-
        };
-

-
        let rad_repo = profile.storage.repository(*rid)?;
-
        let git_repo = radicle_surf::Repository::open(paths::repository(&profile.storage, rid))?;
-
        let project_info = rad_repo.project()?;
-
        let msg_repository = Repository {
-
            id: *rid,
-
            name: project_info.name().to_string(),
-
            description: project_info.description().to_string(),
-
            private: !rad_repo.identity()?.visibility.is_public(),
-
            default_branch: project_info.default_branch().to_string(),
-
            delegates: rad_repo.delegates()?.iter().copied().collect(),
-
        };
-

-
        let author = match extract_author(profile, event) {
-
            Ok(author) => author,
-
            Err(err) => {
-
                return Err(err);
-
            }
-
        };
-

-
        match parse_ref(name)? {
-
            None => Err(MessageError::NoEventHandler),
-
            Some(ParsedRef::Patch(_oid)) => Ok(Request::Trigger {
-
                common: EventCommonFields {
-
                    version: PROTOCOL_VERSION,
-
                    event_type: EventType::Patch,
-
                    repository: msg_repository,
-
                },
-
                push: None,
-
                patch: Some(
-
                    self.build_trigger_from_patch(event, rad_repo, &git_repo, profile, author)?,
-
                ),
-
            }),
-
            Some(ParsedRef::Push(_branch)) => Ok(Request::Trigger {
-
                common: EventCommonFields {
-
                    version: PROTOCOL_VERSION,
-
                    event_type: EventType::Push,
-
                    repository: msg_repository,
-
                },
-
                push: Some(self.build_trigger_from_push(
-
                    git_repo,
-
                    author,
-
                    name,
-
                    old.unwrap_or(*oid),
-
                    *oid,
-
                )?),
-
                patch: None,
-
            }),
-
        }
-
    }
-

-
    fn build_trigger_from_patch(
-
        &self,
-
        event: &BrokerEvent,
-
        repository: radicle::storage::git::Repository,
-
        repo: &radicle_surf::Repository,
-
        profile: &Profile,
-
        author: Author,
-
    ) -> Result<PatchEvent, MessageError> {
-
        let patch_id = event.patch_id()?.ok_or(MessageError::Trigger)?;
-
        let patch = patch::Patches::open(&repository)?
-
            .get(&patch_id.into())?
-
            .ok_or(MessageError::Trigger)?;
-

-
        let revs: Vec<Revision> = patch
-
            .revisions()
-
            .map(|(rid, r)| {
-
                Ok::<Revision, MessageError>(Revision {
-
                    id: rid.into(),
-
                    author: did_to_author(profile, r.author().id())?,
-
                    description: r.description().to_string(),
-
                    base: *r.base(),
-
                    oid: r.head(),
-
                    timestamp: r.timestamp().as_secs(),
-
                })
-
            })
-
            .collect::<Result<Vec<Revision>, MessageError>>()?;
-
        let patch_author_pk = radicle::crypto::PublicKey::from(author.id);
-
        let patch_latest_revision = patch
-
            .latest_by(&patch_author_pk)
-
            .ok_or(MessageError::Trigger)?;
-
        let patch_head = patch_latest_revision.1.head();
-
        let patch_base = patch_latest_revision.1.base();
-
        let patch_commits: Vec<Oid> = repo
-
            .history(patch_head)?
-
            .take_while(|c| {
-
                if let Ok(c) = c {
-
                    c.id != *patch_base
-
                } else {
-
                    false
-
                }
-
            })
-
            .map(|r| r.map(|c| c.id))
-
            .collect::<Result<Vec<Oid>, _>>()?;
-
        let action = if patch.revisions().count() > 1 {
-
            PatchAction::Updated
-
        } else {
-
            PatchAction::Created
-
        };
-
        Ok(PatchEvent {
-
            action,
-
            patch: Patch {
-
                id: patch_id,
-
                author,
-
                title: patch.title().to_string(),
-
                state: State {
-
                    status: patch.state().to_string(),
-
                    conflicts: match patch.state() {
-
                        patch::State::Open { conflicts, .. } => conflicts.to_vec(),
-
                        _ => vec![],
-
                    },
-
                },
-
                before: *patch_base,
-
                after: patch_head,
-
                commits: patch_commits,
-
                target: patch.target().head(&repository)?,
-
                labels: patch.labels().map(|l| l.name().to_string()).collect(),
-
                assignees: patch.assignees().collect(),
-
                revisions: revs,
-
            },
-
        })
-
    }
-

-
    fn build_trigger_from_push(
-
        &self,
-
        repo: radicle_surf::Repository,
-
        pusher: Author,
-
        branch: &str,
-
        before: Oid,
-
        after: Oid,
-
    ) -> Result<PushEvent, MessageError> {
-
        let mut push_commits: Vec<Oid> = repo
-
            .history(after)?
-
            .take_while(|c| if let Ok(c) = c { c.id != before } else { false })
-
            .map(|r| r.map(|c| c.id))
-
            .collect::<Result<Vec<Oid>, _>>()?;
-
        if push_commits.is_empty() {
-
            push_commits = vec![before];
-
        }
-
        Ok(PushEvent {
-
            pusher,
-
            before,
-
            after,
-
            branch: push_branch(branch),
-
            commits: push_commits,
-
        })
-
    }
}

/// A request message sent by the broker to its adapter child process.
@@ -681,16 +509,6 @@ fn did_to_author(profile: &Profile, did: &Did) -> Result<Author, MessageError> {
    Ok(Author { id: *did, alias })
}

-
fn extract_author(profile: &Profile, event: &BrokerEvent) -> Result<Author, MessageError> {
-
    let nid = match event.nid()? {
-
        Some(nid) => nid,
-
        None => {
-
            return Err(MessageError::Trigger);
-
        }
-
    };
-
    did_to_author(profile, &Did::from(nid))
-
}
-

impl fmt::Display for Request {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
@@ -1062,7 +880,7 @@ pub enum MessageError {
    #[error(transparent)]
    RadicleProfile(#[from] radicle::profile::Error),

-
    /// Error retrieving context to generate trigger from BrokerEvent
+
    /// Error retrieving context to generate trigger.
    #[error("could not generate trigger from event")]
    Trigger,

@@ -1070,10 +888,6 @@ pub enum MessageError {
    #[error(transparent)]
    StorageError(#[from] radicle::storage::Error),

-
    /// Error from event module.
-
    #[error(transparent)]
-
    EventError(#[from] crate::event::BrokerEventError),
-

    /// Error from Radicle repository.
    #[error(transparent)]
    RepositoryError(#[from] radicle::storage::RepositoryError),
@@ -1092,146 +906,6 @@ pub enum MessageError {
}

#[cfg(test)]
-
pub mod tests {
-
    use crate::event::BrokerEvent;
-
    use crate::msg::{EventType, Request, RequestBuilder};
-
    use radicle::git::raw::Oid;
-
    use radicle::git::RefString;
-
    use radicle::patch::{MergeTarget, Patches};
-
    use radicle::prelude::Did;
-
    use radicle::storage::ReadRepository;
-

-
    use crate::test::{MockNode, TestResult};
-

-
    #[test]
-
    fn trigger_push() -> TestResult<()> {
-
        let mock_node = MockNode::new()?;
-
        let profile = mock_node.profile()?;
-

-
        let project = mock_node.node().project();
-
        let (_, repo_head) = project.repo.head()?;
-
        let cmt = radicle::test::fixtures::commit(
-
            "my test commit",
-
            &[repo_head.into()],
-
            &project.backend,
-
        );
-

-
        let be = BrokerEvent::RefChanged {
-
            rid: project.id,
-
            name: RefString::try_from(
-
                "refs/namespaces/$nid/refs/heads/master".replace("$nid", &profile.id().to_string()),
-
            )?,
-
            oid: cmt,
-
            old: Some(repo_head),
-
        };
-

-
        let req = RequestBuilder::default()
-
            .profile(&profile)
-
            .broker_event(&be)
-
            .build_trigger()?;
-
        let Request::Trigger {
-
            common,
-
            push,
-
            patch,
-
        } = req;
-

-
        assert!(patch.is_none());
-
        assert!(push.is_some());
-
        assert_eq!(common.event_type, EventType::Push);
-
        assert_eq!(common.repository.id, project.id);
-
        assert_eq!(common.repository.name, project.repo.project()?.name());
-

-
        let push = push.unwrap();
-
        assert_eq!(push.after, cmt);
-
        assert_eq!(push.before, repo_head);
-
        assert_eq!(
-
            push.branch,
-
            "master".replace("$nid", &profile.id().to_string())
-
        );
-
        assert_eq!(push.commits, vec![cmt]);
-
        assert_eq!(push.pusher.id, Did::from(profile.id()));
-

-
        Ok(())
-
    }
-

-
    #[test]
-
    fn trigger_patch() -> TestResult<()> {
-
        let mock_node = MockNode::new()?;
-
        let profile = mock_node.profile()?;
-

-
        let project = mock_node.node().project();
-
        let (_, repo_head) = project.repo.head()?;
-
        let cmt = radicle::test::fixtures::commit(
-
            "my test commit",
-
            &[repo_head.into()],
-
            &project.backend,
-
        );
-

-
        let node = mock_node.node();
-

-
        let mut patches = Patches::open(&project.repo)?;
-
        let mut cache = radicle::cob::cache::NoCache;
-
        let patch_cob = patches.create(
-
            "my patch title",
-
            "my patch description",
-
            MergeTarget::Delegates,
-
            repo_head,
-
            cmt,
-
            &[],
-
            &mut cache,
-
            &node.signer,
-
        )?;
-

-
        let be = BrokerEvent::RefChanged {
-
            rid: project.id,
-
            name: RefString::try_from(
-
                "refs/namespaces/$nid/refs/heads/patches/$patchId"
-
                    .replace("$nid", &profile.id().to_string())
-
                    .replace("$patchId", &patch_cob.id.to_string()),
-
            )?,
-
            oid: radicle_git_ext::Oid::from(Oid::from_str(&patch_cob.id.to_string())?),
-
            old: None,
-
        };
-

-
        let req = RequestBuilder::default()
-
            .profile(&profile)
-
            .broker_event(&be)
-
            .build_trigger()?;
-
        let Request::Trigger {
-
            common,
-
            push,
-
            patch,
-
        } = req;
-

-
        assert!(patch.is_some());
-
        assert!(push.is_none());
-
        assert_eq!(common.event_type, EventType::Patch);
-
        assert_eq!(common.repository.id, project.id);
-
        assert_eq!(common.repository.name, project.repo.project()?.name());
-

-
        let patch = patch.unwrap();
-
        assert_eq!(patch.action.as_str(), "created");
-
        assert_eq!(patch.patch.id.to_string(), patch_cob.id.to_string());
-
        assert_eq!(patch.patch.title, patch_cob.title());
-
        assert_eq!(patch.patch.state.status, patch_cob.state().to_string());
-
        assert_eq!(patch.patch.target, repo_head);
-
        assert_eq!(patch.patch.revisions.len(), 1);
-
        let rev = patch.patch.revisions.first().unwrap();
-
        assert_eq!(rev.id.to_string(), patch_cob.id.to_string());
-
        assert_eq!(rev.base, repo_head);
-
        assert_eq!(rev.oid, cmt);
-
        assert_eq!(rev.author.id, Did::from(profile.id()));
-
        assert_eq!(rev.description, patch_cob.description());
-
        assert_eq!(rev.timestamp, patch_cob.timestamp().as_secs());
-
        assert_eq!(patch.patch.after, cmt);
-
        assert_eq!(patch.patch.before, repo_head);
-
        assert_eq!(patch.patch.commits, vec![cmt]);
-

-
        Ok(())
-
    }
-
}
-

-
#[cfg(test)]
pub mod trigger_from_ci_event_tests {
    use crate::ci_event::CiEvent;
    use crate::msg::{EventType, Request, RequestBuilder};
@@ -1489,3 +1163,40 @@ pub mod trigger_from_ci_event_tests {
        Ok(())
    }
}
+

+
// Parse a Git ref to get the branch it refers to. If it doesn't
+
// return to a branch, return the empty string.
+
fn push_branch(name: &str) -> String {
+
    let mut parts = name.split("/refs/heads/");
+
    if let Some(suffix) = parts.nth(1) {
+
        if parts.next().is_none() {
+
            return suffix.to_string();
+
        }
+
    }
+
    "".to_string()
+
}
+

+
#[cfg(test)]
+
mod test_push_branch {
+
    use super::push_branch;
+

+
    #[test]
+
    fn get_push_branch() {
+
        assert_eq!(
+
            push_branch(
+
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/branch_name"
+
            ),
+
            "branch_name".to_string()
+
        );
+
    }
+

+
    #[test]
+
    fn get_no_push_branch() {
+
        assert_eq!(
+
            push_branch(
+
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/rad/sigrefs"
+
            ),
+
            "".to_string()
+
        );
+
    }
+
}
modified src/node_event_source.rs
@@ -10,10 +10,6 @@ use radicle::{
use crate::logger;

/// Source of events from the local Radicle node.
-
///
-
/// The events are filtered. Only events allowed by at least one
-
/// filter are returned. See [`NodeEventSource::allow`] and
-
/// [`EventFilter`].
pub struct NodeEventSource {
    events: Box<dyn Iterator<Item = Result<Event, radicle::node::Error>>>,
}
modified src/test.rs
@@ -5,7 +5,7 @@ use std::{
};

use crate::adapter::Adapter;
-
use crate::event::BrokerEvent;
+
use crate::ci_event::CiEvent;
use crate::msg::{Request, RequestBuilder};
use radicle::crypto::ssh::Keystore;
use radicle::crypto::test::signer::MockSigner;
@@ -64,19 +64,19 @@ pub fn trigger_request() -> TestResult<Request> {
    let cmt =
        radicle::test::fixtures::commit("my test commit", &[repo_head.into()], &project.backend);

-
    let be = BrokerEvent::RefChanged {
-
        rid: project.id,
-
        name: RefString::try_from(
+
    let ci_event = CiEvent::BranchCreated {
+
        from_node: *profile.id(),
+
        repo: project.id,
+
        branch: RefString::try_from(
            "refs/namespaces/$nid/refs/heads/master".replace("$nid", &profile.id().to_string()),
        )?,
-
        oid: cmt,
-
        old: Some(repo_head),
+
        tip: cmt,
    };

    Ok(RequestBuilder::default()
        .profile(&profile)
-
        .broker_event(&be)
-
        .build_trigger()?)
+
        .ci_event(&ci_event)
+
        .build_trigger_from_ci_event()?)
}

pub fn mock_adapter(filename: &Path, script: &str) -> TestResult<Adapter> {