Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Use winpipe for control socket on Windows
Merged lorenz opened 8 months ago

As std::os::unix is obviously not available on Windows, resort to the winpipe crate, which implements a very similar API for named pipes.

Apart from simple changes to imports, we need to jump through a few hoops because the std::io::{Read, Write} impls are on &mut for winpipe, thus yielding different mutability requirements on Unix vs. Windows.

We resort to cloning, but as this fallible, swallow the added complexity of a platform-dependent implementation to not risk panics on Unix.

5 files changed +75 -20 ce11f03f 19a262d3
modified Cargo.lock
@@ -2870,6 +2870,7 @@ dependencies = [
 "tempfile",
 "test-log",
 "thiserror 1.0.69",
+
 "winpipe",
]

[[package]]
modified crates/radicle-node/Cargo.toml
@@ -51,6 +51,9 @@ thiserror = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
radicle-systemd = { workspace = true, optional = true }

+
[target.'cfg(windows)'.dependencies]
+
winpipe = { workspace = true }
+

[dev-dependencies]
qcheck = { workspace = true }
qcheck-macros = { workspace = true }
modified crates/radicle-node/src/control.rs
@@ -2,11 +2,14 @@
use std::io::prelude::*;
use std::io::BufReader;
use std::io::LineWriter;
-
use std::os::unix::net::UnixListener;
-
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::{io, net, time};

+
#[cfg(unix)]
+
use std::os::unix::net::{UnixListener as Listener, UnixStream as Stream};
+
#[cfg(windows)]
+
use winpipe::{WinListener as Listener, WinStream as Stream};
+

use radicle::node::Handle;
use serde_json as json;

