Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Emit event when refs are synced
Alexis Sellier committed 3 years ago
commit bb07e255717893c588c5faaca6652d06450f7d11
parent 4b2bc365efbbd723f33031e467d85fc36e73ad5b
8 files changed +218 -93
modified radicle-node/src/runtime/handle.rs
@@ -1,8 +1,7 @@
-
use std::ops::Deref;
use std::os::unix::net::UnixStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
-
use std::{fmt, io, time};
+
use std::{fmt, io};

use crossbeam_channel as chan;
use radicle::node::Seeds;
@@ -14,8 +13,8 @@ use crate::profile::Home;
use crate::runtime::Emitter;
use crate::service;
use crate::service::tracking;
-
use crate::service::Event;
use crate::service::{CommandError, QueryState};
+
use crate::service::{Event, Events};
use crate::service::{NodeId, Sessions};
use crate::wire;
use crate::wire::StreamId;
@@ -69,57 +68,10 @@ pub struct Handle {
    emitter: Emitter<Event>,
}

-
/// Events feed.
-
pub struct Events(chan::Receiver<Event>);
-

-
impl Deref for Events {
-
    type Target = chan::Receiver<Event>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl Events {
-
    /// Listen for events, and wait for the given predicate to return something,
-
    /// or timeout if the specified amount of time has elapsed.
-
    pub fn wait<F>(
-
        &self,
-
        mut f: F,
-
        timeout: time::Duration,
-
    ) -> Result<Event, chan::RecvTimeoutError>
-
    where
-
        F: FnMut(&Event) -> bool,
-
    {
-
        let start = time::Instant::now();
-

-
        loop {
-
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
-
                match self.recv_timeout(timeout) {
-
                    Ok(event) => {
-
                        if f(&event) {
-
                            return Ok(event);
-
                        }
-
                    }
-
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
-
                        return Err(err);
-
                    }
-
                    Err(chan::RecvTimeoutError::Timeout) => {
-
                        // Keep trying until our timeout reaches zero.
-
                        continue;
-
                    }
-
                }
-
            } else {
-
                return Err(chan::RecvTimeoutError::Timeout);
-
            }
-
        }
-
    }
-
}
-

impl Handle {
    /// Subscribe to events stream.
    pub fn events(&self) -> Events {
-
        Events(self.emitter.subscribe())
+
        Events::from(self.emitter.subscribe())
    }
}

modified radicle-node/src/service.rs
@@ -2,6 +2,7 @@
#![allow(clippy::collapsible_match)]
#![allow(clippy::collapsible_if)]
pub mod config;
+
pub mod events;
pub mod filter;
pub mod message;
pub mod reactor;
@@ -42,6 +43,7 @@ use crate::Link;

pub use crate::node::NodeId;
pub use crate::service::config::{Config, Network};
+
pub use crate::service::events::{Event, Events};
pub use crate::service::message::{Message, ZeroBytes};
pub use crate::service::session::Session;

@@ -82,27 +84,6 @@ pub use message::INVENTORY_LIMIT;
/// Maximum number of project git references imposed by message size limits.
pub use message::REF_REMOTE_LIMIT;

-
/// A service event.
-
#[derive(Debug, Clone)]
-
pub enum Event {
-
    RefsFetched {
-
        remote: NodeId,
-
        rid: Id,
-
        updated: Vec<RefUpdate>,
-
    },
-
    SeedDiscovered {
-
        rid: Id,
-
        nid: NodeId,
-
    },
-
    SeedDropped {
-
        rid: Id,
-
        nid: NodeId,
-
    },
-
    PeerConnected {
-
        nid: NodeId,
-
    },
-
}
-

/// Result of syncing our routing table with a node's inventory.
#[derive(Default)]
struct SyncedRouting {
@@ -369,8 +350,8 @@ where
    }

    /// Subscriber to inner `Emitter` events.
-
    pub fn events(&mut self) -> chan::Receiver<Event> {
-
        self.emitter.subscribe()
+
    pub fn events(&mut self) -> Events {
+
        Events::from(self.emitter.subscribe())
    }

    /// Get I/O reactor.
@@ -909,6 +890,22 @@ where
                    return Ok(false);
                }

