Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add new CI events and filter for them
Lars Wirzenius committed 1 year ago
commit 426a9d3315126619a60d0371e62bcad7d75b74ee
parent 96c17ea7aff9d97b100b3710cafddaa0d4f7c869
3 files changed +1134 -0
added src/ci_event.rs
@@ -0,0 +1,605 @@
+
use std::path::{Path, PathBuf};
+

+
use regex::Regex;
+
use serde::{Deserialize, Serialize};
+

+
use radicle_git_ext::Oid;
+

+
use radicle::{
+
    cob::patch::PatchId,
+
    git::RefString,
+
    node::{Event, NodeId},
+
    prelude::RepoId,
+
    storage::RefUpdate,
+
};
+

+
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
+
pub enum CiEvent {
+
    Shutdown,
+
    BranchCreated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        branch: RefString,
+
        tip: Oid,
+
    },
+
    BranchUpdated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        branch: RefString,
+
        tip: Oid,
+
        old_tip: Oid,
+
    },
+
    BranchDeleted {
+
        repo: RepoId,
+
        branch: RefString,
+
        tip: Oid,
+
    },
+
    PatchCreated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        patch: PatchId,
+
        new_tip: Oid,
+
    },
+
    PatchUpdated {
+
        from_node: NodeId,
+
        repo: RepoId,
+
        patch: PatchId,
+
        new_tip: Oid,
+
    },
+
}
+

+
impl CiEvent {
+
    pub fn branch_created(
+
        node: NodeId,
+
        repo: RepoId,
+
        branch: &str,
+
        tip: Oid,
+
    ) -> Result<Self, CiEventError> {
+
        Ok(Self::BranchCreated {
+
            from_node: node,
+
            repo,
+
            branch: RefString::try_from(branch)
+
                .map_err(|e| CiEventError::RefString(branch.into(), e))?,
+
            tip,
+
        })
+
    }
+

+
    pub fn branch_updated(
+
        node: NodeId,
+
        repo: RepoId,
+
        branch: &str,
+
        tip: Oid,
+
        old_tip: Oid,
+
    ) -> Result<Self, CiEventError> {
+
        let branch =
+
            namespaced_branch(branch).map_err(|_| CiEventError::without_namespace2(branch))?;
+
        Ok(Self::BranchUpdated {
+
            from_node: node,
+
            repo,
+
            branch: RefString::try_from(branch.clone())
+
                .map_err(|e| CiEventError::RefString(branch.clone(), e))?,
+
            tip,
+
            old_tip,
+
        })
+
    }
+

+
    pub fn from_node_event(event: &Event) -> Result<Vec<Self>, CiEventError> {
+
        fn ref_string(s: String) -> Result<RefString, CiEventError> {
+
            RefString::try_from(s.clone()).map_err(|e| CiEventError::ref_string(s, e))
+
        }
+

+
        fn branch(ref_name: &str, update: &RefUpdate) -> Result<RefString, CiEventError> {
+
            ref_string(
+
                namespaced_branch(ref_name)
+
                    .map_err(|_| CiEventError::without_namespace(ref_name, update.clone()))?,
+
            )
+
        }
+

+
        match event {
+
            Event::RefsFetched {
+
                remote,
+
                rid,
+
                updated,
+
            } => {
+
                let mut events = vec![];
+
                for update in updated {
+
                    let e = match update {
+
                        RefUpdate::Created { name, oid } => {
+
                            eprintln!("created: {name:?}");
+
                            if let Ok(patch_id) = patch_id(name) {
+
                                CiEvent::PatchCreated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    patch: patch_id,
+
                                    new_tip: *oid,
+
                                }
+
                            } else if let Ok(branch) = namespaced_branch(name) {
+
                                CiEvent::BranchCreated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    branch: ref_string(branch)?,
+
                                    tip: *oid,
+
                                }
+
                            } else {
+
                                eprintln!("don't know what it is, ignoring");
+
                                continue;
+
                            }
+
                        }
+
                        RefUpdate::Updated { name, old, new } => {
+
                            eprintln!("updated: {name:?}");
+
                            if let Ok(patch_id) = patch_id(name) {
+
                                eprintln!("it's a patch");
+
                                CiEvent::PatchUpdated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    patch: patch_id,
+
                                    new_tip: *new,
+
                                }
+
                            } else if let Ok(branch) = namespaced_branch(name) {
+
                                eprintln!("it's a branch update");
+
                                CiEvent::BranchUpdated {
+
                                    from_node: *remote,
+
                                    repo: *rid,
+
                                    branch: ref_string(branch)?,
+
                                    tip: *new,
+
                                    old_tip: *old,
+
                                }
+
                            } else {
+
                                eprintln!("don't know what it is, ignoring");
+
                                continue;
+
                            }
+
                        }
+
                        RefUpdate::Deleted { name, oid } => CiEvent::BranchDeleted {
+
                            repo: *rid,
+
                            branch: branch(name, update)?,
+
                            tip: *oid,
+
                        },
+
                        RefUpdate::Skipped { .. } => continue,
+
                    };
+
                    events.push(e);
+
                }
+
                Ok(events)
+
            }