@@ -30,7 +33,7 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<E, H>(listener: UnixListener, handle: H) -> Result<(), Error>
+
pub fn listen<E, H>(listener: Listener, handle: H) -> Result<(), Error>
where
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
@@ -42,11 +45,11 @@ where

    for incoming in listener.incoming() {
        match incoming {
-
            Ok(mut stream) => {
+
            Ok(stream) => {
                let handle = handle.clone();

                thread::spawn(&nid, "control", move || {
-
                    if let Err(e) = command(&stream, handle) {
+
                    if let Err((e, mut stream)) = command(stream, handle) {
                        log::error!(target: "control", "Command returned error: {e}");

                        CommandResult::error(e).to_writer(&mut stream).ok();
@@ -74,15 +77,56 @@ enum CommandError {
    Io(#[from] io::Error),
}

-
fn command<E, H>(stream: &UnixStream, mut handle: H) -> Result<(), CommandError>
+
#[cfg(unix)]
+
fn command<E, H>(stream: Stream, handle: H) -> Result<(), (CommandError, Stream)>
+
where
+
    H: Handle<Error = runtime::HandleError> + 'static,
+
    H::Sessions: serde::Serialize,
+
    CommandResult<E>: From<H::Event>,
+
    E: serde::Serialize,
+
{
+
    let reader = BufReader::new(&stream);
+
    let writer = LineWriter::new(&stream);
+

+
    command_internal(reader, writer, handle).map_err(|e| (e, stream))
+
}
+

+

+
/// Due to different mutability requirements between Unix and Windows,
+
/// we are forced to clone the stream on Windows.
+
///
+
/// # Errors
+
///
+
/// As of winpipe 0.1.1, [`WinStream::try_clone`] is actually infallible.
+
#[cfg(windows)]
+
fn command<E, H>(mut stream: Stream, handle: H) -> Result<(), (CommandError, Stream)>
+
where
+
    H: Handle<Error = runtime::HandleError> + 'static,
+
    H::Sessions: serde::Serialize,
+
    CommandResult<E>: From<H::Event>,
+
    E: serde::Serialize,
+
{
+
    let mut clone = stream.try_clone().map_err(|e| (e.into(), stream))?;
+
    let reader = BufReader::new(&mut clone);
+
    let writer = LineWriter::new(&mut stream);
+

+
    command_internal(reader, writer, handle).map_err(|e| (e, stream))
+
}
+

+
#[inline(always)]
+
fn command_internal<E, H, R, W>(
+
    mut reader: BufReader<R>,
+
    mut writer: LineWriter<W>,
+
    mut handle: H,
+
) -> Result<(), CommandError>
where
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
    CommandResult<E>: From<H::Event>,
    E: serde::Serialize,
+
    R: io::Read,
+
    W: io::Write,
{
-
    let mut reader = BufReader::new(stream);
-
    let mut writer = LineWriter::new(stream);
    let mut line = String::new();

    reader.read_line(&mut line)?;
@@ -241,7 +285,6 @@ fn fetch<W: Write, H: Handle<Error = runtime::HandleError>>(
#[cfg(test)]
mod tests {
    use std::io::prelude::*;
-
    use std::os::unix::net::UnixStream;
    use std::thread;

    use super::*;
@@ -257,7 +300,7 @@ mod tests {
        let handle = test::handle::Handle::default();
        let socket = tmp.path().join("alice.sock");
        let rids = test::arbitrary::set::<RepoId>(1..3);
-
        let listener = UnixListener::bind(&socket).unwrap();
+
        let listener = Listener::bind(&socket).unwrap();

        thread::spawn({
            let handle = handle.clone();
@@ -267,7 +310,7 @@ mod tests {

        for rid in &rids {
            let stream = loop {
-
                if let Ok(stream) = UnixStream::connect(&socket) {
+
                if let Ok(stream) = Stream::connect(&socket) {
                    break stream;
                }
            };
@@ -305,7 +348,7 @@ mod tests {
        let socket = tmp.path().join("node.sock");
        let proj = test::arbitrary::gen::<RepoId>(1);
        let peer = test::arbitrary::gen::<NodeId>(1);
-
        let listener = UnixListener::bind(&socket).unwrap();
+
        let listener = Listener::bind(&socket).unwrap();
        let mut handle = Node::new(&socket);

        thread::spawn({
modified crates/radicle-node/src/runtime.rs
@@ -1,10 +1,14 @@
pub mod handle;
pub mod thread;

-
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::{fs, io, net};

+
#[cfg(unix)]
+
use std::os::unix::net::UnixListener as Listener;
+
#[cfg(windows)]
+
use winpipe::WinListener as Listener;
+

use crossbeam_channel as chan;
use cyphernet::Ecdh;
use netservices::resource::NetAccept;
@@ -100,9 +104,9 @@ impl From<service::Error> for Error {
/// Wraps a [`UnixListener`] but tracks its origin.
pub enum ControlSocket {
    /// The listener was created by binding to it.
-
    Bound(UnixListener, PathBuf),
+
    Bound(Listener, PathBuf),
    /// The listener was received via socket activation.
-
    Received(UnixListener),
+
    Received(Listener),
}

/// Holds join handles to the client threads, as well as a client handle.
@@ -313,7 +317,7 @@ impl Runtime {
    }

    #[cfg(all(feature = "systemd", target_os = "linux"))]
-
    fn receive_listener() -> Option<UnixListener> {
+
    fn receive_listener() -> Option<Listener> {
        use std::os::fd::FromRawFd;
        match radicle_systemd::listen::fd("control") {
            Ok(Some(fd)) => {
@@ -327,7 +331,7 @@ impl Runtime {
                Some(unsafe {
                    // SAFETY: We take ownership of this FD from systemd,
                    // which guarantees that it is open.
-
                    UnixListener::from_raw_fd(fd)
+
                    Listener::from_raw_fd(fd)
                })
            }
            Ok(None) => None,
@@ -348,7 +352,7 @@ impl Runtime {
        }

        log::info!(target: "node", "Binding control socket {}..", &path.display());
-
        match UnixListener::bind(&path) {
+
        match Listener::bind(&path) {
            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
            Err(err) => Err(err.into()),
modified crates/radicle-node/src/runtime/handle.rs
@@ -1,9 +1,13 @@
use std::net;
-
use std::os::unix::net::UnixStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{fmt, io, time};

+
#[cfg(unix)]
+
use std::os::unix::net::UnixStream as Stream;
+
#[cfg(windows)]
+
use winpipe::WinStream as Stream;
+

use crossbeam_channel as chan;
use radicle::node::events::{Event, Events};
use radicle::node::policy;
@@ -320,7 +324,7 @@ impl radicle::node::Handle for Handle {
        // Send a shutdown request to our own control socket. This is the only way to kill the
        // control thread gracefully. Since the control thread may have called this function,
        // the control socket may already be disconnected. Ignore errors.
-
        UnixStream::connect(self.home.socket())
+
        Stream::connect(self.home.socket())
            .and_then(|sock| Command::Shutdown.to_writer(sock))
            .ok();