Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Cleanup working naming
Alexis Sellier committed 3 years ago
commit ac23f45d9b880421270b9b5bde5ac11352218c23
parent 7d5347b74f283566246741cd2ec0d917ad27da98
4 files changed +26 -26
modified radicle-node/src/runtime.rs
@@ -23,7 +23,6 @@ use crate::service::{routing, tracking};
use crate::wire;
use crate::wire::Wire;
use crate::worker;
-
use crate::worker::{WorkerPool, WorkerReq};
use crate::{service, LocalTime};

pub use handle::Error as HandleError;
@@ -76,7 +75,7 @@ pub struct Runtime<G: Signer + EcSign> {
    pub storage: Storage,
    pub reactor: Reactor<wire::Control<G>>,
    pub daemon: net::SocketAddr,
-
    pub pool: WorkerPool,
+
    pub pool: worker::Pool,
    pub local_addrs: Vec<net::SocketAddr>,
}

@@ -131,7 +130,7 @@ impl<G: Signer + EcSign + 'static> Runtime<G> {
            sig: EcSign::sign(&signer, id.as_slice()),
        };

-
        let (worker_send, worker_recv) = chan::unbounded::<WorkerReq<G>>();
+
        let (worker_send, worker_recv) = chan::unbounded::<worker::Task<G>>();
        let mut wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
        let mut local_addrs = Vec::new();

@@ -155,7 +154,7 @@ impl<G: Signer + EcSign + 'static> Runtime<G> {
            );
        }

-
        let pool = WorkerPool::with(
+
        let pool = worker::Pool::with(
            worker_recv,
            handle.clone(),
            worker::Config {
modified radicle-node/src/runtime/handle.rs
@@ -16,7 +16,7 @@ use crate::service;
use crate::service::{CommandError, QueryState};
use crate::service::{NodeId, Sessions};
use crate::wire;
-
use crate::worker::WorkerResp;
+
use crate::worker::TaskResult;

/// An error resulting from a handle method.
#[derive(Error, Debug)]
@@ -89,7 +89,7 @@ impl<G: Signer + EcSign + 'static> Handle<G> {
        }
    }

-
    pub fn worker_result(&mut self, resp: WorkerResp<G>) -> Result<(), Error> {
+
    pub fn worker_result(&mut self, resp: TaskResult<G>) -> Result<(), Error> {
        match self.controller.cmd(wire::Control::Worker(resp)) {
            Ok(()) => {}
            Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Err(Error::NotConnected),
modified radicle-node/src/wire/protocol.rs
@@ -27,7 +27,7 @@ use crate::crypto::Signer;
use crate::service::reactor::{Fetch, Io};
use crate::service::{routing, session, DisconnectReason, Message, Service};
use crate::wire::{Decode, Encode};
-
use crate::worker::{WorkerReq, WorkerResp};
+
use crate::worker::{Task, TaskResult};
use crate::Link;
use crate::{address, service};

@@ -37,7 +37,7 @@ pub enum Control<G: Signer + EcSign> {
    /// Message from the user to the service.
    User(service::Command),
    /// Message from a worker to the service.
-
    Worker(WorkerResp<G>),
+
    Worker(TaskResult<G>),
}

impl<G: Signer + EcSign> fmt::Debug for Control<G> {
@@ -186,7 +186,7 @@ pub struct Wire<R, S, W, G: Signer + EcSign> {
    /// Backing service instance.
    service: Service<R, S, W, G>,
    /// Worker pool interface.
-
    worker: chan::Sender<WorkerReq<G>>,
+
    worker: chan::Sender<Task<G>>,
    /// Used for authentication; keeps local identity.
    cert: Cert<Signature>,
    /// Used for authentication.
@@ -210,7 +210,7 @@ where
{
    pub fn new(
        mut service: Service<R, S, W, G>,
-
        worker: chan::Sender<WorkerReq<G>>,
+
        worker: chan::Sender<Task<G>>,
        cert: Cert<Signature>,
        signer: G,
        proxy: net::SocketAddr,
@@ -307,7 +307,7 @@ where

        if self
            .worker
-
            .send(WorkerReq {
+
            .send(Task {
                fetch,
                session,
                drain: self.read_queue.drain(..).collect(),
@@ -318,10 +318,10 @@ where
        }
    }

-
    fn worker_result(&mut self, resp: WorkerResp<G>) {
-
        log::debug!(target: "wire", "Fetch completed: {:?}", resp.result);
+
    fn worker_result(&mut self, task: TaskResult<G>) {
+
        log::debug!(target: "wire", "Fetch completed: {:?}", task.result);

-
        let session = resp.session;
+
        let session = task.session;
        let fd = session.as_connection().as_raw_fd();
        let peer = self.peer_mut_by_fd(fd);

@@ -342,7 +342,7 @@ where
        peer.downgrade();

        self.actions.push_back(Action::RegisterTransport(session));
-
        self.service.fetched(resp.result);
+
        self.service.fetched(task.result);
    }
}

@@ -488,7 +488,7 @@ where
    fn handle_command(&mut self, cmd: Self::Command) {
        match cmd {
            Control::User(cmd) => self.service.command(cmd),
-
            Control::Worker(resp) => self.worker_result(resp),
+
            Control::Worker(result) => self.worker_result(result),
        }
    }

modified radicle-node/src/worker.rs
@@ -34,15 +34,16 @@ pub struct Config {
    pub storage: Storage,
}

-
/// Worker request.
-
pub struct WorkerReq<G: Signer + EcSign> {
+
/// Task to be accomplished on a worker thread.
+
/// This is either going to be an outgoing or incoming fetch.
+
pub struct Task<G: Signer + EcSign> {
    pub fetch: Fetch,
    pub session: WireSession<G>,
    pub drain: Vec<u8>,
}

/// Worker response.
-
pub struct WorkerResp<G: Signer + EcSign> {
+
pub struct TaskResult<G: Signer + EcSign> {
    pub result: FetchResult,
    pub session: WireSession<G>,
}
@@ -50,7 +51,7 @@ pub struct WorkerResp<G: Signer + EcSign> {
/// A worker that replicates git objects.
struct Worker<G: Signer + EcSign> {
    storage: Storage,
-
    tasks: chan::Receiver<WorkerReq<G>>,
+
    tasks: chan::Receiver<Task<G>>,
    daemon: net::SocketAddr,
    timeout: time::Duration,
    handle: Handle<G>,
@@ -68,8 +69,8 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        }
    }

-
    fn process(&mut self, task: WorkerReq<G>) {
-
        let WorkerReq {
+
    fn process(&mut self, task: Task<G>) {
+
        let Task {
            fetch,
            session,
            drain,
@@ -86,7 +87,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {

        if self
            .handle
-
            .worker_result(WorkerResp { result, session })
+
            .worker_result(TaskResult { result, session })
            .is_err()
        {
            log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
@@ -256,14 +257,14 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
}

/// A pool of workers. One thread is allocated for each worker.
-
pub struct WorkerPool {
+
pub struct Pool {
    pool: Vec<JoinHandle<Result<(), chan::RecvError>>>,
}

-
impl WorkerPool {
+
impl Pool {
    /// Create a new worker pool with the given parameters.
    pub fn with<G: Signer + EcSign + 'static>(
-
        tasks: chan::Receiver<WorkerReq<G>>,
+
        tasks: chan::Receiver<Task<G>>,
        handle: Handle<G>,
        config: Config,
    ) -> Self {