Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add eventing system to `Handler` and `Service`
xphoniex committed 3 years ago
commit a49eec9892e6f1a1811cf862f4d5297fc599ba84
parent ebfd445034150956866bff17a861c0808e1ec9a0
8 files changed +85 -30
modified radicle-node/src/runtime.rs
@@ -3,6 +3,7 @@ mod handle;
use std::io::{BufRead, BufReader};
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
+
use std::sync::{Arc, Mutex};
use std::{fs, io, net, thread, time};

use crossbeam_channel as chan;
@@ -22,7 +23,7 @@ use crate::address;
use crate::control;
use crate::crypto::Signer;
use crate::node::{routing, NodeId};
-
use crate::service::tracking;
+
use crate::service::{tracking, Event};
use crate::wire;
use crate::wire::Wire;
use crate::worker;
@@ -61,6 +62,39 @@ pub enum Error {
    GitVersion(#[from] git::VersionError),
}

+
/// Publishes events to subscribers.
+
#[derive(Debug, Clone)]
+
pub struct Emitter<T> {
+
    pub(crate) subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
+
}
+

+
impl<T> Default for Emitter<T> {
+
    fn default() -> Emitter<T> {
+
        Emitter {
+
            subscribers: Default::default(),
+
        }
+
    }
+
}
+

+
impl<T: Clone> Emitter<T> {
+
    /// Emit event to subscribers and drop those who can't receive it.
+
    pub(crate) fn emit(&self, event: T) {
+
        self.subscribers
+
            .lock()
+
            .unwrap()
+
            .retain(|s| s.try_send(event.clone()).is_ok());
+
    }
+

+
    /// Subscribe to events stream.
+
    pub fn events(&mut self) -> chan::Receiver<T> {
+
        let (sender, receiver) = chan::unbounded();
+
        let mut subs = self.subscribers.lock().unwrap();
+
        subs.push(sender);
+

+
        receiver
+
    }
+
}
+

/// Holds join handles to the client threads, as well as a client handle.
pub struct Runtime<G: Signer + Ecdh> {
    pub id: NodeId,
@@ -113,6 +147,7 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {

        log::info!(target: "node", "Default tracking policy set to '{}'", &config.policy);
        log::info!(target: "node", "Initializing service ({:?})..", network);
+
        let emitter: Emitter<Event> = Default::default();
        let service = service::Service::new(
            config,
            clock,
@@ -122,6 +157,7 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {
            tracking,
            signer.clone(),
            rng,
+
            emitter.clone(),
        );

        let (worker_send, worker_recv) = chan::unbounded::<worker::Task<G>>();
@@ -138,7 +174,7 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {
            log::info!(target: "node", "Listening on {local_addr}..");
        }
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
-
        let handle = Handle::new(home.clone(), reactor.controller());
+
        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
        let atomic = git::version()? >= git::VERSION_REQUIRED;

        if !atomic {
modified radicle-node/src/runtime/handle.rs
@@ -13,8 +13,10 @@ use crate::crypto::Signer;
use crate::identity::Id;
use crate::node::{Command, FetchResult};
use crate::profile::Home;
+
use crate::runtime::Emitter;
use crate::service;
use crate::service::tracking;
+
use crate::service::Event;
use crate::service::{CommandError, QueryState};
use crate::service::{NodeId, Sessions};
use crate::wire;
@@ -64,6 +66,20 @@ pub struct Handle<G: Signer + Ecdh> {

    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
    shutdown: Arc<AtomicBool>,
+

+
    /// Publishes events to subscribers.
+
    emitter: Emitter<Event>,
+
}
+

+
impl<G: Signer + Ecdh> Handle<G> {
+
    /// Subscribe to events stream.
+
    pub fn events(&mut self) -> chan::Receiver<Event> {
+
        let (sender, receiver) = chan::unbounded();
+
        let mut subs = self.emitter.subscribers.lock().unwrap();
+
        subs.push(sender);
+

+
        receiver
+
    }
}

impl<G: Signer + Ecdh> fmt::Debug for Handle<G> {
@@ -78,16 +94,22 @@ impl<G: Signer + Ecdh> Clone for Handle<G> {
            home: self.home.clone(),
            controller: self.controller.clone(),
            shutdown: self.shutdown.clone(),
+
            emitter: self.emitter.clone(),
        }
    }
}

impl<G: Signer + Ecdh + 'static> Handle<G> {
-
    pub fn new(home: Home, controller: reactor::Controller<wire::Control<G>>) -> Self {
+
    pub fn new(
+
        home: Home,
+
        controller: reactor::Controller<wire::Control<G>>,
+
        emitter: Emitter<Event>,
+
    ) -> Self {
        Self {
            home,
            controller,
            shutdown: Arc::default(),
+
            emitter,
        }
    }

modified radicle-node/src/service.rs
@@ -29,6 +29,7 @@ use crate::node;
use crate::node::routing;
use crate::node::{Address, Features, FetchResult, Seed, Seeds};
use crate::prelude::*;
+
use crate::runtime::Emitter;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::reactor::FetchDirection;
@@ -203,6 +204,8 @@ pub struct Service<R, A, S, G> {
    last_announce: LocalTime,
    /// Time when the service was initialized.
    start_time: LocalTime,
+
    /// Publishes events to subscribers.
+
    emitter: Emitter<Event>,
}

impl<R, A, S, G> Service<R, A, S, G>
@@ -236,6 +239,7 @@ where
        tracking: tracking::Config,
        signer: G,
        rng: Rng,
+
        emitter: Emitter<Event>,
    ) -> Self {
        let sessions = Sessions::new(rng.clone());

@@ -261,6 +265,7 @@ where
            last_prune: LocalTime::default(),
            last_announce: LocalTime::default(),
            start_time: LocalTime::default(),
+
            emitter,
        }
    }

@@ -341,6 +346,11 @@ where
        &self.signer
    }

+
    /// Subscriber to inner `Emitter` events.
+
    pub fn events(&mut self) -> chan::Receiver<Event> {
+
        self.emitter.events()
+
    }
+

    /// Get I/O reactor.
    pub fn reactor(&mut self) -> &mut Reactor {
        &mut self.reactor
@@ -586,11 +596,12 @@ where
                    Ok(updated) => {
                        log::debug!(target: "service", "Fetched {rid} from {remote}");

-
                        self.reactor.event(Event::RefsFetched {
+
                        self.emitter.emit(Event::RefsFetched {
                            remote,
                            rid,
                            updated: updated.clone(),
                        });
+

                        FetchResult::Success { updated }
                    }
                    Err(err) => {
modified radicle-node/src/service/reactor.rs
@@ -22,8 +22,6 @@ pub enum Io {
    Fetch(Fetch),
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
-
    /// Emit an event.
-
    Event(Event),
}

/// Fetch job sent to worker thread.
@@ -81,11 +79,6 @@ pub struct Reactor {
}

impl Reactor {
-
    /// Emit an event.
-
    pub fn event(&mut self, event: Event) {
-
        self.io.push_back(Io::Event(event));
-
    }
-

    /// Connect to a peer.
    pub fn connect(&mut self, id: NodeId, addr: Address) {
        self.io.push_back(Io::Connect(id, addr));
modified radicle-node/src/test/peer.rs
@@ -3,6 +3,7 @@ use std::iter;
use std::net;
use std::ops::{Deref, DerefMut};

+
use crossbeam_channel as chan;
use log::*;

use crate::address;
@@ -13,6 +14,7 @@ use crate::identity::Id;
use crate::node;
use crate::node::routing;
use crate::prelude::*;
+
use crate::runtime::Emitter;
use crate::service;
use crate::service::message::*;
use crate::service::reactor::Io;
@@ -130,6 +132,8 @@ where
        let tracking = tracking::Store::memory().unwrap();
        let tracking = tracking::Config::new(config.policy, config.scope, tracking);
        let id = *config.signer.public_key();
+

+
        let emitter: Emitter<Event> = Default::default();
        let service = Service::new(
            config.config,
            config.local_time,
@@ -139,6 +143,7 @@ where
            tracking,
            config.signer,
            config.rng.clone(),
+
            emitter,
        );
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));
@@ -320,10 +325,9 @@ where
        msgs.into_iter()
    }

-
    /// Get a draining iterator over the peer's emitted events.
-
    pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
-
        self.outbox()
-
            .filter_map(|io| if let Io::Event(e) = io { Some(e) } else { None })
+
    /// Get a stream of the peer's emitted events.
+
    pub fn events(&mut self) -> chan::Receiver<Event> {
+
        self.service.events()
    }

    /// Get a draining iterator over the peer's I/O outbox.
modified radicle-node/src/test/simulator.rs
@@ -606,14 +606,6 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    );
                }
            }
-
            Io::Event(event) => {
-
                let events = self.events.entry(node).or_insert_with(VecDeque::new);
-
                if events.len() >= MAX_EVENTS {
-
                    warn!(target: "sim", "Dropping event: buffer is full");
-
                } else {
-
                    events.push_back(event);
-
                }
-
            }
            Io::Fetch(fetch) => {
                let remote = fetch.remote;

modified radicle-node/src/tests.rs
@@ -1069,6 +1069,8 @@ fn test_push_and_pull() {
    )
    .initialize([&mut alice, &mut bob, &mut eve]);

+
    let bob_events = bob.events();
+

    // Here we expect Alice to connect to Eve.
    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

@@ -1098,8 +1100,8 @@ fn test_push_and_pull() {
        .unwrap()
        .is_some());
    assert_matches!(
-
        sim.events(&bob.id).next(),
-
        Some(service::Event::RefsFetched { remote, .. })
+
        bob_events.recv(),
+
        Ok(service::Event::RefsFetched { remote, .. })
        if remote == eve.node_id(),
        "Bob fetched from Eve"
    );
modified radicle-node/src/wire/protocol.rs
@@ -703,11 +703,6 @@ where
                    }
                    self.actions.push_back(reactor::Action::Send(fd, data));
                }
-
                Io::Event(_e) => {
-
                    log::warn!(
-
                        target: "wire", "Events are not currently supported"
-
                    );
-
                }
                Io::Connect(node_id, addr) => {
                    if self.connected().any(|(_, id)| id == &node_id) {
                        log::error!(