Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: report upload-pack progress
Fintan Halpenny committed 2 years ago
commit c8a24d558416661fc3b39e9dffd323b71a23f224
parent 89627a8b6a58cad55267296c666d7c4b246e6c53
6 files changed +99 -12
modified Cargo.lock
@@ -2537,6 +2537,7 @@ dependencies = [
 "crossbeam-channel",
 "cyphernet",
 "fastrand",
+
 "gix-protocol",
 "io-reactor",
 "lexopt",
 "libc",
modified radicle-node/Cargo.toml
@@ -21,6 +21,7 @@ colored = { version = "2.1.0" }
crossbeam-channel = { version = "0.5.6" }
cyphernet = { version = "0.5.0", features = ["tor", "dns", "ed25519", "p2p-ed25519"] }
fastrand = { version = "2.0.0" }
+
gix-protocol = { version = "0.41.1", features = ["blocking-client"] }
io-reactor = { version = "0.5.1", features = ["popol"] }
lexopt = { version = "0.3.0" }
libc = { version = "0.2.137" }
modified radicle-node/src/service.rs
@@ -425,6 +425,10 @@ where
    pub fn local_time(&self) -> LocalTime {
        self.clock
    }
+

+
    pub fn emitter(&self) -> Emitter<Event> {
+
        self.emitter.clone()
+
    }
}

impl<D, S, G> Service<D, S, G>
modified radicle-node/src/wire/protocol.rs
@@ -721,7 +721,10 @@ where
                                };

                                let task = Task {
-
                                    fetch: FetchRequest::Responder { remote: *nid },
+
                                    fetch: FetchRequest::Responder {
+
                                        remote: *nid,
+
                                        emitter: self.service.emitter(),
+
                                    },
                                    stream,
                                    channels,
                                };
modified radicle-node/src/worker.rs
@@ -11,14 +11,14 @@ use std::path::PathBuf;
use crossbeam_channel as chan;

use radicle::identity::RepoId;
-
use radicle::node::notifications;
+
use radicle::node::{notifications, Event};
use radicle::prelude::NodeId;
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{cob, crypto, Storage};
use radicle_fetch::FetchLimit;

-
use crate::runtime::{thread, Handle};
+
use crate::runtime::{thread, Emitter, Handle};
use crate::service::policy;
use crate::service::policy::Policy;
use crate::wire::StreamId;
@@ -113,13 +113,15 @@ pub enum FetchRequest {
    Responder {
        /// Remote peer we are interacting with.
        remote: NodeId,
+
        /// Reporter for upload-pack progress.
+
        emitter: Emitter<Event>,
    },
}

