Radish alpha
r
rad:z4V1sjrXqjvFdnCUbxPFqd5p4DtH5
Radicle web interface
Radicle
Git
httpd: Add `axum-listener` and use axum for unix-sockets directly
Sebastian Martinez committed 4 months ago
commit 4d062d83479d2cffead09416d31319254aaef418
parent 97ad0a8
6 files changed +126 -165
modified radicle-httpd/Cargo.lock
@@ -212,6 +212,18 @@ dependencies = [
]

[[package]]
+
name = "axum-listener"
+
version = "0.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b0cd6458a1428aec99bace599e09140ecd60479f81a33f866ec628b2f5d3a793"
+
dependencies = [
+
 "axum",
+
 "futures",
+
 "tokio",
+
 "tracing",
+
]
+

+
[[package]]
name = "backtrace"
version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -838,12 +850,28 @@ dependencies = [
]

[[package]]
+
name = "futures"
+
version = "0.3.30"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
+
dependencies = [
+
 "futures-channel",
+
 "futures-core",
+
 "futures-executor",
+
 "futures-io",
+
 "futures-sink",
+
 "futures-task",
+
 "futures-util",
+
]
+

+
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
 "futures-core",
+
 "futures-sink",
]

[[package]]
@@ -853,6 +881,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"

[[package]]
+
name = "futures-executor"
+
version = "0.3.30"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
+
dependencies = [
+
 "futures-core",
+
 "futures-task",
+
 "futures-util",
+
]
+

+
[[package]]
+
name = "futures-io"
+
version = "0.3.31"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+

+
[[package]]
+
name = "futures-macro"
+
version = "0.3.30"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
+
dependencies = [
+
 "proc-macro2",
+
 "quote",
+
 "syn 2.0.87",
+
]
+

+
[[package]]
+
name = "futures-sink"
+
version = "0.3.31"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+

+
[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -864,10 +926,16 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
+
 "futures-channel",
 "futures-core",
+
 "futures-io",
+
 "futures-macro",
+
 "futures-sink",
 "futures-task",
+
 "memchr",
 "pin-project-lite",
 "pin-utils",
+
 "slab",
]

