Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Spawn a thread for each control connection
Alexis Sellier committed 2 years ago
commit 664aa570e6ec1f948142da1b44032d465c118836
parent b8c2f2a647274869ef51a6ea576e6de8b2e4523a
6 files changed +85 -42
modified radicle-node/src/control.rs
@@ -25,6 +25,8 @@ pub enum Error {
    Bind(io::Error),
    #[error("invalid socket path specified: {0}")]
    InvalidPath(PathBuf),
+
    #[error("node: {0}")]
+
    Node(#[from] runtime::HandleError),
}

/// Listen for commands on the control socket, and process them.
@@ -33,25 +35,28 @@ pub fn listen<H: Handle<Error = runtime::HandleError> + 'static>(
    handle: H,
) -> Result<(), Error> {
    log::debug!(target: "control", "Control thread listening on socket..");
+
    let nid = handle.nid()?.to_human();

    for incoming in listener.incoming() {
        match incoming {
            Ok(mut stream) => {
-
                if let Err(e) = command(&stream, handle.clone()) {
-
                    if let CommandError::Shutdown = e {
-
                        log::debug!(target: "control", "Shutdown requested..");
-
                        // Channel might already be disconnected if shutdown
-
                        // came from somewhere else. Ignore errors.
-
                        handle.shutdown().ok();
-
                        break;
-
                    }
-
                    log::error!(target: "control", "Command returned error: {e}");
-

-
                    CommandResult::error(e).to_writer(&mut stream).ok();
-

-
                    stream.flush().ok();
-
                    stream.shutdown(net::Shutdown::Both).ok();
-
                }
+
                let handle = handle.clone();
+

+
                thread::Builder::new()
+
                    .name(nid.clone())
+
                    .spawn(move || {
+
                        if let Err(e) = command(&stream, handle) {
+
                            log::error!(target: "control", "Command returned error: {e}");
+

+
                            CommandResult::error(e).to_writer(&mut stream).ok();
+

+
                            stream.flush().ok();
+
                            stream.shutdown(net::Shutdown::Both).ok();
+
                        }
+
                    })
+
                    // SAFETY: Only panics if the thread name contained NULL bytes, which we can
+
                    // guarantee is not the case here.
+
                    .unwrap();
            }
            Err(e) => log::error!(target: "control", "Failed to accept incoming connection: {}", e),
        }
@@ -73,8 +78,6 @@ enum CommandError {
    Runtime(#[from] runtime::HandleError),
    #[error("i/o error: {0}")]
    Io(#[from] io::Error),
-
    #[error("shutdown requested")]
-
    Shutdown,
}

fn command<H: Handle<Error = runtime::HandleError> + 'static>(
@@ -82,7 +85,7 @@ fn command<H: Handle<Error = runtime::HandleError> + 'static>(
    mut handle: H,
) -> Result<(), CommandError> {
    let mut reader = BufReader::new(stream);
-
    let writer = LineWriter::new(stream);
+
    let mut writer = LineWriter::new(stream);
    let mut line = String::new();

    reader.read_line(&mut line)?;
@@ -187,29 +190,31 @@ fn command<H: Handle<Error = runtime::HandleError> + 'static>(
                return Err(CommandError::Runtime(e));
            }
        },
-
        CommandName::Subscribe => {
-
            let mut stream = stream.try_clone()?;
-

-
            thread::spawn(move || {
-
                match handle.subscribe(MAX_TIMEOUT) {
-
                    Ok(events) => {
-
                        for e in events {
-
                            let event = e?;
-
                            let event = serde_json::to_string(&event)?;
+
        CommandName::Subscribe => match handle.subscribe(MAX_TIMEOUT) {
+
            Ok(events) => {
+
                for e in events {
+
                    let event = e?;
+
                    let event = serde_json::to_string(&event)?;

-
                            writeln!(stream, "{event}")?;
-
                        }
-
                    }
-
                    Err(e) => log::error!(target: "control", "Error subscribing to events: {e}"),
+
                    writeln!(&mut writer, "{event}")?;
                }
-
                Ok::<_, io::Error>(())
-
            });
-
        }
+
            }
+
            Err(e) => log::error!(target: "control", "Error subscribing to events: {e}"),
+
        },
        CommandName::Status => {
            CommandResult::ok().to_writer(writer).ok();
        }
+
        CommandName::NodeId => match handle.nid() {
+
            Ok(nid) => {
+
                writeln!(writer, "{nid}")?;
+
            }
+
            Err(e) => return Err(CommandError::Runtime(e)),
+
        },
        CommandName::Shutdown => {
-
            return Err(CommandError::Shutdown);
+
            log::debug!(target: "control", "Shutdown requested..");
+
            // Channel might already be disconnected if shutdown
+
            // came from somewhere else. Ignore errors.
+
            handle.shutdown().ok();
        }
    }
    Ok(())
modified radicle-node/src/runtime.rs
@@ -4,7 +4,7 @@ use std::io::{BufRead, BufReader};
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-
use std::{fs, io, net, thread, time};
+
use std::{io, net, thread, time};

use crossbeam_channel as chan;
use cyphernet::Ecdh;
@@ -227,7 +227,7 @@ impl Runtime {
        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
        log::info!(target: "node", "Binding control socket {}..", home.socket().display());

-
        let control = thread::Builder::new().name(self.id.to_human()).spawn({
+
        thread::Builder::new().name(self.id.to_human()).spawn({
            let handle = self.handle.clone();
            move || control::listen(self.control, handle)
        })?;
@@ -262,11 +262,9 @@ impl Runtime {
        daemon::kill(&daemon).ok(); // Ignore error if daemon has already exited, for whatever reason.
        daemon.wait()?;

-
        // If the socket file was deleted by some other process, for whatever reason,
-
        // the control thread will not be able to join.
-
        if fs::remove_file(home.socket()).is_ok() {
-
            control.join().unwrap()?;
-
        }
+
        // Nb. We don't join the control thread here, as we have no way of notifying it that the
+
        // node is shutting down.
+

        log::debug!(target: "node", "Node shutdown completed for {}", self.id);

        Ok(())
modified radicle-node/src/runtime/handle.rs
@@ -123,6 +123,21 @@ impl radicle::node::Handle for Handle {
    type Sessions = Sessions;
    type Error = Error;

+
    fn nid(&self) -> Result<NodeId, Self::Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            sender.send(*state.nid()).ok();
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        let nid = receiver.recv()?;
+

+
        Ok(nid)
+
    }
+

    fn is_running(&self) -> bool {
        true
    }
modified radicle-node/src/service.rs
@@ -1517,6 +1517,8 @@ where

/// Gives read access to the service state.
pub trait ServiceState {
+
    /// Get the Node ID.
+
    fn nid(&self) -> &NodeId;
    /// Get the connected peers.
    fn sessions(&self) -> &Sessions;
    /// Get a repository from storage, using the local node's key.
@@ -1537,6 +1539,10 @@ where
    G: Signer,
    S: ReadStorage,
{
+
    fn nid(&self) -> &NodeId {
+
        self.signer.public_key()
+
    }
+

    fn sessions(&self) -> &Sessions {
        &self.sessions
    }
modified radicle-node/src/test/handle.rs
@@ -1,4 +1,5 @@
use std::collections::HashSet;
+
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::{io, time};

@@ -19,6 +20,10 @@ impl radicle::node::Handle for Handle {
    type Error = HandleError;
    type Sessions = service::Sessions;

+
    fn nid(&self) -> Result<NodeId, Self::Error> {
+
        Ok(NodeId::from_str("z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK").unwrap())
+
    }
+

    fn is_running(&self) -> bool {
        true
    }
modified radicle/src/node.rs
@@ -145,6 +145,8 @@ pub enum CommandName {
    UntrackNode,
    /// Get the node's status.
    Status,
+
    /// Get the node's NID.
+
    NodeId,
    /// Shutdown the node.
    Shutdown,
    /// Subscribe to events.
@@ -407,6 +409,8 @@ pub trait Handle: Clone + Sync + Send {
    /// The error returned by all methods.
    type Error: std::error::Error + Send + Sync + 'static;

+
    /// Get the local Node ID.
+
    fn nid(&self) -> Result<NodeId, Self::Error>;
    /// Check if the node is running. to a peer.
    fn is_running(&self) -> bool;
    /// Connect to a peer.
@@ -532,6 +536,15 @@ impl Handle for Node {
    type Sessions = ();
    type Error = Error;

+
    fn nid(&self) -> Result<NodeId, Error> {
+
        self.call::<&str, NodeId>(CommandName::NodeId, [], DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse {
+
                cmd: CommandName::NodeId,
+
            })?
+
            .map_err(Error::from)
+
    }
+

    fn is_running(&self) -> bool {
        let Ok(mut lines) = self.call::<&str, CommandResult>(CommandName::Status, [], DEFAULT_TIMEOUT) else {
            return false;
@@ -552,6 +565,7 @@ impl Handle for Node {
        .ok_or(Error::EmptyResponse {
            cmd: CommandName::Connect,
        })??;
+

        Ok(())
    }