Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Rename `reactor` module to `io`
Alexis Sellier committed 3 years ago
commit 1d9ee81b6deae8e97afd55acf0c480a7ef85653d
parent 8508eab8faddb8d02148db6255b1d52479c78b4a
8 files changed +173 -174
modified radicle-node/src/service.rs
@@ -3,8 +3,8 @@
#![allow(clippy::collapsible_if)]
pub mod config;
pub mod filter;
+
pub mod io;
pub mod message;
-
pub mod reactor;
pub mod session;
pub mod tracking;

@@ -47,8 +47,8 @@ pub use crate::service::message::{Message, ZeroBytes};
pub use crate::service::session::Session;

use self::gossip::Gossip;
+
use self::io::Outbox;
use self::message::InventoryAnnouncement;
-
use self::reactor::Reactor;
use self::tracking::NamespacesError;

/// Target number of peers to maintain connections to.
@@ -194,8 +194,8 @@ pub struct Service<R, A, S, G> {
    sessions: Sessions,
    /// Clock. Tells the time.
    clock: LocalTime,
-
    /// Interface to the I/O reactor.
-
    reactor: Reactor,
+
    /// I/O outbox.
+
    outbox: Outbox,
    /// Source of entropy.
    rng: Rng,
    /// Fetch requests initiated by user, which are waiting for results.
@@ -261,7 +261,7 @@ where
            clock,
            routing,
            gossip: Gossip::default(),
-
            reactor: Reactor::default(),
+
            outbox: Outbox::default(),
            sessions,
            fetch_reqs: HashMap::new(),
            filter: Filter::empty(),
@@ -276,8 +276,8 @@ where

    /// Return the next i/o action to execute.
    #[allow(clippy::should_implement_trait)]
-
    pub fn next(&mut self) -> Option<reactor::Io> {
-
        self.reactor.next()
+
    pub fn next(&mut self) -> Option<io::Io> {
+
        self.outbox.next()
    }

    /// Track a repository.
@@ -356,9 +356,9 @@ where
        Events::from(self.emitter.subscribe())
    }

-
    /// Get I/O reactor.
-
    pub fn reactor(&mut self) -> &mut Reactor {
-
        &mut self.reactor
+
    /// Get I/O outbox.
+
    pub fn outbox(&mut self) -> &mut Outbox {
+
        &mut self.outbox
    }

    /// Lookup a project, both locally and in the routing table.
@@ -403,7 +403,7 @@ where
                .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id)),
        );
        // Start periodic tasks.
-
        self.reactor.wakeup(IDLE_INTERVAL);
+
        self.outbox.wakeup(IDLE_INTERVAL);

        Ok(())
    }
@@ -425,7 +425,7 @@ where
            self.keep_alive(&now);
            self.disconnect_unresponsive_peers(&now);
            self.maintain_connections();
-
            self.reactor.wakeup(IDLE_INTERVAL);
+
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
        if now - self.last_sync >= SYNC_INTERVAL {
@@ -434,7 +434,7 @@ where
            if let Err(e) = self.fetch_missing_inventory() {
                error!(target: "service", "Error fetching missing inventory: {e}");
            }
-
            self.reactor.wakeup(SYNC_INTERVAL);
+
            self.outbox.wakeup(SYNC_INTERVAL);
            self.last_sync = now;
        }
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
@@ -445,7 +445,7 @@ where
            {
                error!(target: "service", "Error announcing inventory: {}", err);
            }
-
            self.reactor.wakeup(ANNOUNCE_INTERVAL);
+
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
            self.last_announce = now;
        }
        if now - self.last_prune >= PRUNE_INTERVAL {
@@ -454,7 +454,7 @@ where
            if let Err(err) = self.prune_routing_entries(&now) {
                error!("Error pruning routing entries: {}", err);
            }
-
            self.reactor.wakeup(PRUNE_INTERVAL);
+
            self.outbox.wakeup(PRUNE_INTERVAL);
            self.last_prune = now;
        }

