Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Use `git-daemon` as backend
Alexis Sellier committed 3 years ago
commit 02be3341441eb9dac2df3b6e6945f136ff81423a
parent d9e1055067429ab8ba2650c9874dd5fabe8d97f5
5 files changed +173 -72
modified radicle-node/src/client.rs
@@ -1,3 +1,4 @@
+
use std::io::{BufRead, BufReader};
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::{fs, io, net, thread, time};
@@ -66,13 +67,15 @@ pub struct Runtime<G: Signer + EcSign> {
    pub home: Home,
    pub handle: Handle<G>,
    pub control: thread::JoinHandle<Result<(), control::Error>>,
+
    pub storage: Storage,
    pub reactor: Reactor<wire::Control<G>>,
+
    pub daemon: net::SocketAddr,
    pub pool: WorkerPool,
    pub local_addrs: Vec<net::SocketAddr>,
}

impl<G: Signer + EcSign> Runtime<G> {
-
    /// Run the client.
+
    /// Initialize the runtime.
    ///
    /// This function spawns threads.
    pub fn with(
@@ -80,6 +83,7 @@ impl<G: Signer + EcSign> Runtime<G> {
        config: service::Config,
        listen: Vec<net::SocketAddr>,
        proxy: net::SocketAddr,
+
        daemon: net::SocketAddr,
        signer: G,
    ) -> Result<Runtime<G>, Error>
    where
@@ -140,6 +144,8 @@ impl<G: Signer + EcSign> Runtime<G> {

        log::info!(target: "node", "Binding control socket {}..", node_sock.display());

+
        // TODO: Move this stuff to `run` function.
+

        let listener = match UnixListener::bind(&node_sock) {
            Ok(sock) => sock,
            Err(err) if err.kind() == io::ErrorKind::AddrInUse => {
@@ -157,7 +163,8 @@ impl<G: Signer + EcSign> Runtime<G> {
        let pool = WorkerPool::with(
            8,
            time::Duration::from_secs(9),
-
            storage,
+
            storage.clone(),
+
            daemon,
            worker_recv,
            handle.clone(),
            id.to_human(),
@@ -167,7 +174,9 @@ impl<G: Signer + EcSign> Runtime<G> {
            id,
            home,
            control,
+
            storage,
            reactor,
+
            daemon,
            handle,
            pool,
            local_addrs,
@@ -176,11 +185,25 @@ impl<G: Signer + EcSign> Runtime<G> {

    pub fn run(self) -> Result<(), Error> {
        log::info!(target: "node", "Running node {}..", self.id);
+
        log::info!(target: "node", "Spawning git daemon at {}..", self.storage.path().display());
+

+
        let mut daemon = daemon::spawn(self.storage.path(), self.daemon)?;
+
        thread::Builder::new().name(self.id.to_human()).spawn({
+
            let stderr = daemon.stderr.take().unwrap();
+
            || {
+
                for line in BufReader::new(stderr).lines().flatten() {
+
                    log::debug!(target: "daemon", "{line}");
+
                }
+
            }
+
        })?;

        self.pool.run().unwrap();
        self.reactor.join().unwrap();
        self.control.join().unwrap()?;

+
        daemon.kill().ok(); // Ignore error if daemon has already exited, for whatever reason.
+
        daemon.wait()?;
+

        fs::remove_file(self.home.socket()).ok();

        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
@@ -188,3 +211,41 @@ impl<G: Signer + EcSign> Runtime<G> {
        Ok(())
    }
}
+

+
pub mod daemon {
+
    use std::path::Path;
+
    use std::process::{Child, Command, Stdio};
+
    use std::{env, io, net};
+

+
    pub fn spawn(storage: &Path, addr: net::SocketAddr) -> io::Result<Child> {
+
        let storage = storage.canonicalize()?;
+
        let listen = format!("--listen={}", addr.ip());
+
        let port = format!("--port={}", addr.port());
+
        let child = Command::new("git")
+
            .env_clear()
+
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT")))
+
            .env("GIT_PROTOCOL", "version=2")
+
            .current_dir(storage)
+
            .arg("daemon")
+
            // Make all git directories available.
+
            .arg("--export-all")
+
            .arg("--verbose")
+
            // The git "root". Should be our storage path.
+
            .arg("--base-path=.")
+
            // Timeout (in seconds) between the moment the connection is established
+
            // and the client request is received (typically a rather low value,
+
            // since that should be basically immediate).
+
            .arg("--init-timeout=3")
+
            // Timeout (in seconds) for specific client sub-requests.
+
            // This includes the time it takes for the server to process the sub-request
+
            // and the time spent waiting for the next client’s request.
+
            .arg("--timeout=9")
+
            .arg("--log-destination=stderr")
+
            .arg(listen)
+
            .arg(port)
+
            .stderr(Stdio::piped())
+
            .spawn()?;
+

+
        Ok(child)
+
    }
+
}
modified radicle-node/src/control.rs
@@ -35,8 +35,6 @@ pub fn listen<H: Handle<Error = client::handle::Error>>(
                log::debug!(target: "control", "Accepted new client on control socket..");

                if let Err(e) = drain(&stream, &mut handle) {
-
                    log::debug!(target: "control", "Received {} on control socket", e);
-

                    if let DrainError::Shutdown = e {
                        log::debug!(target: "control", "Shutdown requested..");
                        // Channel might already be disconnected if shutdown
modified radicle-node/src/main.rs
@@ -90,7 +90,8 @@ fn execute() -> anyhow::Result<()> {
        ..service::Config::default()
    };
    let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
-
    let runtime = Runtime::with(home, config, options.listen, proxy, signer)?;
+
    let daemon = ([0, 0, 0, 0], 9418).into();
+
    let runtime = Runtime::with(home, config, options.listen, proxy, daemon, signer)?;

    runtime.run()?;

modified radicle-node/src/tests/e2e.rs
@@ -104,7 +104,16 @@ impl Node {
    fn spawn(self, config: service::Config) -> NodeHandle {
        let listen = vec![([0, 0, 0, 0], 0).into()];
        let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
-
        let rt = Runtime::with(self.home, config, listen, proxy, self.signer.clone()).unwrap();
+
        let daemon = ([0, 0, 0, 0], fastrand::u16(1025..)).into();
+
        let rt = Runtime::with(
+
            self.home,
+
            config,
+
            listen,
+
            proxy,
+
            daemon,
+
            self.signer.clone(),
+
        )
+
        .unwrap();
        let addr = *rt.local_addrs.first().unwrap();
        let id = *self.signer.public_key();
        let handle = ManuallyDrop::new(rt.handle.clone());
@@ -400,9 +409,7 @@ fn test_clone() {
    match lookup {
        // Drain the channel.
        FetchLookup::Found { seeds, results } => for _ in results.iter().take(seeds.len()) {},
-
        other => {
-
            panic!("Unexpected fetch lookup: {:?}", other);
-
        }
+
        other => panic!("Unexpected fetch lookup: {:?}", other),
    }
    rad::fork(acme, &alice.signer, &alice.storage).unwrap();

@@ -429,3 +436,34 @@ fn test_clone() {

    assert_eq!(oid, *canonical);
}
+

+
#[test]
+
fn test_fetch_up_to_date() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let alice = Node::new(tmp.path());
+
    let mut bob = Node::new(tmp.path());
+
    let acme = bob.project("acme");
+

+
    let mut alice = alice.spawn(service::Config::default());
+
    let bob = bob.spawn(service::Config::default());
+

+
    alice.connect(&bob);
+
    converge([&alice, &bob]);
+

+
    transport::local::register(alice.storage.clone());
+

+
    let _ = alice.handle.track_repo(acme).unwrap();
+

+
    match alice.handle.fetch(acme).unwrap() {
+
        FetchLookup::Found { seeds, results } => for _ in results.iter().take(seeds.len()) {},
+
        other => panic!("Unexpected fetch lookup: {:?}", other),
+
    }
+

+
    // Fetch again! This time, everything's up to date.
+
    match alice.handle.fetch(acme).unwrap() {
+
        FetchLookup::Found { seeds, results } => for _ in results.iter().take(seeds.len()) {},
+
        other => panic!("Unexpected fetch lookup: {:?}", other),
+
    }
+
}
modified radicle-node/src/worker.rs
@@ -35,6 +35,7 @@ pub struct WorkerResp<G: Signer + EcSign> {
struct Worker<G: Signer + EcSign> {
    storage: Storage,
    tasks: chan::Receiver<WorkerReq<G>>,
+
    daemon: net::SocketAddr,
    timeout: time::Duration,
    handle: Handle<G>,
    name: String,
@@ -89,8 +90,11 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
                Err((session, err)) => return (session, Err(err.into())),
            };
            let result = self.fetch(fetch, &mut tunnel);
-
            let session = tunnel.into_session();
+
            let mut session = tunnel.into_session();

+
            if let Err(err) = pktline::flush(&mut session) {
+
                log::error!(target: "worker", "Fetch error: {err}");
+
            }
            if let Err(err) = &result {
                log::error!(target: "worker", "Fetch error: {err}");
            }
@@ -132,7 +136,7 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
            .arg("fetch")
            .arg("--atomic") // FIXME: Not available on 2.30 (debian standard)
            .arg("--verbose")
-
            .arg(format!("git://{tunnel_addr}/{}", repo.id))
+
            .arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()))
            // FIXME: We need to omit our own namespace from this refspec in case we're fetching '*'.
            .arg(fetch.namespaces.as_fetchspec())
            .stdout(process::Stdio::piped())
@@ -172,77 +176,52 @@ impl<G: Signer + EcSign + 'static> Worker<G> {
        stream_r: &mut WireReader,
        stream_w: &mut WireWriter<G>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        let repo = self.storage.repository(fetch.repo)?;
-
        let mut child = process::Command::new("git")
-
            .current_dir(repo.path())
-
            .env_clear()
-
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
-
            .args(["-c", "protocol.version=2"])
-
            .arg("upload-pack")
-
            .arg("--strict") // The path to the git repo must be exact.
-
            .arg(".")
-
            .stdout(process::Stdio::piped())
-
            .stderr(process::Stdio::piped())
-
            .stdin(process::Stdio::piped())
-
            .spawn()?;
-

-
        let mut stdin = child.stdin.take().unwrap();
-
        let mut stdout = child.stdout.take().unwrap();
-
        let mut reader = pktline::GitReader::new(drain, stream_r);
-
        let stderr = child.stderr.take().unwrap();
-

-
        thread::Builder::new().name(self.name.clone()).spawn(|| {
-
            for line in BufReader::new(stderr).lines().flatten() {
-
                log::error!(target: "worker", "Git: {}", line);
-
            }
-
        })?;
-

-
        match reader.read_command_pkt_line() {
-
            Ok((cmd, _pktline)) => {
+
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)?;
+
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone().unwrap(), daemon);
+
        let mut stream_reader = pktline::GitReader::new(drain, stream_r);
+
        let mut daemon_reader = pktline::GitReader::new(vec![], &mut daemon_r);
+
        let mut buffer = [0; u16::MAX as usize + 1];
+

+
        let request = match stream_reader.read_request_pkt_line() {
+
            Ok((req, pktline)) => {
                log::debug!(
                    target: "worker",
-
                    "Parsed git command packet-line for {}: {:?}", fetch.repo, cmd
+
                    "Parsed git command packet-line for {}: {:?}", fetch.repo, req
                );
-
                if cmd.repo != fetch.repo {
+
                if req.repo != fetch.repo {
                    return Err(FetchError::Git(git::raw::Error::from_str(
                        "git pkt-line command does not match fetch request",
                    )));
                }
+
                pktline
            }
            Err(err) => {
                return Err(FetchError::Git(git::raw::Error::from_str(&format!(
                    "error parsing git command packet-line: {err}"
                ))));
            }
-
        }
-

-
        thread::scope::<_, Result<Vec<RefUpdate>, FetchError>>(|scope| {
-
            // Output of `upload-pack` is sent back to the remote peer.
-
            let outgoing = scope.spawn(move || io::copy(&mut stdout, stream_w));
-

-
            let mut buf = [0; 65536];
-
            // Data coming from the remote peer is written to the standard input of the
-
            // `upload-pack` process.
-
            while !outgoing.is_finished() {
-
                let n = reader.read_pkt_line(&mut buf)?;
-

-
                stdin.write_all(&buf[..n]).unwrap();
-
                log::trace!(target: "worker", "Received {:?}", String::from_utf8_lossy(&buf[..n]));
+
        };
+
        daemon_w.write_all(&request)?;

-
                if &buf[..n] == pktline::DONE {
+
        loop {
+
            if let Err(e) = daemon_reader.read_pkt_lines(stream_w, &mut buffer) {
+
                // This is the expected error when the remote disconnects.
+
                if e.kind() == io::ErrorKind::UnexpectedEof {
                    break;
                }
-
            }
-
            // SAFETY: The thread should not panic, but if it does, we bubble up the panic.
-
            outgoing.join().unwrap()?;
+
                log::debug!(target: "worker", "Upload of {} to {} returned error: {e}", fetch.repo, fetch.remote);

-
            if child.wait()?.success() {
-
                log::debug!(target: "worker", "Upload-pack for {} exited successfully", fetch.repo);
-
            } else {
-
                log::error!(target: "worker", "Upload-pack for {} exited with error", fetch.repo);
+
                return Err(e.into());
+
            }
+
            if let Err(e) = stream_reader.read_pkt_lines(&mut daemon_w, &mut buffer) {
+
                log::debug!(target: "worker", "Remote returned error: {e}");
+
                break;
            }
-
            Ok(vec![])
-
        })
+
        }
+
        log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.repo, fetch.remote);
+

+
        // TODO: Don't return anything when uploading.
+
        Ok(vec![])
    }
}

@@ -257,6 +236,7 @@ impl WorkerPool {
        capacity: usize,
        timeout: time::Duration,
        storage: Storage,
+
        daemon: net::SocketAddr,
        tasks: chan::Receiver<WorkerReq<G>>,
        handle: Handle<G>,
        name: String,
@@ -266,6 +246,7 @@ impl WorkerPool {
            let worker = Worker {
                tasks: tasks.clone(),
                storage: storage.clone(),
+
                daemon,
                handle: handle.clone(),
                timeout,
                name: name.clone(),
@@ -306,7 +287,10 @@ mod pktline {
    pub const FLUSH_PKT: &[u8; HEADER_LEN] = b"0000";
    pub const DELIM_PKT: &[u8; HEADER_LEN] = b"0001";
    pub const RESPONSE_END_PKT: &[u8; HEADER_LEN] = b"0002";
-
    pub const DONE: &[u8] = b"0009done\n";
+

+
    pub fn flush<W: io::Write>(w: &mut W) -> io::Result<()> {
+
        write!(w, "0000")
+
    }

    pub struct GitReader<'a, R> {
        drain: Vec<u8>,
@@ -318,16 +302,16 @@ mod pktline {
            Self { drain, stream }
        }

-
        /// Parse a Git command packet-line.
+
        /// Parse a Git request packet-line.
        ///
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
        ///
-
        pub fn read_command_pkt_line(&mut self) -> io::Result<(GitCommand, Vec<u8>)> {
+
        pub fn read_request_pkt_line(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
            let mut pktline = [0u8; 1024];
            let length = self.read_pkt_line(&mut pktline)?;
-
            let Some(cmd) = GitCommand::parse(&pktline[4..length]) else {
-
            return Err(io::ErrorKind::InvalidInput.into());
-
        };
+
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
+
                return Err(io::ErrorKind::InvalidInput.into());
+
            };
            Ok((cmd, Vec::from(&pktline[..length])))
        }

@@ -350,6 +334,25 @@ mod pktline {

            Ok(length)
        }
+

+
        pub fn read_pkt_lines<W: io::Write>(
+
            &mut self,
+
            w: &mut W,
+
            buf: &mut [u8],
+
        ) -> io::Result<()> {
+
            loop {
+
                let n = self.read_pkt_line(buf)?;
+
                if n == 0 {
+
                    break;
+
                }
+
                w.write_all(&buf[..n])?;
+

+
                if &buf[..n] == FLUSH_PKT {
+
                    return Ok(());
+
                }
+
            }
+
            Ok(())
+
        }
    }

    impl<'a, R: io::Read> io::Read for GitReader<'a, R> {
@@ -366,14 +369,14 @@ mod pktline {
    }

    #[derive(Debug)]
-
    pub struct GitCommand {
+
    pub struct GitRequest {
        pub repo: Id,
        pub path: String,
        pub host: Option<(String, Option<u16>)>,
        pub extra: Vec<(String, Option<String>)>,
    }

-
    impl GitCommand {
+
    impl GitRequest {
        /// Parse a Git command from a packet-line.
        fn parse(input: &[u8]) -> Option<Self> {
            let input = str::from_utf8(input).ok()?;