Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
cli: upload-pack events
Fintan Halpenny committed 2 years ago
commit 740106d7595c4e2722ca2c8d0f6ee34f751f06d2
parent 3fe6bbe1cfe220295c3cb376b504089af4715f31
6 files changed +129 -14
modified radicle-cli/src/commands/init.rs
@@ -395,6 +395,8 @@ fn sync(
    spinner.message("Syncing..");

    let mut replicas = HashSet::new();
+
    // Start upload pack as None and set it if we encounter an event
+
    let mut upload_pack = term::upload_pack::UploadPack::new();

    for e in events {
        match e {
@@ -413,21 +415,27 @@ fn sync(
                remote,
                progress,
            })) if rid == rid_ => {
-
                spinner.message(format!("Uploading {rid} to {remote} {progress}"));
+
                log::debug!("Upload progress for {remote}: {progress}");
            }
+
            Ok(Event::UploadPack(UploadPack::PackProgress {
+
                rid: rid_,
+
                remote,
+
                transmitted,
+
            })) if rid == rid_ => spinner.message(upload_pack.transmitted(remote, transmitted)),
            Ok(Event::UploadPack(UploadPack::Done {
                rid: rid_,
                remote,
                status,
            })) if rid == rid_ => {
-
                spinner.message(format!("Upload done for {rid} to {remote}: {status}"));
+
                log::debug!("Upload done for {rid} to {remote} with status: {status}");
+
                spinner.message(upload_pack.done(&remote));
            }
            Ok(Event::UploadPack(UploadPack::Error {
                rid: rid_,
                remote,
                err,
            })) if rid == rid_ => {
-
                spinner.message(format!("Upload error for {rid} to {remote}: {err}"));
+
                term::warning(format!("Upload error for {rid} to {remote}: {err}"));
            }
            Ok(_) => {
                // Some other irrelevant event received.
modified radicle-cli/src/terminal.rs
@@ -8,6 +8,7 @@ pub mod highlight;
pub mod issue;
pub mod json;
pub mod patch;
+
pub mod upload_pack;

use std::ffi::OsString;
use std::process;
modified radicle-cli/src/terminal/format.rs
@@ -82,6 +82,22 @@ pub fn timestamp(time: impl Into<LocalTime>) -> Paint<String> {
    Paint::new(fmt.convert(duration.into()))
}

+
pub fn bytes(size: usize) -> Paint<String> {
+
    const KB: usize = 1024;
+
    const MB: usize = 1024usize.pow(2);
+
    const GB: usize = 1024usize.pow(3);
+
    let size = if size < KB {
+
        format!("{size} B")
+
    } else if size < MB {
+
        format!("{} KiB", size / KB)
+
    } else if size < GB {
+
        format!("{} MiB", size / MB)
+
    } else {
+
        format!("{} GiB", size / GB)
+
    };
+
    Paint::new(size)
+
}
+

/// Format a ref update.
pub fn ref_update(update: &RefUpdate) -> Paint<&'static str> {
    match update {
@@ -358,4 +374,14 @@ mod test {
        let res = strip_comments(test);
        assert_eq!(exp, res);
    }
+

+
    #[test]
+
    fn test_bytes() {
+
        assert_eq!(bytes(1023).to_string(), "1023 B");
+
        assert_eq!(bytes(1024).to_string(), "1 KiB");
+
        assert_eq!(bytes(1024 * 9).to_string(), "9 KiB");
+
        assert_eq!(bytes(1024usize.pow(2)).to_string(), "1 MiB");
+
        assert_eq!(bytes(1024usize.pow(2) * 56).to_string(), "56 MiB");
+
        assert_eq!(bytes(1024usize.pow(3) * 1024).to_string(), "1024 GiB");
+
    }
}
added radicle-cli/src/terminal/upload_pack.rs
@@ -0,0 +1,50 @@
+
use std::collections::BTreeSet;
+
use std::time::Instant;
+

+
use crate::terminal::format;
+
use radicle::node::NodeId;
+

+
/// Keeps track of upload-pack progress for displaying to the terminal.
+
pub struct UploadPack {
+
    /// Keep track of which remotes are being uploaded to, removing any that
+
    /// have completed.
+
    remotes: BTreeSet<NodeId>,
+
    /// Keep track of how long we've been transmitting for to calculate
+
    /// throughput.
+
    timer: Instant,
+
}
+

+
impl Default for UploadPack {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

+
impl UploadPack {
+
    /// Construct an empty set of spinners.
+
    pub fn new() -> Self {
+
        Self {
+
            remotes: BTreeSet::new(),
+
            timer: Instant::now(),
+
        }
+
    }
+

+
    /// Display the number of peers, the total transmitted bytes, and the
+
    /// throughput.
+
    pub fn transmitted(&mut self, remote: NodeId, transmitted: usize) -> String {
+
        self.remotes.insert(remote);
+
        let throughput = transmitted as f64 / self.timer.elapsed().as_secs_f64();
+
        let throughput = format::bytes(throughput.floor() as usize);
+
        let n = self.remotes.len();
+
        let transmitted = format::bytes(transmitted);
+
        format!("Uploading to {n} peer(s) ({transmitted} | {throughput:.2}/s)")
+
    }
+

+
    /// Display which remote has completed upload-pack and how many are
+
    /// remaining.
+
    pub fn done(&mut self, remote: &NodeId) -> String {
+
        self.remotes.remove(remote);
+
        let n = self.remotes.len();
+
        format!("Uploaded to {remote}, {n} peer(s) remaining..")
+
    }
+
}
modified radicle-node/src/worker/upload_pack.rs
@@ -83,12 +83,8 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
-
    let mut reporter = Reporter {
-
        rid: header.repo,
-
        remote,
-
        emitter: emitter.clone(),
-
        send,
-
    };
+
    let mut reporter = Reporter::new(header.repo, remote, emitter.clone(), send);
+

    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
            // N.b. we indefinitely copy stdout to the sender,
@@ -152,17 +148,32 @@ struct Reporter<W> {
    remote: NodeId,
    emitter: Emitter<Event>,
    send: W,
+
    total: usize,
}

impl<W> Reporter<W> {
-
    fn emit(&self, buf: &[u8]) {
-
        if let Some(progress) = Self::as_upload_pack_progress(buf) {
-
            log::trace!(target: "worker", "upload-pack progress: {progress}");
-
            self.emitter
-
                .emit(events::UploadPack::write(self.rid, self.remote, progress).into());
+
    fn new(rid: RepoId, remote: NodeId, emitter: Emitter<Event>, send: W) -> Self {
+
        Self {
+
            rid,
+
            remote,
+
            emitter,
+
            send,
+
            total: 0,
        }
    }

+
    fn emit(&mut self, buf: &[u8]) {
+
        let event = match Self::as_upload_pack_progress(buf) {
+
            Some(progress) => events::UploadPack::write(self.rid, self.remote, progress),
+
            None => {
+
                self.total += buf.len();
+
                events::UploadPack::pack_progress(self.rid, self.remote, self.total)
+
            }
+
        };
+
        log::trace!(target: "worker", "upload-pack progress: {event:?}");
+
        self.emitter.emit(event.into())
+
    }
+

    fn as_upload_pack_progress(buf: &[u8]) -> Option<events::upload_pack::Progress> {
        use events::upload_pack::Progress::*;
        let gix_protocol::RemoteProgress {
modified radicle/src/node/events/upload_pack.rs
@@ -29,6 +29,7 @@ pub enum UploadPack {
        /// The progress metadata of the upload-pack.
        progress: Progress,
    },
+
    /// An error occurred during the upload-pack process.
    Error {
        /// The repository being fetched.
        rid: RepoId,
@@ -37,9 +38,27 @@ pub enum UploadPack {
        /// The error that occurred during the upload-pack.
        err: String,
    },
+
    /// The upload-pack packfile transmission progress.
+
    PackProgress {
+
        /// The repository being fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        remote: NodeId,
+
        /// The total number of bytes transmitted.
+
        transmitted: usize,
+
    },
}

impl UploadPack {
+
    /// Construct a `UploadPack::PackProgress` event.
+
    pub fn pack_progress(rid: RepoId, remote: NodeId, transmitted: usize) -> Self {
+
        Self::PackProgress {
+
            rid,
+
            remote,
+
            transmitted,
+
        }
+
    }
+

    /// Construct a `UploadPack::Write` event.
    pub fn write(rid: RepoId, remote: NodeId, progress: Progress) -> Self {
        Self::Write {