@@ -470,7 +470,7 @@ where
                self.connect(nid, addr);
            }
            Command::Disconnect(nid) => {
-
                self.reactor.disconnect(nid, DisconnectReason::Command);
+
                self.outbox.disconnect(nid, DisconnectReason::Command);
            }
            Command::Seeds(rid, resp) => match self.seeds(&rid) {
                Ok(seeds) => {
@@ -493,7 +493,7 @@ where
                resp.send(tracked).ok();

                // Let all our peers know that we're interested in this repo from now on.
-
                self.reactor.broadcast(
+
                self.outbox.broadcast(
                    Message::subscribe(self.filter(), self.time(), Timestamp::MAX),
                    self.sessions.connected().map(|(_, s)| s),
                );
@@ -563,7 +563,7 @@ where

                match self.tracking.namespaces_for(&self.storage, &rid) {
                    Ok(namespaces) => {
-
                        self.reactor.fetch(session, rid, namespaces);
+
                        self.outbox.fetch(session, rid, namespaces);
                    }
                    Err(err) => {
                        error!(target: "service", "Error getting namespaces for {rid}: {err}");
@@ -617,8 +617,7 @@ where
                // For now, we only disconnect the remote in case of timeout. In the future,
                // there may be other reasons to disconnect.
                if err.is_timeout() {
-
                    self.reactor
-
                        .disconnect(remote, DisconnectReason::Fetch(err));
+
                    self.outbox.disconnect(remote, DisconnectReason::Fetch(err));
                }
                FetchResult::Failed { reason }
            }
@@ -692,7 +691,7 @@ where
        if link.is_outbound() {
            if let Some(peer) = self.sessions.get_mut(&remote) {
                let attempted = peer.to_connected(self.clock);
-
                self.reactor.write_all(peer, msgs);
+
                self.outbox.write_all(peer, msgs);

                if let Err(e) = self.addresses.connected(&remote, &attempted, self.time()) {
                    error!(target: "service", "Error updating address book with connection: {e}");
@@ -714,7 +713,7 @@ where
                        self.clock,
                        self.config.limits.clone(),
                    ));
-
                    self.reactor.write_all(peer, msgs);
+
                    self.outbox.write_all(peer, msgs);
                }
            }
        }
@@ -756,7 +755,7 @@ where

            debug!(target: "service", "Reconnecting to {remote} in {delay}..");

-
            self.reactor.wakeup(delay);
+
            self.outbox.wakeup(delay);
        } else {
            self.sessions.remove(&remote);
            // Only re-attempt outbound connections, since we don't care if an inbound connection
@@ -773,7 +772,7 @@ where
            Err(err) => {
                // If there's an error, stop processing messages from this peer.
                // However, we still relay messages returned up to this point.
-
                self.reactor
+
                self.outbox
                    .disconnect(remote, DisconnectReason::Session(err));

                // FIXME: The peer should be set in a state such that we don't
@@ -1089,7 +1088,7 @@ where
                        .filter(|(id, _)| *id != remote && *id != &announcer)
                        .map(|(_, p)| p);

-
                    self.reactor.relay(ann, relay_to);
+
                    self.outbox.relay(ann, relay_to);

                    return Ok(());
                }
@@ -1102,7 +1101,7 @@ where
                    // Don't send announcements authored by the remote, back to the remote.
                    .filter(|ann| &ann.node != remote)
                {
-
                    self.reactor.write(peer, ann.into());
+
                    self.outbox.write(peer, ann.into());
                }
                peer.subscribe = Some(subscribe);
            }
@@ -1111,7 +1110,7 @@ where
                if ponglen > Ping::MAX_PONG_ZEROES {
                    return Ok(());
                }
-
                self.reactor.write(
+
                self.outbox.write(
                    peer,
                    Message::Pong {
                        zeroes: ZeroBytes::new(ponglen),
@@ -1238,7 +1237,7 @@ where
        });
        let ann = msg.signed(&self.signer);

-
        self.reactor.broadcast(ann, peers);
+
        self.outbox.broadcast(ann, peers);

        Ok(())
    }
@@ -1266,7 +1265,7 @@ where
    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
        if let Some(sess) = self.sessions.get_mut(&nid) {
            sess.to_initial();
-
            self.reactor.connect(nid, addr);
+
            self.outbox.connect(nid, addr);

            return true;
        }
@@ -1292,7 +1291,7 @@ where
                self.config.limits.clone(),
            ),
        );
-
        self.reactor.connect(nid, addr);
+
        self.outbox.connect(nid, addr);

        true
    }
