Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement simple worker thread pool
Dr. Maxim Orlovsky committed 3 years ago
commit 947c89a398f5c06c0544072bbd23618fa7b63a8e
parent a7b1c19746d280551f6bd8fce9ca8fa2872d8d1d
2 files changed +75 -25
modified radicle-node/src/main.rs
@@ -8,14 +8,13 @@ use reactor::poller::popol;
use reactor::Reactor;

use radicle::profile;
-
use radicle::storage::WriteStorage;
use radicle_node::client::handle::Handle;
use radicle_node::client::{ADDRESS_DB_FILE, NODE_DIR, ROUTING_DB_FILE, TRACKING_DB_FILE};
use radicle_node::crypto::ssh::keystore::MemorySigner;
use radicle_node::prelude::{Address, NodeId};
-
use radicle_node::service::{routing, tracking, FetchResult};
+
use radicle_node::service::{routing, tracking};
use radicle_node::wire::Transport;
-
use radicle_node::worker::{WorkerReq, WorkerResp};
+
use radicle_node::worker::{WorkerPool, WorkerReq};
use radicle_node::{address, control, logger, service};

#[derive(Debug)]
@@ -126,27 +125,7 @@ fn main() -> anyhow::Result<()> {
    );

    let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<MemorySigner>>();
-
    let workers = thread::spawn(move || {
-
        while let Ok(WorkerReq {
-
            fetch,
-
            session,
-
            channel,
-
            ..
-
        }) = worker_recv.recv()
-
        {
-
            let result = match worker_storage.repository(fetch.repo) {
-
                Ok(_) => todo!(),
-
                Err(err) => FetchResult::Error {
-
                    from: fetch.remote,
-
                    error: err.into(),
-
                },
-
            };
-
            if channel.send(WorkerResp { result, session }).is_err() {
-
                log::error!("Unable to report fetch result: worker channel disconnected");
-
            }
-
        }
-
    });
-

+
    let pool = WorkerPool::with(10, worker_storage, worker_recv);
    let wire = Transport::new(service, worker_send, negotiator.clone(), proxy_addr, clock);
    let reactor =
        Reactor::new(wire, popol::Poller::new()).expect("unable to instantiate P2P reactor");
@@ -162,9 +141,9 @@ fn main() -> anyhow::Result<()> {
    let handle = Handle::from(controller);
    let control = thread::spawn(move || control::listen(node, handle));

+
    pool.join().unwrap();
    control.join().unwrap()?;
    reactor.join().unwrap();
-
    workers.join().unwrap();

    Ok(())
}
modified radicle-node/src/worker.rs
@@ -1,7 +1,11 @@
use crossbeam_channel as chan;
use netservices::noise::NoiseXk;
+
use std::thread;
+
use std::thread::JoinHandle;

use radicle::crypto::Negotiator;
+
use radicle::storage::WriteStorage;
+
use radicle::Storage;

use crate::service::reactor::Fetch;
use crate::service::FetchResult;
@@ -19,3 +23,70 @@ pub struct WorkerResp<G: Negotiator> {
    pub result: FetchResult,
    pub session: NoiseXk<G>,
}
+

+
pub struct Worker<G: Negotiator> {
+
    storage: Storage,
+
    tasks: chan::Receiver<WorkerReq<G>>,
+
}
+

+
impl<G: Negotiator> Worker<G> {
+
    pub fn run(self) -> Result<(), chan::RecvError> {
+
        loop {
+
            let task = self.tasks.recv()?;
+
            self.process(task);
+
        }
+
    }
+

+
    pub fn process(&self, task: WorkerReq<G>) {
+
        let WorkerReq {
+
            fetch,
+
            session,
+
            // TODO: Implement logic.
+
            drain: _drain,
+
            channel,
+
        } = task;
+
        let result = match self.storage.repository(fetch.repo) {
+
            Ok(_) => todo!(),
+
            Err(err) => FetchResult::Error {
+
                from: fetch.remote,
+
                error: err.into(),
+
            },
+
        };
+
        if channel.send(WorkerResp { result, session }).is_err() {
+
            log::error!("Unable to report fetch result: worker channel disconnected");
+
        }
+
    }
+
}
+

+
pub struct WorkerPool {
+
    pool: Vec<JoinHandle<Result<(), chan::RecvError>>>,
+
}
+

+
impl WorkerPool {
+
    pub fn with<G: Negotiator + 'static>(
+
        capacity: usize,
+
        storage: Storage,
+
        tasks: chan::Receiver<WorkerReq<G>>,
+
    ) -> Self {
+
        let mut pool = Vec::with_capacity(capacity);
+
        for _ in 0..capacity {
+
            let runtime = Worker {
+
                tasks: tasks.clone(),
+
                storage: storage.clone(),
+
            };
+
            let thread = thread::spawn(|| runtime.run());
+
            pool.push(thread);
+
        }
+
        Self { pool }
+
    }
+

+
    pub fn join(self) -> thread::Result<()> {
+
        for worker in self.pool {
+
            let result = worker.join()?;
+
            if let Err(err) = result {
+
                log::error!(target: "pool", "Worker failed: {err}");
+
            }
+
        }
+
        Ok(())
+
    }
+
}