Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Update `io-reactor`
Alexis Sellier committed 3 years ago
commit 723cdcf2336b12a47b570853fdfa4cf03ef01d3e
parent b9484b6832f660b102f80c7a4a2efbaa3487d1ca
6 files changed +46 -36
modified Cargo.lock
@@ -699,11 +699,12 @@ dependencies = [

[[package]]
name = "dialoguer"
-
version = "0.10.2"
+
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "a92e7e37ecef6857fdc0c0c5d42fd5b0938e46590c2183cc92dd310a6d078eb1"
+
checksum = "af3c796f3b0b408d9fd581611b47fa850821fcb84aa640b83a3c1a5be2d691f2"
dependencies = [
 "console",
+
 "shell-words",
 "tempfile",
 "zeroize",
]
@@ -1408,7 +1409,7 @@ dependencies = [
[[package]]
name = "io-reactor"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#ee0ae68f5be18977b9a66e709db997801d42ab17"
+
source = "git+https://github.com/rust-amplify/io-reactor?rev=6148aec926c82f08373687d1a9da07a2bca67ea3#6148aec926c82f08373687d1a9da07a2bca67ea3"
dependencies = [
 "amplify",
 "crossbeam-channel",
@@ -1514,9 +1515,9 @@ checksum = "478ee9e62aaeaf5b140bd4138753d1f109765488581444218d3ddda43234f3e8"

[[package]]
name = "libc"
-
version = "0.2.138"
+
version = "0.2.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
+
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"

[[package]]
name = "libgit2-sys"
@@ -1658,7 +1659,7 @@ dependencies = [
[[package]]
name = "netservices"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#ee0ae68f5be18977b9a66e709db997801d42ab17"
+
source = "git+https://github.com/cyphernet-dao/rust-netservices#50944dfcf1593ca568b8a1c3ee4b44fc05deb8ca"
dependencies = [
 "amplify",
 "cyphernet",
@@ -2749,6 +2750,12 @@ dependencies = [
]

[[package]]
+
name = "shell-words"
+
version = "1.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
+

+
[[package]]
name = "shlex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -34,11 +34,12 @@ rev = "90cc3eac67aa5cfd5f42cf7cb1e2b155af3214fb"
version = "0.3.0"

[patch.crates-io.io-reactor]
-
git = "https://github.com/cyphernet-wg/rust-netservices"
+
git = "https://github.com/rust-amplify/io-reactor"
+
rev = "6148aec926c82f08373687d1a9da07a2bca67ea3"
version = "0.1.0"

[patch.crates-io.netservices]
-
git = "https://github.com/cyphernet-wg/rust-netservices"
+
git = "https://github.com/cyphernet-dao/rust-netservices"
version = "0.1.0"

[patch.crates-io.radicle-git-ext]
modified radicle-node/src/client.rs
@@ -53,26 +53,29 @@ pub enum Error {
}

/// Holds join handles to the client threads, as well as a client handle.
-
pub struct Runtime<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone> {
+
pub struct Runtime {
    pub id: NodeId,
-
    pub handle: Handle<Wire<routing::Table, address::Book, radicle::Storage, G>>,
+
    pub handle: Handle,
    pub control: thread::JoinHandle<Result<(), control::Error>>,
-
    pub reactor: Reactor<Wire<routing::Table, address::Book, radicle::Storage, G>>,
+
    pub reactor: Reactor<service::Command>,
    pub pool: WorkerPool,
    pub local_addrs: Vec<net::SocketAddr>,
}

-
impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static> Runtime<G> {
+
impl Runtime {
    /// Run the client.
    ///
    /// This function spawns threads.
-
    pub fn with(
+
    pub fn with<G>(
        home: Home,
        config: service::Config,
        listen: Vec<net::SocketAddr>,
        proxy: net::SocketAddr,
        signer: G,
-
    ) -> Result<Runtime<G>, Error> {
+
    ) -> Result<Runtime, Error>
+
    where
+
        G: crypto::Signer + EcSign<Sig = Signature, Pk = NodeId> + Clone + 'static,
+
    {
        let id = *signer.public_key();
        let node_sock = home.socket();
        let node_dir = home.node();
@@ -118,14 +121,7 @@ impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static>
            worker_recv,
            id.to_human(),
        );
-
        let wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
-
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
-
        let handle = Handle::new(home, reactor.controller());
-
        let control = thread::spawn({
-
            let handle = handle.clone();
-
            move || control::listen(node_sock, handle)
-
        });
-
        let controller = reactor.controller();
+
        let mut wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
        let mut local_addrs = Vec::new();

        for addr in listen {
@@ -133,10 +129,16 @@ impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static>
            let local_addr = listener.local_addr();

            local_addrs.push(local_addr);
-
            controller.register_listener(listener)?;
+
            wire.listen(listener);

            log::info!("Listening on {local_addr}..");
        }
+
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
+
        let handle = Handle::new(home, reactor.controller());
+
        let control = thread::spawn({
+
            let handle = handle.clone();
+
            move || control::listen(node_sock, handle)
+
        });

        Ok(Runtime {
            id,
modified radicle-node/src/client/handle.rs
@@ -49,12 +49,12 @@ impl<T> From<chan::SendError<T>> for Error {
    }
}

-
pub struct Handle<T: reactor::Handler> {
+
pub struct Handle {
    pub(crate) home: Home,
-
    pub(crate) controller: reactor::Controller<T>,
+
    pub(crate) controller: reactor::Controller<service::Command>,
}

-
impl<T: reactor::Handler> Clone for Handle<T> {
+
impl Clone for Handle {
    fn clone(&self) -> Self {
        Self {
            home: self.home.clone(),
@@ -63,20 +63,18 @@ impl<T: reactor::Handler> Clone for Handle<T> {
    }
}

-
impl<T: reactor::Handler> Handle<T> {
-
    pub fn new(home: Home, controller: reactor::Controller<T>) -> Self {
+
impl Handle {
+
    pub fn new(home: Home, controller: reactor::Controller<service::Command>) -> Self {
        Self { home, controller }
    }
-
}

-
impl<T: reactor::Handler<Command = service::Command>> Handle<T> {
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.controller.send(cmd)?;
+
        self.controller.cmd(cmd)?;
        Ok(())
    }
}

-
impl<T: reactor::Handler<Command = service::Command>> radicle::node::Handle for Handle<T> {
+
impl radicle::node::Handle for Handle {
    type Sessions = Sessions;
    type FetchLookup = FetchLookup;
    type Error = Error;
modified radicle-node/src/tests/e2e.rs
@@ -17,12 +17,10 @@ use radicle::test::fixtures;
use radicle::Storage;
use radicle::{assert_matches, rad};

-
use crate::address;
use crate::node::NodeId;
-
use crate::service::{routing, FetchLookup, FetchResult};
+
use crate::service::{FetchLookup, FetchResult};
use crate::storage::git::transport;
use crate::test::logger;
-
use crate::wire::Wire;
use crate::{client, client::handle::Handle, client::Runtime, service};

/// A node that can be run.
@@ -38,7 +36,7 @@ struct NodeHandle {
    storage: Storage,
    addr: net::SocketAddr,
    thread: ManuallyDrop<thread::JoinHandle<Result<(), client::Error>>>,
-
    handle: ManuallyDrop<Handle<Wire<routing::Table, address::Book, Storage, MockSigner>>>,
+
    handle: ManuallyDrop<Handle>,
}

impl Drop for NodeHandle {
modified radicle-node/src/wire/protocol.rs
@@ -219,6 +219,10 @@ where
        }
    }

+
    pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
+
        self.actions.push_back(Action::RegisterListener(socket));
+
    }
+

    fn peer_mut_by_fd(&mut self, fd: RawFd) -> &mut Peer<G> {
        self.peers.get_mut(&fd).unwrap_or_else(|| {
            log::error!(target: "transport", "Peer with fd {fd} was not found");