Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Replace most sleeps with events
Alexis Sellier committed 3 years ago
commit 7c41c5edffa368f1a7a7f44d820fc67a57dd49fa
parent 6fb6cff70825fbac2aef72722181d39c9cd4113e
7 files changed +156 -69
modified radicle-cli/tests/commands.rs
@@ -12,6 +12,7 @@ use radicle::test::fixtures;

use radicle_cli_test::TestFormula;
use radicle_node::service::tracking::{Policy, Scope};
+
use radicle_node::service::Event;
use radicle_node::test::{
    environment::{Config, Environment},
    logger,
@@ -368,7 +369,7 @@ fn rad_init_sync_and_clone() {
    // Necessary for now, if we don't want the new inventry announcement to be considered stale
    // for Bob.
    // TODO: Find a way to advance internal clocks instead.
-
    thread::sleep(time::Duration::from_secs(1));
+
    thread::sleep(time::Duration::from_millis(3));

    // Alice initializes a repo after her node has started, and after bob has connected to it.
    test(
@@ -416,8 +417,6 @@ fn test_clone_without_seeds() {

#[test]
fn test_cob_replication() {
-
    logger::init(log::Level::Debug);
-

    let mut environment = Environment::new();
    let working = tempfile::tempdir().unwrap();
    let mut alice = environment.node("alice");
@@ -427,6 +426,7 @@ fn test_cob_replication() {

    let mut alice = alice.spawn(Config::default());
    let mut bob = bob.spawn(Config::default());
+
    let events = alice.handle.events();

    alice.handle.track_node(bob.id, None).unwrap();
    alice.connect(&bob);
@@ -435,6 +435,14 @@ fn test_cob_replication() {
    bob.rad("clone", &[rid.to_string().as_str()], working.path())
        .unwrap();

+
    // Wait for Alice to fetch the clone refs.
+
    events
+
        .wait(
+
            |e| matches!(e, Event::RefsFetched { .. }),
+
            time::Duration::from_secs(6),
+
        )
+
        .unwrap();
+

    let bob_repo = bob.storage.repository(rid).unwrap();
    let mut bob_issues = radicle::cob::issue::Issues::open(&bob_repo).unwrap();
    let issue = bob_issues
@@ -455,7 +463,10 @@ fn test_cob_replication() {
    bob.handle.announce_refs(rid).unwrap();

    // Wait for Alice to fetch the issue refs.
-
    thread::sleep(time::Duration::from_secs(1));
+
    events
+
        .iter()
+
        .find(|e| matches!(e, Event::RefsFetched { .. }))
+
        .unwrap();

    let alice_repo = alice.storage.repository(rid).unwrap();
    let alice_issues = radicle::cob::issue::Issues::open(&alice_repo).unwrap();
@@ -469,8 +480,6 @@ fn test_cob_replication() {
//     alice -- seed -- bob
//
fn test_replication_via_seed() {
-
    logger::init(log::Level::Debug);
-

    let mut environment = Environment::new();
    let alice = environment.node("alice");
    let bob = environment.node("bob");
@@ -490,7 +499,7 @@ fn test_replication_via_seed() {
    bob.connect(&seed);

    // Enough time for the next inventory from Seed to not be considered stale by Bob.
-
    thread::sleep(time::Duration::from_secs(1));
+
    thread::sleep(time::Duration::from_millis(3));

    alice.routes_to(&[]);
    seed.routes_to(&[]);
@@ -522,6 +531,9 @@ fn test_replication_via_seed() {
    seed.routes_to(&[(rid, alice.id), (rid, seed.id)]);
    bob.routes_to(&[(rid, alice.id), (rid, seed.id)]);

+
    let seed_events = seed.handle.events();
+
    let alice_events = alice.handle.events();
+

    bob.rad("clone", &[rid.to_string().as_str()], working.join("bob"))
        .unwrap();

@@ -529,8 +541,12 @@ fn test_replication_via_seed() {
    seed.routes_to(&[(rid, alice.id), (rid, seed.id), (rid, bob.id)]);
    bob.routes_to(&[(rid, alice.id), (rid, seed.id), (rid, bob.id)]);

-
    // Enough time for the Seed to be able to fetch Bob's fork.
-
    thread::sleep(time::Duration::from_secs(1));
+
    seed_events
+
        .iter()
+
        .any(|e| matches!(e, Event::RefsFetched { remote, .. } if remote == bob.id));
+
    alice_events
+
        .iter()
+
        .any(|e| matches!(e, Event::RefsFetched { remote, .. } if remote == seed.id));

    seed.storage
        .repository(rid)
modified radicle-node/src/runtime.rs
@@ -65,7 +65,7 @@ pub enum Error {
/// Publishes events to subscribers.
#[derive(Debug, Clone)]
pub struct Emitter<T> {
-
    pub(crate) subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
+
    subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
}

impl<T> Default for Emitter<T> {
@@ -86,7 +86,7 @@ impl<T: Clone> Emitter<T> {
    }

    /// Subscribe to events stream.
-
    pub fn events(&mut self) -> chan::Receiver<T> {
+
    pub fn subscribe(&self) -> chan::Receiver<T> {
        let (sender, receiver) = chan::unbounded();
        let mut subs = self.subscribers.lock().unwrap();
        subs.push(sender);
modified radicle-node/src/runtime/handle.rs
@@ -1,8 +1,8 @@
-
use std::fmt;
-
use std::io;
+
use std::ops::Deref;
use std::os::unix::net::UnixStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
+
use std::{fmt, io, time};

use crossbeam_channel as chan;
use cyphernet::Ecdh;
@@ -66,19 +66,61 @@ pub struct Handle<G: Signer + Ecdh> {

    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
    shutdown: Arc<AtomicBool>,
-

    /// Publishes events to subscribers.
    emitter: Emitter<Event>,
}

+
/// Events feed.
+
pub struct Events(chan::Receiver<Event>);
+

+
impl Deref for Events {
+
    type Target = chan::Receiver<Event>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl Events {
+
    /// Listen for events, and wait for the given predicate to return something,
+
    /// or timeout if the specified amount of time has elapsed.
+
    pub fn wait<F>(
+
        &self,
+
        mut f: F,
+
        timeout: time::Duration,
+
    ) -> Result<Event, chan::RecvTimeoutError>
+
    where
+
        F: FnMut(&Event) -> bool,
+
    {
+
        let start = time::Instant::now();
+

+
        loop {
+
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+
                match self.recv_timeout(timeout) {
+
                    Ok(event) => {
+
                        if f(&event) {
+
                            return Ok(event);
+
                        }
+
                    }
+
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
+
                        return Err(err);
+
                    }
+
                    Err(chan::RecvTimeoutError::Timeout) => {
+
                        // Keep trying until our timeout reaches zero.
+
                        continue;
+
                    }
+
                }
+
            } else {
+
                return Err(chan::RecvTimeoutError::Timeout);
+
            }
+
        }
+
    }
+
}
+

impl<G: Signer + Ecdh> Handle<G> {
    /// Subscribe to events stream.
-
    pub fn events(&mut self) -> chan::Receiver<Event> {
-
        let (sender, receiver) = chan::unbounded();
-
        let mut subs = self.emitter.subscribers.lock().unwrap();
-
        subs.push(sender);
-

-
        receiver
+
    pub fn events(&self) -> Events {
+
        Events(self.emitter.subscribe())
    }
}

modified radicle-node/src/service.rs
@@ -92,6 +92,17 @@ pub enum Event {
        rid: Id,
        updated: Vec<RefUpdate>,
    },
+
    SeedDiscovered {
+
        rid: Id,
+
        nid: NodeId,
+
    },
+
    SeedDropped {
+
        rid: Id,
+
        nid: NodeId,
+
    },
+
    PeerConnected {
+
        nid: NodeId,
+
    },
}

/// General service error.
@@ -344,7 +355,7 @@ where

    /// Subscriber to inner `Emitter` events.
    pub fn events(&mut self) -> chan::Receiver<Event> {
-
        self.emitter.events()
+
        self.emitter.subscribe()
    }

    /// Get I/O reactor.
@@ -668,6 +679,7 @@ where

    pub fn connected(&mut self, remote: NodeId, link: Link) {
        info!(target: "service", "Connected to {} ({:?})", remote, link);
+
        self.emitter.emit(Event::PeerConnected { nid: remote });

        let msgs = self.initial(link);

@@ -807,6 +819,7 @@ where
                match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
                    Ok(updated) => {
                        if updated.is_empty() {
+
                            debug!(target: "service", "No routes updated by inventory announcement from {announcer}");
                            return Ok(false);
                        }
                    }
@@ -863,6 +876,10 @@ where
                    .insert(message.rid, *relayer, message.timestamp)
                {
                    if updated {
+
                        self.emitter.emit(Event::SeedDiscovered {
+
                            rid: message.rid,
+
                            nid: *relayer,
+
                        });
                        info!(target: "service", "Routing table updated for {} with seed {relayer}", message.rid);
                    }
                }
@@ -886,7 +903,7 @@ where
                        match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
                            Ok(true) => self.fetch(message.rid, announcer),
                            Ok(false) => {
-
                                debug!(target: "service", "Skip fetch the refs from {announcer}")
+
                                debug!(target: "service", "Skipping fetch from {announcer}")
                            }
                            Err(e) => {
                                error!(target: "service", "Failed to check refs announcement: {e}");
@@ -977,7 +994,7 @@ where
    ) -> Result<bool, Error> {
        // First, check the freshness.
        if !message.is_fresh(&self.storage)? {
-
            debug!(target: "service", "All refs of {} are already in the local node", &message.rid);
+
            debug!(target: "service", "All refs of {} are already in local storage", &message.rid);
            return Ok(false);
        }

@@ -1206,26 +1223,31 @@ where
        let mut updated = Vec::new();
        let mut included = HashSet::new();

-
        for proj_id in inventory {
-
            included.insert(proj_id);
-
            if self.routing.insert(*proj_id, from, timestamp)? {
-
                info!(target: "service", "Routing table updated for {proj_id} with seed {from}");
+
        for rid in inventory {
+
            included.insert(rid);
+
            if self.routing.insert(*rid, from, timestamp)? {
+
                info!(target: "service", "Routing table updated for {rid} with seed {from}");
+
                self.emitter.emit(Event::SeedDiscovered {
+
                    rid: *rid,
+
                    nid: from,
+
                });

                if self
                    .tracking
-
                    .is_repo_tracked(proj_id)
+
                    .is_repo_tracked(rid)
                    .expect("Service::process_inventory: error accessing tracking configuration")
                {
                    // TODO: We should fetch here if we're already connected, case this seed has
                    // refs we don't have.
                }
-
                updated.push(*proj_id);
+
                updated.push(*rid);
            }
        }
-
        for id in self.routing.get_resources(&from)?.into_iter() {
-
            if !included.contains(&id) {
-
                if self.routing.remove(&id, &from)? {
-
                    updated.push(id);
+
        for rid in self.routing.get_resources(&from)?.into_iter() {
+
            if !included.contains(&rid) {
+
                if self.routing.remove(&rid, &from)? {
+
                    updated.push(rid);
+
                    self.emitter.emit(Event::SeedDropped { rid, nid: from });
                }
            }
        }
modified radicle-node/src/test/environment.rs
@@ -3,7 +3,7 @@ use std::mem::ManuallyDrop;
use std::path::{Path, PathBuf};
use std::{
    collections::{BTreeMap, BTreeSet},
-
    env, fs, io, iter, net, process, thread,
+
    env, fs, io, iter, net, process, thread, time,
    time::Duration,
};

@@ -27,6 +27,7 @@ use radicle::test::fixtures;
use radicle::Storage;

use crate::node::NodeId;
+
use crate::service::Event;
use crate::storage::git::transport;
use crate::{runtime, runtime::Handle, service, Runtime};

@@ -133,27 +134,28 @@ impl<G: Signer + cyphernet::Ecdh + 'static> Drop for NodeHandle<G> {
impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
    /// Connect this node to another node, and wait for the connection to be established both ways.
    pub fn connect(&mut self, remote: &NodeHandle<G>) -> &mut Self {
+
        let local_events = self.handle.events();
+
        let remote_events = remote.handle.events();
+

        self.handle.connect(remote.id, remote.addr.into()).unwrap();

-
        loop {
-
            let local_sessions = self.handle.sessions().unwrap();
-
            let remote_sessions = remote.handle.sessions().unwrap();
-

-
            let local_sessions = local_sessions
-
                .connected()
-
                .map(|(id, _)| id)
-
                .collect::<BTreeSet<_>>();
-
            let remote_sessions = remote_sessions
-
                .connected()
-
                .map(|(id, _)| id)
-
                .collect::<BTreeSet<_>>();
-

-
            if local_sessions.contains(&remote.id) && remote_sessions.contains(&self.id) {
-
                log::debug!(target: "test", "Connection between {} and {} established", self.id, remote.id);
-
                break;
-
            }
-
            thread::sleep(Duration::from_millis(100));
-
        }
+
        local_events
+
            .iter()
+
            .find(|e| {
+
                matches!(
+
                    e, Event::PeerConnected { nid } if nid == &remote.id
+
                )
+
            })
+
            .unwrap();
+
        remote_events
+
            .iter()
+
            .find(|e| {
+
                matches!(
+
                    e, Event::PeerConnected { nid } if nid == &self.id
+
                )
+
            })
+
            .unwrap();
+

        self
    }

@@ -176,13 +178,10 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
    /// Wait until this node's routing table contains the given routes.
    #[track_caller]
    pub fn routes_to(&self, routes: &[(Id, NodeId)]) {
-
        let mut tries = 0;
-
        loop {
-
            // ~3s to converge to the correct routes
-
            if tries > 30 {
-
                panic!("Node::routes_to: routing tables did not converge to include given routes")
-
            }
+
        log::debug!(target: "test", "Waiting for {} to route to {:?}", self.id, routes);
+
        let events = self.handle.events();

+
        loop {
            let mut remaining: BTreeSet<_> = routes.iter().collect();

            for (rid, nid) in self.routing() {
@@ -196,10 +195,13 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
            if remaining.is_empty() {
                break;
            }
-
            tries += 1;
-
            thread::sleep(Duration::from_millis(100));
+
            events
+
                .wait(
+
                    |e| matches!(e, Event::SeedDiscovered { .. }),
+
                    time::Duration::from_secs(6),
+
                )
+
                .unwrap();
        }
-
        log::debug!(target: "test", "Node {} routes to {:?}", self.id, routes);
    }

    /// Run a `rad` CLI command.
modified radicle-node/src/tests.rs
@@ -1256,12 +1256,17 @@ fn test_push_and_pull() {
        .get(&alice.node_id(), proj_id)
        .unwrap()
        .is_some());
-
    assert_matches!(
-
        bob_events.recv(),
-
        Ok(service::Event::RefsFetched { remote, .. })
-
        if remote == eve.node_id(),
-
        "Bob fetched from Eve"
-
    );
+

+
    bob_events
+
        .iter()
+
        .find(|e| {
+
            matches!(
+
                e,
+
                service::Event::RefsFetched { remote, .. }
+
                if *remote == eve.node_id(),
+
            )
+
        })
+
        .expect("Bob fetched from Eve");
}

#[test]
modified radicle-node/src/worker.rs
@@ -446,7 +446,7 @@ impl Pool {
    pub fn run(self) -> thread::Result<()> {
        for (i, worker) in self.pool.into_iter().enumerate() {
            if let Err(err) = worker.join()? {
-
                log::debug!(target: "pool", "Worker {i} exited: {err}");
+
                log::trace!(target: "pool", "Worker {i} exited: {err}");
            }
        }
        log::debug!(target: "pool", "Worker pool shutting down..");