[[package]]
@@ -2022,11 +2090,11 @@ version = "0.20.0"
dependencies = [
 "anyhow",
 "axum",
+
 "axum-listener",
 "base64 0.22.1",
 "chrono",
 "flate2",
 "hyper",
-
 "hyper-util",
 "infer",
 "lexopt",
 "lru",
@@ -2042,7 +2110,6 @@ dependencies = [
 "tempfile",
 "thiserror 2.0.14",
 "tokio",
-
 "tower 0.4.13",
 "tower 0.5.2",
 "tower-http",
 "tracing",
@@ -2830,7 +2897,6 @@ dependencies = [
 "tokio",
 "tower-layer",
 "tower-service",
-
 "tracing",
]

[[package]]
modified radicle-httpd/Cargo.toml
@@ -23,11 +23,11 @@ path = "src/main.rs"
[dependencies]
anyhow = { version = "1" }
axum = { version = "0.8.4", default-features = false, features = ["json", "query", "tokio", "http1"] }
+
axum-listener = { version = "0.2.0" }
base64 = { version = "0.22.1" }
chrono = { version = "0.4.41", default-features = false, features = ["clock"] }
flate2 = { version = "1" }
-
hyper = { version = "1.6.0", default-features = false, features = ["server", "http1"] }
-
hyper-util = { version = "0.1.7", features = ["tokio", "server", "service"] }
+
hyper = { version = "1.6.0", default-features = false }
infer = { version = "0.19.0" }
lexopt = { version = "0.3.1" }
lru = { version = "0.16.0" }
@@ -40,7 +40,6 @@ serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
thiserror = { version = "2" }
tokio = { version = "1.47.1", default-features = false, features = ["macros", "rt-multi-thread"] }
-
tower = { version = "0.4.13", features = ["util"] }
tower-http = { version = "0.6.6", default-features = false, features = ["trace", "cors", "set-header"] }
tracing = { version = "0.1.41", default-features = false, features = ["std", "log"] }
tracing-logfmt = { version = "0.3.5", optional = true }
modified radicle-httpd/src/lib.rs
@@ -4,26 +4,22 @@
pub mod error;

use std::collections::HashMap;
-
use std::net::SocketAddr;
use std::num::NonZeroUsize;
-
use std::path::PathBuf;
use std::process::Command;
use std::str;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context as _;
-
use axum::body::{Body, HttpBody};
-
use axum::http::{Request, Response};
+
use axum::body::Body;
+
use axum::http::Request;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{middleware, Json, Router};
-
use hyper::header::CONTENT_TYPE;
+
use axum_listener::{DualAddr, DualListener};
+
use hyper::body::Body as _;
+
use hyper::header::{CONTENT_TYPE, FORWARDED};
use hyper::Method;
-
use hyper_util::rt::TokioIo;
-
use hyper_util::service::TowerToHyperService;
-
use tokio::net::{TcpListener, UnixListener};
-
use tower::Service;
use tower_http::cors;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
@@ -32,9 +28,8 @@ use tracing::Span;
use radicle::identity::RepoId;
use radicle::Profile;

-
use tracing_extra::{tracing_middleware, ColoredStatus, Paint, RequestId, TracingInfo};
-

use crate::api::RADICLE_VERSION;
+
use crate::tracing_extra::{tracing_middleware, ColoredStatus, Paint, RequestId, TracingInfo};

mod api;
mod axum_extra;
@@ -49,24 +44,9 @@ mod tracing_extra;
pub const DEFAULT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();

#[derive(Debug, Clone)]
-
pub enum ListenAddress {
-
    Tcp(SocketAddr),
-
    Unix(PathBuf),
-
}
-

-
impl std::fmt::Display for ListenAddress {
-
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-
        match self {
-
            ListenAddress::Tcp(addr) => write!(f, "http://{}", addr),
-
            ListenAddress::Unix(path) => write!(f, "unix://{}", path.display()),
-
        }
-
    }
-
}
-

-
#[derive(Debug, Clone)]
pub struct Options {
    pub aliases: HashMap<String, RepoId>,
-
    pub listen: ListenAddress,
+
    pub listen: DualAddr,
    pub cache: Option<NonZeroUsize>,
}

@@ -80,47 +60,34 @@ pub async fn run(options: Options) -> anyhow::Result<()> {

    tracing::info!("{}", str::from_utf8(&git_version)?.trim());

-
    match &options.listen {
-
        ListenAddress::Tcp(addr) => {
-
            let listener = TcpListener::bind(addr).await?;
-
            tracing::info!("listening on {}", options.listen);
-
            run_tcp_server(listener, options).await
-
        }
-
        ListenAddress::Unix(path) => {
-
            // Remove existing socket file if it exists
-
            if path.exists() {
-
                std::fs::remove_file(path)?;
-
            }
-
            let listener = UnixListener::bind(path)?;
-
            tracing::info!("listening on {}", options.listen);
-
            run_unix_server(listener, options).await
-
        }
-
    }
-
}
+
    let listener = DualListener::bind(&options.listen).await?;
+
    tracing::info!("listening on {:?}", &options.listen);

-
async fn run_tcp_server(listener: TcpListener, options: Options) -> anyhow::Result<()> {
    let profile = Profile::load()?;
    let request_id = RequestId::new();

    tracing::info!("using radicle home at {}", profile.home().path().display());

-
    let app =
-
        router(options, profile)?
+
    let app = router(options, profile)?
        .layer(middleware::from_fn(tracing_middleware))
        .layer(
            TraceLayer::new_for_http()
-
                .make_span_with(move |_request: &Request<Body>| {
-
                    tracing::info_span!("request", id = %request_id.clone().next())
+
                .make_span_with(move |request: &Request<Body>| {
+
                    if let Some(forwarded) = request.headers().get("X-Forwarded-For").and_then(|s| s.to_str().ok()) {
+
                        tracing::info_span!("request", id = %request_id.clone().next(), "X-Forwarded-For" = forwarded)
+
                    } else {
+
                        tracing::info_span!("request", id = %request_id.clone().next())
+
                    }
                })
                .on_response(
-
                    |response: &Response<Body>, latency: Duration, _span: &Span| {
+
                    |response: &hyper::Response<Body>, latency: Duration, _span: &Span| {
                        if let Some(info) = response.extensions().get::<TracingInfo>() {
-
                            let client_addr = info.connect_info
-
                                .map(|ci| ci.0.to_string())
-
                                .unwrap_or_else(|| "unknown".to_string());
                            tracing::info!(
                                "{} \"{} {} {:?}\" {} {:?} {}",
-
                                client_addr,
+
                                match info.connect_info.0 {
+
                                    DualAddr::Tcp(c) => c.to_string(),
+
                                    DualAddr::Uds(_) => "unix-socket".into()
+
                                },
                                info.method,
                                info.uri,
                                info.version,
@@ -140,86 +107,14 @@ async fn run_tcp_server(listener: TcpListener, options: Options) -> anyhow::Resu
                            tracing::info!("Processed");
                        }
                    },
-
                ),
-
        )
-
        .into_make_service_with_connect_info::<SocketAddr>();
+
                )
+
        ).into_make_service_with_connect_info::<DualAddr>();

    axum::serve(listener, app)
        .await
        .map_err(anyhow::Error::from)
}

-
async fn run_unix_server(listener: UnixListener, options: Options) -> anyhow::Result<()> {
-
    let profile = Profile::load()?;
-
    let request_id = RequestId::new();
-

-
    tracing::info!("using radicle home at {}", profile.home().path().display());
-

-
    let app =
-
        router(options, profile)?
-
        .layer(middleware::from_fn(tracing_middleware))
-
        .layer(
-
            TraceLayer::new_for_http()
-
                .make_span_with(move |_request: &Request<Body>| {
-
                    tracing::info_span!("request", id = %request_id.clone().next())
-
                })
-
                .on_response(
-
                    |response: &Response<Body>, latency: Duration, _span: &Span| {
-
                        tracing::info!(
-
                            "unix-socket \"{} {} {:?}\" {} {:?} {}",
-
                            response.extensions().get::<TracingInfo>()
-
                                .map(|info| info.method.as_str())
-
                                .unwrap_or("UNKNOWN"),
-
                            response.extensions().get::<TracingInfo>()
-
                                .map(|info| info.uri.path())
-
                                .unwrap_or("/"),
-
                            response.extensions().get::<TracingInfo>()
-
                                .map(|info| info.version)
-
                                .unwrap_or(hyper::Version::HTTP_11),
-
                            ColoredStatus(response.status()),
-
                            latency,
-
                            Paint::dim(
-
                                response
-
                                    .body()
-
                                    .size_hint()
-
                                    .exact()
-
                                    .map(|n| n.to_string())
-
                                    .unwrap_or("0".to_string())
-
                                    .into()
-
                            ),
-
                        );
-
                    },
-
                ),
-
        )
-
        .into_make_service();
-

-
    // Manual Unix socket serving since axum doesn't directly support Unix listeners
-
    loop {
-
        let (stream, _) = listener.accept().await?;
-
        let mut app_service = app.clone();
-

-
        tokio::spawn(async move {
-
            let stream = TokioIo::new(stream);
-

-
            // Create a service from the service factory for this connection
-
            match app_service.call(()).await {
-
                Ok(service) => {
-
                    let hyper_service = TowerToHyperService::new(service);
-
                    if let Err(err) = hyper::server::conn::http1::Builder::new()
-
                        .serve_connection(stream, hyper_service)
-
                        .await
-
                    {
-
                        tracing::error!("Error serving Unix socket connection: {}", err);
-
                    }
-
                }
-
                Err(err) => {
-
                    tracing::error!("Failed to create service: {:?}", err);
-
                }
-
            }
-
        });
-
    }
-
}
-

/// Create a router consisting of other sub-routers.
fn router(options: Options, profile: Profile) -> anyhow::Result<Router> {
    let profile = Arc::new(profile);
@@ -315,8 +210,9 @@ mod routes {

    use axum::extract::connect_info::MockConnectInfo;
    use axum::http::StatusCode;
+
    use axum_listener::DualAddr;

-
    use crate::test::{self, get};
+
    use crate::test;

    #[tokio::test]
    async fn test_invalid_route_returns_404() {
@@ -324,7 +220,7 @@ mod routes {
        let app = super::router(
            super::Options {
                aliases: HashMap::new(),
-
                listen: super::ListenAddress::Tcp(SocketAddr::from(([0, 0, 0, 0], 8080))),
+
                listen: DualAddr::Tcp(SocketAddr::from(([0, 0, 0, 0], 8080))),
                cache: None,
            },
            test::profile(tmp.path(), [0xff; 32]),
@@ -332,7 +228,7 @@ mod routes {
        .unwrap()
        .layer(MockConnectInfo(SocketAddr::from(([0, 0, 0, 0], 8080))));

-
        let response = get(&app, "/aa/a").await;
+
        let response = test::get(&app, "/aa/a").await;

        assert_eq!(response.status(), StatusCode::NOT_FOUND);
    }
modified radicle-httpd/src/main.rs
@@ -1,7 +1,7 @@
use std::num::NonZeroUsize;
-
use std::path::PathBuf;
use std::{collections::HashMap, process};

+
use axum_listener::DualAddr;
use radicle::prelude::RepoId;
use radicle::version::Version;
use radicle_httpd as httpd;
@@ -32,13 +32,13 @@ Options

#[tokio::main]
async fn main() -> anyhow::Result<()> {
-
    let options = parse_options()?;
-

    // SAFETY: The logger is only initialized once.
    httpd::logger::init().unwrap();
    tracing::info!("starting http daemon..");
    tracing::info!("version {} ({})", env!("RADICLE_VERSION"), env!("GIT_HEAD"));

+
    let options = parse_options()?;
+

    match httpd::run(options).await {
        Ok(()) => {}
        Err(err) => {
@@ -61,9 +61,23 @@ fn parse_options() -> Result<httpd::Options, lexopt::Error> {
    while let Some(arg) = parser.next()? {
        match arg {
            Long("listen") => {
-
                let value = parser.value()?;
-
                let addr = parse_listen_address(&value.to_string_lossy())
-
                    .map_err(|e| lexopt::Error::from(e.to_string()))?;
+
                let addr: DualAddr = parser.value()?.parse()?;
+

+
                // Get socket path and remove it if existing
+
                if let DualAddr::Uds(socket_path) = &addr {
+
                    if let Some(path) = socket_path.as_pathname() {
+
                        if path.exists() {
+
                            tracing::info!("Removing existing socket path at {}", path.display());
+
                            if let Err(e) = std::fs::remove_file(path) {
+
                                tracing::error!("{e}");
+
                            }
+
                        }
+
                    } else {
+
                        tracing::error!("Provided socket address isn't a valid path.");
+
                        process::exit(0);
+
                    }
+
                }
+

                listen = Some(addr);
            }
            Long("alias") | Short('a') => {
@@ -74,7 +88,7 @@ fn parse_options() -> Result<httpd::Options, lexopt::Error> {
            }
            Long("version") | Short('v') => {
                if let Err(e) = VERSION.write(std::io::stdout()) {
-
                    eprintln!("error: {e}");
+
                    tracing::error!("{e}");
                    process::exit(1);
                };
                process::exit(0);
@@ -92,22 +106,7 @@ fn parse_options() -> Result<httpd::Options, lexopt::Error> {
    }
    Ok(httpd::Options {
        aliases,
-
        listen: listen.unwrap_or_else(|| httpd::ListenAddress::Tcp(([0, 0, 0, 0], 8080).into())),
+
        listen: listen.unwrap_or_else(|| DualAddr::Tcp(([0, 0, 0, 0], 8080).into())),
        cache,
    })
}
-

-
fn parse_listen_address(value: &str) -> Result<httpd::ListenAddress, Box<dyn std::error::Error>> {
-
    // Check if it's a Unix socket path (contains '/' and no ':' port separator)
-
    if value.contains('/') && !value.contains(':') {
-
        Ok(httpd::ListenAddress::Unix(PathBuf::from(value)))
-
    } else if value.starts_with("unix:") {
-
        // Support explicit unix: prefix
-
        let path = value.strip_prefix("unix:").unwrap();
-
        Ok(httpd::ListenAddress::Unix(PathBuf::from(path)))
-
    } else {
-
        // Try to parse as TCP address
-
        let addr = value.parse()?;
-
        Ok(httpd::ListenAddress::Tcp(addr))
-
    }
-
}
modified radicle-httpd/src/test.rs
@@ -326,7 +326,7 @@ fn seed_with_signer<G: Signer<Signature>>(

    let options = crate::Options {
        aliases: std::collections::HashMap::new(),
-
        listen: std::net::SocketAddr::from(([0, 0, 0, 0], 8080)),
+
        listen: axum_listener::DualAddr::Tcp(std::net::SocketAddr::from(([0, 0, 0, 0], 8080))),
        cache: Some(crate::DEFAULT_CACHE_SIZE),
    };

modified radicle-httpd/src/tracing_extra.rs
@@ -1,5 +1,4 @@
use std::fmt;
-
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

@@ -9,6 +8,7 @@ use axum::http::Request;
use axum::middleware::Next;
use axum::response::IntoResponse;
use axum::Extension;
+
use axum_listener::DualAddr;
use hyper::{Method, StatusCode, Uri, Version};

pub use radicle_term::ansi::Paint;
@@ -28,7 +28,7 @@ impl RequestId {

#[derive(Clone)]
pub struct TracingInfo {
-
    pub connect_info: Option<ConnectInfo<SocketAddr>>,
+
    pub connect_info: ConnectInfo<DualAddr>,
    pub method: Method,
    pub version: Version,
    pub uri: Uri,
@@ -50,8 +50,9 @@ impl fmt::Display for ColoredStatus {
pub async fn tracing_middleware(request: Request<Body>, next: Next) -> impl IntoResponse {
    let connect_info = request
        .extensions()
-
        .get::<ConnectInfo<std::net::SocketAddr>>()
-
        .copied();
+
        .get::<ConnectInfo<DualAddr>>()
+
        .unwrap()
+
        .clone();

    let method = request.method().clone();
    let version = request.version();