Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: handle `LocalRefsAnnounced` events
Defelo committed 3 months ago
commit 85d2def97dcc9a61253fff45e6efbaa88caf88c4
parent cdeb159
4 files changed +243 -78
modified src/bin/cibtoolcmd/event.rs
@@ -419,6 +419,8 @@ pub struct CiEvents {

impl Leaf for CiEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
+
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+

        let bytes = std::fs::read(&self.input)
            .map_err(|e| CibToolError::ReadEvents(self.input.clone(), e))?;
        let text = String::from_utf8(bytes)
@@ -433,7 +435,7 @@ impl Leaf for CiEvents {

        let mut ci_events: Vec<CiEvent> = vec![];
        for node_event in node_events.iter() {
-
            if let Ok(mut cevs) = CiEvent::from_node_event(node_event) {
+
            if let Ok(mut cevs) = CiEvent::from_node_event(node_event, &profile) {
                ci_events.append(&mut cevs);
            }
        }
modified src/ci_event.rs
@@ -1,15 +1,22 @@
-
use std::path::{Path, PathBuf};
+
use std::{
+
    collections::BTreeMap,
+
    path::{Path, PathBuf},
+
};

use regex::Regex;
use serde::{Deserialize, Serialize};

use radicle::{
+
    Profile,
    cob::patch::PatchId,
    crypto::PublicKey,
-
    git::{BranchName, Namespaced, Oid, RefString},
+
    git::{BranchName, Namespaced, Oid, Qualified, RefString},
    node::{Event, NodeId},
    prelude::RepoId,
-
    storage::RefUpdate,
+
    storage::{
+
        ReadStorage, RefUpdate, RepositoryError,
+
        refs::{Refs, RefsAt},
+
    },
};

use crate::{
@@ -277,70 +284,110 @@ impl CiEvent {
    }

    #[allow(clippy::unwrap_used)]
-
    pub fn from_node_event(event: &Event) -> Result<Vec<Self>, CiEventError> {
-
        match event {
+
    pub fn from_node_event(event: &Event, profile: &Profile) -> Result<Vec<Self>, CiEventError> {
+
        let (rid, updates) = match event {
            Event::RefsFetched {
                remote: _,
                rid,
                updated,
+
            } => (*rid, updated),
+
            Event::LocalRefsAnnounced {
+
                rid,
+
                refs,
+
                timestamp: _,
            } => {
-
                let mut events = vec![];
-
                for update in updated {
-
                    let e = match update {
-
                        RefUpdate::Created { name, oid } => {
-
                            let origin = originator(name.to_namespaced().unwrap())?;
-
                            match ParsedRef::parse_ref(name) {
-
                                Some(ParsedRef::Branch(branch)) => {
-
                                    Self::branch_created(origin, *rid, &branch, *oid)?
-
                                }
-
                                Some(ParsedRef::Patch(patch_id)) => {
-
                                    Self::patch_created(origin, *rid, patch_id, *oid)
-
                                }
-
                                Some(ParsedRef::Tag(tag_name)) => {
-
                                    Self::tag_created(origin, *rid, &tag_name, *oid)?
-
                                }
-
                                None => continue,
-
                            }
-
                        }
-
                        RefUpdate::Updated { name, old, new } => {
-
                            let origin = originator(name.to_namespaced().unwrap())?;
-
                            match ParsedRef::parse_ref(name) {
-
                                Some(ParsedRef::Branch(branch)) => {
-
                                    Self::branch_updated(origin, *rid, &branch, *new, *old)?
-
                                }
-
                                Some(ParsedRef::Patch(patch_id)) => {
-
                                    Self::patch_updated(origin, *rid, patch_id, *new)
-
                                }
-
                                Some(ParsedRef::Tag(tag_name)) => {
-
                                    Self::tag_updated(origin, *rid, &tag_name, *new, *old)?
-
                                }
-
                                None => continue,
-
                            }
-
                        }
-
                        RefUpdate::Deleted { name, oid } => {
-
                            let origin = originator(name.to_namespaced().unwrap())?;
-
                            match ParsedRef::parse_ref(name) {
-
                                Some(ParsedRef::Branch(branch)) => {
-
                                    Self::branch_deleted(origin, *rid, &branch, *oid)?
-
                                }
-
                                Some(ParsedRef::Patch(_patch_id)) => continue,
-
                                Some(ParsedRef::Tag(tag_name)) => {
-
                                    Self::tag_deleted(origin, *rid, &tag_name, *oid)?
-
                                }
-
                                None => continue,
-
                            }
-
                        }
-
                        RefUpdate::Skipped { .. } => continue,
-
                    };
-
                    events.push(e);
-
                }
-
                Ok(events)
+
                let repo = profile
+
                    .storage
+
                    .repository(*rid)
+
                    .map_err(CiEventError::Repository)?;
+
                let parent = repo
+
                    .backend
+
                    .find_commit(*refs.at)
+
                    .map_err(CiEventError::Git)?
+
                    .parent(0)
+
                    .map_err(CiEventError::Git)?
+
                    .id();
+
                let parent_refs = RefsAt {
+
                    remote: refs.remote,
+
                    at: parent.into(),
+
                };
+

+
                let signed_refs_new = refs.load(&repo).map_err(CiEventError::StorageRefs)?.sigrefs;
+
                let signed_refs_old = parent_refs
+
                    .load(&repo)
+
                    .map_err(CiEventError::StorageRefs)?
+
                    .sigrefs;
+

+
                let updates = diff_refs(signed_refs_old.refs, signed_refs_new.refs)
+
                    .into_iter()
+
                    .filter_map(|mut ref_update| {
+
                        let name = ref_update_name_mut(&mut ref_update);
+
                        *name = Qualified::from_refstr(&*name)?
+
                            .with_namespace(profile.public_key.to_component())
+
                            .into_qualified()
+
                            .into_refstring();
+
                        Some(ref_update)
+
                    });
+

+
                (*rid, &updates.collect())
            }
            _ => {
                logger::node_event_not_handled(event);
-
                Ok(vec![])
+
                return Ok(vec![]);
            }
+
        };
+

+
        let mut events = vec![];
+
        for update in updates {
+
            let e = match update {
+
                RefUpdate::Created { name, oid } => {
+
                    let origin = originator(name.to_namespaced().unwrap())?;
+
                    match ParsedRef::parse_ref(name) {
+
                        Some(ParsedRef::Branch(branch)) => {
+
                            Self::branch_created(origin, rid, &branch, *oid)?
+
                        }
+
                        Some(ParsedRef::Patch(patch_id)) => {
+
                            Self::patch_created(origin, rid, patch_id, *oid)
+
                        }
+
                        Some(ParsedRef::Tag(tag_name)) => {
+
                            Self::tag_created(origin, rid, &tag_name, *oid)?
+
                        }
+
                        None => continue,
+
                    }
+
                }
+
                RefUpdate::Updated { name, old, new } => {
+
                    let origin = originator(name.to_namespaced().unwrap())?;
+
                    match ParsedRef::parse_ref(name) {
+
                        Some(ParsedRef::Branch(branch)) => {
+
                            Self::branch_updated(origin, rid, &branch, *new, *old)?
+
                        }
+
                        Some(ParsedRef::Patch(patch_id)) => {
+
                            Self::patch_updated(origin, rid, patch_id, *new)
+
                        }
+
                        Some(ParsedRef::Tag(tag_name)) => {
+
                            Self::tag_updated(origin, rid, &tag_name, *new, *old)?
+
                        }
+
                        None => continue,
+
                    }
+
                }
+
                RefUpdate::Deleted { name, oid } => {
+
                    let origin = originator(name.to_namespaced().unwrap())?;
+
                    match ParsedRef::parse_ref(name) {
+
                        Some(ParsedRef::Branch(branch)) => {
+
                            Self::branch_deleted(origin, rid, &branch, *oid)?
+
                        }
+
                        Some(ParsedRef::Patch(_patch_id)) => continue,
+
                        Some(ParsedRef::Tag(tag_name)) => {
+
                            Self::tag_deleted(origin, rid, &tag_name, *oid)?
+
                        }
+
                        None => continue,
+
                    }
+
                }
+
                RefUpdate::Skipped { .. } => continue,
+
            };
+
            events.push(e);
        }
+
        Ok(events)
    }

    pub fn to_pretty_json(&self) -> Result<String, CiEventError> {
@@ -352,6 +399,47 @@ fn originator(name: Namespaced) -> Result<PublicKey, CiEventError> {
    PublicKey::from_namespaced(&name).map_err(|err| CiEventError::key_from_namespaced(&name, err))
}

+
fn diff_refs(old_refs: Refs, new_refs: Refs) -> Vec<RefUpdate> {
+
    let mut updates = Vec::new();
+
    let mut old_refs = BTreeMap::from(old_refs);
+
    let mut new_refs = BTreeMap::from(new_refs);
+

+
    old_refs.retain(|old_ref, old_oid| match new_refs.remove_entry(old_ref) {
+
        Some((new_ref, new_oid)) => {
+
            if new_oid != *old_oid {
+
                updates.push(RefUpdate::Updated {
+
                    name: new_ref,
+
                    old: *old_oid,
+
                    new: new_oid,
+
                })
+
            }
+
            false
+
        }
+
        None => true,
+
    });
+
    updates.extend(
+
        old_refs
+
            .into_iter()
+
            .map(|(old_ref, oid)| RefUpdate::Deleted { name: old_ref, oid }),
+
    );
+
    updates.extend(
+
        new_refs
+
            .into_iter()
+
            .map(|(new_ref, oid)| RefUpdate::Created { name: new_ref, oid }),
+
    );
+

+
    updates
+
}
+

+
fn ref_update_name_mut(ref_update: &mut RefUpdate) -> &mut RefString {
+
    match ref_update {
+
        RefUpdate::Updated { name, .. } => name,
+
        RefUpdate::Created { name, .. } => name,
+
        RefUpdate::Deleted { name, .. } => name,
+
        RefUpdate::Skipped { name, .. } => name,
+
    }
+
}
+

pub struct CiEvents {
    events: Vec<CiEvent>,
}
@@ -399,6 +487,15 @@ pub enum CiEventError {

    #[error("failed to encode CI event as JSON")]
    ToJson(#[source] Box<dyn std::error::Error + Send + 'static>),
+

+
    #[error(transparent)]
+
    Repository(RepositoryError),
+

+
    #[error(transparent)]
+
    StorageRefs(radicle::storage::refs::Error),
+

+
    #[error(transparent)]
+
    Git(radicle::git::raw::Error),
}

impl CiEventError {
@@ -430,7 +527,10 @@ mod test {
    use radicle::{prelude::NodeId, storage::RefUpdate};
    use std::str::FromStr;

-
    use crate::refs::{branch_from_namespaced, ref_string};
+
    use crate::{
+
        refs::{branch_from_namespaced, ref_string},
+
        test::{MockNode, TestResult},
+
    };

    const MAIN_BRANCH_REF_NAME: &str =
        "refs/namespaces/z6MkiB8T5cBEQHnrs2MgjMVqvpSVj42X81HjKfFi2XBoMbtr/refs/heads/main";
@@ -470,19 +570,24 @@ mod test {
    }

    #[test]
-
    fn nothing_updated() {
+
    fn nothing_updated() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let event = Event::RefsFetched {
            remote: nid(),
            rid: rid(),
            updated: vec![],
        };
-
        let result = CiEvent::from_node_event(&event);
+
        let result = CiEvent::from_node_event(&event, &profile);
        assert!(result.is_ok());
        assert_eq!(result.unwrap(), vec![]);
+
        Ok(())
    }

    #[test]
-
    fn skipped() {
+
    fn skipped() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let event = Event::RefsFetched {
            remote: nid(),
            rid: rid(),
@@ -492,13 +597,16 @@ mod test {
            }],
        };

-
        let result = CiEvent::from_node_event(&event);
+
        let result = CiEvent::from_node_event(&event, &profile);
        assert!(result.is_ok());
        assert_eq!(result.unwrap(), vec![]);
+
        Ok(())
    }

    #[test]
-
    fn branch_created() {
+
    fn branch_created() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let rid = rid();
        let oid = oid();
        let event = Event::RefsFetched {
@@ -509,7 +617,7 @@ mod test {
                oid,
            }],
        };
-
        let x = CiEvent::from_node_event(&event);
+
        let x = CiEvent::from_node_event(&event, &profile);
        eprintln!("result: {x:#?}");
        match x {
            Err(_) => panic!("should succeed"),
@@ -528,10 +636,13 @@ mod test {
            }
            Ok(_) => panic!("empty list of events should not happen"),
        }
+
        Ok(())
    }

    #[test]
-
    fn branch_updated() {
+
    fn branch_updated() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let rid = rid();
        let oid = oid();
        let event = Event::RefsFetched {
@@ -543,7 +654,7 @@ mod test {
                new: oid,
            }],
        };
-
        let x = CiEvent::from_node_event(&event);
+
        let x = CiEvent::from_node_event(&event, &profile);
        eprintln!("result: {x:#?}");
        match x {
            Err(_) => panic!("should succeed"),
@@ -566,10 +677,13 @@ mod test {
            }
            Ok(_) => panic!("empty list of events should not happen"),
        }
+
        Ok(())
    }

    #[test]
-
    fn branch_deleted() {
+
    fn branch_deleted() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let rid = rid();
        let oid = oid();
        let event = Event::RefsFetched {
@@ -580,7 +694,7 @@ mod test {
                oid,
            }],
        };
-
        let x = CiEvent::from_node_event(&event);
+
        let x = CiEvent::from_node_event(&event, &profile);
        eprintln!("result: {x:#?}");
        match x {
            Err(_) => panic!("should succeed"),
@@ -596,10 +710,13 @@ mod test {
            }
            Ok(_) => panic!("empty list of events should not happen"),
        }
+
        Ok(())
    }

    #[test]
-
    fn patch_created() {
+
    fn patch_created() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let rid = rid();
        let patch_id = oid_from(PATCH_ID).into();
        let oid = oid();
@@ -611,7 +728,7 @@ mod test {
                oid,
            }],
        };
-
        let x = CiEvent::from_node_event(&event);
+
        let x = CiEvent::from_node_event(&event, &profile);
        eprintln!("result: {x:#?}");
        match x {
            Err(_) => panic!("should succeed"),
@@ -630,10 +747,13 @@ mod test {
            }
            Ok(_) => panic!("empty list of events should not happen"),
        }
+
        Ok(())
    }

    #[test]
-
    fn patch_updated() {
+
    fn patch_updated() -> TestResult<()> {
+
        let mock_node = MockNode::new()?;
+
        let profile = mock_node.profile()?;
        let rid = rid();
        let patch_id = oid_from(PATCH_ID).into();
        let oid = oid();
@@ -646,7 +766,7 @@ mod test {
                new: oid,
            }],
        };
-
        let x = CiEvent::from_node_event(&event);
+
        let x = CiEvent::from_node_event(&event, &profile);
        eprintln!("result: {x:#?}");
        match x {
            Err(_) => panic!("should succeed"),
@@ -665,6 +785,47 @@ mod test {
            }
            Ok(_) => panic!("empty list of events should not happen"),
        }
+
        Ok(())
+
    }
+

+
    #[test]
+
    fn diff_refs() {
+
        let oid1 = oid_from("c474d78c37a5c664aba2042db78ec235c0a5d569");
+
        let oid2 = oid_from("5b6886f63ff90d61afaeec2f3569a9a1a0984bdb");
+
        let foo = RefString::try_from("refs/heads/foo").unwrap();
+
        let bar = RefString::try_from("refs/heads/bar").unwrap();
+
        let baz = RefString::try_from("refs/heads/baz").unwrap();
+
        let xyz = RefString::try_from("refs/heads/xyz").unwrap();
+

+
        let old = Refs::from(BTreeMap::from([
+
            (foo.clone(), oid1),
+
            (bar.clone(), oid1),
+
            (xyz.clone(), oid1),
+
        ]));
+
        let new = Refs::from(BTreeMap::from([
+
            (foo.clone(), oid2),
+
            (baz.clone(), oid2),
+
            (xyz.clone(), oid1),
+
        ]));
+
        let updates = super::diff_refs(old, new);
+
        assert_eq!(
+
            updates,
+
            [
+
                RefUpdate::Updated {
+
                    name: foo,
+
                    old: oid1,
+
                    new: oid2
+
                },
+
                RefUpdate::Deleted {
+
                    name: bar,
+
                    oid: oid1,
+
                },
+
                RefUpdate::Created {
+
                    name: baz,
+
                    oid: oid2,
+
                }
+
            ]
+
        );
    }
}

modified src/ci_event_source.rs
@@ -10,12 +10,14 @@ use crate::{

pub struct CiEventSource {
    source: NodeEventSource,
+
    profile: Profile,
}

impl CiEventSource {
-
    pub fn new(profile: &Profile) -> Result<Self, CiEventSourceError> {
+
    pub fn new(profile: Profile) -> Result<Self, CiEventSourceError> {
        let source = Self {
-
            source: NodeEventSource::new(profile).map_err(CiEventSourceError::Subscribe)?,
+
            source: NodeEventSource::new(&profile).map_err(CiEventSourceError::Subscribe)?,
+
            profile,
        };
        logger::ci_event_source_created(&source);
        Ok(source)
@@ -41,8 +43,8 @@ impl CiEventSource {
                Ok(None)
            }
            Ok(Some(event)) => {
-
                let ci_events =
-
                    CiEvent::from_node_event(&event).map_err(CiEventSourceError::CiEvent)?;
+
                let ci_events = CiEvent::from_node_event(&event, &self.profile)
+
                    .map_err(CiEventSourceError::CiEvent)?;
                if !ci_events.is_empty() {
                    logger::ci_event_source_got_events(&ci_events);
                }
modified src/queueadd.rs
@@ -43,7 +43,7 @@ impl QueueAdder {
    fn add_events(&self) -> Result<(), AdderError> {
        let profile = Profile::load().map_err(AdderError::profile)?;

-
        let mut source = CiEventSource::new(&profile)?;
+
        let mut source = CiEventSource::new(profile)?;

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.