Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Control via `uds_windows` not `winpipe`
Merged lorenz opened 2 months ago

Recent versions of Windows support Unix Domain Sockets natively, see https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/.

By using that feature instead of Windows named pipes, the difference for handling communication via the control socket comparing Unix-like systems and Windows becomes smaller:

  1. No special paths like \.\pipe\… have to be handled.
  2. Not two I/O mechanisms are abstracted (named pipe and UDS) but just one.
  3. winpipe relies on background threads while uds_windows does not.

Once the feature windows_unix_domain_sockets (which is tracked at https://github.com/rust-lang/rust/issues/150487) stabilizes, it will likely be possible to just drop the dependency uds_windows and use the implementation in std::os::windows::net directly.

9 files changed +54 -101 90cf37c4 ebf7d876
modified Cargo.lock
@@ -2229,6 +2229,15 @@ dependencies = [
]

[[package]]
+
name = "memoffset"
+
version = "0.9.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
+
dependencies = [
+
 "autocfg",
+
]
+

+
[[package]]
name = "miniz_oxide"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2840,8 +2849,8 @@ dependencies = [
 "sqlite",
 "tempfile",
 "thiserror 2.0.17",
+
 "uds_windows",
 "unicode-normalization",
-
 "winpipe",
]

[[package]]
@@ -3071,7 +3080,7 @@ dependencies = [
 "tempfile",
 "test-log",
 "thiserror 2.0.17",
-
 "winpipe",
+
 "uds_windows",
]

[[package]]
@@ -4492,6 +4501,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"

[[package]]
+
name = "uds_windows"
+
version = "1.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "89daebc3e6fd160ac4aa9fc8b3bf71e1f74fbf92367ae71fb83a037e8bf164b9"
+
dependencies = [
+
 "memoffset",
+
 "tempfile",
+
 "winapi",
+
]
+

+
[[package]]
name = "unarray"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -69,6 +69,7 @@ snapbox = "0.4.3"
sqlite = "0.32.0"
tempfile = "3.3.0"
thiserror = { version = "2", default-features = false }
+
uds_windows = "1.1.0"
winpipe = "0.1.1"
winsplit = "0.1.0"
zeroize = "1.5.7"
modified crates/radicle-node/Cargo.toml
@@ -47,8 +47,8 @@ thiserror = { workspace = true, default-features = true }
radicle-systemd = { workspace = true, optional = true }

[target.'cfg(windows)'.dependencies]
-
winpipe = { workspace = true }
radicle-windows = { workspace = true }
+
uds_windows = { workspace = true }

[dev-dependencies]
mio = { version = "1", features = ["os-ext"] }
modified crates/radicle-node/src/control.rs
@@ -6,9 +6,9 @@ use std::path::PathBuf;
use std::{io, net, time};

#[cfg(unix)]
-
use std::os::unix::net::{UnixListener as Listener, UnixStream as Stream};
+
use std::os::unix::net::{UnixListener, UnixStream};
#[cfg(windows)]
-
use winpipe::{WinListener as Listener, WinStream as Stream};
+
use uds_windows::{UnixListener, UnixStream};

use radicle::node::Handle;
use serde_json as json;
@@ -33,7 +33,7 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<E, H>(listener: Listener, handle: H) -> Result<(), Error>
+
pub fn listen<E, H>(listener: UnixListener, handle: H) -> Result<(), Error>
where
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
@@ -45,11 +45,11 @@ where

    for incoming in listener.incoming() {
        match incoming {
-
            Ok(stream) => {
+
            Ok(mut stream) => {
                let handle = handle.clone();

                thread::spawn(&nid, "control", move || {
-
                    if let Err((e, mut stream)) = command(stream, handle) {
+
                    if let Err(e) = command(&stream, handle) {
                        log::debug!(target: "control", "Command returned error: {e}");

                        CommandResult::error(e).to_writer(&mut stream).ok();
@@ -77,63 +77,16 @@ enum CommandError {
    Io(#[from] io::Error),
}

-
#[cfg(unix)]
-
fn command<E, H>(stream: Stream, handle: H) -> Result<(), (CommandError, Stream)>
-
where
-
    H: Handle<Error = runtime::HandleError> + 'static,
-
    H::Sessions: serde::Serialize,
-
    CommandResult<E>: From<H::Event>,
-
    E: serde::Serialize,
-
{
-
    let reader = BufReader::new(&stream);
-
    let writer = LineWriter::new(&stream);
-

-
    command_internal(reader, writer, handle).map_err(|e| (e, stream))
-
}
-

-
/// Due to different mutability requirements between Unix and Windows,
-
/// we are forced to clone the stream on Windows.
-
///
-
/// # Errors
-
///
-
/// As of winpipe 0.1.1, [`WinStream::try_clone`] is actually infallible.
-
#[cfg(windows)]
-
fn command<E, H>(stream: Stream, handle: H) -> Result<(), (CommandError, Stream)>
+
fn command<E, H>(stream: &UnixStream, mut handle: H) -> Result<(), CommandError>
where
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
    CommandResult<E>: From<H::Event>,
    E: serde::Serialize,
{
-
    let mut reader = match stream.try_clone() {
-
        Ok(reader) => reader,
-
        Err(err) => return Err((err.into(), stream)),
-
    };
-
    let reader = BufReader::new(&mut reader);
-

-
    let mut writer = match stream.try_clone() {
-
        Ok(writer) => writer,
-
        Err(err) => return Err((err.into(), stream)),
-
    };
-
    let writer = LineWriter::new(&mut writer);
-

-
    command_internal(reader, writer, handle).map_err(|e| (e, stream))
-
}
+
    let mut reader = BufReader::new(stream);
+
    let mut writer = LineWriter::new(stream);

-
#[inline(always)]
-
fn command_internal<E, H, R, W>(
-
    mut reader: BufReader<R>,
-
    mut writer: LineWriter<W>,
-
    mut handle: H,
-
) -> Result<(), CommandError>
-
where
-
    H: Handle<Error = runtime::HandleError> + 'static,
-
    H::Sessions: serde::Serialize,
-
    CommandResult<E>: From<H::Event>,
-
    E: serde::Serialize,
-
    R: io::Read,
-
    W: io::Write,
-
{
    let mut line = String::new();

    reader.read_line(&mut line)?;
@@ -319,7 +272,7 @@ mod tests {
        let handle = test::handle::Handle::default();
        let socket = tmp.path().join("alice.sock");
        let rids = test::arbitrary::set::<RepoId>(1..3);
-
        let listener = Listener::bind(&socket).unwrap();
+
        let listener = UnixListener::bind(&socket).unwrap();
        let nid = handle.nid().unwrap();

        thread::spawn({
@@ -330,7 +283,7 @@ mod tests {

        for rid in &rids {
            let mut stream = loop {
-
                if let Ok(stream) = Stream::connect(&socket) {
+
                if let Ok(stream) = UnixStream::connect(&socket) {
                    break stream;
                }
            };
@@ -369,7 +322,7 @@ mod tests {
        let socket = tmp.path().join("node.sock");
        let proj = test::arbitrary::gen::<RepoId>(1);
        let peer = test::arbitrary::gen::<NodeId>(1);
-
        let listener = Listener::bind(&socket).unwrap();
+
        let listener = UnixListener::bind(&socket).unwrap();
        let mut handle = Node::new(&socket);

        thread::spawn({
modified crates/radicle-node/src/runtime.rs
@@ -6,9 +6,9 @@ use std::path::PathBuf;
use std::{fs, io, net};

#[cfg(unix)]
-
use std::os::unix::net::UnixListener as Listener;
+
use std::os::unix::net::UnixListener;
#[cfg(windows)]
-
use winpipe::WinListener as Listener;
+
use uds_windows::UnixListener;

use crossbeam_channel as chan;
use cyphernet::Ecdh;
@@ -99,12 +99,12 @@ impl From<service::Error> for Error {
    }
}

-
/// Wraps a [`Listener`] but tracks its origin.
+
/// Wraps a [`UnixListener`] but tracks its origin.
pub enum ControlSocket {
    /// The listener was created by binding to it.
-
    Bound(Listener, PathBuf),
+
    Bound(UnixListener, PathBuf),
    /// The listener was received via socket activation.
-
    Received(Listener),
+
    Received(UnixListener),
}

/// Holds join handles to the client threads, as well as a client handle.
@@ -323,7 +323,7 @@ impl Runtime {
    }

    #[cfg(all(feature = "systemd", target_os = "linux"))]
-
    fn receive_listener() -> Option<Listener> {
+
    fn receive_listener() -> Option<UnixListener> {
        let fd = match radicle_systemd::listen::fd("control") {
            Ok(Some(fd)) => fd,
            Ok(None) => return None,
@@ -348,7 +348,7 @@ impl Runtime {
            return None;
        }

-
        Some(Listener::from(socket))
+
        Some(UnixListener::from(socket))
    }

    fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
@@ -361,7 +361,7 @@ impl Runtime {
        }

        log::info!(target: "node", "Binding control socket {}..", &path.display());
-
        match Listener::bind(&path) {
+
        match UnixListener::bind(&path) {
            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
            Err(err) => Err(err.into()),
modified crates/radicle-node/src/runtime/handle.rs
@@ -5,9 +5,9 @@ use std::sync::Arc;
use std::{fmt, io, time};

#[cfg(unix)]
-
use std::os::unix::net::UnixStream as Stream;
+
use std::os::unix::net::UnixStream;
#[cfg(windows)]
-
use winpipe::WinStream as Stream;
+
use uds_windows::UnixStream;

use crossbeam_channel as chan;
use radicle::crypto::PublicKey;
@@ -338,7 +338,7 @@ impl radicle::node::Handle for Handle {
        // Send a shutdown request to our own control socket. This is the only way to kill the
        // control thread gracefully. Since the control thread may have called this function,
        // the control socket may already be disconnected. Ignore errors.
-
        Stream::connect(self.home.socket())
+
        UnixStream::connect(self.home.socket())
            .and_then(|sock| Command::Shutdown.to_writer(sock))
            .ok();

modified crates/radicle/Cargo.toml
@@ -63,7 +63,7 @@ unicode-normalization = { version = "0.1" }
libc = { workspace = true }

[target.'cfg(windows)'.dependencies]
-
winpipe = { workspace = true }
+
uds_windows = { workspace = true }

[dev-dependencies]
emojis = "0.6"
modified crates/radicle/src/node.rs
@@ -25,9 +25,9 @@ use std::str::FromStr;
use std::{fmt, io, net, thread, time};

#[cfg(unix)]
-
use std::os::unix::net::UnixStream as Stream;
+
use std::os::unix::net::UnixStream;
#[cfg(windows)]
-
use winpipe::WinStream as Stream;
+
use uds_windows::UnixStream;

use amplify::WrapperMut;
use cyphernet::addr::NetAddr;
@@ -948,7 +948,7 @@ pub trait Handle: Clone + Sync + Send {
/// The iterator blocks for a `timeout` duration, returning [`Error::TimedOut`]
/// if the duration is reached.
pub struct LineIter<T> {
-
    stream: BufReader<Stream>,
+
    stream: BufReader<UnixStream>,
    timeout: time::Duration,
    witness: PhantomData<T>,
}
@@ -1009,7 +1009,7 @@ impl Node {
        cmd: Command,
        timeout: time::Duration,
    ) -> Result<LineIter<T>, Error> {
-
        let mut stream = Stream::connect(&self.socket)
+
        let mut stream = UnixStream::connect(&self.socket)
            .map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
        cmd.to_writer(&mut stream)?;
        Ok(LineIter {
modified crates/radicle/src/profile.rs
@@ -603,32 +603,11 @@ impl Home {
    }

    pub fn socket(&self) -> PathBuf {
-
        use env::RAD_SOCKET;
-

-
        #[cfg(unix)]
        const DEFAULT_SOCKET_NAME: &str = "control.sock";

-
        #[cfg(windows)]
-
        const DEFAULT_SOCKET_NAME: &str = r#"\\.\pipe\radicle-node"#;
-

-
        match env::var_os(RAD_SOCKET).map(PathBuf::from) {
-
            None => {
-
                #[cfg(unix)]
-
                return self.node().join(DEFAULT_SOCKET_NAME);
-

-
                #[cfg(windows)]
-
                return PathBuf::from(DEFAULT_SOCKET_NAME);
-
            }
-
            Some(path) => {
-
                #[cfg(windows)]
-
                {
-
                    const PIPE_PREFIX: &str = r#"\\.\pipe\"#;
-
                    assert!(path.starts_with(PIPE_PREFIX), "The value of the environment variable {RAD_SOCKET} ('{}') must start with {PIPE_PREFIX}. This restriction might be relaxed in the future.", path.display());
-
                }
-

-
                path
-
            }
-
        }
+
        env::var_os(env::RAD_SOCKET)
+
            .map(PathBuf::from)
+
            .unwrap_or_else(|| self.node().join(DEFAULT_SOCKET_NAME))
    }

    /// Return a read-write handle to the notifications database.