Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Give useful thread names
Alexis Sellier committed 2 years ago
commit bb099faa4f6e6219f4cb61efbbe3ccd9011c4ae6
parent 765823c686caa89db0b76c19595189ac1fde4e8e
8 files changed +203 -181
modified radicle-node/src/control.rs
@@ -6,7 +6,7 @@ use std::os::unix::net::UnixListener;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::str::FromStr;
-
use std::{io, net, thread, time};
+
use std::{io, net, time};

use radicle::node::Handle;
use serde_json as json;
@@ -15,6 +15,7 @@ use crate::identity::Id;
use crate::node::NodeId;
use crate::node::{Command, CommandName, CommandResult};
use crate::runtime;
+
use crate::runtime::thread;

/// Maximum timeout for waiting for node events.
const MAX_TIMEOUT: time::Duration = time::Duration::MAX;
@@ -35,28 +36,23 @@ pub fn listen<H: Handle<Error = runtime::HandleError> + 'static>(
    handle: H,
) -> Result<(), Error> {
    log::debug!(target: "control", "Control thread listening on socket..");
-
    let nid = handle.nid()?.to_human();
+
    let nid = handle.nid()?;

    for incoming in listener.incoming() {
        match incoming {
            Ok(mut stream) => {
                let handle = handle.clone();

-
                thread::Builder::new()
-
                    .name(nid.clone())
-
                    .spawn(move || {
-
                        if let Err(e) = command(&stream, handle) {
-
                            log::error!(target: "control", "Command returned error: {e}");
-

-
                            CommandResult::error(e).to_writer(&mut stream).ok();
-

-
                            stream.flush().ok();
-
                            stream.shutdown(net::Shutdown::Both).ok();
-
                        }
-
                    })
-
                    // SAFETY: Only panics if the thread name contained NULL bytes, which we can
-
                    // guarantee is not the case here.
-
                    .unwrap();
+
                thread::spawn(&nid, "control", move || {
+
                    if let Err(e) = command(&stream, handle) {
+
                        log::error!(target: "control", "Command returned error: {e}");
+

+
                        CommandResult::error(e).to_writer(&mut stream).ok();
+

+
                        stream.flush().ok();
+
                        stream.shutdown(net::Shutdown::Both).ok();
+
                    }
+
                });
            }
            Err(e) => log::error!(target: "control", "Failed to accept incoming connection: {}", e),
        }
modified radicle-node/src/runtime.rs
@@ -1,10 +1,11 @@
-
mod handle;
+
pub mod handle;
+
pub mod thread;

use std::io::{BufRead, BufReader};
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-
use std::{fs, io, net, thread, time};
+
use std::{fs, io, net, time};

use crossbeam_channel as chan;
use cyphernet::Ecdh;
@@ -200,7 +201,7 @@ impl Runtime {

            log::info!(target: "node", "Listening on {local_addr}..");
        }
-
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
+
        let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
        let atomic = git::version()? >= git::VERSION_REQUIRED;

@@ -217,7 +218,6 @@ impl Runtime {
            handle.clone(),
            worker::Config {
                capacity: 8,
-
                name: id.to_human(),
                timeout: time::Duration::from_secs(9),
                storage: storage.clone(),
                daemon,
@@ -254,23 +254,21 @@ impl Runtime {
        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
        log::info!(target: "node", "Binding control socket {}..", home.socket().display());

-
        thread::Builder::new().name(self.id.to_human()).spawn({
+
        thread::spawn(&self.id, "control", {
            let handle = self.handle.clone();
-
            move || control::listen(self.control, handle)
-
        })?;
-
        let _signals = thread::Builder::new()
-
            .name(self.id.to_human())
-
            .spawn(move || {
-
                if let Ok(()) = self.signals.recv() {
-
                    log::info!(target: "node", "Termination signal received; shutting down..");
-
                    self.handle.shutdown().ok();
-
                }
-
            })?;
+
            || control::listen(self.control, handle)
+
        });
+
        let _signals = thread::spawn(&self.id, "signals", move || {
+
            if let Ok(()) = self.signals.recv() {
+
                log::info!(target: "node", "Termination signal received; shutting down..");
+
                self.handle.shutdown().ok();
+
            }
+
        });

        log::info!(target: "node", "Spawning git daemon at {}..", self.storage.path().display());

        let mut daemon = daemon::spawn(self.storage.path(), self.daemon)?;
-
        thread::Builder::new().name(self.id.to_human()).spawn({
+
        thread::spawn(&self.id, "daemon", {
            let stderr = daemon.stderr.take().unwrap();
            || {
                for line in BufReader::new(stderr).lines().flatten() {
@@ -281,7 +279,7 @@ impl Runtime {
                    }
                }
            }
-
        })?;
+
        });

        self.pool.run().unwrap();
        self.reactor.join().unwrap();
added radicle-node/src/runtime/thread.rs
@@ -0,0 +1,46 @@
+
use std::thread;
+

+
pub use thread::*;
+

+
use radicle::prelude::NodeId;
+

+
/// Spawn an OS thread.
+
pub fn spawn<D, F, T>(nid: &NodeId, label: D, f: F) -> thread::JoinHandle<T>
+
where
+
    D: std::fmt::Display,
+
    F: FnOnce() -> T,
+
    F: Send + 'static,
+
    T: Send + 'static,
+
{
+
    thread::Builder::new()
+
        .name(name(nid, label))
+
        .spawn(f)
+
        .expect("thread::spawn: thread label must not contain NULL bytes")
+
}
+

+
/// Spawn a scoped OS thread.
+
pub fn spawn_scoped<'scope, 'env, D, F, T>(
+
    nid: &NodeId,
+
    label: D,
+
    scope: &'scope thread::Scope<'scope, 'env>,
+
    f: F,
+
) -> thread::ScopedJoinHandle<'scope, T>
+
where
+
    D: std::fmt::Display,
+
    F: FnOnce() -> T,
+
    F: Send + 'scope,
+
    T: Send + 'scope,
+
{
+
    thread::Builder::new()
+
        .name(name(nid, label))
+
        .spawn_scoped(scope, f)
+
        .expect("thread::spawn_scoped: thread label must not contain NULL bytes")
+
}
+

+
pub fn name<D: std::fmt::Display>(nid: &NodeId, label: D) -> String {
+
    if cfg!(debug_assertions) {
+
        format!("{nid} {:<14}", format!("<{label}>"))
+
    } else {
+
        format!("{label}")
+
    }
+
}
modified radicle-node/src/test/environment.rs
@@ -343,12 +343,7 @@ impl<G: cyphernet::Ecdh<Pk = NodeId> + Signer + Clone> Node<G> {
        let addr = *rt.local_addrs.first().unwrap();
        let id = *self.signer.public_key();
        let handle = ManuallyDrop::new(rt.handle.clone());
-
        let thread = ManuallyDrop::new(
-
            thread::Builder::new()
-
                .name(id.to_string())
-
                .spawn(move || rt.run())
-
                .unwrap(),
-
        );
+
        let thread = ManuallyDrop::new(runtime::thread::spawn(&id, "runtime", move || rt.run()));

        NodeHandle {
            id,
modified radicle-node/src/test/logger.rs
@@ -23,7 +23,7 @@ impl Log for Logger {
            target => {
                if self.enabled(record.metadata()) {
                    let current = std::thread::current();
-
                    let msg = format!("{:>12} {}", format!("{target}:"), record.args());
+
                    let msg = format!("{:>10} {}", format!("{target}:"), record.args());
                    let time = LocalTime::now().as_secs();
                    let s = if let Some(name) = current.name() {
                        format!("{time} {name:<16} {msg}")
modified radicle-node/src/worker.rs
@@ -5,8 +5,7 @@ mod tunnel;
use std::collections::{BTreeSet, HashSet};
use std::io::{prelude::*, BufReader};
use std::ops::ControlFlow;
-
use std::thread::JoinHandle;
-
use std::{env, io, net, process, thread, time};
+
use std::{env, io, net, process, time};

use crossbeam_channel as chan;

@@ -15,7 +14,7 @@ use radicle::prelude::NodeId;
use radicle::storage::{Namespaces, ReadRepository, RefUpdate};
use radicle::{git, storage, Storage};

-
use crate::runtime::Handle;
+
use crate::runtime::{thread, Handle};
use crate::wire::StreamId;
use channels::{ChannelReader, ChannelWriter};
use tunnel::Tunnel;
@@ -28,8 +27,6 @@ pub struct Config {
    pub capacity: usize,
    /// Whether to use atomic fetches.
    pub atomic: bool,
-
    /// Thread name.
-
    pub name: String,
    /// Timeout for all operations.
    pub timeout: time::Duration,
    /// Git daemon address.
@@ -147,7 +144,6 @@ struct Worker {
    timeout: time::Duration,
    handle: Handle,
    atomic: bool,
-
    name: String,
}

impl Worker {
@@ -348,7 +344,7 @@ impl Worker {
        log::debug!(target: "worker", "Entering Git protocol loop for {rid}..");

        thread::scope(|s| {
-
            let daemon_to_stream = thread::Builder::new().name(self.name.clone()).spawn_scoped(s, || {
+
            let daemon_to_stream = thread::spawn_scoped(&self.nid, "upload-pack", s, || {
                let mut buffer = [0; u16::MAX as usize + 1];

                loop {
@@ -372,9 +368,9 @@ impl Worker {
                    }
                }
                Self::eof(remote, stream, stream_w, &mut self.handle)
-
            })?;
+
            });

-
            let stream_to_daemon = s.spawn(move || {
+
            let stream_to_daemon = thread::spawn_scoped(&self.nid, "upload-pack", s, move || {
                match stream_r
                    .pipe(&mut daemon_w)
                    .and_then(|()| daemon_w.shutdown(net::Shutdown::Both))
@@ -432,40 +428,36 @@ impl Worker {
        // Since `ls-remote` may return a lot of data, we read the child's stdout concurrently, to
        // prevent deadlocks that could arise if we fill the pipe buffer before the process exits.
        thread::scope(|s| {
-
            thread::Builder::new()
-
                .name(self.name.clone())
-
                .spawn_scoped(s, || {
-
                    for line in BufReader::new(stderr).lines().flatten() {
-
                        log::debug!(target: "worker", "Git: {}", line);
-
                    }
-
                })?;
-
            thread::Builder::new()
-
                .name(self.name.clone())
-
                .spawn_scoped(s, || {
-
                    for line in BufReader::new(stdout).lines().flatten() {
-
                        log::debug!(target: "worker", "Git: {}", line);
-

-
                        let r = match line.split_whitespace().next_back() {
-
                            Some(r) => r,
-
                            None => {
-
                                log::trace!(target: "worker", "Git: ls-remote returned unexpected format {line}");
-
                                continue;
-
                            }
-
                        };
-
                        match git::RefString::try_from(r) {
-
                            Ok(r) => {
-
                                if let Some(ns) = r.to_namespaced() {
-
                                    refs.insert(ns.to_owned());
-
                                } else {
-
                                    log::debug!(target: "worker", "Git: non-namespaced ref '{r}'")
-
                                }
-
                            }
-
                            Err(err) => {
-
                                log::warn!(target: "worker", "Git: invalid refname '{r}' {err}")
+
            thread::spawn_scoped(&self.nid, "ls-refs", s, || {
+
                for line in BufReader::new(stderr).lines().flatten() {
+
                    log::debug!(target: "worker", "Git: {}", line);
+
                }
+
            });
+
            thread::spawn_scoped(&self.nid, "ls-refs", s, || {
+
                for line in BufReader::new(stdout).lines().flatten() {
+
                    log::debug!(target: "worker", "Git: {}", line);
+

+
                    let r = match line.split_whitespace().next_back() {
+
                        Some(r) => r,
+
                        None => {
+
                            log::trace!(target: "worker", "Git: ls-remote returned unexpected format {line}");
+
                            continue;
+
                        }
+
                    };
+
                    match git::RefString::try_from(r) {
+
                        Ok(r) => {
+
                            if let Some(ns) = r.to_namespaced() {
+
                                refs.insert(ns.to_owned());
+
                            } else {
+
                                log::debug!(target: "worker", "Git: non-namespaced ref '{r}'")
                            }
                        }
+
                        Err(err) => {
+
                            log::warn!(target: "worker", "Git: invalid refname '{r}' {err}")
+
                        }
                    }
-
                })?;
+
                }
+
            });

            tunnel.run(self.timeout)?;

@@ -535,11 +527,11 @@ impl Worker {
        let mut child = cmd.spawn()?;
        let stderr = child.stderr.take().unwrap();

-
        thread::Builder::new().name(self.name.clone()).spawn(|| {
+
        thread::spawn(&self.nid, "fetch", || {
            for line in BufReader::new(stderr).lines().flatten() {
                log::debug!(target: "worker", "Git: {}", line);
            }
-
        })?;
+
        });

        tunnel.run(self.timeout)?;

@@ -574,14 +566,14 @@ impl Worker {

/// A pool of workers. One thread is allocated for each worker.
pub struct Pool {
-
    pool: Vec<JoinHandle<Result<(), chan::RecvError>>>,
+
    pool: Vec<thread::JoinHandle<Result<(), chan::RecvError>>>,
}

impl Pool {
    /// Create a new worker pool with the given parameters.
    pub fn with(nid: NodeId, tasks: chan::Receiver<Task>, handle: Handle, config: Config) -> Self {
        let mut pool = Vec::with_capacity(config.capacity);
-
        for _ in 0..config.capacity {
+
        for i in 0..config.capacity {
            let worker = Worker {
                nid,
                tasks: tasks.clone(),
@@ -589,13 +581,9 @@ impl Pool {
                storage: config.storage.clone(),
                daemon: config.daemon,
                timeout: config.timeout,
-
                name: config.name.clone(),
                atomic: config.atomic,
            };
-
            let thread = thread::Builder::new()
-
                .name(config.name.clone())
-
                .spawn(|| worker.run())
-
                .unwrap();
+
            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());

            pool.push(thread);
        }
modified radicle-node/src/worker/tunnel.rs
@@ -1,10 +1,8 @@
-
use std::{
-
    io::{self, Read},
-
    net, thread, time,
-
};
+
use std::{io, io::Read, net, time};

use super::channels::Channels;
use super::{Handle, NodeId, StreamId, Worker};
+
use crate::runtime::thread;

/// Tunnels fetches to a remote peer.
pub struct Tunnel<'a> {
@@ -56,34 +54,31 @@ impl<'a> Tunnel<'a> {
        let stream_id = self.stream;

        thread::scope(|s| {
-
            let remote_to_local = thread::Builder::new()
-
                .name(self.local.to_string())
-
                .spawn_scoped(s, || remote_r.pipe(local_w))?;
+
            let remote_to_local =
+
                thread::spawn_scoped(&self.local, "tunnel", s, || remote_r.pipe(local_w));

-
            let local_to_remote = thread::Builder::new()
-
                .name(self.local.to_string())
-
                .spawn_scoped(s, || {
-
                    let mut buffer = [0; u16::MAX as usize + 1];
+
            let local_to_remote = thread::spawn_scoped(&self.local, "tunnel", s, || {
+
                let mut buffer = [0; u16::MAX as usize + 1];

-
                    loop {
-
                        match local_r.read(&mut buffer) {
-
                            Ok(0) => break,
-
                            Ok(n) => {
-
                                remote_w.send(buffer[..n].to_vec())?;
+
                loop {
+
                    match local_r.read(&mut buffer) {
+
                        Ok(0) => break,
+
                        Ok(n) => {
+
                            remote_w.send(buffer[..n].to_vec())?;

-
                                if let Err(e) = self.handle.flush(nid, stream_id) {
-
                                    log::error!(
-
                                        target: "worker", "Worker channel disconnected; aborting"
-
                                    );
-
                                    return Err(e);
-
                                }
+
                            if let Err(e) = self.handle.flush(nid, stream_id) {
+
                                log::error!(
+
                                    target: "worker", "Worker channel disconnected; aborting"
+
                                );
+
                                return Err(e);
                            }
-
                            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
-
                            Err(e) => return Err(e),
                        }
+
                        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
+
                        Err(e) => return Err(e),
                    }
-
                    Worker::eof(nid, stream_id, remote_w, &mut self.handle)
-
                })?;
+
                }
+
                Worker::eof(nid, stream_id, remote_w, &mut self.handle)
+
            });

            remote_to_local.join().unwrap()?;
            local_to_remote.join().unwrap()?;
modified radicle-term/src/spinner.rs
@@ -116,78 +116,82 @@ pub fn spinner_to(
) -> Spinner {
    let message = message.to_string();
    let progress = Arc::new(Mutex::new(Progress::new(Paint::new(message))));
-
    let handle = thread::spawn({
-
        let progress = progress.clone();
+
    let handle = thread::Builder::new()
+
        .name(String::from("spinner"))
+
        .spawn({
+
            let progress = progress.clone();

-
        move || {
-
            let mut stdout = completion;
-
            let mut stderr = termion::cursor::HideCursor::from(animation);
+
            move || {
+
                let mut stdout = completion;
+
                let mut stderr = termion::cursor::HideCursor::from(animation);

-
            loop {
-
                let Ok(mut progress) = progress.lock() else {
+
                loop {
+
                    let Ok(mut progress) = progress.lock() else {
                    break;
                };
-
                match &mut *progress {
-
                    Progress {
-
                        state: State::Running { cursor },
-
                        message,
-
                    } => {
-
                        let spinner = DEFAULT_STYLE[*cursor];
-

-
                        write!(
-
                            stderr,
-
                            "{}{}{spinner} {message}",
-
                            termion::cursor::Save,
-
                            termion::clear::AfterCursor,
-
                        )
-
                        .ok();
-

-
                        write!(stderr, "{}", termion::cursor::Restore).ok();
-

-
                        *cursor += 1;
-
                        *cursor %= DEFAULT_STYLE.len();
-
                    }
-
                    Progress {
-
                        state: State::Done,
-
                        message,
-
                    } => {
-
                        write!(stderr, "{}", termion::clear::AfterCursor).ok();
-
                        writeln!(stdout, "{} {message}", Paint::green("✓")).ok();
-
                        break;
-
                    }
-
                    Progress {
-
                        state: State::Canceled,
-
                        message,
-
                    } => {
-
                        write!(stderr, "{}", termion::clear::AfterCursor).ok();
-
                        writeln!(
-
                            stdout,
-
                            "{ERROR_PREFIX} {message} {}",
-
                            Paint::red("<canceled>")
-
                        )
-
                        .ok();
-
                        break;
-
                    }
-
                    Progress {
-
                        state: State::Warn,
-
                        message,
-
                    } => {
-
                        writeln!(stdout, "{WARNING_PREFIX} {message}").ok();
-
                        break;
-
                    }
-
                    Progress {
-
                        state: State::Error,
-
                        message,
-
                    } => {
-
                        writeln!(stdout, "{ERROR_PREFIX} {message}").ok();
-
                        break;
+
                    match &mut *progress {
+
                        Progress {
+
                            state: State::Running { cursor },
+
                            message,
+
                        } => {
+
                            let spinner = DEFAULT_STYLE[*cursor];
+

+
                            write!(
+
                                stderr,
+
                                "{}{}{spinner} {message}",
+
                                termion::cursor::Save,
+
                                termion::clear::AfterCursor,
+
                            )
+
                            .ok();
+

+
                            write!(stderr, "{}", termion::cursor::Restore).ok();
+

+
                            *cursor += 1;
+
                            *cursor %= DEFAULT_STYLE.len();
+
                        }
+
                        Progress {
+
                            state: State::Done,
+
                            message,
+
                        } => {
+
                            write!(stderr, "{}", termion::clear::AfterCursor).ok();
+
                            writeln!(stdout, "{} {message}", Paint::green("✓")).ok();
+
                            break;
+
                        }
+
                        Progress {
+
                            state: State::Canceled,
+
                            message,
+
                        } => {
+
                            write!(stderr, "{}", termion::clear::AfterCursor).ok();
+
                            writeln!(
+
                                stdout,
+
                                "{ERROR_PREFIX} {message} {}",
+
                                Paint::red("<canceled>")
+
                            )
+
                            .ok();
+
                            break;
+
                        }
+
                        Progress {
+
                            state: State::Warn,
+
                            message,
+
                        } => {
+
                            writeln!(stdout, "{WARNING_PREFIX} {message}").ok();
+
                            break;
+
                        }
+
                        Progress {
+
                            state: State::Error,
+
                            message,
+
                        } => {
+
                            writeln!(stdout, "{ERROR_PREFIX} {message}").ok();
+
                            break;
+
                        }
                    }
+
                    drop(progress);
+
                    thread::sleep(DEFAULT_TICK);
                }
-
                drop(progress);
-
                thread::sleep(DEFAULT_TICK);
            }
-
        }
-
    });
+
        })
+
        // SAFETY: Only panics if the thread name contains `null` bytes, which isn't the case here.
+
        .unwrap();

    Spinner {
        progress,