Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Improve Upload Experience: Part II
Merged fintohaps opened 1 year ago

Further improvements to the upload experience. See commits inlined below.

cli: upload-pack events and spinners

Introduce a set of types for initialising and updating a set of spinners for upload-pack events. The spinners react to the events that are emitted from the upload-pack process in the worker.

Each set of spinners are for a given remote since the protocol may interact with multiple remotes at the same time when announcing.

The spinners are lazy when first constructed and are only constructed once an upload-pack event is provided for a given remote.

The events expected for the spinners are: enumerating objects, counting objects, compressing objects, and the throughput of sending the bytes to the receiving end.

node: simplify upload-pack reader joining

The join method is equivalent to the custom loop that was written for joining the reader thread in upload-pack.

node: reduce default upload-pack timeout

The default upload-pack timeout can be reduced by noticing the fact that if the stdout stream is still sending data, then the reader thread can stay alive.

This is achieved by keeping a timer that gets set everytime the upload-pack writer sends data. If the reader then runs into a timeout, it can check if this timer has elapsed the timeout as well, otherwise it can loop back and read again. This will either lead to waiting long enough that the EOF message is received, or both streams timeout.

From empirical testing, this can result in the timeout being exceeded but the EOF message not being received, if the pack data is large enough that the receiving side takes more time to write it to disk. However, this means that the upload should, in theory, have completed successfully either way.

The --timeout option is also used in the git-upload-pack process for good measure.

