Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Introduce a node event for canonical reference updates
✓ CI success Fintan Halpenny committed 9 months ago
commit 9396a3bb7bf213f831b26ef7f47ed1e4b45502d0
parent 8953ec4c87a4d69c92f78c649b82c9d009822017
1 passed (1 total) View logs
7 files changed +98 -9
modified CHANGELOG.md
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

+
### Breaking Changes
+

+
- Introduce a node event for canonical reference updates. Whenever the node
+
  fetches new updates, it checks if canonical references can be updated. The
+
  node has learned how to return these results and emit them as node events.
+
  This is a breaking change since it adds a new variant the `Event` type, which
+
  is not forwards-compatible.
+

## Release Highlights

## Deprecations
modified crates/radicle-node/src/test/simulator.rs
@@ -663,6 +663,7 @@ where
                                remote,
                                Rc::new(Ok(fetch::FetchResult {
                                    updated: vec![],
+
                                    canonical: fetch::UpdatedCanonicalRefs::default(),
                                    namespaces: HashSet::new(),
                                    clone: true,
                                    doc: arbitrary::gen(1),
modified crates/radicle-node/src/tests.rs
@@ -1581,6 +1581,7 @@ fn test_queued_fetch_from_ann_same_rid() {
                name: refname.clone(),
                oid,
            }],
+
            canonical: fetch::UpdatedCanonicalRefs::default(),
            namespaces: [carol.id()].into_iter().collect(),
            clone: false,
            doc: arbitrary::gen(1),
modified crates/radicle-node/src/worker/fetch.rs
@@ -18,7 +18,7 @@ use radicle::storage::{
use radicle::{cob, git, node, Storage};
use radicle_fetch::git::refs::Applied;
use radicle_fetch::{Allowed, BlockList, FetchLimit};
-
pub use radicle_protocol::worker::fetch::FetchResult;
+
pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};

use super::channels::ChannelsFlush;

@@ -131,9 +131,13 @@ impl Handle {
                    Err(e) => return Err(e.into()),
                }

-
                if let Err(e) = set_canonical_refs(&repo, &applied) {
-
                    log::warn!(target: "worker", "Failed to set canonical references: {e}");
-
                }
+
                let canonical = match set_canonical_refs(&repo, &applied) {
+
                    Ok(updates) => updates.unwrap_or_default(),
+
                    Err(e) => {
+
                        log::warn!(target: "worker", "Failed to set canonical references: {e}");
+
                        UpdatedCanonicalRefs::default()
+
                    }
+
                };

                // Notifications are only posted for pulls, not clones.
                if let Some(mut store) = notifs {
@@ -150,6 +154,7 @@ impl Handle {

                Ok(FetchResult {
                    updated: applied.updated,
+
                    canonical,
                    namespaces: remotes.into_iter().collect(),
                    doc: repo.identity_doc()?,
                    clone,
@@ -359,17 +364,21 @@ where
    Ok(())
}

-
fn set_canonical_refs(repo: &Repository, applied: &Applied) -> Result<(), error::Canonical> {
+
fn set_canonical_refs(
+
    repo: &Repository,
+
    applied: &Applied,
+
) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
    let identity = repo.identity()?;
    let rules = match identity
        .canonical_refs()?
        .map(|crefs| crefs.rules().clone())
        .filter(|rules| !rules.is_empty())
    {
-
        None => return Ok(()),
+
        None => return Ok(None),
        Some(rules) => rules,
    };

+
    let mut updated_refs = UpdatedCanonicalRefs::default();
    for update in applied.updated.iter() {
        let name = match update {
            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => name,
@@ -417,9 +426,11 @@ fn set_canonical_refs(repo: &Repository, applied: &Applied) -> Result<(), error:
                        target: "worker",
                        "Failed to set canonical reference {refname}->{oid}: {e}"
                    );
+
                } else {
+
                    updated_refs.updated(refname, oid);
                }
            }
        }
    }
-
    Ok(())
+
    Ok(Some(updated_refs))
}
modified crates/radicle-protocol/src/service.rs
@@ -1187,6 +1187,7 @@ where
        match result {
            Ok(crate::worker::fetch::FetchResult {
                updated,
+
                canonical,
                namespaces,
                clone,
                doc,
@@ -1208,6 +1209,16 @@ where
                    rid,
                    updated: updated.clone(),
                });
+
                self.emitter.emit_all(
+
                    canonical
+
                        .into_iter()
+
                        .map(|(refname, target)| Event::CanonicalRef {
+
                            rid,
+
                            refname,
+
                            target,
+
                        })
+
                        .collect(),
+
                );

                // Announce our new inventory if this fetch was a full clone.
                // Only update and announce inventory for public repositories.
modified crates/radicle-protocol/src/worker/fetch.rs
@@ -1,14 +1,17 @@
pub mod error;

-
use std::collections::HashSet;
+
use std::collections::{BTreeMap, HashSet};

use radicle::crypto::PublicKey;
+
use radicle::git;
use radicle::{identity::DocAt, storage::RefUpdate};

#[derive(Debug, Clone)]
pub struct FetchResult {
    /// The set of updated references.
    pub updated: Vec<RefUpdate>,
+
    /// The canonical references that were updated as part of the fetch process.
+
    pub canonical: UpdatedCanonicalRefs,
    /// The set of remote namespaces that were updated.
    pub namespaces: HashSet<PublicKey>,
    /// The fetch was a full clone.
@@ -21,6 +24,7 @@ impl FetchResult {
    pub fn new(doc: DocAt) -> Self {
        Self {
            updated: vec![],
+
            canonical: UpdatedCanonicalRefs::default(),
            namespaces: HashSet::new(),
            clone: false,
            doc,
@@ -28,11 +32,41 @@ impl FetchResult {
    }
}

+
/// The set of canonical references, updated after a fetch, and their
+
/// corresponding targets.
+
#[derive(Clone, Default, Debug)]
+
pub struct UpdatedCanonicalRefs {
+
    inner: BTreeMap<git::Qualified<'static>, git::Oid>,
+
}
+

+
impl IntoIterator for UpdatedCanonicalRefs {
+
    type Item = (git::Qualified<'static>, git::Oid);
+
    type IntoIter = std::collections::btree_map::IntoIter<git::Qualified<'static>, git::Oid>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.inner.into_iter()
+
    }
+
}
+

+
impl UpdatedCanonicalRefs {
+
    /// Insert a new updated entry for the canonical reference identified by
+
    /// `refname` and its new `target.`
+
    pub fn updated(&mut self, refname: git::Qualified<'static>, target: git::Oid) {
+
        self.inner.insert(refname, target);
+
    }
+

+
    /// Return an iterator of all the updates.
+
    pub fn iter(&self) -> impl Iterator<Item = (&git::Qualified<'static>, &git::Oid)> {
+
        self.inner.iter()
+
    }
+
}
+

#[cfg(any(test, feature = "test"))]
impl qcheck::Arbitrary for FetchResult {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        FetchResult {
            updated: vec![],
+
            canonical: UpdatedCanonicalRefs::default(),
            namespaces: HashSet::arbitrary(g),
            clone: bool::arbitrary(g),
            doc: DocAt::arbitrary(g),
modified crates/radicle/src/node/events.rs
@@ -9,7 +9,7 @@ use std::time;

use crossbeam_channel as chan;

-
use crate::git::Oid;
+
use crate::git::{Oid, Qualified};
use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};
@@ -122,6 +122,15 @@ pub enum Event {
    },
    /// The node has uploaded a Git pack file to another node.
    UploadPack(upload_pack::UploadPack),
+
    /// A canonical reference was updated after a fetch.
+
    CanonicalRef {
+
        /// The repository the canonical reference was updated for.
+
        rid: RepoId,
+
        /// The reference name of the canonical reference update.
+
        refname: Qualified<'static>,
+
        /// The new target of the reference, after the update.
+
        target: Oid,
+
    },
}

impl From<upload_pack::UploadPack> for Event {
@@ -214,6 +223,20 @@ impl<T: Clone> Emitter<T> {
            .retain(|s| s.try_send(event.clone()).is_ok());
    }

+
    /// Emit a batch of events to subscribers and drop those who can't receive
+
    /// them.
+
    /// N.b. subscribers are also dropped if their channel is full.
+
    pub fn emit_all(&self, events: Vec<T>) {
+
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
+
        #[allow(clippy::unwrap_used)]
+
        self.subscribers.lock().unwrap().retain(|s| {
+
            events
+
                .clone()
+
                .into_iter()
+
                .all(|event| s.try_send(event).is_ok())
+
        });
+
    }
+

    /// Subscribe to events stream.
    pub fn subscribe(&self) -> chan::Receiver<T> {
        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);