Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Introduce a node event for canonical reference updates
Merged fintohaps opened 8 months ago

Whenever the node fetches new updates, it checks if canonical references can be updated. The node has learned how to return these results and emit them as node events. This is a breaking change since it adds a new variant the Event type, which is not forwards-compatible.

9 files changed +230 -34 690f6b02 7d2f0e38
modified crates/radicle-node/src/test/node.rs
@@ -352,6 +352,72 @@ impl<G: Signer<Signature> + cyphernet::Ecdh> NodeHandle<G> {
            .unwrap()
            .id()
    }
+

+
    /// Perform a commit to `refname` by generating a blob of random data to a
+
    /// random path in a new tree.
+
    ///
+
    /// If the reference does not exist, a new one will be created with the new
+
    /// commit as its target.
+
    ///
+
    /// If the reference already exists, then its target is used as the parent
+
    /// of the new commit, and the reference will be updated.
+
    ///
+
    /// The `rad/sigrefs` are then updated to reflect the new change.
+
    pub fn commit_to(&self, rid: RepoId, refname: impl AsRef<git::RefStr>) {
+
        use radicle::test::arbitrary;
+

+
        let refname = refname.as_ref();
+
        let repo = self.storage.repository(rid).unwrap();
+
        let raw = &repo.backend;
+

+
        let info = self.storage.info();
+
        let author = git::raw::Signature::now(&info.name(), &info.email()).unwrap();
+

+
        let tree = {
+
            let mut tb = raw.treebuilder(None).unwrap();
+
            let blob = raw.blob(&arbitrary::vec::<u8>(100)).unwrap();
+
            tb.insert(
+
                arbitrary::alphanumeric(10),
+
                blob,
+
                git::raw::FileMode::Blob.into(),
+
            )
+
            .unwrap();
+
            let oid = tb.write().unwrap();
+
            raw.find_tree(oid).unwrap()
+
        };
+
        let parent = {
+
            let target = raw
+
                .find_reference(refname.as_str())
+
                .ok()
+
                .and_then(|r| r.target());
+
            target.and_then(|oid| raw.find_commit(oid).ok())
+
        };
+
        match parent {
+
            None => repo
+
                .backend
+
                .commit(
+
                    Some(refname.as_str()),
+
                    &author,
+
                    &author,
+
                    "New commit",
+
                    &tree,
+
                    &[],
+
                )
+
                .unwrap(),
+
            Some(parent) => repo
+
                .backend
+
                .commit(
+
                    Some(refname.as_str()),
+
                    &author,
+
                    &author,
+
                    "New commit",
+
                    &tree,
+
                    &[&parent],
+
                )
+
                .unwrap(),
+
        };
+
        repo.sign_refs(&self.signer).unwrap();
+
    }
}