@@ -1358,7 +1357,7 @@ where
        let time = self.time();
        let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
        for (_, sess) in self.sessions.connected() {
-
            self.reactor.write(sess, inv.clone());
+
            self.outbox.write(sess, inv.clone());
        }
        Ok(())
    }
@@ -1384,7 +1383,7 @@ where
            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);

        for (_, session) in stale {
-
            self.reactor.disconnect(
+
            self.outbox.disconnect(
                session.id,
                DisconnectReason::Session(session::Error::Timeout),
            );
@@ -1399,7 +1398,7 @@ where
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
            .map(|(_, session)| session);
        for session in inactive_sessions {
-
            session.ping(&mut self.reactor).ok();
+
            session.ping(&mut self.outbox).ok();
        }
    }

@@ -1488,7 +1487,7 @@ where

    /// Maintain persistent peer connections.
    fn maintain_persistent(&mut self) {
-
        debug!(target: "service", "Maintaining persistent peers..");
+
        trace!(target: "service", "Maintaining persistent peers..");

        let now = self.local_time();
        let mut reconnect = Vec::new();
added radicle-node/src/service/io.rs
@@ -0,0 +1,131 @@
+
use std::collections::VecDeque;
+

+
use log::*;
+

+
use crate::prelude::*;
+
use crate::service::session::Session;
+
use crate::service::Link;
+
use crate::storage::Namespaces;
+

+
use super::message::{Announcement, AnnouncementMessage};
+

+
/// I/O operation to execute at the network/wire level.
+
#[derive(Debug)]
+
pub enum Io {
+
    /// There are some messages ready to be sent to a peer.
+
    Write(NodeId, Vec<Message>),
+
    /// Connect to a peer.
+
    Connect(NodeId, Address),
+
    /// Disconnect from a peer.
+
    Disconnect(NodeId, DisconnectReason),
+
    /// Fetch repository data from a peer.
+
    Fetch {
+
        /// Repo being fetched.
+
        rid: Id,
+
        /// Remote node being fetched from.
+
        remote: NodeId,
+
        /// Namespaces being fetched.
+
        namespaces: Namespaces,
+
    },
+
    /// Ask for a wakeup in a specified amount of time.
+
    Wakeup(LocalDuration),
+
}
+

+
/// Interface to the network.
+
#[derive(Debug, Default)]
+
pub struct Outbox {
+
    /// Outgoing I/O queue.
+
    io: VecDeque<Io>,
+
}
+

+
impl Outbox {
+
    /// Connect to a peer.
+
    pub fn connect(&mut self, id: NodeId, addr: Address) {
+
        self.io.push_back(Io::Connect(id, addr));
+
    }
+

+
    /// Disconnect a peer.
+
    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
+
        self.io.push_back(Io::Disconnect(id, reason));
+
    }
+

+
    pub fn write(&mut self, remote: &Session, msg: Message) {
+
        msg.log(log::Level::Debug, &remote.id, Link::Outbound);
+
        trace!(target: "service", "Write {:?} to {}", &msg, remote);
+

+
        self.io.push_back(Io::Write(remote.id, vec![msg]));
+
    }
+

+
    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
+
        let msgs = msgs.into_iter().collect::<Vec<_>>();
+

+
        for (ix, msg) in msgs.iter().enumerate() {
+
            trace!(
+
                target: "service",
+
                "Write {:?} to {} ({}/{})",
+
                msg,
+
                remote,
+
                ix + 1,
+
                msgs.len()
+
            );
+
            msg.log(log::Level::Debug, &remote.id, Link::Outbound);
+
        }
+
        self.io.push_back(Io::Write(remote.id, msgs));
+
    }
+

+
    pub fn wakeup(&mut self, after: LocalDuration) {
+
        self.io.push_back(Io::Wakeup(after));
+
    }
+

+
    pub fn fetch(&mut self, remote: &mut Session, rid: Id, namespaces: Namespaces) {
+
        self.io.push_back(Io::Fetch {
+
            rid,
+
            namespaces,
+
            remote: remote.id,
+
        });
+
    }
+

+
    /// Broadcast a message to a list of peers.
+
    pub fn broadcast<'a>(
+
        &mut self,
+
        msg: impl Into<Message>,
+
        peers: impl IntoIterator<Item = &'a Session>,
+
    ) {
+
        let msg = msg.into();
+
        for peer in peers {
+
            self.write(peer, msg.clone());
+
        }
+
    }
