Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
Improve Uploading Experience
Merged fintohaps opened 2 years ago

The main goal of the patch is to improve the upload experience of repositories that are larger in size, e.g. 300MB and 3GB.

This is achieved by a series of changes, outlined in each commit, and summarised here:

  1. Dynamic timeouts for fetching by passing the timeout through the use Io::Fetch and Control::Open. This removes the DEFAULT_CHANNEL_TIMEOUT and we now default to FETCH_TIMEOUT when it’s not specified.
  2. The Emitter is moved to the radicle crate for re-use in radicle-fetch and radicle-node.
  3. Introduce events for the upload-pack process.
  4. Change the call method for the socket stream to use crossbeam-channel so that events can be subscribed to indefinitely but received with a timeout.
  5. The default value of FETCH_TIMEOUT is increased to 1 hour to allow the larger repositories to finish announcing and being cloned by seeds when initialised.
  6. Improve rad init to report the upload-pack events.
17 files changed +426 -152 cadd996a 72c71501
modified Cargo.lock
@@ -2537,6 +2537,7 @@ dependencies = [
 "crossbeam-channel",
 "cyphernet",
 "fastrand",
+
 "gix-protocol",
 "io-reactor",
 "lexopt",
 "libc",
modified radicle-cli/examples/rad-init-sync-preferred.md
@@ -10,6 +10,7 @@ Initializing public radicle 👾 repository in [..]
Your Repository ID (RID) is rad:z3Rry7rpdWuGpfjPYGzdJKQADsoNW.
You can show it any time by running `rad .` from this directory.

+
✓ Repository successfully synced to z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
✓ Repository successfully synced to 1 node(s).

Your repository has been synced to the network and is now discoverable by peers.
modified radicle-cli/src/commands/init.rs
@@ -2,10 +2,10 @@
#![allow(clippy::collapsible_else_if)]
use std::collections::HashSet;
use std::convert::TryFrom;
+
use std::env;
use std::ffi::OsString;
use std::path::PathBuf;
use std::str::FromStr;
-
use std::{env, time};

use anyhow::{anyhow, bail, Context as _};
use serde_json as json;
@@ -14,8 +14,9 @@ use radicle::crypto::{ssh, Verified};
use radicle::explorer::ExplorerUrl;
use radicle::git::RefString;
use radicle::identity::{RepoId, Visibility};
+
use radicle::node::events::UploadPack;
use radicle::node::policy::Scope;
-
use radicle::node::{Event, Handle, NodeId};
+
use radicle::node::{Event, Handle, NodeId, DEFAULT_SUBSCRIBE_TIMEOUT};
use radicle::prelude::Doc;
use radicle::{profile, Node};

@@ -364,7 +365,9 @@ fn sync(
        return Ok(SyncResult::NodeStopped);
    }
    let mut spinner = term::spinner("Updating inventory..");
-
    let events = node.subscribe(time::Duration::from_secs(3))?;
+
    // N.b. indefinitely subscribe to events and set a lower timeout on events
+
    // below.
+
    let events = node.subscribe(DEFAULT_SUBSCRIBE_TIMEOUT)?;
    let sessions = node.sessions()?;

    node.update_inventory(rid)?;
@@ -392,17 +395,40 @@ fn sync(
    spinner.message("Syncing..");

    let mut replicas = HashSet::new();
+

    for e in events {
        match e {
            Ok(Event::RefsSynced {
                remote, rid: rid_, ..
            }) if rid == rid_ => {
+
                term::success!("Repository successfully synced to {remote}");
                replicas.insert(remote);
                // If we manage to replicate to one of our preferred seeds, we can stop waiting.
                if config.preferred_seeds.iter().any(|s| s.id == remote) {
                    break;
                }
            }
+
            Ok(Event::UploadPack(UploadPack::Write {
+
                rid: rid_,
+
                remote,
+
                progress,
+
            })) if rid == rid_ => {
+
                spinner.message(format!("Uploading {rid} to {remote} {progress}"));
+
            }
+
            Ok(Event::UploadPack(UploadPack::Done {
+
                rid: rid_,
+
                remote,
+
                status,
+
            })) if rid == rid_ => {
+
                spinner.message(format!("Upload done for {rid} to {remote}: {status}"));
+
            }
+
            Ok(Event::UploadPack(UploadPack::Error {
+
                rid: rid_,
+
                remote,
+
                err,
+
            })) if rid == rid_ => {
+
                spinner.message(format!("Upload error for {rid} to {remote}: {err}"));
+
            }
            Ok(_) => {
                // Some other irrelevant event received.
            }
modified radicle-cli/src/commands/node/events.rs
@@ -1,10 +1,13 @@
use std::time;

-
use radicle::node::Handle;
+
use radicle::node::{Event, Handle};

-
pub fn run(node: impl Handle, count: usize, timeout: time::Duration) -> anyhow::Result<()> {
+
pub fn run<H>(node: H, count: usize, timeout: time::Duration) -> anyhow::Result<()>
+
where
+
    H: Handle<Event = Result<Event, <H as Handle>::Error>>,
+
{
    let events = node.subscribe(timeout)?;
-
    for (i, event) in events.enumerate() {
+
    for (i, event) in events.into_iter().enumerate() {
        let event = event?;
        let obj = serde_json::to_string(&event)?;

modified radicle-fetch/src/transport/fetch.rs
@@ -1,23 +1,21 @@
-
use std::{
-
    borrow::Cow,
-
    io::{self, BufRead},
-
    path::PathBuf,
-
    sync::{atomic::AtomicBool, Arc},
-
};
+
use std::borrow::Cow;
+
use std::io;
+
use std::io::BufRead;
+
use std::path::PathBuf;
+
use std::sync::{atomic::AtomicBool, Arc};

use gix_features::progress::NestedProgress;
use gix_pack as pack;
-
use gix_protocol::{
-
    fetch::{self, Delegate, DelegateBlocking},
-
    handshake::{self, Ref},
-
    ls_refs, FetchConnection,
-
};
-
use gix_transport::{
-
    bstr::BString,
-
    client,
-
    client::{ExtendedBufRead, MessageKind},
-
    Protocol,
-
};
+
use gix_protocol::fetch;
+
use gix_protocol::fetch::{Delegate, DelegateBlocking};
+
use gix_protocol::handshake;
+
use gix_protocol::handshake::Ref;
+
use gix_protocol::ls_refs;
+
use gix_protocol::FetchConnection;
+
use gix_transport::bstr::BString;
+
use gix_transport::client;
+
use gix_transport::client::{ExtendedBufRead, MessageKind};
+
use gix_transport::Protocol;

use super::{agent_name, indicate_end_of_interaction, Connection, WantsHaves};

@@ -253,6 +251,7 @@ where
            if !sideband_all {
                setup_remote_progress(progress, &mut reader);
            }
+
            let timer = std::time::Instant::now();
            // TODO: remove delegate in favor of functional style to fix progress-hack,
            //       needed as it needs `'static`. As the top-level seems to pass `Discard`,
            //       there should be no repercussions right now.
@@ -262,6 +261,7 @@ where
                &[],
                &response,
            )?;
+
            log::trace!(target: "fetch", "Received pack ({}ms)", timer.elapsed().as_millis());
            assert_eq!(
                reader.stopped_at(),
                None,
@@ -286,6 +286,7 @@ where
    if matches!(protocol, Protocol::V2)
        && matches!(conn.mode, FetchConnection::TerminateOnSuccessfulCompletion)
    {
+
        log::trace!(target: "fetch", "Indicating end of interaction");
        indicate_end_of_interaction(&mut conn)?;
    }

modified radicle-node/Cargo.toml
@@ -21,6 +21,7 @@ colored = { version = "2.1.0" }
crossbeam-channel = { version = "0.5.6" }
cyphernet = { version = "0.5.0", features = ["tor", "dns", "ed25519", "p2p-ed25519"] }
fastrand = { version = "2.0.0" }
+
gix-protocol = { version = "0.41.1", features = ["blocking-client"] }
io-reactor = { version = "0.5.1", features = ["popol"] }
lexopt = { version = "0.3.0" }
libc = { version = "0.2.137" }
modified radicle-node/src/control.rs
@@ -30,12 +30,12 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<H: Handle<Error = runtime::HandleError> + 'static>(
-
    listener: UnixListener,
-
    handle: H,
-
) -> Result<(), Error>
+
pub fn listen<E, H>(listener: UnixListener, handle: H) -> Result<(), Error>
where
+
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
+
    CommandResult<E>: From<H::Event>,
+
    E: serde::Serialize,
{
    log::debug!(target: "control", "Control thread listening on socket..");
    let nid = handle.nid()?;
@@ -74,12 +74,12 @@ enum CommandError {
    Io(#[from] io::Error),
}

-
fn command<H: Handle<Error = runtime::HandleError> + 'static>(
-
    stream: &UnixStream,
-
    mut handle: H,
-
) -> Result<(), CommandError>
+
fn command<E, H>(stream: &UnixStream, mut handle: H) -> Result<(), CommandError>
where
+
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
+
    CommandResult<E>: From<H::Event>,
+
    E: serde::Serialize,
{
    let mut reader = BufReader::new(stream);
    let mut writer = LineWriter::new(stream);
@@ -185,8 +185,7 @@ where
        Command::Subscribe => match handle.subscribe(MAX_TIMEOUT) {
            Ok(events) => {
                for e in events {
-
                    let event = e?;
-
                    CommandResult::Okay(event).to_writer(&mut writer)?;
+
                    CommandResult::from(e).to_writer(&mut writer)?;
                }
            }
            Err(e) => return Err(CommandError::Runtime(e)),
modified radicle-node/src/runtime.rs
@@ -3,7 +3,6 @@ pub mod thread;

use std::os::unix::net::UnixListener;
use std::path::PathBuf;
-
use std::sync::{Arc, Mutex};
use std::{fs, io, net};

use crossbeam_channel as chan;
@@ -35,6 +34,7 @@ use crate::{service, LocalTime};

pub use handle::Error as HandleError;
pub use handle::Handle;
+
pub use node::events::Emitter;

/// A client error.
#[derive(Error, Debug)]
@@ -84,39 +84,6 @@ pub enum Error {
    GitVersion(#[from] git::VersionError),
}

-
/// Publishes events to subscribers.
-
#[derive(Debug, Clone)]
-
pub struct Emitter<T> {
-
    subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
-
}
-

-
impl<T> Default for Emitter<T> {
-
    fn default() -> Emitter<T> {
-
        Emitter {
-
            subscribers: Default::default(),
-
        }
-
    }
-
}
-

-
impl<T: Clone> Emitter<T> {
-
    /// Emit event to subscribers and drop those who can't receive it.
-
    pub(crate) fn emit(&self, event: T) {
-
        self.subscribers
-
            .lock()
-
            .unwrap()
-
            .retain(|s| s.try_send(event.clone()).is_ok());
-
    }
-

-
    /// Subscribe to events stream.
-
    pub fn subscribe(&self) -> chan::Receiver<T> {
-
        let (sender, receiver) = chan::unbounded();
-
        let mut subs = self.subscribers.lock().unwrap();
-
        subs.push(sender);
-

-
        receiver
-
    }
-
}
-

/// Holds join handles to the client threads, as well as a client handle.
pub struct Runtime {
    pub id: NodeId,
modified radicle-node/src/runtime/handle.rs
@@ -124,6 +124,8 @@ impl Handle {

impl radicle::node::Handle for Handle {
    type Sessions = Vec<radicle::node::Session>;
+
    type Events = Events;
+
    type Event = Event;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Self::Error> {
@@ -263,11 +265,8 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn subscribe(
-
        &self,
-
        _timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Error>>>, Error> {
-
        Ok(Box::new(self.events().into_iter().map(Ok)))
+
    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
+
        Ok(self.events())
    }

    fn sessions(&self) -> Result<Self::Sessions, Error> {
modified radicle-node/src/service.rs
@@ -97,8 +97,8 @@ pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Connection retry delta used for ephemeral peers that failed to connect previously.
pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
-
/// How long to wait for a fetch to stall before aborting.
-
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(9);
+
/// How long to wait for a fetch to stall before aborting, default is 1 hour.
+
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3600);

/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
@@ -280,6 +280,8 @@ struct QueuedFetch {
    from: NodeId,
    /// Refs being fetched.
    refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    timeout: time::Duration,
    /// Result channel.
    channel: Option<chan::Sender<FetchResult>>,
}
@@ -423,6 +425,10 @@ where
    pub fn local_time(&self) -> LocalTime {
        self.clock
    }
+

+
    pub fn emitter(&self) -> Emitter<Event> {
+
        self.emitter.clone()
+
    }
}

impl<D, S, G> Service<D, S, G>
@@ -948,6 +954,7 @@ where
                        rid,
                        refs_at,
                        from,
+
                        timeout,
                        channel,
                    };
                    if self.queue.contains(&fetch) {
@@ -964,6 +971,7 @@ where
                    rid,
                    refs_at,
                    from,
+
                    timeout,
                    channel,
                });
            }
@@ -1128,6 +1136,7 @@ where
            rid,
            from,
            refs_at,
+
            timeout,
            channel,
        }) = self.queue.pop_front()
        {
@@ -1140,12 +1149,12 @@ where
                    .expect("Service::dequeue_fetch: error accessing repo seeding configuration");

                // Keep dequeueing if there was nothing to fetch, otherwise break.
-
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, FETCH_TIMEOUT, channel) {
+
                if self.fetch_refs_at(rid, from, refs, repo_entry.scope, timeout, channel) {
                    break;
                }
            } else {
                // If no refs are specified, always do a full fetch.
-
                self.fetch(rid, from, FETCH_TIMEOUT, channel);
+
                self.fetch(rid, from, timeout, channel);
                break;
            }
        }
modified radicle-node/src/test/handle.rs
@@ -22,6 +22,8 @@ pub struct Handle {
impl radicle::node::Handle for Handle {
    type Error = HandleError;
    type Sessions = Vec<radicle::node::Session>;
+
    type Events = Vec<Self::Event>;
+
    type Event = Result<Event, Self::Error>;

    fn nid(&self) -> Result<NodeId, Self::Error> {
        Ok(NodeId::from_str("z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK").unwrap())
@@ -81,11 +83,8 @@ impl radicle::node::Handle for Handle {
        Ok(self.following.lock().unwrap().insert(id))
    }

-
    fn subscribe(
-
        &self,
-
        _timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Self::Error>>>, Self::Error> {
-
        Ok(Box::new(std::iter::empty()))
+
    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
+
        Ok(vec![])
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error> {
modified radicle-node/src/wire/protocol.rs
@@ -29,6 +29,7 @@ use crate::crypto::Signer;
use crate::prelude::Deserializer;
use crate::service;
use crate::service::io::Io;
+
use crate::service::FETCH_TIMEOUT;
use crate::service::{session, DisconnectReason, Service};
use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
@@ -43,10 +44,6 @@ pub const NOISE_XK: HandshakePattern = HandshakePattern {
    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
};

-
/// Default time to wait to receive something from a worker channel. Applies to
-
/// workers waiting for data from remotes as well.
-
pub const DEFAULT_CHANNEL_TIMEOUT: time::Duration = time::Duration::from_secs(30);
-

/// Default time to wait until a network connection is considered inactive.
pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);

@@ -131,22 +128,22 @@ impl Streams {
    }

    /// Open a new stream.
-
    fn open(&mut self) -> (StreamId, worker::Channels) {
+
    fn open(&mut self, timeout: time::Duration) -> (StreamId, worker::Channels) {
        self.seq += 1;

        let id = StreamId::git(self.link)
            .nth(self.seq)
            .expect("Streams::open: too many streams");
        let channels = self
-
            .register(id)
+
            .register(id, timeout)
            .expect("Streams::open: stream was already open");

        (id, channels)
    }

    /// Register an open stream.
-
    fn register(&mut self, stream: StreamId) -> Option<worker::Channels> {
-
        let (wire, worker) = worker::Channels::pair(DEFAULT_CHANNEL_TIMEOUT)
+
    fn register(&mut self, stream: StreamId, timeout: time::Duration) -> Option<worker::Channels> {
+
        let (wire, worker) = worker::Channels::pair(timeout)
            .expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
@@ -718,13 +715,16 @@ where
                            })) => {
                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");

-
                                let Some(channels) = streams.register(stream) else {
+
                                let Some(channels) = streams.register(stream, FETCH_TIMEOUT) else {
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
                                    continue;
                                };

                                let task = Task {
-
                                    fetch: FetchRequest::Responder { remote: *nid },
+
                                    fetch: FetchRequest::Responder {
+
                                        remote: *nid,
+
                                        emitter: self.service.emitter(),
+
                                    },
                                    stream,
                                    channels,
                                };
@@ -1017,7 +1017,7 @@ where
                        log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
                        continue;
                    };
-
                    let (stream, channels) = streams.open();
+
                    let (stream, channels) = streams.open(timeout);

                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");

@@ -1027,7 +1027,6 @@ where
                            rid,
                            remote,
                            refs_at,
-
                            timeout,
                        },
                        stream,
                        channels,
modified radicle-node/src/worker.rs
@@ -5,20 +5,20 @@ mod upload_pack;
pub mod fetch;
pub mod garbage;

+
use std::io;
use std::path::PathBuf;
-
use std::{io, time};

use crossbeam_channel as chan;

use radicle::identity::RepoId;
-
use radicle::node::notifications;
+
use radicle::node::{notifications, Event};
use radicle::prelude::NodeId;
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{cob, crypto, Storage};
use radicle_fetch::FetchLimit;

-
use crate::runtime::{thread, Handle};
+
use crate::runtime::{thread, Emitter, Handle};
use crate::service::policy;
use crate::service::policy::Policy;
use crate::wire::StreamId;
@@ -107,21 +107,21 @@ pub enum FetchRequest {
        remote: NodeId,
        /// If this fetch is for a particular set of `rad/sigrefs`.
        refs_at: Option<Vec<RefsAt>>,
-
        /// Fetch timeout.
-
        timeout: time::Duration,
    },
    /// Server is responding to a fetch request by uploading the
    /// specified `refspecs` sent by the client.
    Responder {
        /// Remote peer we are interacting with.
        remote: NodeId,
+
        /// Reporter for upload-pack progress.
+
        emitter: Emitter<Event>,
    },
}

impl FetchRequest {
    pub fn remote(&self) -> NodeId {
        match self {
-
            Self::Initiator { remote, .. } | Self::Responder { remote } => *remote,
+
            Self::Initiator { remote, .. } | Self::Responder { remote, .. } => *remote,
        }
    }
}
@@ -230,14 +230,12 @@ impl Worker {
                rid,
                remote,
                refs_at,
-
                // TODO: nowhere to use this currently
-
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
                let result = self.fetch(rid, remote, refs_at, channels, notifs);
                FetchResult::Initiator { rid, result }
            }
-
            FetchRequest::Responder { remote } => {
+
            FetchRequest::Responder { remote, emitter } => {
                log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");

                let (mut stream_r, stream_w) = channels.split();
@@ -259,10 +257,17 @@ impl Worker {
                    };
                }

-
                let result =
-
                    upload_pack::upload_pack(&self.nid, &self.storage, &header, stream_r, stream_w)
-
                        .map(|_| ())
-
                        .map_err(|e| e.into());
+
                let result = upload_pack::upload_pack(
+
                    &self.nid,
+
                    remote,
+
                    &self.storage,
+
                    &emitter,
+
                    &header,
+
                    stream_r,
+
                    stream_w,
+
                )
+
                .map(|_| ())
+
                .map_err(|e| e.into());
                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");

                FetchResult::Responder {
modified radicle-node/src/worker/upload_pack.rs
@@ -1,8 +1,13 @@
use std::io;
use std::io::Write;
use std::process::{Command, ExitStatus, Stdio};
+
use std::time::Instant;

-
use radicle::node::NodeId;
+
use gix_protocol::transport::bstr::ByteSlice;
+
use radicle::identity::RepoId;
+
use radicle::node::events;
+
use radicle::node::events::Emitter;
+
use radicle::node::{Event, NodeId};
use radicle::storage::git::paths;
use radicle::Storage;

@@ -16,15 +21,18 @@ use crate::runtime::thread;
/// send the EOF file message.
pub fn upload_pack<R, W>(
    nid: &NodeId,
+
    remote: NodeId,
    storage: &Storage,
+
    emitter: &Emitter<Event>,
    header: &pktline::GitRequest,
    mut recv: R,
-
    mut send: W,
+
    send: W,
) -> io::Result<ExitStatus>
where
    R: io::Read + Send,
    W: io::Write + Send,
{
+
    let timer = Instant::now();
    let protocol_version = header
        .extra
        .iter()
@@ -75,14 +83,21 @@ where

    let mut stdin = child.stdin.take().unwrap();
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
+
    let mut reporter = Reporter {
+
        rid: header.repo,
+
        remote,
+
        emitter: emitter.clone(),
+
        send,
+
    };
    thread::scope(|s| {
        thread::spawn_scoped(nid, "upload-pack", s, || {
            // N.b. we indefinitely copy stdout to the sender,
            // i.e. there's no need for a loop.
-
            match io::copy(&mut stdout, &mut send) {
+
            match io::copy(&mut stdout, &mut reporter) {
                Ok(_) => {}
                Err(e) => {
                    log::error!(target: "worker", "Worker channel disconnected for {}; aborting: {e}", header.repo);
+
                    emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
                }
            }
        });
@@ -104,6 +119,7 @@ where
                    }
                    Err(e) => {
                        log::error!(target: "worker", "Error on upload-pack channel read for {}: {e}", header.repo);
+
                        emitter.emit(events::UploadPack::error(header.repo, remote, e).into());
                        break;
                    }
                }
@@ -124,9 +140,62 @@ where
    })?;

    let status = child.wait()?;
+
    emitter.emit(events::UploadPack::done(header.repo, remote, status).into());
+
    log::debug!(target: "worker", "Upload pack finished ({}ms)", timer.elapsed().as_millis());
    Ok(status)
}

+
/// A combination of the upload-pack sender with an [`Emitter`] for reporting
+
/// the progress events to subscribers.
+
struct Reporter<W> {
+
    rid: RepoId,
+
    remote: NodeId,
+
    emitter: Emitter<Event>,
+
    send: W,
+
}
+

+
impl<W> Reporter<W> {
+
    fn emit(&self, buf: &[u8]) {
+
        if let Some(progress) = Self::as_upload_pack_progress(buf) {
+
            log::trace!(target: "worker", "upload-pack progress: {progress}");
+
            self.emitter
+
                .emit(events::UploadPack::write(self.rid, self.remote, progress).into());
+
        }
+
    }
+

+
    fn as_upload_pack_progress(buf: &[u8]) -> Option<events::upload_pack::Progress> {
+
        use events::upload_pack::Progress::*;
+
        let gix_protocol::RemoteProgress {
+
            action, step, max, ..
+
        } = gix_protocol::RemoteProgress::from_bytes(buf)?;
+
        if action.contains_str("Counting objects") {
+
            step.and_then(|processed| max.map(|total| Counting { processed, total }))
+
        } else if action.contains_str("Compressing objects") {
+
            step.and_then(|processed| max.map(|total| Compressing { processed, total }))
+
        } else if action.contains_str("Enumerating objects") {
+
            max.map(|total| Enumerating { total })
+
        } else {
+
            None
+
        }
+
    }
+
}
+

+
impl<W> io::Write for Reporter<W>
+
where
+
    W: io::Write,
+
{
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        let n = self.send.write(buf)?;
+
        self.send.flush()?;
+
        self.emit(buf);
+
        Ok(n)
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        self.send.flush()
+
    }
+
}
+

pub(super) mod pktline {
    use std::io;
    use std::io::Read;
modified radicle/src/node.rs
@@ -15,6 +15,7 @@ pub mod timestamp;

use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader};
+
use std::marker::PhantomData;
use std::ops::{ControlFlow, Deref};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
@@ -50,6 +51,9 @@ pub const DEFAULT_SOCKET_NAME: &str = "control.sock";
pub const DEFAULT_PORT: u16 = 8776;
/// Default timeout when waiting for the node to respond with data.
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
+
/// Default timeout when waiting for an event to be received on the
+
/// [`Handle::subscribe`] channel.
+
pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
/// Maximum length in bytes of a node alias.
pub const MAX_ALIAS_LENGTH: usize = 32;
/// Penalty threshold at which point we avoid connecting to this node.
@@ -338,6 +342,26 @@ pub enum CommandResult<T> {
    },
}

+
impl<T, E> From<Result<T, E>> for CommandResult<T>
+
where
+
    E: std::error::Error,
+
{
+
    fn from(result: Result<T, E>) -> Self {
+
        match result {
+
            Ok(t) => Self::Okay(t),
+
            Err(e) => Self::Error {
+
                reason: e.to_string(),
+
            },
+
        }
+
    }
+
}
+

+
impl From<Event> for CommandResult<Event> {
+
    fn from(event: Event) -> Self {
+
        Self::Okay(event)
+
    }
+
}
+

/// A success response.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Success {
@@ -856,6 +880,8 @@ pub enum ConnectResult {
pub trait Handle: Clone + Sync + Send {
    /// The peer sessions type.
    type Sessions;
+
    type Events: IntoIterator<Item = Self::Event>;
+
    type Event;
    /// The error returned by all methods.
    type Error: std::error::Error + Send + Sync + 'static;

@@ -905,10 +931,53 @@ pub trait Handle: Clone + Sync + Send {
    /// Query the peer session state.
    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
    /// Subscribe to node events.
-
    fn subscribe(
-
        &self,
-
        timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Self::Error>>>, Self::Error>;
+
    fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
+
}
+

+
/// Iterator of results `T` when passing a [`Command`] to [`Node::call`].
+
///
+
/// The iterator blocks for a `timeout` duration, returning [`Error::TimedOut`]
+
/// if the duration is reached.
+
pub struct LineIter<T> {
+
    stream: BufReader<UnixStream>,
+
    timeout: time::Duration,
+
    witness: PhantomData<T>,
+
}
+

+
impl<T: DeserializeOwned> Iterator for LineIter<T> {
+
    type Item = Result<T, Error>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        let mut l = String::new();
+

+
        self.stream
+
            .get_ref()
+
            .set_read_timeout(Some(self.timeout))
+
            .ok();
+

+
        match self.stream.read_line(&mut l) {
+
            Ok(0) => None,
+
            Ok(_) => {
+
                let result: CommandResult<T> = match json::from_str(&l) {
+
                    Err(e) => {
+
                        return Some(Err(Error::InvalidJson {
+
                            response: l.clone(),
+
                            error: e,
+
                        }))
+
                    }
+
                    Ok(result) => result,
+
                };
+
                match result {
+
                    CommandResult::Okay(result) => Some(Ok(result)),
+
                    CommandResult::Error { reason } => Some(Err(Error::Command { reason })),
+
                }
+
            }
+
            Err(e) => match e.kind() {
+
                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Some(Err(Error::TimedOut)),
+
                _ => Some(Err(Error::Io(e))),
+
            },
+
        }
+
    }
}

/// Public node & device identifier.
@@ -929,33 +998,19 @@ impl Node {
    }

    /// Call a command on the node.
-
    pub fn call<T: DeserializeOwned>(
+
    pub fn call<T: DeserializeOwned + Send + 'static>(
        &self,
        cmd: Command,
        timeout: time::Duration,
-
    ) -> Result<impl Iterator<Item = Result<T, Error>>, Error> {
+
    ) -> Result<LineIter<T>, Error> {
        let stream = UnixStream::connect(&self.socket)
            .map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
        cmd.to_writer(&stream)?;
-

-
        stream.set_read_timeout(Some(timeout))?;
-

-
        Ok(BufReader::new(stream).lines().map(move |l| {
-
            let l = l.map_err(|e| match e.kind() {
-
                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Error::TimedOut,
-
                _ => Error::Io(e),
-
            })?;
-

-
            let result: CommandResult<T> = json::from_str(&l).map_err(|e| Error::InvalidJson {
-
                response: l.clone(),
-
                error: e,
-
            })?;
-

-
            match result {
-
                CommandResult::Okay(result) => Ok(result),
-
                CommandResult::Error { reason } => Err(Error::Command { reason }),
-
            }
-
        }))
+
        Ok(LineIter {
+
            stream: BufReader::new(stream),
+
            timeout,
+
            witness: PhantomData,
+
        })
    }

    /// Announce refs of the given `rid` to the given seeds.
@@ -1029,6 +1084,8 @@ impl Node {
// attempt to return iterators instead of allocating vecs.
impl Handle for Node {
    type Sessions = Vec<Session>;
+
    type Events = LineIter<Event>;
+
    type Event = Result<Event, Error>;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Error> {
@@ -1049,6 +1106,7 @@ impl Handle for Node {
        let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
            return false;
        };
+

        let Some(Ok(_)) = lines.next() else {
            return false;
        };
@@ -1122,29 +1180,29 @@ impl Handle for Node {
    }

    fn follow(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut lines = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

    fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

    fn unfollow(&mut self, nid: NodeId) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut lines = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

    fn unseed(&mut self, rid: RepoId) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse {})??;
+
        let mut lines = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }
@@ -1166,26 +1224,21 @@ impl Handle for Node {
    }

    fn update_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::UpdateInventory { rid }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse {})??;
+
        let mut lines = self.call::<Success>(Command::UpdateInventory { rid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

-
    fn subscribe(
-
        &self,
-
        timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Error>>>, Error> {
-
        let events = self.call(Command::Subscribe, timeout)?;
-

-
        Ok(Box::new(events))
+
    fn subscribe(&self, timeout: time::Duration) -> Result<LineIter<Event>, Error> {
+
        self.call(Command::Subscribe, timeout)
    }

    fn sessions(&self) -> Result<Self::Sessions, Error> {
        let sessions = self
            .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
            .next()
-
            .ok_or(Error::EmptyResponse {})??;
+
            .ok_or(Error::EmptyResponse)??;

        Ok(sessions)
    }
modified radicle/src/node/events.rs
@@ -1,4 +1,10 @@
+
//! Events for `upload-pack` processes.
+
pub mod upload_pack;
+
pub use upload_pack::UploadPack;
+

use std::ops::Deref;
+
use std::sync::Arc;
+
use std::sync::Mutex;
use std::time;

use crossbeam_channel as chan;
@@ -60,6 +66,13 @@ pub enum Event {
        features: node::Features,
        addresses: Vec<node::Address>,
    },
+
    UploadPack(upload_pack::UploadPack),
+
}
+

+
impl From<upload_pack::UploadPack> for Event {
+
    fn from(value: upload_pack::UploadPack) -> Self {
+
        Self::UploadPack(value)
+
    }
}

/// Events feed.
@@ -119,3 +132,36 @@ impl Events {
        }
    }
}
+

+
/// Publishes events to subscribers.
+
#[derive(Debug, Clone)]
+
pub struct Emitter<T> {
+
    subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
+
}
+

+
impl<T> Default for Emitter<T> {
+
    fn default() -> Emitter<T> {
+
        Emitter {
+
            subscribers: Default::default(),
+
        }
+
    }
+
}
+

+
impl<T: Clone> Emitter<T> {
+
    /// Emit event to subscribers and drop those who can't receive it.
+
    pub fn emit(&self, event: T) {
+
        self.subscribers
+
            .lock()
+
            .unwrap()
+
            .retain(|s| s.try_send(event.clone()).is_ok());
+
    }
+

+
    /// Subscribe to events stream.
+
    pub fn subscribe(&self) -> chan::Receiver<T> {
+
        let (sender, receiver) = chan::unbounded();
+
        let mut subs = self.subscribers.lock().unwrap();
+
        subs.push(sender);
+

+
        receiver
+
    }
+
}
added radicle/src/node/events/upload_pack.rs
@@ -0,0 +1,96 @@
+
use std::fmt;
+
use std::io;
+
use std::process::ExitStatus;
+

+
use crate::node::NodeId;
+
use crate::prelude::RepoId;
+

+
/// Events that can occur when an upload-pack process is running.
+
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
pub enum UploadPack {
+
    /// The upload-pack process finished with `status`.
+
    Done {
+
        /// The repository being fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        remote: NodeId,
+
        /// The status code of the upload-pack process.
+
        ///
+
        /// N.b. `ExitStatus` can not be de/serialized, so the `Display` of the
+
        /// status is used instead.
+
        status: String,
+
    },
+
    /// The upload-pack process emitted some [`Progress`] data.
+
    Write {
+
        /// The repository being fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        remote: NodeId,
+
        /// The progress metadata of the upload-pack.
+
        progress: Progress,
+
    },
+
    Error {
+
        /// The repository being fetched.
+
        rid: RepoId,
+
        /// The node being fetched from.
+
        remote: NodeId,
+
        /// The error that occurred during the upload-pack.
+
        err: String,
+
    },
+
}
+

+
impl UploadPack {
+
    /// Construct a `UploadPack::Write` event.
+
    pub fn write(rid: RepoId, remote: NodeId, progress: Progress) -> Self {
+
        Self::Write {
+
            rid,
+
            remote,
+
            progress,
+
        }
+
    }
+

+
    /// Construct a `UploadPack::Done` event.
+
    ///
+
    /// If `error` is `None` the process finished successfully, otherwise it
+
    /// finished with an error.
+
    pub fn done(rid: RepoId, remote: NodeId, status: ExitStatus) -> Self {
+
        Self::Done {
+
            rid,
+
            remote,
+
            status: status.to_string(),
+
        }
+
    }
+

+
    pub fn error(rid: RepoId, remote: NodeId, err: io::Error) -> Self {
+
        Self::Error {
+
            rid,
+
            remote,
+
            err: err.to_string(),
+
        }
+
    }
+
}
+

+
/// Progress updates emitted from the `git-upload-pack` process.
+
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub enum Progress {
+
    Enumerating { total: usize },
+
    Counting { processed: usize, total: usize },
+
    Compressing { processed: usize, total: usize },
+
}
+

+
impl fmt::Display for Progress {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Progress::Enumerating { total } => write!(f, "Enumerating objects: {total}"),
+
            Progress::Counting { processed, total } => {
+
                let percent = (processed / total) * 100;
+
                write!(f, "Counting objects: {percent}% ({processed}/{total})")
+
            }
+
            Progress::Compressing { processed, total } => {
+
                let percent = (processed / total) * 100;
+
                write!(f, "Compressing objects: {percent}% ({processed}/{total})")
+
            }
+
        }
+
    }
+
}