Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix redundant announcements
Alexis Sellier committed 3 years ago
commit f33e79f14be741adebb76de3f47ea991f7ce3ce5
parent 3d04849b7b7b31d6cf7785cd6804a9f821f12a99
4 files changed +132 -67
modified radicle-node/src/service.rs
@@ -378,8 +378,8 @@ where
        })
    }

-
    pub fn initialize(&mut self, time: LocalTime) {
-
        trace!("Init {}", time.as_secs());
+
    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
+
        debug!("Init @{}", time.as_secs());

        self.start_time = time;

@@ -388,6 +388,11 @@ where
        for (id, addr) in addrs {
            self.reactor.connect(id, addr);
        }
+
        // Ensure that our inventory is recorded in our routing table.
+
        for id in self.storage.inventory()? {
+
            self.routing.insert(id, self.node_id(), time.as_secs())?;
+
        }
+
        Ok(())
    }

    pub fn tick(&mut self, now: nakamoto::LocalTime) {
@@ -658,6 +663,11 @@ where
            message,
            ..
        } = announcement;
+

+
        // Ignore our own announcements, in case the relayer sent one by mistake.
+
        if *announcer == self.node_id() {
+
            return Ok(false);
+
        }
        let now = self.clock;
        let timestamp = message.timestamp();
        let relay = self.config.relay;
@@ -884,11 +894,14 @@ where
                }
            }
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
-
                for msg in self
+
                for ann in self
                    .gossip
+
                    // Filter announcements by interest.
                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
+
                    // Don't send announcements authored by the remote, back to the remote.
+
                    .filter(|ann| &ann.node != remote)
                {
-
                    self.reactor.write(peer.id, msg);
+
                    self.reactor.write(peer.id, ann.into());
                }
                peer.subscribe = Some(subscribe);
            }
@@ -937,13 +950,17 @@ where
        let mut included = HashSet::new();
        for proj_id in inventory {
            included.insert(proj_id);
-
            if self.routing.insert(*proj_id, from, *timestamp)?
-
                && self
+
            if self.routing.insert(*proj_id, from, *timestamp)? {
+
                log::info!("Routing table updated for {proj_id} with seed {from}");
+

+
                if self
                    .tracking
                    .is_repo_tracked(proj_id)
                    .expect("Service::process_inventory: error accessing tracking configuration")
-
            {
-
                log::info!("Routing table updated for {} with seed {}", proj_id, from);
+
                {
+
                    // TODO: We should fetch here if we're already connected, case this seed has
+
                    // refs we don't have.
+
                }
            }
        }
        for id in self.routing.get_resources(&from)?.into_iter() {
@@ -1308,13 +1325,13 @@ mod gossip {
            filter: &'a Filter,
            start: Timestamp,
            end: Timestamp,
-
        ) -> impl Iterator<Item = Message> + '_ {
+
        ) -> impl Iterator<Item = Announcement> + '_ {
            self.received
                .iter()
                .filter(move |(t, _)| *t >= start && *t < end)
                .filter(move |(_, a)| a.matches(filter))
                .cloned()
-
                .map(|(_, a)| a.into())
+
                .map(|(_, ann)| ann)
        }
    }

modified radicle-node/src/test/peer.rs
@@ -144,7 +144,7 @@ where
            info!("{}: Initializing: address = {}", self.name, self.ip);

            self.initialized = true;
-
            self.service.initialize(LocalTime::now());
+
            self.service.initialize(LocalTime::now()).unwrap();
        }
    }

modified radicle-node/src/tests/e2e.rs
@@ -6,6 +6,7 @@ use std::{
};

use radicle::crypto::test::signer::MockSigner;
+
use radicle::crypto::Signer;
use radicle::git::refname;
use radicle::identity::Id;
use radicle::node::Handle;
@@ -23,51 +24,26 @@ use crate::test::logger;
use crate::wire::Wire;
use crate::{client, client::Runtime, service};

-
/// Represents a running node.
+
/// A node that can be run.
struct Node {
-
    id: NodeId,
-
    addr: net::SocketAddr,
-
    handle: client::handle::Handle<Wire<routing::Table, address::Book, Storage, MockSigner>>,
+
    home: Home,
    signer: MockSigner,
    storage: Storage,
+
}
+