+
            Event::RefsSynced { .. }
+
            | Event::RefsAnnounced { .. }
+
            | Event::NodeAnnounced { .. }
+
            | Event::SeedDiscovered { .. }
+
            | Event::SeedDropped { .. }
+
            | Event::PeerConnected { .. }
+
            | Event::PeerDisconnected { .. }
+
            | Event::LocalRefsAnnounced { .. }
+
            | Event::UploadPack { .. }
+
            | Event::InventoryAnnounced { .. } => Ok(vec![]),
+
        }
+
    }
+
}
+

+
pub struct CiEvents {
+
    events: Vec<CiEvent>,
+
}
+

+
impl CiEvents {
+
    pub fn from_file(filename: &Path) -> Result<Self, CiEventError> {
+
        let events = std::fs::read(filename).map_err(|e| CiEventError::read_file(filename, e))?;
+
        let events = String::from_utf8(events).map_err(|e| CiEventError::not_utf8(filename, e))?;
+
        let events: Result<Vec<CiEvent>, _> = events.lines().map(serde_json::from_str).collect();
+
        let events = events.map_err(|e| CiEventError::not_json(filename, e))?;
+

+
        Ok(Self { events })
+
    }
+

+
    pub fn iter(&self) -> impl Iterator<Item = &CiEvent> {
+
        self.events.iter()
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum CiEventError {
+
    #[error("updated ref name has no name space: {0:?}): from {1:#?}")]
+
    WithoutNamespace(String, RefUpdate),
+

+
    #[error("updated ref name has no name space: {0:?})")]
+
    WithoutNamespace2(String),
+

+
    #[error("failed to create a RefString from {0:?}")]
+
    RefString(String, radicle::git::fmt::Error),
+

+
    #[error("failed to read broker events file {0}")]
+
    ReadFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("broker events file is not UTF8: {0}")]
+
    NotUtf8(PathBuf, #[source] std::string::FromUtf8Error),
+

+
    #[error("broker events file is not valid JSON: {0}")]
+
    NotJson(PathBuf, #[source] serde_json::Error),
+
}
+

+
impl CiEventError {
+
    fn without_namespace(refname: &str, update: RefUpdate) -> Self {
+
        Self::WithoutNamespace(refname.into(), update)
+
    }
+

+
    fn without_namespace2(refname: &str) -> Self {
+
        Self::WithoutNamespace2(refname.into())
+
    }
+

+
    fn ref_string(name: String, err: radicle::git::fmt::Error) -> Self {
+
        Self::RefString(name, err)
+
    }
+

+
    fn read_file(filename: &Path, err: std::io::Error) -> Self {
+
        Self::ReadFile(filename.into(), err)
+
    }
+

+
    fn not_utf8(filename: &Path, err: std::string::FromUtf8Error) -> Self {
+
        Self::NotUtf8(filename.into(), err)
+
    }
+

+
    fn not_json(filename: &Path, err: serde_json::Error) -> Self {
+
        Self::NotJson(filename.into(), err)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use radicle::{prelude::NodeId, storage::RefUpdate};
+
    use std::str::FromStr;
+

+
    const MAIN_BRANCH_REF_NAME: &str = "refs/namespaces/NID/refs/heads/main";
+
    const MAIN_BRANCH_NAME: &str = "main";
+

+
    const PATCH_REF_NAME: &str =
+
        "refs/namespaces/NID/refs/heads/patches/f9fa90725474de9002be503ae3cda4670c9a174";
+
    const PATCH_ID: &str = "f9fa90725474de9002be503ae3cda4670c9a174";
+

+
    fn nid() -> NodeId {
+
        const NID: &str = "z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV";
+
        NodeId::from_str(NID).unwrap()
+
    }
+

+
    fn rid() -> RepoId {
+
        const RID: &str = "rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5";
+
        RepoId::from_urn(RID).unwrap()
+
    }
+

+
    fn oid_from(oid: &str) -> Oid {
+
        Oid::try_from(oid).unwrap()
+
    }
+

+
    fn oid() -> Oid {
+
        const OID: &str = "ff3099ba5de28d954c41d0b5a84316f943794ea4";
+
        oid_from(OID)
+
    }
+

+
    fn refstring(s: &str) -> RefString {
+
        RefString::try_from(s).unwrap()
+
    }
+

+
    #[test]
+
    fn nothing_updated() {
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid: rid(),
+
            updated: vec![],
+
        };
+
        let result = CiEvent::from_node_event(&event);
+
        assert!(result.is_ok());
+
        assert_eq!(result.unwrap(), vec![]);
+
    }
+

+
    #[test]
+
    fn skipped() {
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid: rid(),
+
            updated: vec![RefUpdate::Skipped {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                oid: oid(),
+
            }],
+
        };
+

+
        let result = CiEvent::from_node_event(&event);
+
        assert!(result.is_ok());
+
        assert_eq!(result.unwrap(), vec![]);
+
    }
+

+
    #[test]
+
    fn branch_created() {
+
        let rid = rid();
+
        let main = refstring(MAIN_BRANCH_NAME);
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Created {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::BranchCreated {
+
                            from_node: _,
+
                            repo,
+
                            branch,
+
                            tip,
+
                        } if repo == rid && branch == main && tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn branch_updated() {
+
        let rid = rid();
+
        let main = refstring(MAIN_BRANCH_NAME);
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Updated {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                old: oid,
+
                new: oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::BranchUpdated {
+
                            from_node: _,
+
                            repo,
+
                            branch,
+
                            tip,
+
                            old_tip,
+
                        } if repo == rid && branch == main && tip == oid && old_tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn branch_deleted() {
+
        let rid = rid();
+
        let main = refstring(MAIN_BRANCH_NAME);
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Deleted {
+
                name: refstring(MAIN_BRANCH_REF_NAME),
+
                oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::BranchDeleted { repo, branch, tip }
+
                            if repo == rid && branch == main && tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn patch_created() {
+
        let rid = rid();
+
        let patch_id = oid_from(PATCH_ID).into();
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Created {
+
                name: refstring(PATCH_REF_NAME),
+
                oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::PatchCreated {
+
                            from_node: _,
+
                            repo,
+
                            patch,
+
                            new_tip,
+
                        } if repo == rid && patch == patch_id && new_tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+

+
    #[test]
+
    fn patch_updated() {
+
        let rid = rid();
+
        let patch_id = oid_from(PATCH_ID).into();
+
        let oid = oid();
+
        let event = Event::RefsFetched {
+
            remote: nid(),
+
            rid,
+
            updated: vec![RefUpdate::Updated {
+
                name: refstring(PATCH_REF_NAME),
+
                old: oid,
+
                new: oid,
+
            }],
+
        };
+
        let x = CiEvent::from_node_event(&event);
+
        eprintln!("result: {x:#?}");
+
        match x {
+
            Err(_) => panic!("should succeed"),
+
            Ok(events) if !events.is_empty() => {
+
                for e in events {
+
                    match e {
+
                        CiEvent::PatchUpdated {
+
                            from_node: _,
+
                            repo,
+
                            patch,
+
                            new_tip,
+
                        } if repo == rid && patch == patch_id && new_tip == oid => {}
+
                        _ => panic!("should not succeed that way"),
+
                    }
+
                }
+
            }
+
            Ok(_) => panic!("empty list of events should not happen"),
+
        }
+
    }
+
}
+

+
fn namespaced_branch(refname: &str) -> Result<String, ParseError> {
+
    const PAT_BRANCH: &str = r"^refs/namespaces/[^/]+/refs/heads/(.+)$";
+
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| ParseError::Regex(PAT_BRANCH, e))?;
+
    if let Some(push_captures) = push_re.captures(refname) {
+
        if let Some(branch) = push_captures.get(1) {
+
            return Ok(branch.as_str().to_string());
+
        }
+
    }
+
    Err(ParseError::NotBranch(refname.into()))
+
}
+

+
fn patch_id(refname: &str) -> Result<PatchId, ParseError> {
+
    eprintln!("refname: {refname:?}");
+
    const PAT_PATCH: &str = r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$";
+
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| ParseError::regex(PAT_PATCH, e))?;
+
    if let Some(patch_captures) = patch_re.captures(refname) {
+
        eprintln!("captures: {patch_captures:?}");
+
        if let Some(patch_id) = patch_captures.get(1) {
+
            eprintln!("patch_id: {patch_id:?}");
+
            let oid = Oid::try_from(patch_id.as_str())
+
                .map_err(|e| ParseError::oid(patch_id.as_str(), e))?;
+
            eprintln!("oid: {oid:?}");
+
            return Ok(oid.into());
+
        }
+
    }
+

+
    Err(ParseError::not_patch(refname))
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
enum ParseError {
+
    #[error("programming error: unacceptable regular expression {0:?}")]
+
    Regex(&'static str, regex::Error),
+

+
    #[error("Git ref name without name space: {0:?}")]
+
    NotBranch(String),
+

+
    #[error("unacceptable Git ref for patch: {0:?}")]
+
    NotPatch(String),
+

+
    #[error("Git ref name includes unacceptable Git object id: {0:?}")]
+
    Oid(String, radicle::git::raw::Error),
+
}
+

+
impl ParseError {
+
    fn regex(pattern: &'static str, err: regex::Error) -> Self {
+
        Self::Regex(pattern, err)
+
    }
+

+
    fn not_patch(refname: &str) -> Self {
+
        Self::NotPatch(refname.into())
+
    }
+

+
    fn oid(refname: &str, err: radicle::git::raw::Error) -> Self {
+
        Self::Oid(refname.into(), err)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test_namespaced_branch {
+
    use super::{namespaced_branch, ParseError};
+

+
    #[test]
+
    fn empty() {
+
        assert!(matches!(
+
            namespaced_branch(""),
+
            Err(ParseError::NotBranch(_))
+
        ));
+
    }
+

+
    #[test]
+
    fn lacks_namespace() {
+
        assert!(matches!(
+
            namespaced_branch(""),
+
            Err(ParseError::NotBranch(_))
+
        ));
+
    }
+

+
    #[test]
+
    fn has_namespace() {
+
        let x = namespaced_branch("refs/namespaces/DID/refs/heads/main");
+
        assert!(x.is_ok());
+
        assert_eq!(x.unwrap(), "main");
+
    }
+

+
    #[test]
+
    fn has_namespace_with_path() {
+
        let x = namespaced_branch("refs/namespaces/DID/refs/heads/liw/debug/this/path");
+
        assert!(x.is_ok());
+
        assert_eq!(x.unwrap(), "liw/debug/this/path");
+
    }
+
}
+

+
#[cfg(test)]
+
mod test_patch_id {
+
    use super::{patch_id, Oid, ParseError};
+

+
    #[test]
+
    fn empty() {
+
        assert!(matches!(patch_id(""), Err(ParseError::NotPatch(_))));
+
    }
+

+
    #[test]
+
    fn lacks_namespace() {
+
        assert!(matches!(patch_id(""), Err(ParseError::NotPatch(_))));
+
    }
+

+
    #[test]
+
    fn has_namespace() {
+
        let x = patch_id(
+
            "refs/namespaces/DID/refs/heads/patches/f9fa90725474de9002be503ae3cda4670c9a1741",
+
        );
+
        assert!(x.is_ok());
+
        assert_eq!(
+
            x.unwrap(),
+
            Oid::try_from("f9fa90725474de9002be503ae3cda4670c9a1741")
+
                .unwrap()
+
                .into()
+
        );
+
    }
+

+
    #[test]
+
    fn has_namespace_with_path() {
+
        assert!(matches!(
+
            patch_id("refs/namespaces/DID/refs/heads/patches/coffee/beef"),
+
            Err(ParseError::NotPatch(_))
+
        ));
+
    }
+
}
added src/filter.rs
@@ -0,0 +1,526 @@
+
use std::path::{Path, PathBuf};
+

+
use serde::{Deserialize, Serialize};
+

+
use radicle::{cob::patch::PatchId, git::RefString, prelude::RepoId};
+
use radicle_git_ext::Oid;
+

+
use crate::{ci_event::CiEvent, logger};
+

+
/// A Boolean expression for filtering broker events.
+
#[derive(Debug, Clone, Deserialize, Serialize)]
+
#[serde(deny_unknown_fields)]
+
pub enum EventFilter {
+
    /// Event for a specific repository.
+
    Repository(RepoId),
+

+
    /// Event is for a specific branch.
+
    Branch(RefString),
+

+
    /// Branch was created.
+
    BranchCreated,
+

+
    /// Branch was updated.
+
    BranchUpdated,
+

+
    /// Branch was deleted.
+
    BranchDeleted,
+

+
    /// Event is for a specific patch.
+
    Patch(Oid),
+

+
    /// Patch was created.
+
    PatchCreated,
+

+
    /// Patch was updated,
+
    PatchUpdated,
+

+
    /// Allow any event.
+
    Allow,
+

+
    /// Don't allow any event.
+
    Deny,
+

+
    /// Allow the opposite of the contained filter.
+
    Not(Box<Self>),
+

+
    /// Allow if all contained filters allow.
+
    And(Vec<Box<Self>>),
+

+
    /// Allow if any contained filter allow.
+
    Or(Vec<Box<Self>>),
+
}
+

+
impl EventFilter {
+
    pub fn allows(&self, event: &CiEvent) -> bool {
+
        logger::debug2(format!("EventFilter::allows: event={event:?}"));
+
        match self {
+
            Self::Allow => return true,
+
            Self::Deny => return false,
+
            Self::Not(expr) => return !expr.allows(event),
+
            Self::And(exprs) => return exprs.iter().all(|e| e.allows(event)),
+
            Self::Or(exprs) => return exprs.iter().any(|e| e.allows(event)),
+
            _ => (),
+
        }
+

+
        match event {
+
            CiEvent::Shutdown => true,
+
            CiEvent::BranchCreated { repo, branch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Branch(wanted) => branch == wanted,
+
                Self::BranchCreated => true,
+
                _ => false,
+
            },
+
            CiEvent::BranchUpdated { repo, branch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Branch(wanted) => branch == wanted,
+
                Self::BranchUpdated => true,
+
                _ => false,
+
            },
+
            CiEvent::BranchDeleted { repo, branch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Branch(wanted) => branch == wanted,
+
                Self::BranchDeleted => true,
+
                _ => false,
+
            },
+
            CiEvent::PatchCreated { repo, patch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Patch(wanted) => *patch == PatchId::from(wanted),
+
                Self::PatchCreated => true,
+
                _ => false,
+
            },
+
            CiEvent::PatchUpdated { repo, patch, .. } => match self {
+
                Self::Repository(wanted) => repo == wanted,
+
                Self::Patch(wanted) => *patch == PatchId::from(wanted),
+
                Self::PatchUpdated => true,
+
                _ => false,
+
            },
+
        }
+
    }
+

+
    pub fn from_file(filename: &Path) -> Result<Vec<Self>, FilterError> {
+
        Filters::from_file(filename)
+
    }
+
}
+

+
#[derive(Deserialize)]
+
struct Filters {
+
    filters: Vec<EventFilter>,
+
}
+

+
impl Filters {
+
    fn from_file(filename: &Path) -> Result<Vec<EventFilter>, FilterError> {
+
        let data =
+
            std::fs::read(filename).map_err(|e| FilterError::ReadFile(filename.into(), e))?;
+
        let filters: Self =
+
            serde_yml::from_slice(&data).map_err(|e| FilterError::ParseYaml(filename.into(), e))?;
+
        Ok(filters.filters)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use radicle::prelude::{Did, RepoId};
+

+
    use super::*;
+

+
    fn did() -> Did {
+
        Did::decode("did:key:z6MkgEMYod7Hxfy9qCvDv5hYHkZ4ciWmLFgfvm3Wn1b2w2FV").unwrap()
+
    }
+

+
    fn rid() -> RepoId {
+
        const RID: &str = "rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5";
+
        RepoId::from_urn(RID).unwrap()
+
    }
+

+
    fn other_rid() -> RepoId {
+
        const RID: &str = "rad:zwTxygwuz5LDGBq255RA2CbNGrz8";
+
        RepoId::from_urn(RID).unwrap()
+
    }
+

+
    fn oid_from(oid: &str) -> Oid {
+
        Oid::try_from(oid).unwrap()
+
    }
+

+
    fn oid() -> Oid {
+
        const OID: &str = "ff3099ba5de28d954c41d0b5a84316f943794ea4";
+
        oid_from(OID)
+
    }
+

+
    fn other_oid() -> Oid {
+
        const OID: &str = "bde68ac76ce093bcc583aa612f45e13fee2353a0";
+
        oid_from(OID)
+
    }
+

+
    fn patch_id() -> PatchId {
+
        PatchId::from(oid())
+
    }
+

+
    fn other_patch_id() -> PatchId {
+
        PatchId::from(other_oid())
+
    }
+

+
    fn refstring(s: &str) -> RefString {
+
        RefString::try_from(s).unwrap()
+
    }
+

+
    fn shutdown() -> CiEvent {
+
        CiEvent::Shutdown
+
    }
+

+
    fn all_events(
+
        did: Did,
+
        repo: RepoId,
+
        branch: RefString,
+
        patch: PatchId,
+
        tip: Oid,
+
        old_tip: Oid,
+
    ) -> Vec<CiEvent> {
+
        vec![
+
            CiEvent::BranchCreated {
+
                from_node: did.into(),
+
                repo,
+
                branch: branch.clone(),
+
                tip,
+
            },
+
            CiEvent::BranchUpdated {
+
                from_node: did.into(),
+
                repo,
+
                branch: branch.clone(),
+
                tip,
+
                old_tip,
+
            },
+
            CiEvent::BranchDeleted { repo, branch, tip },
+
            CiEvent::PatchCreated {
+
                from_node: did.into(),
+
                repo,
+
                patch,
+
                new_tip: tip,
+
            },
+
            CiEvent::PatchUpdated {
+
                from_node: did.into(),
+
                repo,
+
                patch,
+
                new_tip: tip,
+
            },
+
        ]
+
    }
+

+
    // Verify that shutdown is allowed, even when filtering for
+
    // something else.
+
    #[test]
+
    fn allows_shutdown() {
+
        let filter = EventFilter::Repository(rid());
+
        assert!(filter.allows(&shutdown()))
+
    }
+

+
    #[test]
+
    fn allows_all_for_default_repository() {
+
        let filter = EventFilter::Repository(rid());
+
        let events = all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid());
+
        assert!(events.iter().all(|e| filter.allows(e)));
+
    }
+

+
    #[test]
+
    fn doesnt_allow_any_for_other_repository() {
+
        let filter = EventFilter::Repository(rid());
+
        let events = all_events(
+
            did(),
+
            other_rid(),
+
            refstring("main"),
+
            patch_id(),
+
            oid(),
+
            oid(),
+
        );
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_all_for_main_branch() {
+
        let filter = EventFilter::Branch(refstring("main"));
+
        let events = all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid());
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter().filter(|e| {
+
            matches!(
+
                e,
+
                CiEvent::BranchCreated { .. }
+
                    | CiEvent::BranchUpdated { .. }
+
                    | CiEvent::BranchDeleted { .. }
+
            )
+
        }) {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn doesnt_allow_any_for_other_branch() {
+
        let filter = EventFilter::Branch(refstring("main"));
+
        let events = all_events(
+
            did(),
+
            other_rid(),
+
            refstring("other"),
+
            patch_id(),
+
            oid(),
+
            oid(),
+
        );
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_branch_creation() {
+
        let filter = EventFilter::BranchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::BranchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_branch_creation() {
+
        let filter = EventFilter::BranchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::BranchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_branch_update() {
+
        let filter = EventFilter::BranchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::BranchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_branch_update() {
+
        let filter = EventFilter::BranchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::BranchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_branch_deletion() {
+
        let filter = EventFilter::BranchDeleted;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::BranchDeleted { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_branch_deletion() {
+
        let filter = EventFilter::BranchDeleted;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::BranchDeleted { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_specific_patch() {
+
        let filter = EventFilter::Patch(oid());
+
        let events = all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid());
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter().filter(|e| {
+
            matches!(
+
                e,
+
                CiEvent::PatchCreated { .. } | CiEvent::PatchUpdated { .. }
+
            )
+
        }) {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn doesnt_allows_other_patch() {
+
        let filter = EventFilter::Patch(oid());
+
        let events = all_events(
+
            did(),
+
            rid(),
+
            refstring("main"),
+
            other_patch_id(),
+
            oid(),
+
            oid(),
+
        );
+
        eprintln!("filter: {filter:#?}");
+
        for e in events.iter().filter(|e| {
+
            matches!(
+
                e,
+
                CiEvent::PatchCreated { .. } | CiEvent::PatchUpdated { .. }
+
            )
+
        }) {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_patch_creation() {
+
        let filter = EventFilter::PatchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::PatchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_patch_creation() {
+
        let filter = EventFilter::PatchCreated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::PatchCreated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_patch_update() {
+
        let filter = EventFilter::PatchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| matches!(e, CiEvent::PatchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn only_allows_patch_update() {
+
        let filter = EventFilter::PatchUpdated;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid())
+
            .iter()
+
            .filter(|e| !matches!(e, CiEvent::PatchUpdated { .. }))
+
        {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_any_event() {
+
        let filter = EventFilter::Allow;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_no_event() {
+
        let filter = EventFilter::Deny;
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(!filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_opposite() {
+
        let filter = EventFilter::Not(Box::new(EventFilter::Deny));
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_if_all_allow() {
+
        let filter = EventFilter::And(vec![
+
            Box::new(EventFilter::Allow),
+
            Box::new(EventFilter::Allow),
+
        ]);
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+

+
    #[test]
+
    fn allows_if_any_allows() {
+
        let filter = EventFilter::Or(vec![
+
            Box::new(EventFilter::Deny),
+
            Box::new(EventFilter::Allow),
+
        ]);
+

+
        eprintln!("filter: {filter:#?}");
+
        for e in all_events(did(), rid(), refstring("main"), patch_id(), oid(), oid()).iter() {
+
            eprintln!("{:#?} → {}", e, filter.allows(e));
+
            assert!(filter.allows(e));
+
        }
+
    }
+
}
+

+
#[derive(Debug, thiserror::Error)]
+
pub enum FilterError {
+
    #[error("failed to read event filters file {0}")]
+
    ReadFile(PathBuf, #[source] std::io::Error),
+

+
    #[error("failed to parse YAML event filters file {0}")]
+
    ParseYaml(PathBuf, #[source] serde_yml::Error),
+
}
modified src/lib.rs
@@ -7,9 +7,12 @@

pub mod adapter;
pub mod broker;
+
pub mod ci_event;
+
pub mod ci_event_source;
pub mod config;
pub mod db;
pub mod event;
+
pub mod filter;
pub mod logger;
pub mod msg;
pub mod node_event_source;