Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Get rid of worker response channel
Alexis Sellier committed 3 years ago
commit d25c47ad0ebc0a9c7618f17378ba284a38b6e8f0
parent 7e55ed346a1b848c55253059d95c45ed838a6418
6 files changed +65 -69
modified radicle-node/src/client.rs
@@ -11,7 +11,7 @@ use thiserror::Error;

use crate::address;
use crate::control;
-
use crate::crypto::Signature;
+
use crate::crypto::{Signature, Signer};
use crate::node::NodeId;
use crate::service::{routing, tracking};
use crate::wire;
@@ -55,26 +55,26 @@ pub enum Error {
}

/// Holds join handles to the client threads, as well as a client handle.
-
pub struct Runtime {
+
pub struct Runtime<G: Signer + EcSign> {
    pub id: NodeId,
-
    pub handle: Handle,
+
    pub handle: Handle<G>,
    pub control: thread::JoinHandle<Result<(), control::Error>>,
-
    pub reactor: Reactor<wire::Control>,
+
    pub reactor: Reactor<wire::Control<G>>,
    pub pool: WorkerPool,
    pub local_addrs: Vec<net::SocketAddr>,
}

-
impl Runtime {
+
impl<G: Signer + EcSign> Runtime<G> {
    /// Run the client.
    ///
    /// This function spawns threads.
-
    pub fn with<G>(
+
    pub fn with(
        home: Home,
        config: service::Config,
        listen: Vec<net::SocketAddr>,
        proxy: net::SocketAddr,
        signer: G,
-
    ) -> Result<Runtime, Error>
+
    ) -> Result<Runtime<G>, Error>
    where
        G: crypto::Signer + EcSign<Sig = Signature, Pk = NodeId> + Clone + 'static,
    {
modified radicle-node/src/client/handle.rs
@@ -1,16 +1,19 @@
-
use std::io::Write;
+
use std::io::{self, Write};
use std::os::unix::net::UnixStream;
use std::sync::Arc;

use crossbeam_channel as chan;
+
use cyphernet::EcSign;
use thiserror::Error;

+
use crate::crypto::Signer;
use crate::identity::Id;
use crate::profile::Home;
use crate::service;
use crate::service::{CommandError, FetchLookup, QueryState};
use crate::service::{NodeId, Sessions};
use crate::wire;
+
use crate::worker::WorkerResp;

/// An error resulting from a handle method.
#[derive(Error, Debug)]
@@ -50,12 +53,12 @@ impl<T> From<chan::SendError<T>> for Error {
    }
}

-
pub struct Handle {
+
pub struct Handle<G: Signer + EcSign> {
    pub(crate) home: Home,
-
    pub(crate) controller: reactor::Controller<wire::Control>,
+
    pub(crate) controller: reactor::Controller<wire::Control<G>>,
}

-
impl Clone for Handle {
+
impl<G: Signer + EcSign> Clone for Handle<G> {
    fn clone(&self) -> Self {
        Self {
            home: self.home.clone(),
@@ -64,25 +67,27 @@ impl Clone for Handle {
    }
}

-
impl Handle {
-
    pub fn new(home: Home, controller: reactor::Controller<wire::Control>) -> Self {
+
impl<G: Signer + EcSign + 'static> Handle<G> {
+
    pub fn new(home: Home, controller: reactor::Controller<wire::Control<G>>) -> Self {
        Self { home, controller }
    }

-
    pub fn wakeup(&mut self) -> Result<(), Error> {
-
        // TODO: Handle channel disconnect error correctly.
-
        //       This just returns `BrokenPipe`.
-
        self.controller.cmd(wire::Control::Wakeup)?;
+
    pub fn worker_result(&mut self, resp: WorkerResp<G>) -> Result<(), Error> {
+
        match self.controller.cmd(wire::Control::Worker(resp)) {
+
            Ok(()) => {}
+
            Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Err(Error::NotConnected),
+
            Err(err) => return Err(err.into()),
+
        }
        Ok(())
    }

    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.controller.cmd(wire::Control::Command(cmd))?;
+
        self.controller.cmd(wire::Control::User(cmd))?;
        Ok(())
    }
}

-
impl radicle::node::Handle for Handle {
+
impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
    type Sessions = Sessions;
    type FetchLookup = FetchLookup;
    type Error = Error;
modified radicle-node/src/service.rs
@@ -526,7 +526,7 @@ where
        }
    }

-
    pub fn fetch_complete(&mut self, result: FetchResult) {
+
    pub fn repo_fetched(&mut self, result: FetchResult) {
        // TODO(cloudhead): handle completed job with service business logic
        // TODO: Downgrade session to gossip protocol.
        if let Some(session) = self.sessions.get_mut(result.remote()) {
modified radicle-node/src/tests/e2e.rs
@@ -36,7 +36,7 @@ struct NodeHandle {
    storage: Storage,
    addr: net::SocketAddr,
    thread: ManuallyDrop<thread::JoinHandle<Result<(), client::Error>>>,
-
    handle: ManuallyDrop<Handle>,
+
    handle: ManuallyDrop<Handle<MockSigner>>,
}

impl Drop for NodeHandle {
modified radicle-node/src/wire/protocol.rs
@@ -7,7 +7,7 @@ use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
-
use std::{io, net};
+
use std::{fmt, io, net};

use amplify::Wrapper as _;
use crossbeam_channel as chan;
@@ -31,10 +31,22 @@ use crate::worker::{WorkerReq, WorkerResp};
use crate::Link;
use crate::{address, service};

-
#[derive(Debug)]
-
pub enum Control {
-
    Command(service::Command),
-
    Wakeup,
+
#[allow(clippy::large_enum_variant)]
+
/// Control message used internally between workers, users, and the service.
+
pub enum Control<G: Signer + EcSign> {
+
    /// Message from the user to the service.
+
    User(service::Command),
+
    /// Message from a worker to the service.
+
    Worker(WorkerResp<G>),
+
}
+

+
impl<G: Signer + EcSign> fmt::Debug for Control<G> {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::User(cmd) => cmd.fmt(f),
+
            Self::Worker(resp) => resp.result.fmt(f),
+
        }
+
    }
}

/// Peer session type.
@@ -48,7 +60,7 @@ pub type WireWriter<G> = CypherWriter<G, Sha256>;
type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;

/// Peer connection state machine.
-
enum Peer<G: Signer + EcSign> {
+
enum Peer {
    /// The initial state before handshake is completed.
    Connecting { link: Link },
    /// The state after handshake is completed.
@@ -68,14 +80,10 @@ enum Peer<G: Signer + EcSign> {
        id: NodeId,
    },
    /// The peer is now upgraded and we are in control of the socket.
-
    Upgraded {
-
        link: Link,
-
        id: NodeId,
-
        response: chan::Receiver<WorkerResp<G>>,
-
    },
+
    Upgraded { link: Link, id: NodeId },
}

-
impl<G: Signer + EcSign> std::fmt::Debug for Peer<G> {
+
impl std::fmt::Debug for Peer {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Connecting { link } => write!(f, "Connecting({:?})", link),
@@ -91,7 +99,7 @@ impl<G: Signer + EcSign> std::fmt::Debug for Peer<G> {
    }
}

-
impl<G: Signer + EcSign> Peer<G> {
+
impl Peer {
    /// Return the peer's id, if any.
    fn id(&self) -> Option<&NodeId> {
        match self {
@@ -145,7 +153,7 @@ impl<G: Signer + EcSign> Peer<G> {
    }

    /// Switch to upgraded state.
-
    fn upgraded(&mut self, listener: chan::Receiver<WorkerResp<G>>) -> Fetch {
+
    fn upgraded(&mut self) -> Fetch {
        if let Self::Upgrading { fetch, id, link } = self {
            let fetch = fetch.clone();
            log::debug!(target: "wire", "Peer {id} upgraded for fetch {}", fetch.repo);
@@ -153,7 +161,6 @@ impl<G: Signer + EcSign> Peer<G> {
            *self = Self::Upgraded {
                id: *id,
                link: *link,
-
                response: listener,
            };
            fetch
        } else {
@@ -187,7 +194,7 @@ pub struct Wire<R, S, W, G: Signer + EcSign> {
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Peer sessions.
-
    peers: HashMap<RawFd, Peer<G>>,
+
    peers: HashMap<RawFd, Peer>,
    /// SOCKS5 proxy address.
    proxy: net::SocketAddr,
    /// Buffer for incoming peer data.
@@ -229,14 +236,14 @@ where
        self.actions.push_back(Action::RegisterListener(socket));
    }

-
    fn peer_mut_by_fd(&mut self, fd: RawFd) -> &mut Peer<G> {
+
    fn peer_mut_by_fd(&mut self, fd: RawFd) -> &mut Peer {
        self.peers.get_mut(&fd).unwrap_or_else(|| {
            log::error!(target: "wire", "Peer with fd {fd} was not found");
            panic!("Peer with fd {fd} is not known");
        })
    }

-
    fn fd_by_id(&self, node_id: &NodeId) -> (RawFd, &Peer<G>) {
+
    fn fd_by_id(&self, node_id: &NodeId) -> (RawFd, &Peer) {
        self.peers
            .iter()
            .find(|(_, peer)| peer.id() == Some(node_id))
@@ -292,8 +299,7 @@ where
    fn upgraded(&mut self, transport: NetTransport<WireSession<G>>) {
        let fd = transport.as_raw_fd();
        let peer = self.peer_mut_by_fd(fd);
-
        let (send, recv) = chan::bounded::<WorkerResp<G>>(1);
-
        let fetch = peer.upgraded(recv);
+
        let fetch = peer.upgraded();
        let session = match transport.into_session() {
            Ok(session) => session,
            Err(_) => panic!("Transport::upgraded: peer write buffer not empty on upgrade"),
@@ -305,7 +311,6 @@ where
                fetch,
                session,
                drain: self.read_queue.drain(..).collect(),
-
                channel: send,
            })
            .is_err()
        {
@@ -313,21 +318,7 @@ where
        }
    }

-
    fn wakeup(&mut self) {
-
        let mut completed = Vec::new();
-
        for peer in self.peers.values() {
-
            if let Peer::Upgraded { response, .. } = peer {
-
                if let Ok(resp) = response.try_recv() {
-
                    completed.push(resp);
-
                }
-
            }
-
        }
-
        for resp in completed {
-
            self.fetch_complete(resp);
-
        }
-
    }
-

-
    fn fetch_complete(&mut self, resp: WorkerResp<G>) {
+
    fn worker_result(&mut self, resp: WorkerResp<G>) {
        log::debug!(target: "wire", "Fetch completed: {:?}", resp.result);

        let session = resp.session;
@@ -351,7 +342,7 @@ where
        peer.downgrade();

        self.actions.push_back(Action::RegisterTransport(session));
-
        self.service.fetch_complete(resp.result);
+
        self.service.repo_fetched(resp.result);
    }
}

@@ -364,7 +355,7 @@ where
{
    type Listener = NetAccept<WireSession<G>>;
    type Transport = NetTransport<WireSession<G>>;
-
    type Command = Control;
+
    type Command = Control<G>;

    fn tick(&mut self, _time: Duration) {
        // FIXME: Change this once a proper timestamp is passed into the function.
@@ -496,8 +487,8 @@ where

    fn handle_command(&mut self, cmd: Self::Command) {
        match cmd {
-
            Control::Command(cmd) => self.service.command(cmd),
-
            Control::Wakeup => self.wakeup(),
+
            Control::User(cmd) => self.service.command(cmd),
+
            Control::Worker(resp) => self.worker_result(resp),
        }
    }

modified radicle-node/src/worker.rs
@@ -23,7 +23,6 @@ pub struct WorkerReq<G: Signer + EcSign> {
    pub fetch: Fetch,
    pub session: WireSession<G>,
    pub drain: Vec<u8>,
-
    pub channel: chan::Sender<WorkerResp<G>>,
}

/// Worker response.
@@ -37,10 +36,10 @@ struct Worker<G: Signer + EcSign> {
    storage: Storage,
    tasks: chan::Receiver<WorkerReq<G>>,
    timeout: time::Duration,
-
    handle: Handle,
+
    handle: Handle<G>,
}

-
impl<G: Signer + EcSign> Worker<G> {
+
impl<G: Signer + EcSign + 'static> Worker<G> {
    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
    /// the next task.
    fn run(mut self) -> Result<(), chan::RecvError> {
@@ -55,7 +54,6 @@ impl<G: Signer + EcSign> Worker<G> {
            fetch,
            session,
            drain,
-
            channel,
        } = task;

        let (session, result) = self._process(&fetch, drain, session);
@@ -71,10 +69,12 @@ impl<G: Signer + EcSign> Worker<G> {
        };
        log::debug!(target: "worker", "Sending response back to service..");

-
        if channel.send(WorkerResp { result, session }).is_err() {
+
        if self
+
            .handle
+
            .worker_result(WorkerResp { result, session })
+
            .is_err()
+
        {
            log::error!("Unable to report fetch result: worker channel disconnected");
-
        } else {
-
            self.handle.wakeup().unwrap();
        }
    }

@@ -253,7 +253,7 @@ impl WorkerPool {
        timeout: time::Duration,
        storage: Storage,
        tasks: chan::Receiver<WorkerReq<G>>,
-
        handle: Handle,
+
        handle: Handle<G>,
        name: String,
    ) -> Self {
        let mut pool = Vec::with_capacity(capacity);