Radish alpha
r
Radicle web interface
Radicle
Git (anonymous pull)
Log in to clone via SSH
httpd: implement request and response streaming in git handler
Defelo committed 14 days ago
commit c39af0a8e232b80eecc57a129c4245cb1048c740
parent 0b89857f21070c475212bd1e760c7098ef695299
3 files changed +84 -101
modified radicle-httpd/Cargo.lock
@@ -955,7 +955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
dependencies = [
 "libc",
-
 "windows-sys 0.59.0",
+
 "windows-sys 0.60.2",
]

[[package]]
@@ -1040,9 +1040,9 @@ dependencies = [

[[package]]
name = "futures-channel"
-
version = "0.3.30"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
+
checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
dependencies = [
 "futures-core",
 "futures-sink",
@@ -1050,9 +1050,9 @@ dependencies = [

[[package]]
name = "futures-core"
-
version = "0.3.30"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
+
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"

[[package]]
name = "futures-executor"
@@ -1067,15 +1067,15 @@ dependencies = [

[[package]]
name = "futures-io"
-
version = "0.3.31"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"

[[package]]
name = "futures-macro"
-
version = "0.3.30"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
+
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
 "proc-macro2",
 "quote",
@@ -1084,21 +1084,21 @@ dependencies = [

[[package]]
name = "futures-sink"
-
version = "0.3.31"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+
checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"

[[package]]
name = "futures-task"
-
version = "0.3.30"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
+
checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"

[[package]]
name = "futures-util"
-
version = "0.3.30"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
+
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
 "futures-channel",
 "futures-core",
@@ -1108,7 +1108,6 @@ dependencies = [
 "futures-task",
 "memchr",
 "pin-project-lite",
-
 "pin-utils",
 "slab",
]

@@ -2068,12 +2067,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"

[[package]]
-
name = "pin-utils"
-
version = "0.1.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
-

-
[[package]]
name = "pkcs1"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2392,6 +2385,7 @@ dependencies = [
 "axum-listener",
 "base64 0.22.1",
 "chrono",
+
 "futures-util",
 "hyper",
 "infer",
 "lexopt",
@@ -2409,6 +2403,7 @@ dependencies = [
 "tempfile",
 "thiserror 2.0.14",
 "tokio",
+
 "tokio-util",
 "tower 0.5.2",
 "tower-http",
 "tracing",
@@ -2689,7 +2684,7 @@ dependencies = [
 "errno",
 "libc",
 "linux-raw-sys 0.9.4",
-
 "windows-sys 0.59.0",
+
 "windows-sys 0.60.2",
]

[[package]]
@@ -3299,6 +3294,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
 "backtrace",
+
 "bytes",
 "io-uring",
 "libc",
 "mio 1.0.2",
modified radicle-httpd/Cargo.toml
@@ -26,6 +26,7 @@ axum = { version = "0.8.4", default-features = false, features = ["json", "query
axum-listener = { version = "0.2.2" }
base64 = { version = "0.22.1" }
chrono = { version = "0.4.41", default-features = false, features = ["clock"] }
+
futures-util = { version = "0.3.32", default-features = false }
hyper = { version = "1.6.0", default-features = false }
infer = { version = "0.19.0" }
lexopt = { version = "0.3.1" }
@@ -39,7 +40,8 @@ radicle-term = { version = "0.17.0", default-features = false }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
thiserror = { version = "2" }
-
tokio = { version = "1.47.1", default-features = false, features = ["macros", "rt-multi-thread", "signal"] }
+
tokio = { version = "1.47.1", default-features = false, features = ["macros", "rt-multi-thread", "signal", "process", "io-util"] }
+
tokio-util = { version = "0.7.18", default-features = false, features = ["io"] }
tower-http = { version = "0.6.6", default-features = false, features = ["trace", "cors", "set-header", "decompression-full"] }
tracing = { version = "0.1.41", default-features = false, features = ["std", "log"] }
tracing-logfmt = { version = "0.3.5", optional = true }
modified radicle-httpd/src/git.rs
@@ -1,12 +1,11 @@
use std::collections::HashMap;
-
use std::io::prelude::*;
use std::path::Path;
-
use std::process::{Command, Stdio};
+
use std::process::Stdio;
+
use std::str;
use std::sync::Arc;
-
use std::{io, str};

-
use axum::body::Bytes;
-
use axum::extract::{ConnectInfo, Path as AxumPath, RawQuery, State};
+
use axum::body::Body;
+
use axum::extract::{ConnectInfo, Path as AxumPath, RawQuery, Request, State};
use axum::http::header::HeaderName;
use axum::http::{HeaderMap, Method, StatusCode};
use axum::response::IntoResponse;
@@ -14,10 +13,14 @@ use axum::routing::any;
use axum::Router;
use axum_listener::DualAddr;

+
use futures_util::TryStreamExt;
use radicle::identity::RepoId;
use radicle::node::NodeId;
use radicle::profile::Profile;
use radicle::storage::{ReadRepository, ReadStorage};
+
use tokio::io::{AsyncBufReadExt, BufReader};
+
use tokio::process::Command;
+
use tokio_util::io::{ReaderStream, StreamReader};
use tower_http::decompression::RequestDecompressionLayer;

use crate::error::GitError as Error;
@@ -25,7 +28,7 @@ use crate::error::GitError as Error;
pub fn router(profile: Arc<Profile>, aliases: HashMap<String, RepoId>) -> Router {
    Router::new()
        .route(
-
            "/{rid}/{*request}",
+
            "/{rid}/{*path}",
            any(git_handler).layer(RequestDecompressionLayer::new()),
        )
        .with_state((profile, aliases))
@@ -33,12 +36,12 @@ pub fn router(profile: Arc<Profile>, aliases: HashMap<String, RepoId>) -> Router

async fn git_handler(
    State((profile, aliases)): State<(Arc<Profile>, HashMap<String, RepoId>)>,
-
    AxumPath((repository, request)): AxumPath<(String, String)>,
+
    AxumPath((repository, path)): AxumPath<(String, String)>,
    method: Method,
    headers: HeaderMap,
    ConnectInfo(remote): ConnectInfo<DualAddr>,
    query: RawQuery,
-
    body: Bytes,
+
    request: Request,
) -> impl IntoResponse {
    let query = query.0.unwrap_or_default();
    let name = repository.strip_suffix(".git").unwrap_or(&repository);
@@ -52,16 +55,16 @@ async fn git_handler(
        }
    };

-
    let (nid, request): (Option<NodeId>, &str) = {
-
        let (first, rest) = request.split_once('/').unwrap_or((&request, ""));
+
    let (nid, path): (Option<NodeId>, &str) = {
+
        let (first, rest) = path.split_once('/').unwrap_or((&path, ""));
        match first.parse::<NodeId>() {
            Ok(nid) => (Some(nid), rest),
-
            Err(_) => (None, &request),
+
            Err(_) => (None, &path),
        }
    };

    let (status, headers, body) = git_http_backend(
-
        &profile, method, headers, body, remote, rid, nid, request, query,
+
        &profile, method, headers, request, remote, rid, nid, path, query,
    )
    .await?;

@@ -80,20 +83,18 @@ async fn git_http_backend(
    profile: &Profile,
    method: Method,
    headers: HeaderMap,
-
    body: Bytes,
+
    request: Request,
    remote: DualAddr,
    id: RepoId,
    nid: Option<NodeId>,
    path: &str,
    query: String,
-
) -> Result<(StatusCode, HashMap<String, Vec<String>>, Vec<u8>), Error> {
+
) -> Result<(StatusCode, HashMap<String, Vec<String>>, Body), Error> {
    let git_dir = radicle::storage::git::paths::repository(&profile.storage, &id);
-
    let content_type =
-
        if let Some(Ok(content_type)) = headers.get("Content-Type").map(|h| h.to_str()) {
-
            content_type
-
        } else {
-
            ""
-
        };
+
    let content_type = headers
+
        .get("Content-Type")
+
        .and_then(|x| x.to_str().ok())
+
        .unwrap_or_default();

    // Don't allow cloning of private repositories.
    let doc = profile.storage.repository(id)?.identity_doc()?;
@@ -139,71 +140,55 @@ async fn git_http_backend(
        .stdin(Stdio::piped())
        .spawn()?;

-
    {
-
        // This is safe because we captured the child's stdin.
-
        let mut stdin = child.stdin.take().unwrap();
-
        stdin.write_all(&body)?;
-
    }
-

-
    match child.wait_with_output() {
-
        Ok(output) if output.status.success() => {
-
            tracing::info!("git-http-backend: exited successfully for {}", id);
-

-
            let mut reader = std::io::Cursor::new(output.stdout);
-
            let mut headers = HashMap::new();
-

-
            // Parse headers returned by git so that we can use them in the client response.
-
            for line in io::Read::by_ref(&mut reader).lines() {
-
                let line = line?;
-

-
                if line.is_empty() || line == "\r" {
-
                    break;
-
                }
+
    let mut stdin = child.stdin.take().expect("stdin was captured");
+
    let body = request
+
        .into_body()
+
        .into_data_stream()
+
        .map_err(std::io::Error::other);
+
    let mut body = StreamReader::new(body);
+
    tokio::spawn(async move {
+
        let _ = tokio::io::copy(&mut body, &mut stdin).await;
+
    });
+

+
    let stderr = BufReader::new(child.stderr.take().expect("stderr was captured"));
+
    tokio::spawn(async move {
+
        let mut lines = stderr.lines();
+
        while let Ok(Some(line)) = lines.next_line().await {
+
            tracing::error!("git-http-backend: stderr: {}", line.trim_end());
+
        }
+
    });

-
                let mut parts = line.splitn(2, ':');
-
                let key = parts.next();
-
                let value = parts.next();
+
    let mut stdout = BufReader::new(child.stdout.take().expect("stdout was captured"));

-
                if let (Some(key), Some(value)) = (key, value) {
-
                    let value = &value[1..];
+
    let mut headers = HashMap::<String, Vec<String>>::new();

-
                    headers
-
                        .entry(key.to_string())
-
                        .or_insert_with(Vec::new)
-
                        .push(value.to_string());
-
                } else {
-
                    return Err(Error::BackendHeader(line));
-
                }
-
            }
+
    let mut line = String::new();
+
    while stdout.read_line(&mut line).await? != 0 {
+
        if line.ends_with("\r\n") {
+
            line.truncate(line.len() - 2);
+
        }
+
        if line.is_empty() {
+
            break;
+
        }

-
            let status = {
-
                tracing::debug!("git-http-backend: {:?}", &headers);
+
        let Some((key, value)) = line.split_once(": ") else {
+
            return Err(Error::BackendHeader(line));
+
        };
+
        headers.entry(key.into()).or_default().push(value.into());
+
        line.clear();
+
    }

-
                let line = headers.remove("Status").unwrap_or_default();
-
                let line = line.into_iter().next().unwrap_or_default();
-
                let mut parts = line.split(' ');
+
    tracing::debug!("git-http-backend: {:?}", &headers);

-
                parts
-
                    .next()
-
                    .and_then(|p| p.parse().ok())
-
                    .unwrap_or(StatusCode::OK)
-
            };
+
    let status = headers
+
        .remove("Status")
+
        .as_ref()
+
        .and_then(|values| values.first()?.split_whitespace().next()?.parse().ok())
+
        .unwrap_or(StatusCode::OK);

-
            let position = reader.position() as usize;
-
            let body = reader.into_inner().split_off(position);
+
    let body = Body::from_stream(ReaderStream::new(stdout));

-
            Ok((status, headers, body))
-
        }
-
        Ok(output) => {
-
            if let Ok(output) = std::str::from_utf8(&output.stderr) {
-
                tracing::error!("git-http-backend: stderr: {}", output.trim_end());
-
            }
-
            Err(Error::BackendExited(output.status))
-
        }
-
        Err(err) => {
-
            panic!("failed to wait for git-http-backend: {err}");
-
        }
-
    }
+
    Ok((status, headers, body))
}

#[cfg(test)]