+
                // Check if the announcer is in sync with our own refs, and if so emit an event.
+
                // This event is used for showing sync progress to users.
+
                match message.is_synced(&self.node_id(), &self.storage) {
+
                    Ok(synced) => {
+
                        if synced {
+
                            self.emitter.emit(Event::RefsSynced {
+
                                rid: message.rid,
+
                                remote: *announcer,
+
                            });
+
                        }
+
                    }
+
                    Err(e) => {
+
                        error!(target: "service", "Error checking refs announcement sync status: {e}");
+
                    }
+
                }
+

                // TODO: Buffer/throttle fetches.
                let repo_entry = self.tracking.repo_policy(&message.rid).expect(
                    "Service::handle_announcement: error accessing repo tracking configuration",
added radicle-node/src/service/events.rs
@@ -0,0 +1,85 @@
+
use std::ops::Deref;
+
use std::time;
+

+
use crossbeam_channel as chan;
+

+
use radicle::prelude::*;
+
use radicle::storage::RefUpdate;
+

+
/// A service event.
+
#[derive(Debug, Clone)]
+
pub enum Event {
+
    RefsFetched {
+
        remote: NodeId,
+
        rid: Id,
+
        updated: Vec<RefUpdate>,
+
    },
+
    RefsSynced {
+
        remote: NodeId,
+
        rid: Id,
+
    },
+
    SeedDiscovered {
+
        rid: Id,
+
        nid: NodeId,
+
    },
+
    SeedDropped {
+
        rid: Id,
+
        nid: NodeId,
+
    },
+
    PeerConnected {
+
        nid: NodeId,
+
    },
+
}
+

+
/// Events feed.
+
pub struct Events(chan::Receiver<Event>);
+

+
impl From<chan::Receiver<Event>> for Events {
+
    fn from(value: chan::Receiver<Event>) -> Self {
+
        Self(value)
+
    }
+
}
+

+
impl Deref for Events {
+
    type Target = chan::Receiver<Event>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl Events {
+
    /// Listen for events, and wait for the given predicate to return something,
+
    /// or timeout if the specified amount of time has elapsed.
+
    pub fn wait<F>(
+
        &self,
+
        mut f: F,
+
        timeout: time::Duration,
+
    ) -> Result<Event, chan::RecvTimeoutError>
+
    where
+
        F: FnMut(&Event) -> bool,
+
    {
+
        let start = time::Instant::now();
+

+
        loop {
+
            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
+
                match self.recv_timeout(timeout) {
+
                    Ok(event) => {
+
                        if f(&event) {
+
                            return Ok(event);
+
                        }
+
                    }
+
                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
+
                        return Err(err);
+
                    }
+
                    Err(chan::RecvTimeoutError::Timeout) => {
+
                        // Keep trying until our timeout reaches zero.
+
                        continue;
+
                    }
+
                }
+
            } else {
+
                return Err(chan::RecvTimeoutError::Timeout);
+
            }
+
        }
+
    }
+
}
modified radicle-node/src/service/message.rs
@@ -174,6 +174,26 @@ impl RefsAnnouncement {
        }
        Ok(false)
    }
+

+
    /// Check if an announcement tells us that a node is in sync with a local remote.
+
    pub fn is_synced<S: ReadStorage>(
+
        &self,
+
        remote: &NodeId,
+
        storage: S,
+
    ) -> Result<bool, storage::Error> {
+
        let repo = match storage.repository(self.rid) {
+
            // If the repo doesn't exist, we're not in sync.
+
            Err(e) if e.is_not_found() => return Ok(false),
+
            Err(e) => return Err(e),
+
            Ok(r) => r,
+
        };
+

+
        if let Some((_, refs)) = self.refs.iter().find(|(nid, _)| nid == remote) {
+
            let local_refs = repo.remote(remote)?.refs.unverified();
+
            return Ok(&local_refs == refs);
+
        }
+
        Ok(false)
+
    }
}