+

+
    /// Relay a message to interested peers.
+
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
+
        if let AnnouncementMessage::Refs(msg) = &ann.message {
+
            let id = msg.rid;
+
            let peers = peers.into_iter().filter(|p| {
+
                if let Some(subscribe) = &p.subscribe {
+
                    subscribe.filter.contains(&id)
+
                } else {
+
                    // If the peer did not send us a `subscribe` message, we don'the
+
                    // relay any messages to them.
+
                    false
+
                }
+
            });
+
            self.broadcast(ann, peers);
+
        } else {
+
            self.broadcast(ann, peers);
+
        }
+
    }
+

+
    #[cfg(any(test, feature = "test"))]
+
    pub(crate) fn queue(&mut self) -> &mut VecDeque<Io> {
+
        &mut self.io
+
    }
+
}
+

+
impl Iterator for Outbox {
+
    type Item = Io;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.io.pop_front()
+
    }
+
}
deleted radicle-node/src/service/reactor.rs
@@ -1,131 +0,0 @@
-
use std::collections::VecDeque;
-

-
use log::*;
-

-
use crate::prelude::*;
-
use crate::service::session::Session;
-
use crate::service::Link;
-
use crate::storage::Namespaces;
-

-
use super::message::{Announcement, AnnouncementMessage};
-

-
/// Output of a state transition.
-
#[derive(Debug)]
-
pub enum Io {
-
    /// There are some messages ready to be sent to a peer.
-
    Write(NodeId, Vec<Message>),
-
    /// Connect to a peer.
-
    Connect(NodeId, Address),
-
    /// Disconnect from a peer.
-
    Disconnect(NodeId, DisconnectReason),
-
    /// Fetch repository data from a peer.
-
    Fetch {
-
        /// Repo being fetched.
-
        rid: Id,
-
        /// Remote node being fetched from.
-
        remote: NodeId,
-
        /// Namespaces being fetched.
-
        namespaces: Namespaces,
-
    },
-
    /// Ask for a wakeup in a specified amount of time.
-
    Wakeup(LocalDuration),
-
}
-

-
/// Interface to the network reactor.
-
#[derive(Debug, Default)]
-
pub struct Reactor {
-
    /// Outgoing I/O queue.
-
    io: VecDeque<Io>,
-
}
-

-
impl Reactor {
-
    /// Connect to a peer.
-
    pub fn connect(&mut self, id: NodeId, addr: Address) {
-
        self.io.push_back(Io::Connect(id, addr));
-
    }
-

-
    /// Disconnect a peer.
-
    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
-
        self.io.push_back(Io::Disconnect(id, reason));
-
    }
-

-
    pub fn write(&mut self, remote: &Session, msg: Message) {
-
        msg.log(log::Level::Debug, &remote.id, Link::Outbound);
-
        trace!(target: "service", "Write {:?} to {}", &msg, remote);
-

-
        self.io.push_back(Io::Write(remote.id, vec![msg]));
-
    }
-

-
    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
-
        let msgs = msgs.into_iter().collect::<Vec<_>>();
-

-
        for (ix, msg) in msgs.iter().enumerate() {
-
            trace!(
-
                target: "service",
-
                "Write {:?} to {} ({}/{})",
-
                msg,
-
                remote,
-
                ix + 1,
-
                msgs.len()
-
            );
-
            msg.log(log::Level::Debug, &remote.id, Link::Outbound);
-
        }
-
        self.io.push_back(Io::Write(remote.id, msgs));
-
    }
-

-
    pub fn wakeup(&mut self, after: LocalDuration) {
-
        self.io.push_back(Io::Wakeup(after));
-
    }
-

-
    pub fn fetch(&mut self, remote: &mut Session, rid: Id, namespaces: Namespaces) {
-
        self.io.push_back(Io::Fetch {
-
            rid,
-
            namespaces,
-
            remote: remote.id,
-
        });
-
    }
-

-
    /// Broadcast a message to a list of peers.
-
    pub fn broadcast<'a>(
-
        &mut self,
-
        msg: impl Into<Message>,
-
        peers: impl IntoIterator<Item = &'a Session>,
