Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix worker fetch deadlock
Alexis Sellier committed 2 years ago
commit 5fa677c5fbc1a79d2d712a82117f2ece85c1e786
parent 4bf15bf7ca867baa5e9d838d3fadd9036fb933b4
3 files changed +48 -34
modified radicle-node/src/wire/protocol.rs
@@ -560,6 +560,8 @@ where
                                ..
                            })) => {
                                if let Some(channels) = streams.get(&stream) {
+
                                    log::debug!(target: "wire", "Received end-of-file on id={stream} from {nid}");
+

                                    if channels.send(ChannelEvent::Eof).is_err() {
                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
                                    }
modified radicle-node/src/worker.rs
@@ -400,7 +400,7 @@ impl Worker {
        stream: StreamId,
        channels: &mut Channels,
    ) -> Result<BTreeSet<git::Namespaced<'static>>, FetchError> {
-
        let mut tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
+
        let tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
        let tunnel_addr = tunnel.local_addr();
        let mut cmd = process::Command::new("git");
        cmd.current_dir(repo.path())
@@ -421,45 +421,57 @@ impl Worker {

        log::debug!(target: "worker", "Running command: {:?}", cmd);

+
        let mut refs = BTreeSet::new();
        let mut child = cmd.spawn()?;
        let stderr = child.stderr.take().unwrap();
        let stdout = child.stdout.take().unwrap();

-
        thread::Builder::new().name(self.name.clone()).spawn(|| {
-
            for line in BufReader::new(stderr).lines().flatten() {
-
                log::debug!(target: "worker", "Git: {}", line);
-
            }
-
        })?;
-

-
        tunnel.run(self.timeout)?;
-

-
        let result = child.wait()?;
-
        if result.success() {
-
            let mut refs = BTreeSet::new();
-

-
            for line in BufReader::new(stdout).lines().flatten() {
-
                log::debug!(target: "worker", "Git: {line}");
-
                let r = match line.split_whitespace().next_back() {
-
                    Some(r) => r,
-
                    None => {
-
                        log::trace!(target: "worker", "Git: ls-remote returned unexpected format {line}");
-
                        continue;
+
        // Since `ls-remote` may return a lot of data, we read the child's stdout concurrently, to
+
        // prevent deadlocks that could arise if we fill the pipe buffer before the process exits.
+
        thread::scope(|s| {
+
            thread::Builder::new()
+
                .name(self.name.clone())
+
                .spawn_scoped(s, || {
+
                    for line in BufReader::new(stderr).lines().flatten() {
+
                        log::debug!(target: "worker", "Git: {}", line);
                    }
-
                };
-
                match git::RefString::try_from(r) {
-
                    Ok(r) => {
-
                        if let Some(ns) = r.to_namespaced() {
-
                            refs.insert(ns.to_owned());
-
                        } else {
-
                            log::debug!(target: "worker", "Git: non-namespaced ref '{r}'")
+
                })?;
+
            thread::Builder::new()
+
                .name(self.name.clone())
+
                .spawn_scoped(s, || {
+
                    for line in BufReader::new(stdout).lines().flatten() {
+
                        log::debug!(target: "worker", "Git: {}", line);
+

+
                        let r = match line.split_whitespace().next_back() {
+
                            Some(r) => r,
+
                            None => {
+
                                log::trace!(target: "worker", "Git: ls-remote returned unexpected format {line}");
+
                                continue;
+
                            }
+
                        };
+
                        match git::RefString::try_from(r) {
+
                            Ok(r) => {
+
                                if let Some(ns) = r.to_namespaced() {
+
                                    refs.insert(ns.to_owned());
+
                                } else {
+
                                    log::debug!(target: "worker", "Git: non-namespaced ref '{r}'")
+
                                }
+
                            }
+
                            Err(err) => {
+
                                log::warn!(target: "worker", "Git: invalid refname '{r}' {err}")
+
                            }
                        }
                    }
-
                    Err(err) => {
-
                        log::warn!(target: "worker", "Git: invalid refname '{r}' {err}")
-
                    }
-
                }
-
            }
+
                })?;

+
            tunnel.run(self.timeout)?;
+

+
            Ok::<_, FetchError>(())
+
        })?;
+

+
        let result = child.wait()?;
+

+
        if result.success() {
            Ok(refs)
        } else {
            Err(FetchError::CommandFailed {
@@ -480,7 +492,7 @@ impl Worker {
    where
        S: IntoIterator<Item = fetch::Refspec>,
    {
-
        let mut tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
+
        let tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
        let tunnel_addr = tunnel.local_addr();
        let mut cmd = process::Command::new("git");
        cmd.current_dir(repo.path())
modified radicle-node/src/worker/tunnel.rs
@@ -44,7 +44,7 @@ impl<'a> Tunnel<'a> {
    }

    /// Run the tunnel until the connection is closed.
-
    pub fn run(&mut self, timeout: time::Duration) -> io::Result<()> {
+
    pub fn run(mut self, timeout: time::Duration) -> io::Result<()> {
        let (remote_w, remote_r) = self.channels.split();
        let (local, _) = self.listener.accept()?;
        let (mut local_r, local_w) = (local.try_clone()?, local);