9 files changed +149 -24 3fe6bbe1 064ece32
modified radicle-cli/src/commands/init.rs
@@ -395,6 +395,8 @@ fn sync(
    spinner.message("Syncing..");

    let mut replicas = HashSet::new();
+
    // Start upload pack as None and set it if we encounter an event
+
    let mut upload_pack = term::upload_pack::UploadPack::new();

    for e in events {
        match e {
@@ -413,21 +415,27 @@ fn sync(
                remote,
                progress,
            })) if rid == rid_ => {
-
                spinner.message(format!("Uploading {rid} to {remote} {progress}"));
+
                log::debug!("Upload progress for {remote}: {progress}");
            }
+
            Ok(Event::UploadPack(UploadPack::PackProgress {
+
                rid: rid_,
+
                remote,
+
                transmitted,
+
            })) if rid == rid_ => spinner.message(upload_pack.transmitted(remote, transmitted)),
            Ok(Event::UploadPack(UploadPack::Done {
                rid: rid_,
                remote,
                status,
            })) if rid == rid_ => {
-
                spinner.message(format!("Upload done for {rid} to {remote}: {status}"));
+
                log::debug!("Upload done for {rid} to {remote} with status: {status}");
+
                spinner.message(upload_pack.done(&remote));
            }
            Ok(Event::UploadPack(UploadPack::Error {
                rid: rid_,
                remote,
                err,
            })) if rid == rid_ => {
-
                spinner.message(format!("Upload error for {rid} to {remote}: {err}"));
+
                term::warning(format!("Upload error for {rid} to {remote}: {err}"));
            }
            Ok(_) => {
                // Some other irrelevant event received.
modified radicle-cli/src/terminal.rs
@@ -8,6 +8,7 @@ pub mod highlight;
pub mod issue;
pub mod json;
pub mod patch;
+
pub mod upload_pack;

use std::ffi::OsString;
use std::process;
modified radicle-cli/src/terminal/format.rs
@@ -82,6 +82,22 @@ pub fn timestamp(time: impl Into<LocalTime>) -> Paint<String> {
    Paint::new(fmt.convert(duration.into()))
}

+
pub fn bytes(size: usize) -> Paint<String> {
+
    const KB: usize = 1024;
+
    const MB: usize = 1024usize.pow(2);
+
    const GB: usize = 1024usize.pow(3);
+
    let size = if size < KB {
+
        format!("{size} B")
+
    } else if size < MB {
+
        format!("{} KiB", size / KB)
+
    } else if size < GB {
+
        format!("{} MiB", size / MB)
+
    } else {
+
        format!("{} GiB", size / GB)
+
    };
+
    Paint::new(size)
+
}
+

/// Format a ref update.
pub fn ref_update(update: &RefUpdate) -> Paint<&'static str> {
    match update {
@@ -358,4 +374,14 @@ mod test {
        let res = strip_comments(test);
        assert_eq!(exp, res);
    }
+

+
    #[test]
+
    fn test_bytes() {
+
        assert_eq!(bytes(1023).to_string(), "1023 B");
+
        assert_eq!(bytes(1024).to_string(), "1 KiB");
+
        assert_eq!(bytes(1024 * 9).to_string(), "9 KiB");
+
        assert_eq!(bytes(1024usize.pow(2)).to_string(), "1 MiB");
+
        assert_eq!(bytes(1024usize.pow(2) * 56).to_string(), "56 MiB");
+
        assert_eq!(bytes(1024usize.pow(3) * 1024).to_string(), "1024 GiB");
+
    }
}
added radicle-cli/src/terminal/upload_pack.rs
@@ -0,0 +1,50 @@
+
use std::collections::BTreeSet;
+
use std::time::Instant;
+

+
use crate::terminal::format;
+
use radicle::node::NodeId;
+

+
/// Keeps track of upload-pack progress for displaying to the terminal.
+
pub struct UploadPack {
+
    /// Keep track of which remotes are being uploaded to, removing any that
+
    /// have completed.
+
    remotes: BTreeSet<NodeId>,
+
    /// Keep track of how long we've been transmitting for to calculate
+
    /// throughput.
+
    timer: Instant,
+
}
+

+
impl Default for UploadPack {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

+
impl UploadPack {
+
    /// Construct an empty set of spinners.
+
    pub fn new() -> Self {
+
        Self {
+
            remotes: BTreeSet::new(),
+
            timer: Instant::now(),
+
        }
+
    }
+

+
    /// Display the number of peers, the total transmitted bytes, and the
+
    /// throughput.
+
    pub fn transmitted(&mut self, remote: NodeId, transmitted: usize) -> String {
+
        self.remotes.insert(remote);
+
        let throughput = transmitted as f64 / self.timer.elapsed().as_secs_f64();
+
        let throughput = format::bytes(throughput.floor() as usize);
+
        let n = self.remotes.len();
+
        let transmitted = format::bytes(transmitted);
+
        format!("Uploading to {n} peers ({transmitted} | {throughput:.2}/s)")
+
    }
+

+
    /// Display which remote has completed upload-pack and how many are
+
    /// remaining.
+
    pub fn done(&mut self, remote: &NodeId) -> String {
+
        self.remotes.remove(remote);
+
        let n = self.remotes.len();
+
        format!("Uploaded to {remote}, {n} peers remaining..")
+
    }
+
}
modified radicle-node/src/service.rs
@@ -98,8 +98,8 @@ pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Connection retry delta used for ephemeral peers that failed to connect previously.
pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
-
/// How long to wait for a fetch to stall before aborting, default is 1 hour.
-
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3600);
+
/// How long to wait for a fetch to stall before aborting, default is 3s.
+
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);

/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
modified radicle-node/src/worker.rs
@@ -238,6 +238,7 @@ impl Worker {
            FetchRequest::Responder { remote, emitter } => {
                log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");

+
                let timeout = channels.timeout();
                let (mut stream_r, stream_w) = channels.split();
                let header = match upload_pack::pktline::git_request(&mut stream_r) {
                    Ok(header) => header,
@@ -265,6 +266,7 @@ impl Worker {
                    &header,
                    stream_r,
                    stream_w,
+
                    timeout,
                )
                .map(|_| ())
                .map_err(|e| e.into());
modified radicle-node/src/worker/channels.rs
@@ -34,6 +34,10 @@ impl ChannelsFlush {
    pub fn split(&mut self) -> (&mut ChannelReader, &mut ChannelFlushWriter) {
        (&mut self.receiver, &mut self.sender)
    }
+

+
    pub fn timeout(&self) -> time::Duration {
+
        self.sender.writer.timeout.max(self.receiver.timeout)
+
    }
}

impl radicle_fetch::transport::ConnectionStream for ChannelsFlush {
modified radicle-node/src/worker/upload_pack.rs
@@ -1,7 +1,7 @@
use std::io;
use std::io::Write;
use std::process::{Command, ExitStatus, Stdio};
-
use std::time::Instant;
+
use std::time::{Duration, Instant};

use gix_protocol::transport::bstr::ByteSlice;
use radicle::identity::RepoId;
@@ -27,6 +27,7 @@ pub fn upload_pack<R, W>(
    header: &pktline::GitRequest,
    mut recv: R,
    send: W,
+
    timeout: Duration,
) -> io::Result<ExitStatus>
where
    R: io::Read + Send,
@@ -72,6 +73,7 @@ where
                "lsrefs.unborn=ignore",
                "upload-pack",
                "--strict",
+
                format!("--timeout={}", timeout.as_secs()).as_str(),
                ".",
            ])
            .stdout(Stdio::piped())
@@ -83,12 +85,8 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
-
    let mut reporter = Reporter {
-
        rid: header.repo,
-
        remote,
-
        emitter: emitter.clone(),
-
        send,
-
    };
+
    let mut reporter = Reporter::new(header.repo, remote, emitter.clone(), send);
+

    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
            // N.b. we indefinitely copy stdout to the sender,
@@ -117,6 +115,12 @@ where
                        log::debug!(target: "worker", "Exiting upload-pack reader thread for {}", header.repo);
                        break;
                    }
+
                    // N.b. if the read timed out, ensure that the sender isn't
+
                    // still sending messages.
+
                    Err(e) if e.kind() == io::ErrorKind::TimedOut => {
+
                        log::warn!(target: "worker", "Read channel timed out for upload-pack {}", header.repo);
+
                        break;
+
                    }
                    Err(e) => {
                        log::error!(target: "worker", "Error on upload-pack channel read for {}: {e}", header.repo);
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
@@ -128,14 +132,10 @@ where

        // N.b. we only care if the `reader` is finished. We then kill
        // the child which will end the thread for the sender.
-
        loop {
-
            if reader.is_finished() {
-
                child.kill()?;
-
                break;
-
            } else {
-
                std::thread::sleep(std::time::Duration::from_millis(100));
-
            }
+
        if let Err(e) = reader.join() {
+
            log::warn!(target: "worker", "Upload pack thread panicked: {e:?}");
        }
+
        child.kill()?;
        Ok::<_, io::Error>(())
    })?;

@@ -152,17 +152,32 @@ struct Reporter<W> {
    remote: NodeId,
    emitter: Emitter<Event>,
    send: W,
+
    total: usize,
}

impl<W> Reporter<W> {
-
    fn emit(&self, buf: &[u8]) {
-
        if let Some(progress) = Self::as_upload_pack_progress(buf) {
-
            log::trace!(target: "worker", "upload-pack progress: {progress}");
-
            self.emitter
-
                .emit(events::UploadPack::write(self.rid, self.remote, progress).into());
+
    fn new(rid: RepoId, remote: NodeId, emitter: Emitter<Event>, send: W) -> Self {
+
        Self {
+
            rid,
+
            remote,
+
            emitter,
+
            send,
+
            total: 0,
        }
    }

+
    fn emit(&mut self, buf: &[u8]) {
+
        let event = match Self::as_upload_pack_progress(buf) {
+
            Some(progress) => events::UploadPack::write(self.rid, self.remote, progress),
+
            None => {
+
                self.total += buf.len();
+
                events::UploadPack::pack_progress(self.rid, self.remote, self.total)
+
            }
+
        };
+
        log::trace!(target: "worker", "upload-pack progress: {event:?}");
+
        self.emitter.emit(event.into());
+
    }
+

    fn as_upload_pack_progress(buf: &[u8]) -> Option<events::upload_pack::Progress> {
        use events::upload_pack::Progress::*;
        let gix_protocol::RemoteProgress {
modified radicle/src/node/events/upload_pack.rs
@@ -29,6 +29,7 @@ pub enum UploadPack {
        /// The progress metadata of the upload-pack.
        progress: Progress,
    },
+
    /// An error occurred during the upload-pack process.
    Error {
        /// The repository being fetched.
        rid: RepoId,
@@ -37,9 +38,27 @@ pub enum UploadPack {
        /// The error that occurred during the upload-pack.
        err: String,
    },
+
    /// The upload-pack packfile transmission progress.
+
    PackProgress {
+
        /// The repository being fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        remote: NodeId,
+
        /// The total number of bytes transmitted.
+
        transmitted: usize,
+
    },
}

impl UploadPack {
+
    /// Construct a `UploadPack::PackProgress` event.
+
    pub fn pack_progress(rid: RepoId, remote: NodeId, transmitted: usize) -> Self {
+
        Self::PackProgress {
+
            rid,
+
            remote,
+
            transmitted,
+
        }
+
    }
+

    /// Construct a `UploadPack::Write` event.
    pub fn write(rid: RepoId, remote: NodeId, progress: Progress) -> Self {
        Self::Write {