use std::io;
use std::io::{Read, Write};
use std::process::{Command, ExitStatus, Stdio};
use std::time::{Duration, Instant};
use radicle_fetch::{ByteSlice as _, RemoteProgress};
use radicle::Storage;
use radicle::identity::RepoId;
use radicle::node::events;
use radicle::node::events::Emitter;
use radicle::node::{Event, NodeId};
use radicle::storage::git::paths;
use crate::runtime::thread;
/// Perform the Git upload-pack process, given that the Git request
/// `header` has already been read and parsed.
///
/// N.b. The upload-pack process itself is strict, i.e. it will read
/// requests from the client indefinitely, and so the client side MUST
/// send the EOF file message.
pub fn upload_pack<R, W>(
nid: &NodeId,
remote: NodeId,
storage: &Storage,
emitter: &Emitter<Event>,
header: &GitRequest,
mut recv: R,
send: W,
timeout: Duration,
) -> io::Result<ExitStatus>
where
R: io::Read + Send,
W: io::Write + Send,
{
let timer = Instant::now();
let protocol_version = header
.extra
.iter()
.find_map(|kv| match kv {
(k, Some(v)) if k == "version" => {
let version = match v.as_str() {
"2" => 2,
"1" => 1,
_ => 0,
};
Some(version)
}
_ => None,
})
.unwrap_or(0);
if protocol_version != 2 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"only Git protocol version 2 is supported",
));
}
let git_dir = paths::repository(storage, &header.repo);
let mut child = {
let mut cmd = Command::new("git");
cmd.current_dir(git_dir)
.env_clear()
.envs(std::env::vars().filter(|(key, _)| key == "PATH" || key.starts_with("GIT_TRACE")))
.env("GIT_PROTOCOL", format!("version={protocol_version}"))
.args([
"-c",
"uploadpack.allowAnySha1InWant=true",
"-c",
"uploadpack.allowRefInWant=true",
"-c",
"lsrefs.unborn=ignore",
"upload-pack",
"--strict",
format!("--timeout={}", timeout.as_secs()).as_str(),
".",
])
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.stderr(Stdio::inherit());
cmd.spawn()?
};
#[cfg(windows)]
let job = radicle_windows::jobs::Job::for_child(&child)?;
let mut stdin = child.stdin.take().unwrap();
let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
let reporter = std::sync::Mutex::new(Reporter::new(header.repo, remote, emitter.clone(), send));
thread::scope(|s| {
thread::spawn_scoped(nid, "upload-pack", s, || {
let mut buffer = [0; u16::MAX as usize + 1];
loop {
match stdout.read(&mut buffer) {
Ok(0) => break,
Ok(n) => {
let mut lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
if let Err(e) = lock.write_all(&buffer[..n]) {
log::debug!(target: "worker", "Failed to write buffer to upload-pack reporter: {e}");
emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
break;
}
drop(lock);
}
Err(e) => {
log::debug!(target: "worker", "Exiting upload-pack writer thread for {}: {e}", header.repo);
emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
break;
}
}
}
});
let reader = thread::spawn_scoped(nid, "upload-pack", s, || {
let mut buffer = [0; u16::MAX as usize + 1];
loop {
match recv.read(&mut buffer) {
Ok(0) => break,
Ok(n) => {
if let Err(e) = stdin.write_all(&buffer[..n]) {
log::debug!(target: "worker", "Failed to write to upload-pack stdin: {e}");
break;
}
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
log::debug!(target: "worker", "Exiting upload-pack reader thread for {}", header.repo);
break;
}
Err(e) if e.kind() == io::ErrorKind::TimedOut => {
log::debug!(target: "worker", "Read channel timed out for upload-pack {}", header.repo);
// N.b. if the read timed out, ensure that the sender isn't
// still sending messages.
let lock = reporter.lock().expect("FATAL: upload_pack poisoned lock");
if lock.is_timeout(timeout) {
break;
}
}
Err(e) => {
log::debug!(target: "worker", "Failure on upload-pack channel read for {}: {e}", header.repo);
emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
break;
}
}
}
});
// N.b. we only care if the `reader` is finished. We then kill
// the child which will end the thread for the sender.
if let Err(e) = reader.join() {
log::warn!(target: "worker", "Upload pack thread panicked: {e:?}");
}
#[cfg(unix)]
return child.kill();
#[cfg(windows)]
return job.terminate(3);
})?;
let status = child.wait()?;
emitter.emit(events::UploadPack::done(header.repo, remote, status).into());
log::debug!(target: "worker", "Upload pack finished ({}ms)", timer.elapsed().as_millis());
Ok(status)
}
/// A combination of the upload-pack sender with an [`Emitter`] for reporting
/// the progress events to subscribers.
struct Reporter<W> {
rid: RepoId,
remote: NodeId,
emitter: Emitter<Event>,
send: W,
total: usize,
last_sent: Instant,
}
impl<W> Reporter<W> {
fn new(rid: RepoId, remote: NodeId, emitter: Emitter<Event>, send: W) -> Self {
Self {
rid,
remote,
emitter,
send,
total: 0,
last_sent: Instant::now(),
}
}
fn is_timeout(&self, timeout: Duration) -> bool {
Instant::now().duration_since(self.last_sent) > timeout
}
fn emit(&mut self, buf: &[u8]) {
let event = match Self::as_upload_pack_progress(buf) {
Some(progress) => events::UploadPack::write(self.rid, self.remote, progress),
None => {
self.total += buf.len();
events::UploadPack::pack_progress(self.rid, self.remote, self.total)
}
};
log::trace!(target: "worker", "upload-pack progress: {event:?}");
self.emitter.emit(event.into());
}
fn as_upload_pack_progress(buf: &[u8]) -> Option<events::upload_pack::Progress> {
use events::upload_pack::Progress::*;
let RemoteProgress {
action, step, max, ..
} = RemoteProgress::from_bytes(buf)?;
if action.contains_str("Counting objects") {
step.and_then(|processed| max.map(|total| Counting { processed, total }))
} else if action.contains_str("Compressing objects") {
step.and_then(|processed| max.map(|total| Compressing { processed, total }))
} else if action.contains_str("Enumerating objects") {
max.map(|total| Enumerating { total })
} else {
None
}
}
}
impl<W> io::Write for Reporter<W>
where
W: io::Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.send.write(buf)?;
self.emit(buf);
self.last_sent = Instant::now();
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.send.flush()
}
}
/// The Git request packet-line for a repository.
///
/// See <https://git-scm.com/docs/pack-protocol.html#_git_transport>.
///
/// Example: `0032git-upload-pack /rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5.git\0host=myserver.com\0`
#[derive(Debug)]
pub struct GitRequest {
pub repo: RepoId,
#[allow(dead_code)]
pub path: String,
#[allow(dead_code)]
pub host: Option<(String, Option<u16>)>,
pub extra: Vec<(String, Option<String>)>,
}
impl GitRequest {
pub(super) fn from_packetline(
packet_line: gix_packetline::PacketLineRef<'_>,
) -> Option<GitRequest> {
packet_line.as_slice().and_then(Self::parse)
}
/// Parse a Git command from a packet-line.
fn parse(input: &[u8]) -> Option<Self> {
let input = std::str::from_utf8(input).ok()?;
let mut parts = input
.strip_prefix("git-upload-pack ")?
.split_terminator('\0');
let path = parts.next()?.to_owned();
let repo = path.strip_prefix('/')?.parse().ok()?;
let host = match parts.next() {
None | Some("") => None,
Some(host) => {
let host = host.strip_prefix("host=")?;
match host.split_once(':') {
None => Some((host.to_owned(), None)),
Some((host, port)) => {
let port = port.parse::<u16>().ok()?;
Some((host.to_owned(), Some(port)))
}
}
}
};
let extra = parts
.skip_while(|part| part.is_empty())
.map(|part| match part.split_once('=') {
None => (part.to_owned(), None),
Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
})
.collect();
Some(Self {
repo,
path,
host,
extra,
})
}
}