Radish alpha
r
rad:z4V1sjrXqjvFdnCUbxPFqd5p4DtH5
Radicle web interface
Radicle
Git
httpd: Make radicle-httpd work with unix sockets
Rūdolfs Ošiņš committed 4 months ago
commit 97ad0a865ec5c40576b457999495c54e66e5e0be
parent 97dac7e
5 files changed +148 -15
modified radicle-httpd/Cargo.lock
@@ -2026,6 +2026,7 @@ dependencies = [
 "chrono",
 "flate2",
 "hyper",
+
 "hyper-util",
 "infer",
 "lexopt",
 "lru",
@@ -2041,6 +2042,7 @@ dependencies = [
 "tempfile",
 "thiserror 2.0.14",
 "tokio",
+
 "tower 0.4.13",
 "tower 0.5.2",
 "tower-http",
 "tracing",
@@ -2828,6 +2830,7 @@ dependencies = [
 "tokio",
 "tower-layer",
 "tower-service",
+
 "tracing",
]

[[package]]
modified radicle-httpd/Cargo.toml
@@ -26,7 +26,8 @@ axum = { version = "0.8.4", default-features = false, features = ["json", "query
base64 = { version = "0.22.1" }
chrono = { version = "0.4.41", default-features = false, features = ["clock"] }
flate2 = { version = "1" }
-
hyper = { version = "1.6.0", default-features = false }
+
hyper = { version = "1.6.0", default-features = false, features = ["server", "http1"] }
+
hyper-util = { version = "0.1.7", features = ["tokio", "server", "service"] }
infer = { version = "0.19.0" }
lexopt = { version = "0.3.1" }
lru = { version = "0.16.0" }
@@ -39,6 +40,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
thiserror = { version = "2" }
tokio = { version = "1.47.1", default-features = false, features = ["macros", "rt-multi-thread"] }
+
tower = { version = "0.4.13", features = ["util"] }
tower-http = { version = "0.6.6", default-features = false, features = ["trace", "cors", "set-header"] }
tracing = { version = "0.1.41", default-features = false, features = ["std", "log"] }
tracing-logfmt = { version = "0.3.5", optional = true }
modified radicle-httpd/src/lib.rs
@@ -6,6 +6,7 @@ pub mod error;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
+
use std::path::PathBuf;
use std::process::Command;
use std::str;
use std::sync::Arc;
@@ -19,7 +20,10 @@ use axum::routing::get;
use axum::{middleware, Json, Router};
use hyper::header::CONTENT_TYPE;
use hyper::Method;
-
use tokio::net::TcpListener;
+
use hyper_util::rt::TokioIo;
+
use hyper_util::service::TowerToHyperService;
+
use tokio::net::{TcpListener, UnixListener};
+
use tower::Service;
use tower_http::cors;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
@@ -45,9 +49,24 @@ mod tracing_extra;
pub const DEFAULT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();

#[derive(Debug, Clone)]
+
pub enum ListenAddress {
+
    Tcp(SocketAddr),
+
    Unix(PathBuf),
+
}
+

+
impl std::fmt::Display for ListenAddress {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            ListenAddress::Tcp(addr) => write!(f, "http://{}", addr),
+
            ListenAddress::Unix(path) => write!(f, "unix://{}", path.display()),
+
        }
+
    }
+
}
+

+
#[derive(Debug, Clone)]
pub struct Options {
    pub aliases: HashMap<String, RepoId>,
-
    pub listen: SocketAddr,
+
    pub listen: ListenAddress,
    pub cache: Option<NonZeroUsize>,
}

@@ -61,10 +80,25 @@ pub async fn run(options: Options) -> anyhow::Result<()> {

    tracing::info!("{}", str::from_utf8(&git_version)?.trim());

-
    let listener = TcpListener::bind(options.listen).await?;
-

-
    tracing::info!("listening on http://{}", options.listen);
+
    match &options.listen {
+
        ListenAddress::Tcp(addr) => {
+
            let listener = TcpListener::bind(addr).await?;
+
            tracing::info!("listening on {}", options.listen);
+
            run_tcp_server(listener, options).await
+
        }
+
        ListenAddress::Unix(path) => {
+
            // Remove existing socket file if it exists
+
            if path.exists() {
+
                std::fs::remove_file(path)?;
+
            }
+
            let listener = UnixListener::bind(path)?;
+
            tracing::info!("listening on {}", options.listen);
+
            run_unix_server(listener, options).await
+
        }
+
    }
+
}

+
async fn run_tcp_server(listener: TcpListener, options: Options) -> anyhow::Result<()> {
    let profile = Profile::load()?;
    let request_id = RequestId::new();

@@ -81,9 +115,12 @@ pub async fn run(options: Options) -> anyhow::Result<()> {
                .on_response(
                    |response: &Response<Body>, latency: Duration, _span: &Span| {
                        if let Some(info) = response.extensions().get::<TracingInfo>() {
+
                            let client_addr = info.connect_info
+
                                .map(|ci| ci.0.to_string())
+
                                .unwrap_or_else(|| "unknown".to_string());
                            tracing::info!(
                                "{} \"{} {} {:?}\" {} {:?} {}",
-
                                info.connect_info.0,
+
                                client_addr,
                                info.method,
                                info.uri,
                                info.version,
@@ -112,6 +149,77 @@ pub async fn run(options: Options) -> anyhow::Result<()> {
        .map_err(anyhow::Error::from)
}

+
async fn run_unix_server(listener: UnixListener, options: Options) -> anyhow::Result<()> {
+
    let profile = Profile::load()?;
+
    let request_id = RequestId::new();
+

+
    tracing::info!("using radicle home at {}", profile.home().path().display());
+

+
    let app =
+
        router(options, profile)?
+
        .layer(middleware::from_fn(tracing_middleware))
+
        .layer(
+
            TraceLayer::new_for_http()
+
                .make_span_with(move |_request: &Request<Body>| {
+
                    tracing::info_span!("request", id = %request_id.clone().next())
+
                })
+
                .on_response(
+
                    |response: &Response<Body>, latency: Duration, _span: &Span| {
+
                        tracing::info!(
+
                            "unix-socket \"{} {} {:?}\" {} {:?} {}",
+
                            response.extensions().get::<TracingInfo>()
+
                                .map(|info| info.method.as_str())
+
                                .unwrap_or("UNKNOWN"),
+
                            response.extensions().get::<TracingInfo>()
+
                                .map(|info| info.uri.path())
+
                                .unwrap_or("/"),
+
                            response.extensions().get::<TracingInfo>()
+
                                .map(|info| info.version)
+
                                .unwrap_or(hyper::Version::HTTP_11),
+
                            ColoredStatus(response.status()),
+
                            latency,
+
                            Paint::dim(
+
                                response
+
                                    .body()
+
                                    .size_hint()
+
                                    .exact()
+
                                    .map(|n| n.to_string())
+
                                    .unwrap_or("0".to_string())
+
                                    .into()
+
                            ),
+
                        );
+
                    },
+
                ),
+
        )
+
        .into_make_service();
+

+
    // Manual Unix socket serving since axum doesn't directly support Unix listeners
+
    loop {
+
        let (stream, _) = listener.accept().await?;
+
        let mut app_service = app.clone();
+

+
        tokio::spawn(async move {
+
            let stream = TokioIo::new(stream);
+

+
            // Create a service from the service factory for this connection
+
            match app_service.call(()).await {
+
                Ok(service) => {
+
                    let hyper_service = TowerToHyperService::new(service);
+
                    if let Err(err) = hyper::server::conn::http1::Builder::new()
+
                        .serve_connection(stream, hyper_service)
+
                        .await
+
                    {
+
                        tracing::error!("Error serving Unix socket connection: {}", err);
+
                    }
+
                }
+
                Err(err) => {
+
                    tracing::error!("Failed to create service: {:?}", err);
+
                }
+
            }
+
        });
+
    }
+
}
+

/// Create a router consisting of other sub-routers.
fn router(options: Options, profile: Profile) -> anyhow::Result<Router> {
    let profile = Arc::new(profile);
@@ -216,7 +324,7 @@ mod routes {
        let app = super::router(
            super::Options {
                aliases: HashMap::new(),
-
                listen: SocketAddr::from(([0, 0, 0, 0], 8080)),
+
                listen: super::ListenAddress::Tcp(SocketAddr::from(([0, 0, 0, 0], 8080))),
                cache: None,
            },
            test::profile(tmp.path(), [0xff; 32]),
modified radicle-httpd/src/main.rs
@@ -1,4 +1,5 @@
use std::num::NonZeroUsize;
+
use std::path::PathBuf;
use std::{collections::HashMap, process};

use radicle::prelude::RepoId;
@@ -16,10 +17,12 @@ pub const HELP_MSG: &str = r#"
Usage

   radicle-httpd [<option>...]
-
   
+

Options

-
    --listen       <address>         Address to listen on (default: 0.0.0.0:8080)
+
    --listen       <address>         Address to listen on: TCP address (e.g., 127.0.0.1:8080)
+
                                     or Unix socket path (e.g., /tmp/radicle.sock)
+
                                     (default: 0.0.0.0:8080)
    --alias, -a    <alias> <rid>     Provide alias and RID pairs to shorten git clone commands for repositories,
                                     e.g. heartwood and rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5 to produce https://seed.radicle.xyz/heartwood.git
    --cache        <number>          Max amount of items in cache for /tree endpoints (default: 100)
@@ -58,7 +61,9 @@ fn parse_options() -> Result<httpd::Options, lexopt::Error> {
    while let Some(arg) = parser.next()? {
        match arg {
            Long("listen") => {
-
                let addr = parser.value()?.parse()?;
+
                let value = parser.value()?;
+
                let addr = parse_listen_address(&value.to_string_lossy())
+
                    .map_err(|e| lexopt::Error::from(e.to_string()))?;
                listen = Some(addr);
            }
            Long("alias") | Short('a') => {
@@ -87,7 +92,22 @@ fn parse_options() -> Result<httpd::Options, lexopt::Error> {
    }
    Ok(httpd::Options {
        aliases,
-
        listen: listen.unwrap_or_else(|| ([0, 0, 0, 0], 8080).into()),
+
        listen: listen.unwrap_or_else(|| httpd::ListenAddress::Tcp(([0, 0, 0, 0], 8080).into())),
        cache,
    })
}
+

+
fn parse_listen_address(value: &str) -> Result<httpd::ListenAddress, Box<dyn std::error::Error>> {
+
    // Check if it's a Unix socket path (contains '/' and no ':' port separator)
+
    if value.contains('/') && !value.contains(':') {
+
        Ok(httpd::ListenAddress::Unix(PathBuf::from(value)))
+
    } else if value.starts_with("unix:") {
+
        // Support explicit unix: prefix
+
        let path = value.strip_prefix("unix:").unwrap();
+
        Ok(httpd::ListenAddress::Unix(PathBuf::from(path)))
+
    } else {
+
        // Try to parse as TCP address
+
        let addr = value.parse()?;
+
        Ok(httpd::ListenAddress::Tcp(addr))
+
    }
+
}
modified radicle-httpd/src/tracing_extra.rs
@@ -28,7 +28,7 @@ impl RequestId {

#[derive(Clone)]
pub struct TracingInfo {
-
    pub connect_info: ConnectInfo<SocketAddr>,
+
    pub connect_info: Option<ConnectInfo<SocketAddr>>,
    pub method: Method,
    pub version: Version,
    pub uri: Uri,
@@ -48,10 +48,10 @@ impl fmt::Display for ColoredStatus {
}

pub async fn tracing_middleware(request: Request<Body>, next: Next) -> impl IntoResponse {
-
    let connect_info = *request
+
    let connect_info = request
        .extensions()
        .get::<ConnectInfo<std::net::SocketAddr>>()
-
        .unwrap();
+
        .copied();

    let method = request.method().clone();
    let version = request.version();