/// Node announcing its inventory to the network.
modified radicle-node/src/test/peer.rs
@@ -3,9 +3,10 @@ use std::iter;
use std::net;
use std::ops::{Deref, DerefMut};

-
use crossbeam_channel as chan;
use log::*;
+
use radicle::rad;
use radicle::storage::ReadRepository;
+
use radicle::Storage;

use crate::address;
use crate::address::Store;
@@ -24,9 +25,8 @@ use crate::service::*;
use crate::storage::git::transport::remote;
use crate::storage::Inventory;
use crate::storage::{Namespaces, RemoteId, WriteStorage};
-
use crate::test::arbitrary;
-
use crate::test::simulator;
use crate::test::storage::MockStorage;
+
use crate::test::{arbitrary, fixtures, simulator};
use crate::Link;
use crate::{LocalDuration, LocalTime};

@@ -41,6 +41,7 @@ pub struct Peer<S, G> {
    pub ip: net::IpAddr,
    pub rng: fastrand::Rng,
    pub local_addr: net::SocketAddr,
+
    pub tempdir: tempfile::TempDir,

    initialized: bool,
}
@@ -81,12 +82,13 @@ impl Peer<MockStorage, MockSigner> {
    pub fn new(name: &'static str, ip: impl Into<net::IpAddr>) -> Self {
        Self::with_storage(name, ip, MockStorage::empty())
    }
+
}

-
    pub fn with_storage(
-
        name: &'static str,
-
        ip: impl Into<net::IpAddr>,
-
        storage: MockStorage,
-
    ) -> Self {
+
impl<S> Peer<S, MockSigner>
+
where
+
    S: WriteStorage + 'static,
+
{
+
    pub fn with_storage(name: &'static str, ip: impl Into<net::IpAddr>, storage: S) -> Self {
        Self::config(name, ip, storage, Config::default())
    }
}
@@ -118,6 +120,25 @@ impl Default for Config<MockSigner> {
    }
}

+
impl<G: Signer> Peer<Storage, G> {
+
    pub fn project(&mut self, name: &str, description: &str) -> Id {
+
        radicle::storage::git::transport::local::register(self.storage().clone());
+

+
        let (repo, _) = fixtures::repository(self.tempdir.path().join(name));
+
        let (rid, _, _) = rad::init(
+
            &repo,
+
            name,
+
            description,
+
            radicle::git::refname!("master"),
+
            self.signer(),
+
            self.storage(),
+
        )
+
        .unwrap();
+

+
        rid
+
    }
+
}
+

impl<S, G> Peer<S, G>
where
    S: WriteStorage + 'static,
@@ -132,6 +153,7 @@ where
        let routing = routing::Table::memory().unwrap();
        let tracking = tracking::Store::memory().unwrap();
        let tracking = tracking::Config::new(config.policy, config.scope, tracking);
+
        let tempdir = tempfile::tempdir().unwrap();
        let id = *config.signer.public_key();
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));
@@ -160,6 +182,7 @@ where
            local_addr,
            rng: config.rng,
            initialized: false,
+
            tempdir,
        }
    }

@@ -294,9 +317,12 @@ where
        .expect("`inventory-announcement` must be sent");
    }

