Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle src node events.rs
//! Events for `upload-pack` processes.
pub mod upload_pack;
pub use upload_pack::UploadPack;

use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use std::time;

use crossbeam_channel as chan;

use crate::git::Oid;
use crate::git::fmt::Qualified;
use crate::node;
use crate::prelude::*;
use crate::storage::{RefUpdate, refs};

/// Maximum unconsumed events allowed per subscription.
pub const MAX_PENDING_EVENTS: usize = 8192;

/// A service event.
///
/// The node emits events of this type to its control socket for other
/// programs to consume.
#[non_exhaustive]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Event {
    /// The node has received changes to Git references in a
    /// repository stored on the node, from another node.
    RefsFetched {
        /// The node identifier of the other node.
        remote: NodeId,
        /// The identifier of the repository in question.
        rid: RepoId,
        /// The list of Git references that were updated.
        updated: Vec<RefUpdate>,
    },
    /// The node has sent its list of Git references to another node
    /// and the other has fetched the updated references.
    RefsSynced {
        /// The node identifier of the other node.
        remote: NodeId,
        /// The identifier of the repository in question.
        rid: RepoId,
        /// The `rad/sigrefs` reference that was fetched.
        at: Oid,
    },
    /// The node has discovered a repository on new node on the
    /// Radicle network.
    SeedDiscovered {
        /// The identifier of the repository in question.
        rid: RepoId,
        /// The node identifier of the other node.
        nid: NodeId,
    },
    /// The node has dropped a repository on a node from its list of
    /// known repositories and nodes.
    SeedDropped {
        /// The identifier of the repository in question.
        rid: RepoId,
        /// The node identifier of the other node.
        nid: NodeId,
    },
    /// The node has connected directly to another node.
    PeerConnected {
        /// The node identifier of the other node.
        nid: NodeId,
    },
    /// The node has terminated its direct connection to another node.
    PeerDisconnected {
        /// The node identifier of the other node.
        nid: NodeId,
        /// The reason why the connection was terminated.
        reason: String,
    },
    /// The local node has received changes to Git references from its
    /// local user. In other words, the local user has pushed to the
    /// node, updated COBs, or otherwise updated refs in their local node.
    LocalRefsAnnounced {
        /// The identifier of the repository in question.
        rid: RepoId,
        /// List of changed Git references for the repository.
        refs: refs::RefsAt,
        /// When were the new references received? In other words,
        /// when did the user run `git push`?
        timestamp: Timestamp,
    },
    /// The node has received a message with a list of repositories on
    /// another node on the network.
    InventoryAnnounced {
        /// The node identifier of the other node.
        nid: NodeId,
        /// List of repositories sent.
        inventory: Vec<RepoId>,
        /// When was the list sent?
        timestamp: Timestamp,
    },
    /// The node has received a message about new signed Git
    /// references ("sigrefs") for a repository on another node on the
    /// network.
    RefsAnnounced {
        /// The node identifier of the other node.
        nid: NodeId,
        /// The identifier of the repository in question.
        rid: RepoId,
        /// List of Git references for the repository.
        refs: Vec<refs::RefsAt>,
        /// When was the list sent?
        timestamp: Timestamp,
    },
    /// The node received a message about a new node on the network.
    NodeAnnounced {
        /// The node identifier of the other node.
        nid: NodeId,
        /// Alias for the other node.
        alias: Alias,
        /// When was the announcement sent?
        timestamp: Timestamp,
        /// What features did the node advertise to the other node.
        features: node::Features,
        /// What of its addresses did the node tell the other node about?
        addresses: Vec<node::Address>,
    },
    /// The node has uploaded a Git pack file to another node.
    UploadPack(upload_pack::UploadPack),
    /// A canonical reference was updated after a fetch.
    CanonicalRefUpdated {
        /// The repository the canonical reference was updated for.
        rid: RepoId,
        /// The reference name of the canonical reference update.
        refname: Qualified<'static>,
        /// The new target of the reference, after the update.
        target: Oid,
    },
}

impl From<upload_pack::UploadPack> for Event {
    fn from(value: upload_pack::UploadPack) -> Self {
        Self::UploadPack(value)
    }
}

/// Events feed.
pub struct Events(chan::Receiver<Event>);

impl IntoIterator for Events {
    type Item = Event;
    type IntoIter = chan::IntoIter<Event>;

    fn into_iter(self) -> Self::IntoIter {
        self.0.into_iter()
    }
}

impl From<chan::Receiver<Event>> for Events {
    fn from(value: chan::Receiver<Event>) -> Self {
        Self(value)
    }
}

impl Deref for Events {
    type Target = chan::Receiver<Event>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl Events {
    /// Listen for events, and wait for the given predicate to return something,
    /// or timeout if the specified amount of time has elapsed.
    pub fn wait<F, T>(&self, mut f: F, timeout: time::Duration) -> Result<T, chan::RecvTimeoutError>
    where
        F: FnMut(&Event) -> Option<T>,
    {
        let start = time::Instant::now();

        loop {
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
                match self.recv_timeout(timeout) {
                    Ok(event) => {
                        if let Some(output) = f(&event) {
                            return Ok(output);
                        }
                    }
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
                        return Err(err);
                    }
                    Err(chan::RecvTimeoutError::Timeout) => {
                        // Keep trying until our timeout reaches zero.
                        continue;
                    }
                }
            } else {
                return Err(chan::RecvTimeoutError::Timeout);
            }
        }
    }
}

/// Publishes events to subscribers.
#[derive(Debug, Clone)]
pub struct Emitter<T> {
    subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
}

impl<T> Default for Emitter<T> {
    fn default() -> Emitter<T> {
        Emitter {
            subscribers: Default::default(),
        }
    }
}

impl<T: Clone> Emitter<T> {
    /// Emit event to subscribers and drop those who can't receive it.
    /// Nb. subscribers are also dropped if their channel is full.
    pub fn emit(&self, event: T) {
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
        #[allow(clippy::unwrap_used)]
        self.subscribers
            .lock()
            .unwrap()
            .retain(|s| s.try_send(event.clone()).is_ok());
    }

    /// Emit a batch of events to subscribers and drop those who can't receive
    /// them.
    /// N.b. subscribers are also dropped if their channel is full.
    pub fn emit_all(&self, events: impl IntoIterator<Item = T>) {
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
        #[allow(clippy::unwrap_used)]
        let mut subscribers = self.subscribers.lock().unwrap();
        for event in events {
            subscribers.retain(|s| s.try_send(event.clone()).is_ok());
        }
    }

    /// Subscribe to events stream.
    pub fn subscribe(&self) -> chan::Receiver<T> {
        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
        #[allow(clippy::unwrap_used)]
        let mut subs = self.subscribers.lock().unwrap();
        subs.push(sender);

        receiver
    }

    /// Number of subscribers.
    pub fn subscriptions(&self) -> usize {
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
        #[allow(clippy::unwrap_used)]
        self.subscribers.lock().unwrap().len()
    }

    /// Number of messages that have not yet been received.
    pub fn pending(&self) -> usize {
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
        #[allow(clippy::unwrap_used)]
        self.subscribers
            .lock()
            .unwrap()
            .iter()
            .map(|ch| ch.len())
            .sum()
    }
}