+
/// Handle to a running node.
+
struct NodeHandle {
+
    id: NodeId,
+
    storage: Storage,
+
    addr: net::SocketAddr,
    #[allow(dead_code)]
    thread: thread::JoinHandle<Result<(), client::Error>>,
+
    handle: client::handle::Handle<Wire<routing::Table, address::Book, Storage, MockSigner>>,
}

-
impl Node {
-
    /// Spawn a node in its own thread.
-
    fn spawn(base: &Path, config: service::Config) -> Self {
-
        let home = base.join(
-
            iter::repeat_with(fastrand::alphanumeric)
-
                .take(8)
-
                .collect::<String>(),
-
        );
-
        let paths = Home::init(home).unwrap();
-
        let signer = MockSigner::default();
-
        let listen = vec![([0, 0, 0, 0], 0).into()];
-
        let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
-
        let storage = Storage::open(paths.storage()).unwrap();
-
        let rt = Runtime::with(paths, config, listen, proxy, signer.clone()).unwrap();
-
        let addr = *rt.local_addrs.first().unwrap();
-
        let id = rt.id;
-
        let handle = rt.handle.clone();
-
        let thread = thread::Builder::new()
-
            .name(id.to_string())
-
            .spawn(|| rt.run())
-
            .unwrap();
-

-
        Self {
-
            id,
-
            addr,
-
            handle,
-
            signer,
-
            storage,
-
            thread,
-
        }
-
    }
-

+
impl NodeHandle {
    /// Connect this node to another node, and wait for the connection to be established both ways.
-
    fn connect(&mut self, remote: &Node) {
+
    fn connect(&mut self, remote: &NodeHandle) {
        self.handle.connect(remote.id, remote.addr.into()).unwrap();

        loop {
@@ -84,11 +60,54 @@ impl Node {
                .collect::<BTreeSet<_>>();

            if local_sessions.contains(&remote.id) && remote_sessions.contains(&self.id) {
+
                log::debug!(target: "test", "Connection between {} and {} established", self.id, remote.id);
                break;
            }
            thread::sleep(Duration::from_millis(100));
        }
    }
+
}
+

+
impl Node {
+
    /// Create a new node.
+
    fn new(base: &Path) -> Self {
+
        let home = base.join(
+
            iter::repeat_with(fastrand::alphanumeric)
+
                .take(8)
+
                .collect::<String>(),
+
        );
+
        let home = Home::init(home).unwrap();
+
        let signer = MockSigner::default();
+
        let storage = Storage::open(home.storage()).unwrap();
+

+
        Self {
+
            home,
+
            signer,
+
            storage,
+
        }
+
    }
+

+
    /// Spawn a node in its own thread.
+
    fn spawn(self, config: service::Config) -> NodeHandle {
+
        let listen = vec![([0, 0, 0, 0], 0).into()];
+
        let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
+
        let rt = Runtime::with(self.home, config, listen, proxy, self.signer.clone()).unwrap();
+
        let handle = rt.handle.clone();
+
        let addr = *rt.local_addrs.first().unwrap();
+
        let id = *self.signer.public_key();
+
        let thread = thread::Builder::new()
+
            .name(id.to_string())
+
            .spawn(move || rt.run())
+
            .unwrap();
+

+
        NodeHandle {
+
            id,
+
            storage: self.storage,
+
            addr,
+
            handle,
+
            thread,
+
        }
+
    }

    /// Populate a storage instance with a project.
    fn project(&mut self, name: &str) -> Id {
@@ -110,7 +129,10 @@ impl Node {
        .map(|(id, _, _)| id)
        .unwrap();

-
        log::debug!(target: "test", "Initialized project {id} for node {}", self.id);
+
        log::debug!(
+
            target: "test",
+
            "Initialized project {id} for node {}", self.signer.public_key()
+
        );

        id
    }
@@ -118,7 +140,7 @@ impl Node {

/// Checks whether the nodes have converged in their routing tables.
#[track_caller]
-
fn check<'a>(nodes: impl IntoIterator<Item = &'a Node>) -> BTreeSet<(Id, NodeId)> {
+
fn check<'a>(nodes: impl IntoIterator<Item = &'a NodeHandle>) -> BTreeSet<(Id, NodeId)> {
    let nodes = nodes.into_iter().collect::<Vec<_>>();

    let mut all_routes = BTreeSet::<(Id, NodeId)>::new();
@@ -143,6 +165,8 @@ fn check<'a>(nodes: impl IntoIterator<Item = &'a Node>) -> BTreeSet<(Id, NodeId)
            if routes == all_routes {
                log::debug!(target: "test", "Node {} has converged", node.id);
                return false;
+
            } else {
+
                log::debug!(target: "test", "Node {} has {:?}", node.id, routes);
            }
            true
        });
@@ -160,11 +184,15 @@ fn test_inventory_sync_basic() {

    let tmp = tempfile::tempdir().unwrap();

-
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
-
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+
    let mut alice = Node::new(tmp.path());
+
    let mut bob = Node::new(tmp.path());

    alice.project("alice");
    bob.project("bob");
+

+
    let mut alice = alice.spawn(service::Config::default());
+
    let bob = bob.spawn(service::Config::default());
+

    alice.connect(&bob);

    let routes = check([&alice, &bob]);
@@ -180,14 +208,18 @@ fn test_inventory_sync_bridge() {

    let tmp = tempfile::tempdir().unwrap();

-
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
-
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
-
    let mut eve = Node::spawn(tmp.path(), service::Config::default());
+
    let mut alice = Node::new(tmp.path());
+
    let mut bob = Node::new(tmp.path());
+
    let mut eve = Node::new(tmp.path());

    alice.project("alice");
    bob.project("bob");
    eve.project("eve");

+
    let mut alice = alice.spawn(service::Config::default());
+
    let mut bob = bob.spawn(service::Config::default());
+
    let eve = eve.spawn(service::Config::default());
+

    alice.connect(&bob);
    bob.connect(&eve);

@@ -206,16 +238,21 @@ fn test_inventory_sync_ring() {

    let tmp = tempfile::tempdir().unwrap();

-
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
-
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
-
    let mut eve = Node::spawn(tmp.path(), service::Config::default());
-
    let mut carol = Node::spawn(tmp.path(), service::Config::default());
+
    let mut alice = Node::new(tmp.path());
+
    let mut bob = Node::new(tmp.path());
+
    let mut eve = Node::new(tmp.path());
+
    let mut carol = Node::new(tmp.path());

    alice.project("alice");
    bob.project("bob");
    eve.project("eve");
    carol.project("carol");

+
    let mut alice = alice.spawn(service::Config::default());
+
    let mut bob = bob.spawn(service::Config::default());
+
    let mut eve = eve.spawn(service::Config::default());
+
    let mut carol = carol.spawn(service::Config::default());
+

    alice.connect(&bob);
    bob.connect(&eve);
    eve.connect(&carol);
@@ -238,11 +275,11 @@ fn test_inventory_sync_star() {

    let tmp = tempfile::tempdir().unwrap();

-
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
-
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
-
    let mut eve = Node::spawn(tmp.path(), service::Config::default());
-
    let mut carol = Node::spawn(tmp.path(), service::Config::default());
-
    let mut dave = Node::spawn(tmp.path(), service::Config::default());
+
    let mut alice = Node::new(tmp.path());
+
    let mut bob = Node::new(tmp.path());
+
    let mut eve = Node::new(tmp.path());
+
    let mut carol = Node::new(tmp.path());
+
    let mut dave = Node::new(tmp.path());

    alice.project("alice");
    bob.project("bob");
@@ -250,6 +287,12 @@ fn test_inventory_sync_star() {
    carol.project("carol");
    dave.project("dave");

+
    let alice = alice.spawn(service::Config::default());
+
    let mut bob = bob.spawn(service::Config::default());
+
    let mut eve = eve.spawn(service::Config::default());
+
    let mut carol = carol.spawn(service::Config::default());
+
    let mut dave = dave.spawn(service::Config::default());
+

    bob.connect(&alice);
    eve.connect(&alice);
    carol.connect(&alice);
@@ -265,10 +308,13 @@ fn test_replication() {
    logger::init(log::Level::Debug);

    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
-
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+
    let alice = Node::new(tmp.path());
+
    let mut bob = Node::new(tmp.path());
    let acme = bob.project("acme");

+
    let mut alice = alice.spawn(service::Config::default());
+
    let bob = bob.spawn(service::Config::default());
+

    alice.connect(&bob);
    check([&alice, &bob]);

modified radicle-node/src/wire/protocol.rs
@@ -203,7 +203,9 @@ where
        proxy: net::SocketAddr,
        clock: LocalTime,
    ) -> Self {
-
        service.initialize(clock);
+
        service
+
            .initialize(clock)
+
            .expect("Wire::new: error initializing service");

        Self {
            service,