-
    ) {
-
        let msg = msg.into();
-
        for peer in peers {
-
            self.write(peer, msg.clone());
-
        }
-
    }
-

-
    /// Relay a message to interested peers.
-
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
-
        if let AnnouncementMessage::Refs(msg) = &ann.message {
-
            let id = msg.rid;
-
            let peers = peers.into_iter().filter(|p| {
-
                if let Some(subscribe) = &p.subscribe {
-
                    subscribe.filter.contains(&id)
-
                } else {
-
                    // If the peer did not send us a `subscribe` message, we don'the
-
                    // relay any messages to them.
-
                    false
-
                }
-
            });
-
            self.broadcast(ann, peers);
-
        } else {
-
            self.broadcast(ann, peers);
-
        }
-
    }
-

-
    #[cfg(any(test, feature = "test"))]
-
    pub(crate) fn outbox(&mut self) -> &mut VecDeque<Io> {
-
        &mut self.io
-
    }
-
}
-

-
impl Iterator for Reactor {
-
    type Item = Io;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        self.io.pop_front()
-
    }
-
}
modified radicle-node/src/service/session.rs
@@ -4,7 +4,7 @@ use std::{fmt, mem};
use crate::service::config::Limits;
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::{Address, Id, LocalTime, NodeId, Reactor, Rng};
+
use crate::service::{Address, Id, LocalTime, NodeId, Outbox, Rng};
use crate::Link;

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
@@ -295,7 +295,7 @@ impl Session {
        }
    }

-
    pub fn ping(&mut self, reactor: &mut Reactor) -> Result<(), Error> {
+
    pub fn ping(&mut self, reactor: &mut Outbox) -> Result<(), Error> {
        if let State::Connected { ping, .. } = &mut self.state {
            let msg = message::Ping::new(&mut self.rng);
            *ping = PingState::AwaitingResponse(msg.ponglen);
modified radicle-node/src/test/peer.rs
@@ -18,8 +18,8 @@ use crate::node::routing;
use crate::prelude::*;
use crate::runtime::Emitter;
use crate::service;
+
use crate::service::io::Io;
use crate::service::message::*;
-
use crate::service::reactor::Io;
use crate::service::tracking::{Policy, Scope};
use crate::service::*;
use crate::storage::git::transport::remote;
@@ -354,7 +354,7 @@ where
    pub fn messages(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
        let mut msgs = Vec::new();

-
        self.service.reactor().outbox().retain(|o| match o {
+
        self.service.outbox().queue().retain(|o| match o {
            Io::Write(a, messages) if *a == remote => {
                msgs.extend(messages.clone());
                false
@@ -372,12 +372,12 @@ where

    /// Get a draining iterator over the peer's I/O outbox.
    pub fn outbox(&mut self) -> impl Iterator<Item = Io> + '_ {
-
        iter::from_fn(|| self.service.reactor().next())
+
        iter::from_fn(|| self.service.outbox().next())
    }

    /// Get a draining iterator over the peer's I/O outbox, which only returns fetches.
    pub fn fetches(&mut self) -> impl Iterator<Item = (Id, NodeId, Namespaces)> + '_ {
-
        iter::from_fn(|| self.service.reactor().next()).filter_map(|io| {
+
        iter::from_fn(|| self.service.outbox().next()).filter_map(|io| {
            if let Io::Fetch {
                rid,
                remote,
modified radicle-node/src/test/simulator.rs
@@ -16,7 +16,7 @@ use log::*;
use crate::crypto::Signer;
use crate::git::raw as git;
use crate::prelude::{Address, Id};
-
use crate::service::reactor::Io;
+
use crate::service::io::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::{Namespaces, RefUpdate};
use crate::storage::{WriteRepository, WriteStorage};
modified radicle-node/src/tests.rs
@@ -18,8 +18,8 @@ use crate::prelude::*;
use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
+
use crate::service::io::Io;
use crate::service::message::*;
-
use crate::service::reactor::Io;
use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
modified radicle-node/src/wire/protocol.rs
@@ -27,7 +27,7 @@ use radicle::storage::WriteStorage;

use crate::crypto::Signer;
use crate::prelude::Deserializer;
-
use crate::service::reactor::Io;
+
use crate::service::io::Io;
use crate::service::{session, DisconnectReason, Service};
use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};