-
    pub fn connect_to(&mut self, peer: &Self) {
-
        let remote_id = simulator::Peer::<S, G>::id(peer);
-
        let remote_addr = simulator::Peer::<S, G>::addr(peer);
+
    pub fn connect_to<T: WriteStorage + 'static, H: Signer + 'static>(
+
        &mut self,
+
        peer: &Peer<T, H>,
+
    ) {
+
        let remote_id = simulator::Peer::<T, H>::id(peer);
+
        let remote_addr = simulator::Peer::<T, H>::addr(peer);

        self.initialize();
        self.service
@@ -343,7 +369,7 @@ where
    }

    /// Get a stream of the peer's emitted events.
-
    pub fn events(&mut self) -> chan::Receiver<Event> {
+
    pub fn events(&mut self) -> Events {
        self.service.events()
    }

modified radicle-node/src/tests.rs
@@ -4,9 +4,11 @@ use std::collections::BTreeSet;
use std::default::*;
use std::io;
use std::sync::Arc;
+
use std::time;

use crossbeam_channel as chan;
use netservices::LinkDirection as Link;
+
use radicle::storage::ReadRepository;

use crate::collections::{HashMap, HashSet};
use crate::crypto::test::signer::MockSigner;
@@ -1132,6 +1134,49 @@ fn test_queued_fetch() {
}

#[test]
+
fn test_refs_synced_event() {
+
    let temp = tempfile::tempdir().unwrap();
+
    let storage = Storage::open(temp.path()).unwrap();
+
    let mut alice = Peer::with_storage("alice", [8, 8, 8, 8], storage);
+
    let bob = Peer::new("eve", [9, 9, 9, 9]);
+
    let acme = alice.project("acme", "");
+
    let events = alice.events();
+
    let refs = alice
+
        .storage()
+
        .repository(acme)
+
        .unwrap()
+
        .remote(&alice.id)
+
        .unwrap()
+
        .refs
+
        .unverified();
+
    let ann = AnnouncementMessage::from(RefsAnnouncement {
+
        rid: acme,
+
        refs: vec![(alice.id, refs)].try_into().unwrap(),
+
        timestamp: bob.timestamp(),
+
    });
+
    let msg = ann.signed(bob.signer());
+

+
    alice.connect_to(&bob);
+
    alice.receive(bob.id, Message::Announcement(msg));
+

+
    events
+
        .wait(
+
            |e| {
+
                if let Event::RefsSynced { remote, rid } = e {
+
                    assert_eq!(remote, &bob.id);
+
                    assert_eq!(rid, &acme);
+

+
                    true
+
                } else {
+
                    false
+
                }
+
            },
+
            time::Duration::from_secs(3),
+
        )
+
        .unwrap();
+
}
+

+
#[test]
fn test_push_and_pull() {
    let tempdir = tempfile::tempdir().unwrap();

modified radicle/src/rad.rs
@@ -12,10 +12,10 @@ use crate::identity::doc::{DocError, Id};
use crate::identity::project::Project;
use crate::identity::{doc, IdentityError};
use crate::storage::git::transport;
-
use crate::storage::git::{Repository, Storage};
+
use crate::storage::git::Repository;
use crate::storage::refs::SignedRefs;
-
use crate::storage::WriteRepository;
use crate::storage::{BranchName, ReadRepository as _, RemoteId};
+
use crate::storage::{WriteRepository, WriteStorage};
use crate::{identity, storage};

/// Name of the radicle storage remote.
@@ -44,13 +44,13 @@ pub enum InitError {
}

/// Initialize a new radicle project from a git repository.
-
pub fn init<G: Signer>(
+
pub fn init<G: Signer, S: WriteStorage>(
    repo: &git2::Repository,
    name: &str,
    description: &str,
    default_branch: BranchName,
    signer: &G,
-
    storage: &Storage,
+
    storage: S,
) -> Result<(Id, identity::Doc<Verified>, SignedRefs<Verified>), InitError> {
    // TODO: Better error when project id already exists in storage, but remote doesn't.
    let pk = signer.public_key();
modified radicle/src/storage/git.rs
@@ -239,15 +239,15 @@ impl Repository {
    }

    /// Create the repository's identity branch.
-
    pub fn init<G: Signer>(
+
    pub fn init<G: Signer, S: WriteStorage>(
        doc: &Doc<Verified>,
        remote: &RemoteId,
-
        storage: &Storage,
+
        storage: S,
        signer: &G,
    ) -> Result<(Self, git::Oid), Error> {
        let (doc_oid, doc) = doc.encode()?;
        let id = Id::from(doc_oid);
-
        let repo = Self::create(paths::repository(storage, &id), id)?;
+
        let repo = Self::create(paths::repository(&storage, &id), id)?;
        let oid = Doc::init(
            doc.as_slice(),
            remote,