impl FetchRequest {
    pub fn remote(&self) -> NodeId {
        match self {
-
            Self::Initiator { remote, .. } | Self::Responder { remote } => *remote,
+
            Self::Initiator { remote, .. } | Self::Responder { remote, .. } => *remote,
        }
    }
}
@@ -233,7 +235,7 @@ impl Worker {
                let result = self.fetch(rid, remote, refs_at, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
-
            FetchRequest::Responder { remote } => {
+
            FetchRequest::Responder { remote, emitter } => {
                log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");

                let (mut stream_r, stream_w) = channels.split();
@@ -255,10 +257,17 @@ impl Worker {
                    };
                }

-
                let result =
-
                    upload_pack::upload_pack(&self.nid, &self.storage, &header, stream_r, stream_w)
-
                        .map(|_| ())
-
                        .map_err(|e| e.into());
+
                let result = upload_pack::upload_pack(
+
                    &self.nid,
+
                    remote,
+
                    &self.storage,
+
                    &emitter,
+
                    &header,
+
                    stream_r,
+
                    stream_w,
+
                )
+
                .map(|_| ())
+
                .map_err(|e| e.into());
                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");

                FetchResult::Responder {
modified radicle-node/src/worker/upload_pack.rs
@@ -1,8 +1,13 @@
use std::io;
use std::io::Write;
use std::process::{Command, ExitStatus, Stdio};
+
use std::time::Instant;

-
use radicle::node::NodeId;
+
use gix_protocol::transport::bstr::ByteSlice;
+
use radicle::identity::RepoId;
+
use radicle::node::events;
+
use radicle::node::events::Emitter;
+
use radicle::node::{Event, NodeId};
use radicle::storage::git::paths;
use radicle::Storage;

@@ -16,15 +21,18 @@ use crate::runtime::thread;
/// send the EOF file message.
pub fn upload_pack<R, W>(
    nid: &NodeId,
+
    remote: NodeId,
    storage: &Storage,
+
    emitter: &Emitter<Event>,
    header: &pktline::GitRequest,
    mut recv: R,
-
    mut send: W,
+
    send: W,
) -> io::Result<ExitStatus>
where
    R: io::Read + Send,
    W: io::Write + Send,
{
+
    let timer = Instant::now();
    let protocol_version = header
        .extra
        .iter()
@@ -75,14 +83,21 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
+
    let mut reporter = Reporter {
+
        rid: header.repo,
+
        remote,
+
        emitter: emitter.clone(),
+
        send,
+
    };
    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
            // N.b. we indefinitely copy stdout to the sender,
            // i.e. there's no need for a loop.
-
            match io::copy(&mut stdout, &mut send) {
+
            match io::copy(&mut stdout, &mut reporter) {
                Ok(_) => {}
                Err(e) => {
                    log::error!(target: "worker", "Worker channel disconnected for {}; aborting: {e}", header.repo);
+
                    emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
                }
            }
        });
@@ -104,6 +119,7 @@ where
                    }
                    Err(e) => {
                        log::error!(target: "worker", "Error on upload-pack channel read for {}: {e}", header.repo);
+
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
                        break;
                    }
                }
@@ -124,9 +140,62 @@ where
    })?;

    let status = child.wait()?;
+
    emitter.emit(events::UploadPack::done(header.repo, remote, status).into());
+
    log::debug!(target: "worker", "Upload pack finished ({}ms)", timer.elapsed().as_millis());
    Ok(status)
}

+
/// A combination of the upload-pack sender with an [`Emitter`] for reporting
+
/// the progress events to subscribers.
+
struct Reporter<W> {
+
    rid: RepoId,
+
    remote: NodeId,
+
    emitter: Emitter<Event>,
+
    send: W,
+
}
+

+
impl<W> Reporter<W> {
+
    fn emit(&self, buf: &[u8]) {
+
        if let Some(progress) = Self::as_upload_pack_progress(buf) {
+
            log::trace!(target: "worker", "upload-pack progress: {progress}");
+
            self.emitter
+
                .emit(events::UploadPack::write(self.rid, self.remote, progress).into());
+
        }
+
    }
+

+
    fn as_upload_pack_progress(buf: &[u8]) -> Option<events::upload_pack::Progress> {
+
        use events::upload_pack::Progress::*;
+
        let gix_protocol::RemoteProgress {
+
            action, step, max, ..
+
        } = gix_protocol::RemoteProgress::from_bytes(buf)?;
+
        if action.contains_str("Counting objects") {
+
            step.and_then(|processed| max.map(|total| Counting { processed, total }))
+
        } else if action.contains_str("Compressing objects") {
+
            step.and_then(|processed| max.map(|total| Compressing { processed, total }))
+
        } else if action.contains_str("Enumerating objects") {
+
            max.map(|total| Enumerating { total })
+
        } else {
+
            None
+
        }
+
    }
+
}
+

+
impl<W> io::Write for Reporter<W>
+
where
+
    W: io::Write,
+
{
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        let n = self.send.write(buf)?;
+
        self.send.flush()?;
+
        self.emit(buf);
+
        Ok(n)
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        self.send.flush()
+
    }
+
}
+

pub(super) mod pktline {
    use std::io;
    use std::io::Read;