Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Introduce a node event for canonical reference updates
Fintan Halpenny committed 8 months ago
commit a8255a2e074aa07df5fa4b4fee5a3703b8d41d94
parent 690f6b02c0e0d9201807e5711b0e6c6f5a0b5c5a
7 files changed +99 -9
modified crates/radicle-node/src/test/simulator.rs
@@ -663,6 +663,7 @@ where
                                remote,
                                Rc::new(Ok(fetch::FetchResult {
                                    updated: vec![],
+
                                    canonical: fetch::UpdatedCanonicalRefs::default(),
                                    namespaces: HashSet::new(),
                                    clone: true,
                                    doc: arbitrary::gen(1),
modified crates/radicle-node/src/tests.rs
@@ -1589,6 +1589,7 @@ fn test_queued_fetch_from_ann_same_rid() {
                name: refname.clone(),
                oid,
            }],
+
            canonical: fetch::UpdatedCanonicalRefs::default(),
            namespaces: [carol.id()].into_iter().collect(),
            clone: false,
            doc: arbitrary::gen(1),
modified crates/radicle-node/src/worker/fetch.rs
@@ -18,7 +18,7 @@ use radicle::storage::{
use radicle::{cob, git, node, Storage};
use radicle_fetch::git::refs::Applied;
use radicle_fetch::{Allowed, BlockList, FetchLimit};
-
pub use radicle_protocol::worker::fetch::FetchResult;
+
pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};

use super::channels::ChannelsFlush;

@@ -128,9 +128,13 @@ impl Handle {
                    Err(e) => return Err(e.into()),
                }

-
                if let Err(e) = set_canonical_refs(&repo, &applied) {
-
                    log::warn!(target: "worker", "Failed to set canonical references: {e}");
-
                }
+
                let canonical = match set_canonical_refs(&repo, &applied) {
+
                    Ok(updates) => updates.unwrap_or_default(),
+
                    Err(e) => {
+
                        log::warn!(target: "worker", "Failed to set canonical references: {e}");
+
                        UpdatedCanonicalRefs::default()
+
                    }
+
                };

                // Notifications are only posted for pulls, not clones.
                if let Some(mut store) = notifs {
@@ -147,6 +151,7 @@ impl Handle {

                Ok(FetchResult {
                    updated: applied.updated,
+
                    canonical,
                    namespaces: remotes.into_iter().collect(),
                    doc: repo.identity_doc()?,
                    clone,
@@ -356,17 +361,21 @@ where
    Ok(())
}

-
fn set_canonical_refs(repo: &Repository, applied: &Applied) -> Result<(), error::Canonical> {
+
fn set_canonical_refs(
+
    repo: &Repository,
+
    applied: &Applied,
+
) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
    let identity = repo.identity()?;
    let rules = match identity
        .canonical_refs()?
        .map(|crefs| crefs.rules().clone())
        .filter(|rules| !rules.is_empty())
    {
-
        None => return Ok(()),
+
        None => return Ok(None),
        Some(rules) => rules,
    };

+
    let mut updated_refs = UpdatedCanonicalRefs::default();
    for update in applied.updated.iter() {
        let name = match update {
            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => name,
@@ -421,9 +430,11 @@ fn set_canonical_refs(repo: &Repository, applied: &Applied) -> Result<(), error:
                        target: "worker",
                        "Failed to set canonical reference {refname}->{oid}: {e}"
                    );
+
                } else {
+
                    updated_refs.updated(refname, oid);
                }
            }
        }
    }
-
    Ok(())
+
    Ok(Some(updated_refs))
}
modified crates/radicle-protocol/src/service.rs
@@ -1177,6 +1177,7 @@ where
        match result {
            Ok(crate::worker::fetch::FetchResult {
                updated,
+
                canonical,
                namespaces,
                clone,
                doc,
@@ -1198,6 +1199,16 @@ where
                    rid,
                    updated: updated.clone(),
                });
+
                self.emitter.emit_all(
+
                    canonical
+
                        .into_iter()
+
                        .map(|(refname, target)| Event::CanonicalRefUpdated {
+
                            rid,
+
                            refname,
+
                            target,
+
                        })
+
                        .collect(),
+
                );

                // Announce our new inventory if this fetch was a full clone.
                // Only update and announce inventory for public repositories.
modified crates/radicle-protocol/src/worker/fetch.rs
@@ -1,14 +1,17 @@
pub mod error;

-
use std::collections::HashSet;
+
use std::collections::{BTreeMap, HashSet};

use radicle::crypto::PublicKey;
+
use radicle::git;
use radicle::{identity::DocAt, storage::RefUpdate};

#[derive(Debug, Clone)]
pub struct FetchResult {
    /// The set of updated references.
    pub updated: Vec<RefUpdate>,
+
    /// The canonical references that were updated as part of the fetch process.
+
    pub canonical: UpdatedCanonicalRefs,
    /// The set of remote namespaces that were updated.
    pub namespaces: HashSet<PublicKey>,
    /// The fetch was a full clone.
@@ -21,6 +24,7 @@ impl FetchResult {
    pub fn new(doc: DocAt) -> Self {
        Self {
            updated: vec![],
+
            canonical: UpdatedCanonicalRefs::default(),
            namespaces: HashSet::new(),
            clone: false,
            doc,
@@ -28,11 +32,41 @@ impl FetchResult {
    }
}

+
/// The set of canonical references, updated after a fetch, and their
+
/// corresponding targets.
+
#[derive(Clone, Default, Debug)]
+
pub struct UpdatedCanonicalRefs {
+
    inner: BTreeMap<git::Qualified<'static>, git::Oid>,
+
}
+

+
impl IntoIterator for UpdatedCanonicalRefs {
+
    type Item = (git::Qualified<'static>, git::Oid);
+
    type IntoIter = std::collections::btree_map::IntoIter<git::Qualified<'static>, git::Oid>;
+

+
    fn into_iter(self) -> Self::IntoIter {
+
        self.inner.into_iter()
+
    }
+
}
+

+
impl UpdatedCanonicalRefs {
+
    /// Insert a new updated entry for the canonical reference identified by
+
    /// `refname` and its new `target.`
+
    pub fn updated(&mut self, refname: git::Qualified<'static>, target: git::Oid) {
+
        self.inner.insert(refname, target);
+
    }
+

+
    /// Return an iterator of all the updates.
+
    pub fn iter(&self) -> impl Iterator<Item = (&git::Qualified<'static>, &git::Oid)> {
+
        self.inner.iter()
+
    }
+
}
+

#[cfg(any(test, feature = "test"))]
impl qcheck::Arbitrary for FetchResult {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        FetchResult {
            updated: vec![],
+
            canonical: UpdatedCanonicalRefs::default(),
            namespaces: HashSet::arbitrary(g),
            clone: bool::arbitrary(g),
            doc: DocAt::arbitrary(g),
modified crates/radicle/CHANGELOG.md
@@ -9,6 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

+
- Introduce a node event for canonical reference updates, `Event::CanonicalRefUpdated`.
+
  Whenever the node fetches new updates, it checks if canonical references can
+
  be updated. The node has learned how to return these results and emit them as
+
  node events. This is a breaking change since it adds a new variant the `Event`
+
  type.
+
- Add `#[non_exhaustive]` to `Event` to prevent any further breaking changes
+
  when adding new variants.
+

### Changed

- `radicle::profile::Home::socket` defaults to the path `\\.\pipe\radicle-node`
modified crates/radicle/src/node/events.rs
@@ -9,7 +9,7 @@ use std::time;

use crossbeam_channel as chan;

-
use crate::git::Oid;
+
use crate::git::{Oid, Qualified};
use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};
@@ -21,6 +21,7 @@ pub const MAX_PENDING_EVENTS: usize = 8192;
///
/// The node emits events of this type to its control socket for other
/// programs to consume.
+
#[non_exhaustive]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Event {
@@ -122,6 +123,15 @@ pub enum Event {
    },
    /// The node has uploaded a Git pack file to another node.
    UploadPack(upload_pack::UploadPack),
+
    /// A canonical reference was updated after a fetch.
+
    CanonicalRefUpdated {
+
        /// The repository the canonical reference was updated for.
+
        rid: RepoId,
+
        /// The reference name of the canonical reference update.
+
        refname: Qualified<'static>,
+
        /// The new target of the reference, after the update.
+
        target: Oid,
+
    },
}

impl From<upload_pack::UploadPack> for Event {
@@ -214,6 +224,20 @@ impl<T: Clone> Emitter<T> {
            .retain(|s| s.try_send(event.clone()).is_ok());
    }

+
    /// Emit a batch of events to subscribers and drop those who can't receive
+
    /// them.
+
    /// N.b. subscribers are also dropped if their channel is full.
+
    pub fn emit_all(&self, events: Vec<T>) {
+
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
+
        #[allow(clippy::unwrap_used)]
+
        self.subscribers.lock().unwrap().retain(|s| {
+
            events
+
                .clone()
+
                .into_iter()
+
                .all(|event| s.try_send(event).is_ok())
+
        });
+
    }
+

    /// Subscribe to events stream.
    pub fn subscribe(&self) -> chan::Receiver<T> {
        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);