impl Node<MockSigner> {
modified crates/radicle-node/src/test/simulator.rs
@@ -663,6 +663,7 @@ where
                                remote,
                                Rc::new(Ok(fetch::FetchResult {
                                    updated: vec![],
+
                                    canonical: fetch::UpdatedCanonicalRefs::default(),
                                    namespaces: HashSet::new(),
                                    clone: true,
                                    doc: arbitrary::gen(1),
modified crates/radicle-node/src/tests.rs
@@ -1589,6 +1589,7 @@ fn test_queued_fetch_from_ann_same_rid() {
                name: refname.clone(),
                oid,
            }],
+
            canonical: fetch::UpdatedCanonicalRefs::default(),
            namespaces: [carol.id()].into_iter().collect(),
            clone: false,
            doc: arbitrary::gen(1),
modified crates/radicle-node/src/tests/e2e.rs
@@ -1504,3 +1504,43 @@ fn test_channel_reader_limit() {
        "actual: {reason}"
    );
}
+

+
#[test]
+
fn test_fetch_emits_canonical_ref_update() {
+
    let tmp = tempfile::tempdir().unwrap();
+
    let scale = config::scale();
+
    let mut alice = Node::init(tmp.path(), config::relay("alice"));
+
    let bob = Node::init(tmp.path(), config::relay("bob"));
+

+
    let (repo, _) = fixtures::repository(tmp.path());
+
    fixtures::populate(&repo, scale.max(3));
+

+
    let rid = alice.project_from("acme", "", &repo);
+

+
    let mut alice = alice.spawn();
+
    let mut bob = bob.spawn();
+
    let bob_events = bob.handle.events();
+

+
    bob.handle.seed(rid, Scope::All).unwrap();
+
    alice.connect(&bob);
+

+
    let result = bob.handle.fetch(rid, alice.id, DEFAULT_TIMEOUT).unwrap();
+
    assert!(result.is_success());
+

+
    let default_branch: git::Qualified = {
+
        let repo = alice.storage.repository(rid).unwrap();
+
        let proj = repo.project().unwrap();
+
        git::lit::refs_heads(proj.default_branch()).into()
+
    };
+
    alice.commit_to(rid, &default_branch);
+

+
    bob_events
+
        .wait(
+
            |e| {
+
                matches!(e, Event::CanonicalRefUpdated { refname, .. } if *refname == default_branch)
+
                    .then_some(())
+
            },
+
            time::Duration::from_secs(9 * scale as u64),
+
        )
+
        .unwrap();
+
}
modified crates/radicle-node/src/worker/fetch.rs
@@ -1,5 +1,8 @@
+
use radicle::identity::doc::CanonicalRefsError;
+
use radicle::identity::CanonicalRefs;
pub(crate) use radicle_protocol::worker::fetch::error;

+
use std::collections::BTreeSet;
use std::str::FromStr;

use localtime::LocalTime;
@@ -18,7 +21,7 @@ use radicle::storage::{
use radicle::{cob, git, node, Storage};
use radicle_fetch::git::refs::Applied;
use radicle_fetch::{Allowed, BlockList, FetchLimit};
-
pub use radicle_protocol::worker::fetch::FetchResult;
+
pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};

use super::channels::ChannelsFlush;

@@ -128,9 +131,13 @@ impl Handle {
                    Err(e) => return Err(e.into()),
                }

-
                if let Err(e) = set_canonical_refs(&repo, &applied) {
-
                    log::warn!(target: "worker", "Failed to set canonical references: {e}");
-
                }
+
                let canonical = match set_canonical_refs(&repo, &applied) {
+
                    Ok(updates) => updates.unwrap_or_default(),
+
                    Err(e) => {
+
                        log::warn!(target: "worker", "Failed to set canonical references: {e}");
+
                        UpdatedCanonicalRefs::default()
+
                    }
+
                };

                // Notifications are only posted for pulls, not clones.
                if let Some(mut store) = notifs {
@@ -147,6 +154,7 @@ impl Handle {

                Ok(FetchResult {
                    updated: applied.updated,
+
                    canonical,
                    namespaces: remotes.into_iter().collect(),
                    doc: repo.identity_doc()?,
                    clone,
@@ -356,36 +364,37 @@ where
    Ok(())
}

-
fn set_canonical_refs(repo: &Repository, applied: &Applied) -> Result<(), error::Canonical> {
+
fn set_canonical_refs(
+
    repo: &Repository,
+
    applied: &Applied,
+
) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
    let identity = repo.identity()?;
-
    let rules = match identity
-
        .canonical_refs()?
-
        .map(|crefs| crefs.rules().clone())
-
        .filter(|rules| !rules.is_empty())
-
    {
-
        None => return Ok(()),
-
        Some(rules) => rules,
-
    };
-

-
    for update in applied.updated.iter() {
-
        let name = match update {
-
            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => name,
-
            _ => {
-
                log::trace!(target: "worker", "Skipping update {update}");
-
                continue;
+
    // TODO(finto): it's unfortunate that we may end up computing the default
+
    // branch again after `set_head` is called after the fetch. This is due to
+
    // the storage capabilities being leaked to this part of the code base.
+
    let rules = identity
+
        .canonical_refs_or_default(|| {
+
            let rule = identity.doc().default_branch_rule()?;
+
            Ok::<_, CanonicalRefsError>(CanonicalRefs::from_iter([rule]))
+
        })?
+
        .rules()
+
        .clone();
+

+
    let mut updated_refs = UpdatedCanonicalRefs::default();
+
    let refnames = applied
+
        .updated
+
        .iter()
+
        .filter_map(|update| match update {
+
            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
+
                let name = name.clone().into_qualified()?;
+
                let name = name.to_namespaced()?;
+
                Some(name.strip_namespace())
            }
-
        };
-
        let Some(name) = name.clone().into_qualified() else {
-
            log::warn!(target: "worker", "Skipping update for canonical reference '{name}' because it is not qualified.");
-
            continue;
-
        };
-
        let Some(name) = name.to_namespaced() else {
-
            log::warn!(target: "worker", "Skipping update for canonical reference '{name}' because it is not namespaced.");
-
            continue;
-
        };
-

-
        let name = name.strip_namespace();
+
            _ => None,
+
        })
+
        .collect::<BTreeSet<_>>();

+
    for name in refnames {
        let canonical = match rules.canonical(name.clone(), repo) {
            Some(canonical) => canonical,
            None => continue,
@@ -421,9 +430,11 @@ fn set_canonical_refs(repo: &Repository, applied: &Applied) -> Result<(), error:
                        target: "worker",
                        "Failed to set canonical reference {refname}->{oid}: {e}"
                    );
+
                } else {
+
                    updated_refs.updated(refname, oid);
                }
            }
        }
    }
-
    Ok(())
+
    Ok(Some(updated_refs))
}
modified crates/radicle-protocol/src/service.rs
@@ -1177,6 +1177,7 @@ where
        match result {
            Ok(crate::worker::fetch::FetchResult {
                updated,
+
                canonical,
                namespaces,
                clone,
                doc,
@@ -1198,6 +1199,16 @@ where
                    rid,
                    updated: updated.clone(),
                });
+
                self.emitter.emit_all(
+
                    canonical
+
                        .into_iter()
+
                        .map(|(refname, target)| Event::CanonicalRefUpdated {
+
                            rid,
+
                            refname,
+
                            target,
+
                        })
+
                        .collect(),
+
                );

                // Announce our new inventory if this fetch was a full clone.
                // Only update and announce inventory for public repositories.
modified crates/radicle-protocol/src/worker/fetch.rs
@@ -1,14 +1,17 @@
pub mod error;

-
use std::collections::HashSet;
+
use std::collections::{BTreeMap, HashSet};

use radicle::crypto::PublicKey;
+
use radicle::git;
use radicle::{identity::DocAt, storage::RefUpdate};

#[derive(Debug, Clone)]
pub struct FetchResult {
    /// The set of updated references.
    pub updated: Vec<RefUpdate>,
+
    /// The canonical references that were updated as part of the fetch process.
+
    pub canonical: UpdatedCanonicalRefs,
    /// The set of remote namespaces that were updated.
    pub namespaces: HashSet<PublicKey>,
    /// The fetch was a full clone.
@@ -21,6 +24,7 @@ impl FetchResult {
    pub fn new(doc: DocAt) -> Self {
        Self {
            updated: vec![],
+
            canonical: UpdatedCanonicalRefs::default(),
            namespaces: HashSet::new(),
            clone: false,
            doc,
@@ -28,11 +32,41 @@ impl FetchResult {
    }
}

+
/// The set of canonical references, updated after a fetch, and their
+
/// corresponding targets.
+
#[derive(Clone, Default, Debug)]
+
pub struct UpdatedCanonicalRefs {
+
    inner: BTreeMap<git::Qualified<'static>, git::Oid>,
+
}
+

+
impl IntoIterator for UpdatedCanonicalRefs {
+
    type Item = (git::Qualified<'static>, git::Oid);
+
    type IntoIter = std::collections::btree_map::IntoIter<git::Qualified<'static>, git::Oid>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.inner.into_iter()
+
    }
+
}
+

+
impl UpdatedCanonicalRefs {
+
    /// Insert a new updated entry for the canonical reference identified by
+
    /// `refname` and its new `target.`
+
    pub fn updated(&mut self, refname: git::Qualified<'static>, target: git::Oid) {
+
        self.inner.insert(refname, target);
+
    }
+

+
    /// Return an iterator of all the updates.
+
    pub fn iter(&self) -> impl Iterator<Item = (&git::Qualified<'static>, &git::Oid)> {
+
        self.inner.iter()
+
    }
+
}
+

#[cfg(any(test, feature = "test"))]
impl qcheck::Arbitrary for FetchResult {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        FetchResult {
            updated: vec![],
+
            canonical: UpdatedCanonicalRefs::default(),
            namespaces: HashSet::arbitrary(g),
            clone: bool::arbitrary(g),
            doc: DocAt::arbitrary(g),
modified crates/radicle/CHANGELOG.md
@@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

+
- Introduce a node event for canonical reference updates, `Event::CanonicalRefUpdated`.
+
  Whenever the node fetches new updates, it checks if canonical references can
+
  be updated. The node has learned how to return these results and emit them as
+
  node events. This is a breaking change since it adds a new variant the `Event`
+
  type.
+
- Add `#[non_exhaustive]` to `Event` to prevent any further breaking changes
+
  when adding new variants.
+

### Changed

- `radicle::profile::Home::socket` defaults to the path `\\.\pipe\radicle-node`
modified crates/radicle/src/node/events.rs
@@ -9,7 +9,7 @@ use std::time;

use crossbeam_channel as chan;

-
use crate::git::Oid;
+
use crate::git::{Oid, Qualified};
use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};
@@ -21,6 +21,7 @@ pub const MAX_PENDING_EVENTS: usize = 8192;
///
/// The node emits events of this type to its control socket for other
/// programs to consume.
+
#[non_exhaustive]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Event {
@@ -122,6 +123,15 @@ pub enum Event {
    },
    /// The node has uploaded a Git pack file to another node.
    UploadPack(upload_pack::UploadPack),
+
    /// A canonical reference was updated after a fetch.
+
    CanonicalRefUpdated {
+
        /// The repository the canonical reference was updated for.
+
        rid: RepoId,
+
        /// The reference name of the canonical reference update.
+
        refname: Qualified<'static>,
+
        /// The new target of the reference, after the update.
+
        target: Oid,
+
    },
}

impl From<upload_pack::UploadPack> for Event {
@@ -214,6 +224,20 @@ impl<T: Clone> Emitter<T> {
            .retain(|s| s.try_send(event.clone()).is_ok());
    }

+
    /// Emit a batch of events to subscribers and drop those who can't receive
+
    /// them.
+
    /// N.b. subscribers are also dropped if their channel is full.
+
    pub fn emit_all(&self, events: Vec<T>) {
+
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
+
        #[allow(clippy::unwrap_used)]
+
        self.subscribers.lock().unwrap().retain(|s| {
+
            events
+
                .clone()
+
                .into_iter()
+
                .all(|event| s.try_send(event).is_ok())
+
        });
+
    }
+

    /// Subscribe to events stream.
    pub fn subscribe(&self) -> chan::Receiver<T> {
        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);