Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement protocol multiplexing
Alexis Sellier committed 3 years ago
commit 89b9eb53b76adf9e38e0da5a15bf810f8d72a3c8
parent 0f47dc9057d79572414c94399f8ef8d19dd91b4d
20 files changed +1518 -1198
modified radicle-cli/tests/commands.rs
@@ -312,10 +312,12 @@ fn rad_clone() {
    let working = environment.tmp().join("working");

    // Setup a test project.
-
    let _ = alice.project("heartwood", "Radicle Heartwood Protocol & Stack");
+
    let acme = alice.project("heartwood", "Radicle Heartwood Protocol & Stack");

-
    let alice = alice.spawn(Config::default());
+
    let mut alice = alice.spawn(Config::default());
    let mut bob = bob.spawn(Config::default());
+
    // Prevent Alice from fetching Bob's fork, as we're not testing that and it may cause errors.
+
    alice.handle.track_repo(acme, Scope::Trusted).unwrap();

    bob.connect(&alice).converge([&alice]);

modified radicle-node/src/deserializer.rs
@@ -13,7 +13,7 @@ pub struct Deserializer<D = Message> {
    item: PhantomData<D>,
}

-
impl Default for Deserializer<Message> {
+
impl<D: wire::Decode> Default for Deserializer<D> {
    fn default() -> Self {
        Self::new(wire::Size::MAX as usize + 1)
    }
modified radicle-node/src/runtime.rs
@@ -96,24 +96,24 @@ impl<T: Clone> Emitter<T> {
}

/// Holds join handles to the client threads, as well as a client handle.
-
pub struct Runtime<G: Signer + Ecdh> {
+
pub struct Runtime {
    pub id: NodeId,
    pub home: Home,
    pub control: UnixListener,
-
    pub handle: Handle<G>,
+
    pub handle: Handle,
    pub storage: Storage,
-
    pub reactor: Reactor<wire::Control<G>>,
+
    pub reactor: Reactor<wire::Control>,
    pub daemon: net::SocketAddr,
    pub pool: worker::Pool,
    pub local_addrs: Vec<net::SocketAddr>,
    pub signals: chan::Receiver<()>,
}

-
impl<G: Signer + Ecdh + 'static> Runtime<G> {
+
impl Runtime {
    /// Initialize the runtime.
    ///
    /// This function spawns threads.
-
    pub fn init(
+
    pub fn init<G: Signer + Ecdh + 'static>(
        home: Home,
        config: service::Config,
        listen: Vec<net::SocketAddr>,
@@ -121,7 +121,7 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {
        daemon: net::SocketAddr,
        signals: chan::Receiver<()>,
        signer: G,
-
    ) -> Result<Runtime<G>, Error>
+
    ) -> Result<Runtime, Error>
    where
        G: Ecdh<Pk = NodeId> + Clone,
    {
@@ -160,7 +160,7 @@ impl<G: Signer + Ecdh + 'static> Runtime<G> {
            emitter.clone(),
        );

-
        let (worker_send, worker_recv) = chan::unbounded::<worker::Task<G>>();
+
        let (worker_send, worker_recv) = chan::unbounded::<worker::Task>();
        let mut wire = Wire::new(service, worker_send, signer, proxy, clock);
        let mut local_addrs = Vec::new();

modified radicle-node/src/runtime/handle.rs
@@ -5,11 +5,9 @@ use std::sync::Arc;
use std::{fmt, io, time};

use crossbeam_channel as chan;
-
use cyphernet::Ecdh;
use radicle::node::Seeds;
use thiserror::Error;

-
use crate::crypto::Signer;
use crate::identity::Id;
use crate::node::{Command, FetchResult};
use crate::profile::Home;
@@ -20,6 +18,7 @@ use crate::service::Event;
use crate::service::{CommandError, QueryState};
use crate::service::{NodeId, Sessions};
use crate::wire;
+
use crate::wire::StreamId;
use crate::worker::TaskResult;

/// An error resulting from a handle method.
@@ -27,7 +26,7 @@ use crate::worker::TaskResult;
pub enum Error {
    /// The command channel is no longer connected.
    #[error("command channel is not connected")]
-
    NotConnected,
+
    ChannelDisconnected,
    /// The command returned an error.
    #[error("command failed: {0}")]
    Command(#[from] CommandError),
@@ -41,7 +40,7 @@ pub enum Error {

impl From<chan::RecvError> for Error {
    fn from(_: chan::RecvError) -> Self {
-
        Self::NotConnected
+
        Self::ChannelDisconnected
    }
}

@@ -49,20 +48,20 @@ impl From<chan::RecvTimeoutError> for Error {
    fn from(err: chan::RecvTimeoutError) -> Self {
        match err {
            chan::RecvTimeoutError::Timeout => Self::Timeout,
-
            chan::RecvTimeoutError::Disconnected => Self::NotConnected,
+
            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
        }
    }
}

impl<T> From<chan::SendError<T>> for Error {
    fn from(_: chan::SendError<T>) -> Self {
-
        Self::NotConnected
+
        Self::ChannelDisconnected
    }
}

-
pub struct Handle<G: Signer + Ecdh> {
+
pub struct Handle {
    pub(crate) home: Home,
-
    pub(crate) controller: reactor::Controller<wire::Control<G>>,
+
    pub(crate) controller: reactor::Controller<wire::Control>,

    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
    shutdown: Arc<AtomicBool>,
@@ -117,20 +116,20 @@ impl Events {
    }
}

-
impl<G: Signer + Ecdh> Handle<G> {
+
impl Handle {
    /// Subscribe to events stream.
    pub fn events(&self) -> Events {
        Events(self.emitter.subscribe())
    }
}

-
impl<G: Signer + Ecdh> fmt::Debug for Handle<G> {
+
impl fmt::Debug for Handle {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Handle").field("home", &self.home).finish()
    }
}

-
impl<G: Signer + Ecdh> Clone for Handle<G> {
+
impl Clone for Handle {
    fn clone(&self) -> Self {
        Self {
            home: self.home.clone(),
@@ -141,10 +140,10 @@ impl<G: Signer + Ecdh> Clone for Handle<G> {
    }
}

-
impl<G: Signer + Ecdh + 'static> Handle<G> {
+
impl Handle {
    pub fn new(
        home: Home,
-
        controller: reactor::Controller<wire::Control<G>>,
+
        controller: reactor::Controller<wire::Control>,
        emitter: Emitter<Event>,
    ) -> Self {
        Self {
@@ -155,22 +154,20 @@ impl<G: Signer + Ecdh + 'static> Handle<G> {
        }
    }

-
    pub fn worker_result(&mut self, resp: TaskResult<G>) -> Result<(), Error> {
-
        match self.controller.cmd(wire::Control::Worker(resp)) {
-
            Ok(()) => {}
-
            Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Err(Error::NotConnected),
-
            Err(err) => return Err(err.into()),
-
        }
-
        Ok(())
+
    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
+
        self.controller.cmd(wire::Control::Worker(result))
    }

-
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.controller.cmd(wire::Control::User(cmd))?;
-
        Ok(())
+
    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
+
        self.controller.cmd(wire::Control::Flush { remote, stream })
+
    }
+

+
    fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
+
        self.controller.cmd(wire::Control::User(cmd))
    }
}

-
impl<G: Signer + Ecdh + 'static> radicle::node::Handle for Handle<G> {
+
impl radicle::node::Handle for Handle {
    type Sessions = Sessions;
    type Error = Error;

@@ -222,10 +219,12 @@ impl<G: Signer + Ecdh + 'static> radicle::node::Handle for Handle<G> {

    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
        self.command(service::Command::AnnounceRefs(id))
+
            .map_err(Error::from)
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
        self.command(service::Command::AnnounceInventory)
+
            .map_err(Error::from)
    }

    fn sync_inventory(&mut self) -> Result<bool, Error> {
@@ -265,6 +264,8 @@ impl<G: Signer + Ecdh + 'static> radicle::node::Handle for Handle<G> {
            .and_then(|sock| Command::SHUTDOWN.to_writer(sock))
            .ok();

-
        self.controller.shutdown().map_err(|_| Error::NotConnected)
+
        self.controller
+
            .shutdown()
+
            .map_err(|_| Error::ChannelDisconnected)
    }
}
modified radicle-node/src/service.rs
@@ -32,8 +32,6 @@ use crate::prelude::*;
use crate::runtime::Emitter;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
-
use crate::service::reactor::FetchDirection;
-
use crate::service::session::GossipState;
use crate::service::tracking::Scope;
use crate::storage;
use crate::storage::{Namespaces, ReadStorage};
@@ -44,7 +42,6 @@ use crate::Link;
pub use crate::node::NodeId;
pub use crate::service::config::{Config, Network};
pub use crate::service::message::{Message, ZeroBytes};
-
pub use crate::service::reactor::Fetch;
pub use crate::service::session::Session;

use self::gossip::Gossip;
@@ -551,13 +548,12 @@ where
        let seed = session.id;

        match session.fetch(rid) {
-
            session::FetchResult::Ready(fetch) => {
+
            session::FetchResult::Ready => {
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

                match self.tracking.namespaces_for(&self.storage, &rid) {
-
                    Ok(ns) => {
-
                        self.reactor.write(session, fetch);
-
                        session.to_requesting(rid, ns);
+
                    Ok(namespaces) => {
+
                        self.reactor.fetch(session, rid, namespaces);
                    }
                    Err(err) => {
                        error!(target: "service", "Error getting namespaces for {rid}: {err}");
@@ -571,19 +567,8 @@ where
                    }
                };
            }
-
            session::FetchResult::AlreadyFetching(other) => {
-
                if other == rid {
-
                    debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}");
-
                } else {
-
                    // If we can't fetch, it's because we're already fetching from
-
                    // this peer. So we need to queue the request.
-
                    // TODO: consider to find another peer.
-
                    debug!(
-
                        target: "service",
-
                        "Queueing fetch for {rid} from {from}: another fetch is ongoing"
-
                    );
-
                    session.queue_fetch(rid);
-
                }
+
            session::FetchResult::AlreadyFetching => {
+
                debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}");
            }
            session::FetchResult::NotConnected => {
                error!(target: "service", "Unable to fetch {rid} from peer {seed}: peer is not connected");
@@ -591,75 +576,74 @@ where
        }
    }

-
    pub fn fetched(&mut self, fetch: Fetch, result: Result<Vec<RefUpdate>, FetchError>) {
-
        let remote = fetch.remote;
-
        let rid = fetch.rid;
-

-
        match fetch.direction {
-
            FetchDirection::Initiator { namespaces } => {
-
                let result = match result {
-
                    Ok(updated) => {
-
                        log::debug!(target: "service", "Fetched {rid} from {remote}");
-

-
                        self.emitter.emit(Event::RefsFetched {
-
                            remote,
-
                            rid,
-
                            updated: updated.clone(),
-
                        });
+
    pub fn fetched(
+
        &mut self,
+
        rid: Id,
+
        namespaces: Namespaces,
+
        remote: NodeId,
+
        result: Result<Vec<RefUpdate>, FetchError>,
+
    ) {
+
        let result = match result {
+
            Ok(updated) => {
+
                log::debug!(target: "service", "Fetched {rid} from {remote}");

-
                        FetchResult::Success { updated }
-
                    }
-
                    Err(err) => {
-
                        let reason = err.to_string();
-
                        error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
-

-
                        // For now, we only disconnect the remote in case of timeout. In the future,
-
                        // there may be other reasons to disconnect.
-
                        if err.is_timeout() {
-
                            self.reactor
-
                                .disconnect(remote, DisconnectReason::Fetch(err));
-
                        }
-
                        FetchResult::Failed { reason }
-
                    }
-
                };
+
                self.emitter.emit(Event::RefsFetched {
+
                    remote,
+
                    rid,
+
                    updated: updated.clone(),
+
                });

-
                if let Some(results) = self.fetch_reqs.remove(&(rid, remote)) {
-
                    log::debug!(target: "service", "Found existing fetch request, sending result..");
+
                FetchResult::Success { updated }
+
            }
+
            Err(err) => {
+
                let reason = err.to_string();
+
                error!(target: "service", "Fetch failed for {rid} from {remote}: {reason}");
+

+
                // For now, we only disconnect the remote in case of timeout. In the future,
+
                // there may be other reasons to disconnect.
+
                if err.is_timeout() {
+
                    self.reactor
+
                        .disconnect(remote, DisconnectReason::Fetch(err));
+
                }
+
                FetchResult::Failed { reason }
+
            }
+
        };

-
                    if results.send(result).is_err() {
-
                        log::error!(target: "service", "Error sending fetch result for {rid}..");
-
                    } else {
-
                        log::debug!(target: "service", "Sent fetch result for {rid}..");
-
                    }
-
                } else {
-
                    log::debug!(target: "service", "No fetch requests found for {rid}..");
+
        if let Some(results) = self.fetch_reqs.remove(&(rid, remote)) {
+
            log::debug!(target: "service", "Found existing fetch request, sending result..");

-
                    // We only announce refs here when the fetch wasn't user-requested. This is
-
                    // because the user might want to announce his fork, once he has created one,
-
                    // or may choose to not announce anything.
-
                    if let Err(e) = self.announce_refs(rid, &namespaces) {
-
                        error!(target: "service", "Failed to announce new refs: {e}");
-
                    }
-
                }
+
            if results.send(result).is_err() {
+
                log::error!(target: "service", "Error sending fetch result for {rid}..");
+
            } else {
+
                log::debug!(target: "service", "Sent fetch result for {rid}..");
+
            }
+
        } else {
+
            log::debug!(target: "service", "No fetch requests found for {rid}..");

-
                self.switch_to_gossip(remote);
-

-
                // TODO: Since this fetch could be either a full clone
-
                // or simply a ref update, we need to either announce
-
                // new inventory, or new refs. Right now, we announce
-
                // both in some cases.
-
                //
-
                // Announce the newly fetched repository to the
-
                // network, if necessary.
-
                //
-
                // Nb. This needs to be run after we've switched back
-
                // to the gossip protocol, otherwise the messages will
-
                // be queued.
-
                self.sync_and_announce();
-
                self.process_fetch_queue(&remote);
+
            // We only announce refs here when the fetch wasn't user-requested. This is
+
            // because the user might want to announce his fork, once he has created one,
+
            // or may choose to not announce anything.
+
            if let Err(e) = self.announce_refs(rid, &namespaces) {
+
                error!(target: "service", "Failed to announce new refs: {e}");
            }
-
            FetchDirection::Responder => self.switch_to_gossip(remote),
        }
+

+
        // Go back to "idle".
+
        if let Some(s) = self.sessions.get_mut(&remote) {
+
            s.fetched(rid);
+
        }
+
        // TODO: Since this fetch could be either a full clone
+
        // or simply a ref update, we need to either announce
+
        // new inventory, or new refs. Right now, we announce
+
        // both in some cases.
+
        //
+
        // Announce the newly fetched repository to the
+
        // network, if necessary.
+
        //
+
        // Nb. This needs to be run after we've switched back
+
        // to the gossip protocol, otherwise the messages will
+
        // be queued.
+
        self.sync_and_announce();
    }

    pub fn accepted(&mut self, _addr: net::SocketAddr) {
@@ -725,8 +709,8 @@ where

        // If the peer disconnected while we were waiting for a [`Message::FetchOk`],
        // return a failure to any potential fetcher.
-
        if let Some((requested, _)) = session.requesting() {
-
            if let Some(resp) = self.fetch_reqs.remove(&(*requested, remote)) {
+
        for rid in session.fetching() {
+
            if let Some(resp) = self.fetch_reqs.remove(&(rid, remote)) {
                resp.send(FetchResult::Failed {
                    reason: format!("disconnected: {reason}"),
                })
@@ -902,9 +886,7 @@ where
                    if self.sessions.is_connected(announcer) {
                        match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
                            Ok(true) => self.fetch(message.rid, announcer),
-
                            Ok(false) => {
-
                                debug!(target: "service", "Skipping fetch from {announcer}")
-
                            }
+
                            Ok(false) => {}
                            Err(e) => {
                                error!(target: "service", "Failed to check refs announcement: {e}");
                                return Err(session::Error::Misbehavior);
@@ -1042,19 +1024,6 @@ where
        debug!(target: "service", "Received message {:?} from {}", &message, peer.id);

        match (&mut peer.state, message) {
-
            (
-
                session::State::Connected {
-
                    protocol: session::Protocol::Fetch { .. },
-
                    ..
-
                },
-
                _,
-
            ) => {
-
                // This should never happen if the service is properly configured, since all
-
                // incoming data is sent directly to the Git worker.
-
                log::error!(target: "service", "Received gossip message from {remote} during git fetch");
-

-
                return Err(session::Error::Misbehavior);
-
            }
            // Process a peer announcement.
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
                let relayer = peer.id;
@@ -1107,72 +1076,6 @@ where
                    }
                }
            }
-
            (
-
                session::State::Connected {
-
                    protocol: session::Protocol::Gossip { state },
-
                    ..
-
                },
-
                Message::Fetch { rid },
-
            ) => {
-
                debug!(target: "service", "Fetch requested for {rid} from {remote}..");
-

-
                // TODO: Check that we have the repo first?
-

-
                // We got a fetch request right after sending our own. We have to decide on which
-
                // fetch to run: our own, or the remote's.
-
                if let GossipState::Requesting { rid, .. } = state {
-
                    debug!(target: "service", "Received fetch request from {remote} while attempting to fetch {rid}..");
-

-
                    // When fetch requests cross, the inbound peer takes precedence.
-
                    if peer.link.is_inbound() {
-
                        debug!(target: "service", "Cancelling fetch request to {remote}..");
-

-
                        // Cancel our own fetch request. This doesn't send anything to the remote,
-
                        // it simply updates the local session's state machine.
-
                        *state = GossipState::Idle;
-

-
                        // TODO: Queue the fetch request as if we tried to request twice from
-
                        // the same node.
-
                    } else {
-
                        // In this case, the remote node will cancel its request, so we don't
-
                        // want to handover the session to the worker here, we will do it when
-
                        // we get the `FetchOk` from the remote.
-
                        return Ok(());
-
                    }
-
                }
-

-
                // Accept the request and instruct the transport to handover the socket to the worker.
-
                self.reactor.write(peer, Message::FetchOk { rid });
-
                self.reactor.fetch(peer, rid, FetchDirection::Responder);
-
            }
-
            (session::State::Connected { protocol, .. }, Message::FetchOk { rid }) => {
-
                let session::Protocol::Gossip {
-
                    state: GossipState::Requesting { rid: requested, namespaces }
-
                } = protocol else {
-
                    // As long as we disconnect peers who don't respond to our fetch requests within
-
                    // the alloted time, this shouldn't happen by mistake.
-
                    error!(
-
                        "Received unexpected message `fetch-ok` from peer {}",
-
                        peer.id
-
                    );
-
                    return Err(session::Error::Misbehavior);
-
                };
-

-
                if *requested != rid {
-
                    error!(
-
                        "Received `fetch-ok` from {} for incorrect repository {rid}",
-
                        peer.id
-
                    );
-
                    return Err(session::Error::Misbehavior);
-
                }
-
                let namespaces = namespaces.clone();
-

-
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");
-

-
                // Instruct the transport to handover the socket to the worker.
-
                self.reactor
-
                    .fetch(peer, rid, FetchDirection::Initiator { namespaces });
-
            }
            (session::State::Attempted { .. } | session::State::Initial, msg) => {
                error!(target: "service", "Received {:?} from connecting peer {}", msg, peer.id);
            }
@@ -1299,19 +1202,6 @@ where
        Ok(())
    }

-
    fn switch_to_gossip(&mut self, remote: NodeId) {
-
        if let Some(session) = self.sessions.get_mut(&remote) {
-
            // Transition session back to gossip protocol.
-
            session.to_gossip();
-
            // Drain any messages in the session's outbox, which might
-
            // have accumulated during a fetch, and send them to the
-
            // peer.
-
            self.reactor.drain(session);
-
        } else {
-
            log::debug!(target: "service", "Session not found for {remote}");
-
        }
-
    }
-

    fn sync_and_announce(&mut self) {
        match self.sync_inventory() {
            Ok(updated) => {
@@ -1331,16 +1221,6 @@ where
        }
    }

-
    /// Execute the next pending fetch with `remote`, if any.
-
    fn process_fetch_queue(&mut self, remote: &NodeId) {
-
        if let Some(session) = self.sessions.get_mut(remote) {
-
            if let Some(rid) = session.dequeue_fetch() {
-
                debug!(target: "service", "Dequeued a pending fetch {rid} with {remote}");
-
                self.fetch(rid, remote);
-
            }
-
        }
-
    }
-

    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
        if let Some(sess) = self.sessions.get_mut(&nid) {
            sess.to_initial();
@@ -1370,7 +1250,6 @@ where
        pub struct Stats {
            connected: usize,
            disconnected: usize,
-
            fetching: usize,
        }

        let (stats, seeds) = match self.routing.get(rid) {
@@ -1378,10 +1257,7 @@ where
                (Stats::default(), Seeds::default()),
                |(mut stats, mut seeds), node| {
                    if node != self.node_id() {
-
                        if self.sessions.is_fetching(&node) {
-
                            seeds.insert(Seed::Fetching(node));
-
                            stats.fetching += 1;
-
                        } else if self.sessions.is_connected(&node) {
+
                        if self.sessions.is_connected(&node) {
                            seeds.insert(Seed::Connected(node));
                            stats.connected += 1;
                        } else if self.sessions.is_disconnected(&node) {
@@ -1399,8 +1275,8 @@ where
        };
        debug!(
            target: "service",
-
            "Found {} connected seed(s), {} disconnected seed(s), and {} fetching seed(s) for {}",
-
            stats.connected, stats.disconnected, stats.fetching, rid
+
            "Found {} connected seed(s) and {} disconnected seed(s) for {}",
+
            stats.connected, stats.disconnected,  rid
        );

        Ok(seeds)
@@ -1788,10 +1664,6 @@ impl Sessions {
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
    }

-
    pub fn is_fetching(&self, id: &NodeId) -> bool {
-
        self.0.get(id).map(|s| s.is_fetching()).unwrap_or(false)
-
    }
-

    /// Return whether this node can be connected to.
    pub fn is_disconnected(&self, id: &NodeId) -> bool {
        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
modified radicle-node/src/service/message.rs
@@ -341,12 +341,6 @@ pub enum Message {
        /// The pong payload.
        zeroes: ZeroBytes,
    },
-

-
    /// Request a session upgrade to the Git protocol and fetch the given repository.
-
    Fetch { rid: Id },
-

-
    /// Accept a fetch request.
-
    FetchOk { rid: Id },
}

impl PartialOrd for Message {
@@ -442,8 +436,6 @@ impl fmt::Debug for Message {
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
            Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
-
            Self::Fetch { rid } => write!(f, "Fetch({rid})"),
-
            Self::FetchOk { rid } => write!(f, "FetchOk({rid})"),
        }
    }
}
modified radicle-node/src/service/reactor.rs
@@ -1,5 +1,4 @@
-
use std::collections::{HashMap, VecDeque};
-
use std::mem;
+
use std::collections::VecDeque;

use log::*;

@@ -19,52 +18,16 @@ pub enum Io {
    /// Disconnect from a peer.
    Disconnect(NodeId, DisconnectReason),
    /// Fetch repository data from a peer.
-
    Fetch(Fetch),
-
    /// Ask for a wakeup in a specified amount of time.
-
    Wakeup(LocalDuration),
-
}
-

-
/// Fetch job sent to worker thread.
-
#[derive(Debug, Clone)]
-
pub struct Fetch {
-
    /// Repo to fetch.
-
    pub rid: Id,
-
    /// Indicates whether the fetch request was initiated or is a response.
-
    pub direction: FetchDirection,
-
    /// Remote peer we are interacting with.
-
    pub remote: NodeId,
-
}
-

-
impl Fetch {
-
    pub fn is_initiator(&self) -> bool {
-
        self.direction.is_initiator()
-
    }
-

-
    pub fn initiated(&self) -> Option<&Namespaces> {
-
        match &self.direction {
-
            FetchDirection::Initiator { namespaces } => Some(namespaces),
-
            FetchDirection::Responder => None,
-
        }
-
    }
-
}
-

-
#[derive(Debug, Clone)]
-
pub enum FetchDirection {
-
    /// Client is initiating a fetch in order to receive the specified
-
    /// `refspecs` determined by [`Namespaces`].
-
    Initiator {
-
        /// Namespaces to fetch.
+
    Fetch {
+
        /// Repo being fetched.
+
        rid: Id,
+
        /// Remote node being fetched from.
+
        remote: NodeId,
+
        /// Namespaces being fetched.
        namespaces: Namespaces,
    },
-
    /// Server is responding to a fetch request by uploading the
-
    /// specified `refspecs` sent by the client.
-
    Responder,
-
}
-

-
impl FetchDirection {
-
    pub fn is_initiator(&self) -> bool {
-
        matches!(self, Self::Initiator { .. })
-
    }
+
    /// Ask for a wakeup in a specified amount of time.
+
    Wakeup(LocalDuration),
}

/// Interface to the network reactor.
@@ -72,10 +35,6 @@ impl FetchDirection {
pub struct Reactor {
    /// Outgoing I/O queue.
    io: VecDeque<Io>,
-
    /// Message outbox for each node.
-
    /// If messages can't be sent to a node immediately, they are stored in the outbox.
-
    /// This can happen if for eg. a fetch is ongoing with that node.
-
    outbox: HashMap<NodeId, Vec<Message>>,
}

impl Reactor {
@@ -90,71 +49,36 @@ impl Reactor {
    }

    pub fn write(&mut self, remote: &Session, msg: Message) {
-
        // If we've requested a fetch or are currently fetching, any message to be written
-
        // to the remote peer should be queued.
-
        if remote.is_requesting() || remote.is_fetching() {
-
            debug!(target: "service", "Queue {:?} for {}", &msg, remote);
-
            self.outbox.entry(remote.id).or_default().push(msg);
-
        } else {
-
            debug!(target: "service", "Write {:?} to {}", &msg, remote);
-
            self.io.push_back(Io::Write(remote.id, vec![msg]));
-
        }
+
        debug!(target: "service", "Write {:?} to {}", &msg, remote);
+
        self.io.push_back(Io::Write(remote.id, vec![msg]));
    }

    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();
-
        let queue = remote.is_fetching() || remote.is_requesting();

        for (ix, msg) in msgs.iter().enumerate() {
-
            if queue {
-
                debug!(
-
                    target: "service",
-
                    "Queue {:?} for {} ({}/{})",
-
                    msg,
-
                    remote,
-
                    ix + 1,
-
                    msgs.len()
-
                );
-
            } else {
-
                debug!(
-
                    target: "service",
-
                    "Write {:?} to {} ({}/{})",
-
                    msg,
-
                    remote,
-
                    ix + 1,
-
                    msgs.len()
-
                );
-
            }
-
        }
-
        if queue {
-
            self.outbox.entry(remote.id).or_default().extend(msgs);
-
        } else {
-
            self.io.push_back(Io::Write(remote.id, msgs));
-
        }
-
    }
-

-
    pub fn drain(&mut self, remote: &Session) {
-
        if let Some(outbox) = self.outbox.get_mut(&remote.id) {
-
            debug!(target: "service", "Draining outbox for session {} ({} message(s))", remote.id, outbox.len());
-

-
            let msgs = mem::take(outbox);
-
            self.write_all(remote, msgs);
+
            debug!(
+
                target: "service",
+
                "Write {:?} to {} ({}/{})",
+
                msg,
+
                remote,
+
                ix + 1,
+
                msgs.len()
+
            );
        }
+
        self.io.push_back(Io::Write(remote.id, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
        self.io.push_back(Io::Wakeup(after));
    }

-
    pub fn fetch(&mut self, remote: &mut Session, rid: Id, direction: FetchDirection) {
-
        // Transition the session state machine to "fetching".
-
        remote.to_fetching(rid);
-

-
        self.io.push_back(Io::Fetch(Fetch {
+
    pub fn fetch(&mut self, remote: &mut Session, rid: Id, namespaces: Namespaces) {
+
        self.io.push_back(Io::Fetch {
            rid,
-
            direction,
+
            namespaces,
            remote: remote.id,
-
        }));
+
        });
    }

    /// Broadcast a message to a list of peers.
modified radicle-node/src/service/session.rs
@@ -1,8 +1,6 @@
-
use std::collections::VecDeque;
+
use std::collections::HashSet;
use std::fmt;

-
use radicle::storage::Namespaces;
-

use crate::service::message;
use crate::service::message::Message;
use crate::service::{Id, LocalTime, NodeId, Reactor, Rng};
@@ -19,35 +17,6 @@ pub enum PingState {
    Ok,
}

-
/// Sub-state of the gossip protocol.
-
#[derive(Debug, Default, PartialEq, Eq, Clone)]
-
pub enum GossipState {
-
    /// Regular gossip, no pending fetch requests.
-
    #[default]
-
    Idle,
-
    /// Requesting a fetch for the given RID. Waiting for a [`Message::FetchOk`].
-
    Requesting { rid: Id, namespaces: Namespaces },
-
}
-

-
/// Session protocol.
-
#[derive(Debug, PartialEq, Eq, Clone)]
-
pub enum Protocol {
-
    /// The default message-based gossip protocol.
-
    Gossip { state: GossipState },
-
    /// Git smart protocol. Used for fetching repository data.
-
    /// This protocol is used after a connection upgrade via the
-
    /// [`Message::Fetch`] message.
-
    Fetch { rid: Id },
-
}
-

-
impl Default for Protocol {
-
    fn default() -> Self {
-
        Self::Gossip {
-
            state: GossipState::default(),
-
        }
-
    }
-
}
-

#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum State {
@@ -61,8 +30,8 @@ pub enum State {
        since: LocalTime,
        /// Ping state.
        ping: PingState,
-
        /// Session protocol.
-
        protocol: Protocol,
+
        /// Ongoing fetches.
+
        fetching: HashSet<Id>,
    },
    /// When a peer is disconnected.
    Disconnected {
@@ -82,23 +51,9 @@ impl fmt::Display for State {
            Self::Attempted => {
                write!(f, "attempted")
            }
-
            Self::Connected { protocol, .. } => match protocol {
-
                Protocol::Gossip {
-
                    state: GossipState::Idle,
-
                    ..
-
                } => {
-
                    write!(f, "connected <gossip>")
-
                }
-
                Protocol::Gossip {
-
                    state: GossipState::Requesting { rid, .. },
-
                    ..
-
                } => {
-
                    write!(f, "connected <gossip> requested={rid}")
-
                }
-
                Protocol::Fetch { .. } => {
-
                    write!(f, "connected <fetch>")
-
                }
-
            },
+
            Self::Connected { .. } => {
+
                write!(f, "connected")
+
            }
            Self::Disconnected { .. } => {
                write!(f, "disconnected")
            }
@@ -109,10 +64,10 @@ impl fmt::Display for State {
/// Return value of [`Session::fetch`].
#[derive(Debug)]
pub enum FetchResult {
-
    /// We are already fetching from this peer.
-
    AlreadyFetching(Id),
+
    /// We are already fetching the given repo from this peer.
+
    AlreadyFetching,
    /// Ok, ready to fetch.
-
    Ready(Message),
+
    Ready,
    /// This peer is not ready to fetch.
    NotConnected,
}
@@ -164,9 +119,6 @@ pub struct Session {
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,

-
    /// Fetches queued due to another ongoing fetch.
-
    pending_fetches: VecDeque<Id>,
-

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
    /// upon successful connection.
@@ -205,7 +157,6 @@ impl Session {
            persistent,
            last_active: LocalTime::default(),
            attempts: 1,
-
            pending_fetches: VecDeque::new(),
            rng,
        }
    }
@@ -216,14 +167,13 @@ impl Session {
            state: State::Connected {
                since: time,
                ping: PingState::default(),
-
                protocol: Protocol::default(),
+
                fetching: HashSet::default(),
            },
            link: Link::Inbound,
            subscribe: None,
            persistent,
            last_active: LocalTime::default(),
            attempts: 0,
-
            pending_fetches: VecDeque::new(),
            rng,
        }
    }
@@ -236,16 +186,6 @@ impl Session {
        matches!(self.state, State::Connected { .. })
    }

-
    pub fn is_fetching(&self) -> bool {
-
        matches!(
-
            self.state,
-
            State::Connected {
-
                protocol: Protocol::Fetch { .. },
-
                ..
-
            }
-
        )
-
    }
-

    pub fn is_disconnected(&self) -> bool {
        matches!(self.state, State::Disconnected { .. })
    }
@@ -254,64 +194,26 @@ impl Session {
        matches!(self.state, State::Initial)
    }

-
    pub fn is_requesting(&self) -> bool {
-
        matches!(
-
            self.state,
-
            State::Connected {
-
                protocol: Protocol::Gossip {
-
                    state: GossipState::Requesting { .. }
-
                },
-
                ..
-
            }
-
        )
-
    }
-

    pub fn attempts(&self) -> usize {
        self.attempts
    }

-
    pub fn fetch(&self, rid: Id) -> FetchResult {
-
        if let State::Connected { protocol, .. } = &self.state {
-
            match protocol {
-
                Protocol::Gossip { state } => {
-
                    if let GossipState::Requesting { rid, .. } = state {
-
                        FetchResult::AlreadyFetching(*rid)
-
                    } else {
-
                        FetchResult::Ready(Message::Fetch { rid })
-
                    }
-
                }
-
                Protocol::Fetch { rid } => FetchResult::AlreadyFetching(*rid),
+
    pub fn fetch(&mut self, rid: Id) -> FetchResult {
+
        if let State::Connected { fetching, .. } = &mut self.state {
+
            if fetching.insert(rid) {
+
                FetchResult::Ready
+
            } else {
+
                FetchResult::AlreadyFetching
            }
        } else {
            FetchResult::NotConnected
        }
    }

-
    pub fn to_requesting(&mut self, rid: Id, namespaces: Namespaces) {
-
        let State::Connected { protocol, .. } = &mut self.state else {
-
            panic!("Session::to_requesting: cannot transition to 'requesting': session is not connected");
-
        };
-
        *protocol = Protocol::Gossip {
-
            state: GossipState::Requesting { rid, namespaces },
-
        };
-
    }
-

-
    pub fn to_fetching(&mut self, rid: Id) {
-
        let State::Connected { protocol, .. } = &mut self.state else {
-
            panic!("Session::to_fetching: cannot transition to 'fetching': session is not connected");
-
        };
-
        *protocol = Protocol::Fetch { rid };
-
    }
-

-
    pub fn to_gossip(&mut self) {
-
        if let State::Connected { protocol, .. } = &mut self.state {
-
            if let Protocol::Fetch { .. } = protocol {
-
                *protocol = Protocol::default();
-
            } else {
-
                panic!(
-
                    "Unexpected session state for {}: expected 'fetch' protocol, got 'gossip'",
-
                    self.id
-
                );
+
    pub fn fetched(&mut self, rid: Id) {
+
        if let State::Connected { fetching, .. } = &mut self.state {
+
            if !fetching.remove(&rid) {
+
                log::error!(target: "service", "Fetched unknown repository {rid}");
            }
        }
    }
@@ -334,7 +236,7 @@ impl Session {
        self.state = State::Connected {
            since,
            ping: PingState::default(),
-
            protocol: Protocol::default(),
+
            fetching: HashSet::default(),
        };
    }

@@ -354,18 +256,11 @@ impl Session {
        self.state = State::Initial;
    }

-
    pub fn requesting(&self) -> Option<(&Id, &Namespaces)> {
-
        if let State::Connected {
-
            protocol:
-
                Protocol::Gossip {
-
                    state: GossipState::Requesting { rid, namespaces },
-
                },
-
            ..
-
        } = &self.state
-
        {
-
            Some((rid, namespaces))
+
    pub fn fetching(&self) -> HashSet<Id> {
+
        if let State::Connected { fetching, .. } = &self.state {
+
            fetching.clone()
        } else {
-
            None
+
            HashSet::default()
        }
    }

@@ -378,12 +273,4 @@ impl Session {
        }
        Ok(())
    }
-

-
    pub(crate) fn queue_fetch(&mut self, rid: Id) {
-
        self.pending_fetches.push_back(rid);
-
    }
-

-
    pub(crate) fn dequeue_fetch(&mut self) -> Option<Id> {
-
        self.pending_fetches.pop_front()
-
    }
}
modified radicle-node/src/test/arbitrary.rs
@@ -93,7 +93,6 @@ impl Arbitrary for Message {
            MessageType::Pong => Self::Pong {
                zeroes: ZeroBytes::new(u16::arbitrary(g).min(Ping::MAX_PONG_ZEROES)),
            },
-
            _ => unreachable!(),
        }
    }
}
modified radicle-node/src/test/environment.rs
@@ -114,7 +114,7 @@ pub struct NodeHandle<G: Signer + cyphernet::Ecdh + 'static> {
    pub home: Home,
    pub addr: net::SocketAddr,
    pub thread: ManuallyDrop<thread::JoinHandle<Result<(), runtime::Error>>>,
-
    pub handle: ManuallyDrop<Handle<G>>,
+
    pub handle: ManuallyDrop<Handle>,
}

impl<G: Signer + cyphernet::Ecdh + 'static> Drop for NodeHandle<G> {
modified radicle-node/src/test/simulator.rs
@@ -14,9 +14,9 @@ use log::*;

use crate::crypto::Signer;
use crate::git::raw as git;
-
use crate::prelude::Address;
+
use crate::prelude::{Address, Id};
use crate::service::reactor::Io;
-
use crate::service::{DisconnectReason, Event, Fetch, Message, NodeId};
+
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::{Namespaces, RefUpdate};
use crate::storage::{WriteRepository, WriteStorage};
use crate::test::peer::Service;
@@ -63,7 +63,12 @@ pub enum Input {
    /// Received a message from a remote peer.
    Received(NodeId, Vec<Message>),
    /// Fetch completed for a node.
-
    Fetched(Fetch, Rc<Result<Vec<RefUpdate>, FetchError>>),
+
    Fetched(
+
        Id,
+
        Namespaces,
+
        NodeId,
+
        Rc<Result<Vec<RefUpdate>, FetchError>>,
+
    ),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
}
@@ -105,15 +110,8 @@ impl fmt::Display for Scheduled {
            Input::Wake => {
                write!(f, "{}: Tock", self.node)
            }
-
            Input::Fetched(fetch, _) => {
-
                write!(
-
                    f,
-
                    "{} <<~ {} ({}): Fetched (initiated={})",
-
                    self.node,
-
                    fetch.remote,
-
                    fetch.rid,
-
                    fetch.is_initiator()
-
                )
+
            Input::Fetched(rid, _, nid, _) => {
+
                write!(f, "{} <<~ {} ({}): Fetched", self.node, nid, rid)
            }
        }
    }
@@ -411,17 +409,15 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            p.received_message(id, msg);
                        }
                    }
-
                    Input::Fetched(f, result) => {
+
                    Input::Fetched(rid, ns, nid, result) => {
                        let result = Rc::try_unwrap(result).unwrap();
-
                        if let Some(namespaces) = f.initiated() {
-
                            let mut repo = match p.storage().repository_mut(f.rid) {
-
                                Ok(repo) => repo,
-
                                Err(e) if e.is_not_found() => p.storage().create(f.rid).unwrap(),
-
                                Err(e) => panic!("Failed to open repository: {e}"),
-
                            };
-
                            fetch(&mut repo, &f.remote, namespaces.clone()).unwrap();
-
                        }
-
                        p.fetched(f, result);
+
                        let mut repo = match p.storage().repository_mut(rid) {
+
                            Ok(repo) => repo,
+
                            Err(e) if e.is_not_found() => p.storage().create(rid).unwrap(),
+
                            Err(e) => panic!("Failed to open repository: {e}"),
+
                        };
+
                        fetch(&mut repo, &nid, ns.clone()).unwrap();
+
                        p.fetched(rid, ns, nid, result);
                    }
                }
                while let Some(o) = p.next() {
@@ -606,22 +602,16 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    );
                }
            }
-
            Io::Fetch(fetch) => {
-
                let remote = fetch.remote;
-

-
                if fetch.is_initiator() {
-
                    log::info!(
-
                        target: "sim",
-
                        "{:05} {} ~> {} ({}): Fetch outgoing",
-
                        self.elapsed().as_millis(), node, remote, fetch.rid
-
                    );
-
                } else {
-
                    log::info!(
-
                        target: "sim",
-
                        "{:05} {} <~ {} ({}): Fetch incoming",
-
                        self.elapsed().as_millis(), node, remote, fetch.rid
-
                    );
-
                }
+
            Io::Fetch {
+
                rid,
+
                remote,
+
                namespaces,
+
            } => {
+
                log::info!(
+
                    target: "sim",
+
                    "{:05} {} ~> {} ({}): Fetch outgoing",
+
                    self.elapsed().as_millis(), node, remote, rid
+
                );

                if self.is_fallible() {
                    self.inbox.insert(
@@ -630,7 +620,9 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            node,
                            remote,
                            input: Input::Fetched(
-
                                fetch,
+
                                rid,
+
                                namespaces,
+
                                remote,
                                Rc::new(Err(FetchError::Io(io::ErrorKind::Other.into()))),
                            ),
                        },
@@ -641,7 +633,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        Scheduled {
                            node,
                            remote,
-
                            input: Input::Fetched(fetch, Rc::new(Ok(vec![]))),
+
                            input: Input::Fetched(rid, namespaces, remote, Rc::new(Ok(vec![]))),
                        },
                    );
                }
modified radicle-node/src/tests.rs
@@ -17,13 +17,12 @@ use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
use crate::service::message::*;
-
use crate::service::reactor::FetchDirection;
use crate::service::reactor::Io;
use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::transport::{local, remote};
use crate::storage::git::Storage;
-
use crate::storage::{Namespaces, ReadStorage};
+
use crate::storage::ReadStorage;
use crate::test::arbitrary;
use crate::test::assert_matches;
use crate::test::fixtures;
@@ -707,10 +706,7 @@ fn test_refs_announcement_fetch_trusted_no_inventory() {
    alice.receive(bob.id(), bob.refs_announcement(rid));

    // Alice fetches Bob's refs as this is a new repo.
-
    assert_eq!(
-
        alice.messages(bob.id()).next(),
-
        Some(Message::Fetch { rid })
-
    );
+
    assert_matches!(alice.outbox().next(), Some(Io::Fetch { .. }));
}

/// Alice and Bob both have the same repo.
@@ -758,7 +754,7 @@ fn test_refs_announcement_trusted() {
    // Bob announces refs again.
    bob.elapse(LocalDuration::from_mins(1)); // Make sure our announcement is fresh.
    alice.receive(bob.id(), bob.refs_announcement(rid));
-
    assert_matches!(alice.messages(bob.id()).next(), Some(Message::Fetch { .. }));
+
    assert_matches!(alice.outbox().next(), Some(Io::Fetch { .. }));
}

#[test]
@@ -779,86 +775,6 @@ fn test_refs_announcement_no_subscribe() {
}

#[test]
-
fn test_gossip_during_fetch() {
-
    let storage = arbitrary::nonempty_storage(1);
-
    let rid = *storage.inventory.keys().next().unwrap();
-
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
-
    let bob = Peer::new("bob", [8, 8, 8, 8]);
-
    let eve = Peer::new("eve", [9, 9, 9, 9]);
-
    let now = LocalTime::now().as_millis();
-
    let (send, _recv) = chan::bounded::<node::FetchResult>(1);
-
    let inventory1 = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
-
    let inventory2 = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
-

-
    alice.connect_to(&bob);
-
    alice.connect_to(&eve);
-
    alice.command(Command::Fetch(rid, bob.id, send));
-

-
    assert_matches!(alice.messages(bob.id).next(), Some(Message::Fetch { .. }));
-

-
    logger::init(log::Level::Debug);
-

-
    alice.receive(
-
        eve.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inventory1.clone(),
-
                timestamp: now + 1,
-
            },
-
            eve.signer(),
-
        ),
-
    );
-
    // We shouldn't relay to Bob while we're fetching from him.
-
    assert_matches!(alice.messages(bob.id).next(), None);
-

-
    alice.receive(bob.id(), Message::FetchOk { rid });
-
    alice.receive(
-
        eve.id(),
-
        Message::inventory(
-
            InventoryAnnouncement {
-
                inventory: inventory2.clone(),
-
                timestamp: now + 2,
-
            },
-
            eve.signer(),
-
        ),
-
    );
-
    // We shouldn't relay to Bob while we're fetching from him.
-
    assert_matches!(alice.messages(bob.id).next(), None);
-

-
    // Have enough time pass that Alice sends a "ping" to Bob.
-
    alice.elapse(KEEP_ALIVE_DELTA);
-

-
    // Now that the fetch is done, the messages Bob missed should be relayed to him.
-
    alice.fetched(
-
        Fetch {
-
            rid,
-
            direction: FetchDirection::Initiator {
-
                namespaces: Namespaces::All,
-
            },
-
            remote: bob.id,
-
        },
-
        Ok(vec![]),
-
    );
-
    let mut messages = alice.messages(bob.id);
-

-
    assert_matches!(
-
        messages.next(),
-
        Some(Message::Announcement(Announcement {
-
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, .. }),
-
            ..
-
        })) if inventory == inventory1
-
    );
-
    assert_matches!(
-
        messages.next(),
-
        Some(Message::Announcement(Announcement {
-
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { inventory, .. }),
-
            ..
-
        })) if inventory == inventory2
-
    );
-
    assert_matches!(messages.next(), Some(Message::Ping { .. }));
-
}
-

-
#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
@@ -1155,12 +1071,12 @@ fn test_fetch_missing_inventory() {

    alice.elapse(service::SYNC_INTERVAL);
    alice
-
        .messages(bob.id)
-
        .find(|m| matches!(m, Message::Fetch { .. }))
+
        .outbox()
+
        .find(|m| matches!(m, Io::Fetch { .. }))
        .unwrap();
    alice
-
        .messages(eve.id)
-
        .find(|m| matches!(m, Message::Fetch { .. }))
+
        .outbox()
+
        .find(|m| matches!(m, Io::Fetch { .. }))
        .unwrap();
}

@@ -1356,89 +1272,3 @@ fn prop_inventory_exchange_dense() {
        .tests(20)
        .quickcheck(property as fn(MockStorage, MockStorage, MockStorage));
}
-

-
#[test]
-
fn test_queued_fetch() {
-
    let storage = arbitrary::nonempty_storage(3);
-
    let mut repo_keys = storage.inventory.keys();
-
    let rid = *repo_keys.next().unwrap();
-
    let rid2 = *repo_keys.next().unwrap();
-
    let rid3 = *repo_keys.next().unwrap();
-
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
-
    let bob = Peer::new("bob", [8, 8, 8, 8]);
-
    let (send, _recv) = chan::bounded::<node::FetchResult>(1);
-

-
    logger::init(log::Level::Debug);
-

-
    // Send the first fetch.
-
    alice.connect_to(&bob);
-
    alice.command(Command::Fetch(rid, bob.id, send));
-

-
    assert_matches!(alice.messages(bob.id).next(), Some(Message::Fetch { .. }));
-

-
    // Send the 2nd fetch that will be queued.
-
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid2, bob.id, send2));
-

-
    // Send the 3rd fetch that will be queued.
-
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
-
    alice.command(Command::Fetch(rid3, bob.id, send3));
-

-
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
-
    assert_matches!(alice.messages(bob.id).next(), None);
-

-
    alice.receive(bob.id(), Message::FetchOk { rid });
-
    assert_matches!(alice.messages(bob.id).next(), None);
-

-
    // Have enough time pass that Alice sends a "ping" to Bob.
-
    alice.elapse(KEEP_ALIVE_DELTA);
-

-
    // Finish the 1st fetch.
-
    alice.fetched(
-
        Fetch {
-
            rid,
-
            direction: FetchDirection::Initiator {
-
                namespaces: Namespaces::All,
-
            },
-
            remote: bob.id,
-
        },
-
        Ok(vec![]),
-
    );
-

-
    // Now the 1st fetch is done, the gossip messages are drained.
-
    let mut messages = alice.messages(bob.id);
-
    assert_matches!(messages.next(), Some(Message::Ping(_)));
-

-
    // The message after all queued gossip messages is Fetch.
-
    assert_eq!(messages.last(), Some(Message::Fetch { rid: rid2 }));
-

-
    // `FetchOk` for the 2nd fetch.
-
    alice.receive(bob.id(), Message::FetchOk { rid: rid2 });
-

-
    // The 2nd fetch should be in `Io` now. Not the 3rd fetch yet.
-
    let last_io = alice.outbox().last().unwrap();
-
    assert_matches!(last_io, Io::Fetch(fetch) if fetch.rid == rid2);
-

-
    // Finish the 2nd fetch.
-
    alice.fetched(
-
        Fetch {
-
            rid: rid2,
-
            direction: FetchDirection::Initiator {
-
                namespaces: Namespaces::All,
-
            },
-
            remote: bob.id,
-
        },
-
        Ok(vec![]),
-
    );
-

-
    // Now the 2nd fetch is done, the 3rd fetch is drained.
-
    let mut messages = alice.messages(bob.id);
-
    assert_eq!(messages.next(), Some(Message::Fetch { rid: rid3 }));
-

-
    // `FetchOk` for the 3rd fetch.
-
    alice.receive(bob.id(), Message::FetchOk { rid: rid3 });
-

-
    // The 3rd fetch should be in `Io` now.
-
    let last_io = alice.outbox().last().unwrap();
-
    assert_matches!(last_io, Io::Fetch(fetch) if fetch.rid == rid3);
-
}
modified radicle-node/src/wire.rs
@@ -1,6 +1,9 @@
+
mod frame;
mod message;
mod protocol;
+
mod varint;

+
pub use frame::StreamId;
pub use message::{AddressType, MessageType};
pub use protocol::{Control, Wire, WireReader, WireSession, WireWriter};

@@ -40,8 +43,14 @@ pub enum Error {
    InvalidSize { expected: usize, actual: usize },
    #[error("invalid filter size: {0}")]
    InvalidFilterSize(usize),
+
    #[error("invalid channel type {0:x}")]
+
    InvalidStreamKind(u8),
    #[error(transparent)]
    InvalidRefName(#[from] fmt::Error),
+
    #[error("invalid control message with type `{0}`")]
+
    InvalidControlMessage(u8),
+
    #[error("invalid protocol version header `{0:x?}`")]
+
    InvalidProtocolVersion([u8; 4]),
    #[error("unknown address type `{0}`")]
    UnknownAddressType(u8),
    #[error("unknown message type `{0}`")]
added radicle-node/src/wire/frame.rs
@@ -0,0 +1,359 @@
+
//! Framing protocol.
+
#![warn(clippy::missing_docs_in_private_items)]
+
use std::{fmt, io};
+

+
use crate::{wire, wire::varint, wire::varint::VarInt, wire::Message, Link};
+

+
/// Protocol version strings all start with the magic sequence `rad`, followed
+
/// by a version number.
+
pub const PROTOCOL_VERSION: Version = Version([b'r', b'a', b'd', 0x1]);
+

+
/// Control open byte.
+
const CONTROL_OPEN: u8 = 0;
+
/// Control close byte.
+
const CONTROL_CLOSE: u8 = 1;
+
/// Control EOF byte.
+
const CONTROL_EOF: u8 = 2;
+

+
/// Protocol version.
+
pub struct Version([u8; 4]);
+

+
impl wire::Encode for Version {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        writer.write_all(&PROTOCOL_VERSION.0)?;
+

+
        Ok(PROTOCOL_VERSION.0.len())
+
    }
+
}
+

+
impl wire::Decode for Version {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let mut version = [0u8; 4];
+
        reader.read_exact(&mut version[..])?;
+

+
        if version != PROTOCOL_VERSION.0 {
+
            return Err(wire::Error::InvalidProtocolVersion(version));
+
        }
+
        Ok(Self(version))
+
    }
+
}
+

+
/// Identifies a (multiplexed) stream.
+
///
+
/// Stream IDs are variable-length integers with the least significant 3 bits
+
/// denoting the stream type and initiator.
+
///
+
/// The first bit denotes the initiator (outbound or inbound), while the second
+
/// and third bit denote the stream type. See `StreamKind`.
+
///
+
/// In a situation where Alice connects to Bob, Alice will have the initiator
+
/// bit set to `1` for all streams she creates, while Bob will have it set to `0`.
+
///
+
/// This ensures that Stream IDs never collide.
+
/// Additionally, Stream IDs must never be re-used within a connection.
+
///
+
/// +=======+==================================+
+
/// | Bits  | Stream Type                      |
+
/// +=======+==================================+
+
/// | 0b000 | Outbound Control stream          |
+
/// +-------+----------------------------------+
+
/// | 0b001 | Inbound Control stream           |
+
/// +-------+----------------------------------+
+
/// | 0b010 | Outbound Gossip stream           |
+
/// +-------+----------------------------------+
+
/// | 0b011 | Inbound Gossip stream            |
+
/// +-------+----------------------------------+
+
/// | 0b100 | Outbound Git stream              |
+
/// +-------+----------------------------------+
+
/// | 0b101 | Inbound Git stream               |
+
/// +-------+----------------------------------+
+
///
+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+
pub struct StreamId(VarInt);
+

+
impl StreamId {
+
    /// Get the initiator of this stream.
+
    pub fn link(&self) -> Link {
+
        let n = *self.0;
+
        if 0b1 & n == 0 {
+
            Link::Outbound
+
        } else {
+
            Link::Inbound
+
        }
+
    }
+

+
    /// Get the kind of stream this is.
+
    pub fn kind(&self) -> Result<StreamKind, u8> {
+
        let id = *self.0;
+
        match (id >> 1) & 0b11 {
+
            0 => Ok(StreamKind::Control),
+
            1 => Ok(StreamKind::Gossip),
+
            2 => Ok(StreamKind::Git),
+
            n => Err(n as u8),
+
        }
+
    }
+

+
    /// Create a control identifier.
+
    pub fn control(link: Link) -> Self {
+
        match link {
+
            Link::Outbound => Self(VarInt::from(0b000u8)),
+
            Link::Inbound => Self(VarInt::from(0b001u8)),
+
        }
+
    }
+

+
    /// Create a gossip identifier.
+
    pub fn gossip(link: Link) -> Self {
+
        match link {
+
            Link::Outbound => Self(VarInt::from(0b010u8)),
+
            Link::Inbound => Self(VarInt::from(0b011u8)),
+
        }
+
    }
+

+
    /// Create a git identifier.
+
    pub fn git(link: Link) -> Self {
+
        match link {
+
            Link::Outbound => Self(VarInt::from(0b100u8)),
+
            Link::Inbound => Self(VarInt::from(0b101u8)),
+
        }
+
    }
+

+
    /// Get the nth identifier while preserving the stream type and initiator.
+
    pub fn nth(self, n: u64) -> Result<Self, varint::BoundsExceeded> {
+
        let id = *self.0 + (n << 3);
+
        VarInt::new(id).map(Self)
+
    }
+
}
+

+
impl From<StreamId> for u64 {
+
    fn from(value: StreamId) -> Self {
+
        *value.0
+
    }
+
}
+

+
impl From<StreamId> for VarInt {
+
    fn from(value: StreamId) -> Self {
+
        value.0
+
    }
+
}
+

+
impl fmt::Display for StreamId {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "{}", *self.0)
+
    }
+
}
+

+
impl wire::Decode for StreamId {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let id = VarInt::decode(reader)?;
+
        Ok(Self(id))
+
    }
+
}
+

+
impl wire::Encode for StreamId {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        self.0.encode(writer)
+
    }
+
}
+

+
/// Type of stream.
+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+
pub enum StreamKind {
+
    /// Control stream, used to open and close streams.
+
    Control,
+
    /// Gossip stream, used to exchange messages.
+
    Gossip,
+
    /// Git stream, used for replication.
+
    Git,
+
}
+

+
/// Protocol frame.
+
///
+
///  0                   1                   2                   3
+
///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
/// |      'r'      |      'a'      |      'd'      |      0x1      | Version
+
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
/// |                     Stream ID                           |TTT|I| Stream ID with Stream [T]ype and [I]nitiator bits
+
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
/// |                     Data                                   ...| Data (variable size)
+
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
pub struct Frame {
+
    /// The protocol version.
+
    pub version: Version,
+
    /// The stream identifier.
+
    pub stream: StreamId,
+
    /// The frame payload.
+
    pub data: FrameData,
+
}
+

+
impl Frame {
+
    /// Create a 'git' protocol frame.
+
    pub fn git(stream: StreamId, data: Vec<u8>) -> Self {
+
        Self {
+
            version: PROTOCOL_VERSION,
+
            stream,
+
            data: FrameData::Git(data),
+
        }
+
    }
+

+
    /// Create a 'control' protocol frame.
+
    pub fn control(link: Link, ctrl: Control) -> Self {
+
        Self {
+
            version: PROTOCOL_VERSION,
+
            stream: StreamId::control(link),
+
            data: FrameData::Control(ctrl),
+
        }
+
    }
+

+
    /// Create a 'gossip' protocol frame.
+
    pub fn gossip(link: Link, msg: Message) -> Self {
+
        Self {
+
            version: PROTOCOL_VERSION,
+
            stream: StreamId::gossip(link),
+
            data: FrameData::Gossip(msg),
+
        }
+
    }
+

+
    /// Serialize frame to bytes.
+
    pub fn to_bytes(&self) -> Vec<u8> {
+
        wire::serialize(self)
+
    }
+
}
+

+
/// Frame payload.
+
pub enum FrameData {
+
    /// Control frame payload.
+
    Control(Control),
+
    /// Gossip frame payload.
+
    Gossip(Message),
+
    /// Git frame payload. May contain packet-lines as well as packfile data.
+
    Git(Vec<u8>),
+
}
+

+
/// A control message sent over a control stream.
+
pub enum Control {
+
    /// Open a new stream.
+
    Open {
+
        /// The stream to open.
+
        stream: StreamId,
+
    },
+
    /// Close an existing stream.
+
    Close {
+
        /// The stream to close.
+
        stream: StreamId,
+
    },
+
    /// Signal an end-of-file. This can be used to simulate connections terminating
+
    /// without having to close the connection. These control messages are turned into
+
    /// [`io::ErrorKind::UnexpectedEof`] errors on read.
+
    Eof {
+
        /// The stream to send an EOF on.
+
        stream: StreamId,
+
    },
+
}
+

+
impl wire::Decode for Control {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let command = u8::decode(reader)?;
+
        match command {
+
            CONTROL_OPEN => {
+
                let stream = StreamId::decode(reader)?;
+
                Ok(Control::Open { stream })
+
            }
+
            CONTROL_CLOSE => {
+
                let stream = StreamId::decode(reader)?;
+
                Ok(Control::Close { stream })
+
            }
+
            CONTROL_EOF => {
+
                let stream = StreamId::decode(reader)?;
+
                Ok(Control::Eof { stream })
+
            }
+
            other => Err(wire::Error::InvalidControlMessage(other)),
+
        }
+
    }
+
}
+

+
impl wire::Encode for Control {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        match self {
+
            Self::Open { stream: id } => {
+
                n += CONTROL_OPEN.encode(writer)?;
+
                n += id.encode(writer)?;
+
            }
+
            Self::Eof { stream: id } => {
+
                n += CONTROL_EOF.encode(writer)?;
+
                n += id.encode(writer)?;
+
            }
+
            Self::Close { stream: id } => {
+
                n += CONTROL_CLOSE.encode(writer)?;
+
                n += id.encode(writer)?;
+
            }
+
        }
+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for Frame {
+
    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let version = Version::decode(reader)?;
+
        let stream = StreamId::decode(reader)?;
+

+
        match stream.kind() {
+
            Ok(StreamKind::Control) => {
+
                let ctrl = Control::decode(reader)?;
+
                let frame = Frame {
+
                    version,
+
                    stream,
+
                    data: FrameData::Control(ctrl),
+
                };
+
                Ok(frame)
+
            }
+
            Ok(StreamKind::Gossip) => {
+
                let msg = Message::decode(reader)?;
+
                let frame = Frame {
+
                    version,
+
                    stream,
+
                    data: FrameData::Gossip(msg),
+
                };
+
                Ok(frame)
+
            }
+
            Ok(StreamKind::Git { .. }) => {
+
                let size = VarInt::decode(reader)?;
+
                let mut data = vec![0; *size as usize];
+
                reader.read_exact(&mut data[..])?;
+

+
                Ok(Frame::git(stream, data))
+
            }
+
            Err(n) => Err(wire::Error::InvalidStreamKind(n)),
+
        }
+
    }
+
}
+

+
impl wire::Encode for Frame {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.version.encode(writer)?;
+
        n += self.stream.encode(writer)?;
+

+
        match &self.data {
+
            FrameData::Control(ctrl) => {
+
                n += ctrl.encode(writer)?;
+
            }
+
            FrameData::Gossip(msg) => {
+
                n += msg.encode(writer)?;
+
            }
+
            FrameData::Git(data) => {
+
                let len = data.len();
+
                let size = VarInt::new(len as u64)
+
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
+
                n += size.encode(writer)?;
+

+
                writer.write_all(data.as_slice())?;
+
                n += len;
+
            }
+
        }
+
        Ok(n)
+
    }
+
}
modified radicle-node/src/wire/message.rs
@@ -19,8 +19,6 @@ pub enum MessageType {
    Subscribe = 8,
    Ping = 10,
    Pong = 12,
-
    Fetch = 14,
-
    FetchOk = 16,
}

impl From<MessageType> for u16 {
@@ -40,8 +38,6 @@ impl TryFrom<u16> for MessageType {
            8 => Ok(MessageType::Subscribe),
            10 => Ok(MessageType::Ping),
            12 => Ok(MessageType::Pong),
-
            14 => Ok(MessageType::Fetch),
-
            16 => Ok(MessageType::FetchOk),
            _ => Err(other),
        }
    }
@@ -62,8 +58,6 @@ impl Message {
            },
            Self::Ping { .. } => MessageType::Ping,
            Self::Pong { .. } => MessageType::Pong,
-
            Self::Fetch { .. } => MessageType::Fetch,
-
            Self::FetchOk { .. } => MessageType::FetchOk,
        }
        .into()
    }
@@ -216,12 +210,6 @@ impl wire::Encode for Message {
            Self::Pong { zeroes } => {
                n += zeroes.encode(writer)?;
            }
-
            Self::Fetch { rid } => {
-
                n += rid.encode(writer)?;
-
            }
-
            Self::FetchOk { rid } => {
-
                n += rid.encode(writer)?;
-
            }
        }

        if n > wire::Size::MAX as usize {
@@ -295,14 +283,6 @@ impl wire::Decode for Message {
                let zeroes = ZeroBytes::decode(reader)?;
                Ok(Self::Pong { zeroes })
            }
-
            Ok(MessageType::Fetch) => {
-
                let rid = Id::decode(reader)?;
-
                Ok(Self::Fetch { rid })
-
            }
-
            Ok(MessageType::FetchOk) => {
-
                let rid = Id::decode(reader)?;
-
                Ok(Self::FetchOk { rid })
-
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }
    }
modified radicle-node/src/wire/protocol.rs
@@ -7,7 +7,7 @@ use std::collections::VecDeque;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::sync::Arc;
-
use std::{fmt, io, net, str};
+
use std::{io, net};

use amplify::Wrapper as _;
use crossbeam_channel as chan;
@@ -18,7 +18,7 @@ use cyphernet::{Digest, EcSk, Ecdh, Sha256};
use localtime::LocalTime;
use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
use netservices::session::{ProtocolArtifact, Socks5Session};
-
use netservices::{NetConnection, NetProtocol, NetReader, NetSession, NetWriter};
+
use netservices::{NetConnection, NetProtocol, NetReader, NetWriter};
use reactor::Timestamp;

use radicle::collections::HashMap;
@@ -27,11 +27,12 @@ use radicle::storage::WriteStorage;

use crate::crypto::Signer;
use crate::prelude::Deserializer;
-
use crate::service::reactor::{Fetch, Io};
-
use crate::service::{session, DisconnectReason, Message, Service};
-
use crate::wire::{Encode, Error};
-
use crate::worker;
-
use crate::worker::{Task, TaskResult};
+
use crate::service::reactor::Io;
+
use crate::service::{session, DisconnectReason, Service};
+
use crate::wire::frame;
+
use crate::wire::frame::{Frame, FrameData, StreamId};
+
use crate::wire::Encode;
+
use crate::worker::{ChannelEvent, Fetch, Task, TaskResult};
use crate::Link;
use crate::{address, service};

@@ -41,22 +42,16 @@ pub const NOISE_XK: HandshakePattern = HandshakePattern {
    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
};

-
#[allow(clippy::large_enum_variant)]
/// Control message used internally between workers, users, and the service.
-
pub enum Control<G: Signer + Ecdh> {
+
#[allow(clippy::large_enum_variant)]
+
#[derive(Debug)]
+
pub enum Control {
    /// Message from the user to the service.
    User(service::Command),
    /// Message from a worker to the service.
-
    Worker(TaskResult<G>),
-
}
-

-
impl<G: Signer + Ecdh> fmt::Debug for Control<G> {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        match self {
-
            Self::User(cmd) => cmd.fmt(f),
-
            Self::Worker(resp) => resp.result.fmt(f),
-
        }
-
    }
+
    Worker(TaskResult),
+
    /// Flush data in the given stream to the remote.
+
    Flush { remote: NodeId, stream: StreamId },
}

/// Peer session type.
@@ -69,6 +64,81 @@ pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::Tcp
/// Reactor action.
type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;

+
/// Worker channels used to send Git frames back and forth.
+
struct WorkerChannels {
+
    /// Send data to the git worker.
+
    sender: chan::Sender<ChannelEvent>,
+
    /// Receive data from the git worker.
+
    receiver: chan::Receiver<ChannelEvent>,
+
}
+

+
/// Streams associated with a connected peer.
+
struct Streams {
+
    /// Active streams and their associated worker channels.
+
    /// Note that the gossip and control streams are not included here as they are always
+
    /// implied to exist.
+
    streams: HashMap<StreamId, WorkerChannels>,
+
    /// Connection direction.
+
    link: Link,
+
    /// Sequence number used to compute the next stream id.
+
    seq: u64,
+
}
+

+
impl Streams {
+
    /// Create a new [`Streams`] object, passing the connection link.
+
    fn new(link: Link) -> Self {
+
        Self {
+
            streams: HashMap::default(),
+
            link,
+
            seq: 0,
+
        }
+
    }
+

+
    /// Get a known stream.
+
    fn get(&self, stream: &StreamId) -> Option<&WorkerChannels> {
+
        self.streams.get(stream)
+
    }
+

+
    /// Open a new stream.
+
    fn open(&mut self) -> (StreamId, WorkerChannels) {
+
        self.seq += 1;
+

+
        let id = StreamId::git(self.link)
+
            .nth(self.seq)
+
            .expect("Streams::open: too many streams");
+
        let channels = self
+
            .register(id)
+
            .expect("Streams::open: stream was already open");
+

+
        (id, channels)
+
    }
+

+
    /// Register an open stream.
+
    fn register(&mut self, stream: StreamId) -> Option<WorkerChannels> {
+
        let (wire_send, wire_recv) = chan::unbounded::<ChannelEvent>();
+
        let (work_send, work_recv) = chan::unbounded::<ChannelEvent>();
+

+
        match self.streams.entry(stream) {
+
            Entry::Vacant(e) => {
+
                e.insert(WorkerChannels {
+
                    sender: wire_send,
+
                    receiver: work_recv,
+
                });
+
                Some(WorkerChannels {
+
                    sender: work_send,
+
                    receiver: wire_recv,
+
                })
+
            }
+
            Entry::Occupied(_) => None,
+
        }
+
    }
+

+
    /// Unregister an open stream.
+
    fn unregister(&mut self, stream: &StreamId) -> Option<WorkerChannels> {
+
        self.streams.remove(stream)
+
    }
+
}
+

/// Peer connection state machine.
enum Peer {
    /// The initial state of an inbound peer before handshake is completed.
@@ -79,8 +149,9 @@ enum Peer {
    /// Peers in this state are handled by the underlying service.
    Connected {
        link: Link,
-
        id: NodeId,
-
        inbox: Deserializer<Message>,
+
        nid: NodeId,
+
        inbox: Deserializer<Frame>,
+
        streams: Streams,
    },
    /// The peer was scheduled for disconnection. Once the transport is handed over
    /// by the reactor, we can consider it disconnected.
@@ -88,16 +159,6 @@ enum Peer {
        id: Option<NodeId>,
        reason: DisconnectReason,
    },
-
    /// The state after we've started the process of upgraded the peer for a fetch.
-
    /// The request to handover the socket was made to the reactor.
-
    Upgrading {
-
        fetch: Fetch,
-
        link: Link,
-
        id: NodeId,
-
        inbox: Vec<u8>,
-
    },
-
    /// The peer is now upgraded and we are in control of the socket.
-
    Upgraded { link: Link, id: NodeId },
}

impl std::fmt::Debug for Peer {
@@ -105,16 +166,8 @@ impl std::fmt::Debug for Peer {
        match self {
            Self::Inbound {} => write!(f, "Inbound"),
            Self::Outbound { id } => write!(f, "Outbound({id})"),
-
            Self::Connected { link, id, .. } => write!(f, "Connected({link:?}, {id})"),
+
            Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
-
            Self::Upgrading {
-
                fetch, link, id, ..
-
            } => write!(
-
                f,
-
                "Upgrading(initiated={}, {link:?}, {id})",
-
                fetch.is_initiator(),
-
            ),
-
            Self::Upgraded { link, id, .. } => write!(f, "Upgraded({link:?}, {id})"),
        }
    }
}
@@ -124,11 +177,8 @@ impl Peer {
    fn id(&self) -> Option<&NodeId> {
        match self {
            Peer::Outbound { id }
-
            | Peer::Connected { id, .. }
-
            | Peer::Disconnecting { id: Some(id), .. }
-
            | Peer::Upgrading { id, .. }
-
            | Peer::Upgraded { id, .. } => Some(id),
-

+
            | Peer::Connected { nid: id, .. }
+
            | Peer::Disconnecting { id: Some(id), .. } => Some(id),
            Peer::Inbound {} => None,
            Peer::Disconnecting { id: None, .. } => None,
        }
@@ -151,8 +201,9 @@ impl Peer {

            *self = Self::Connected {
                link,
-
                id,
+
                nid: id,
                inbox: Deserializer::default(),
+
                streams: Streams::new(link),
            };
            link
        } else if let Self::Outbound { id: expected } = self {
@@ -161,8 +212,9 @@ impl Peer {

            *self = Self::Connected {
                link,
-
                id,
+
                nid: id,
                inbox: Deserializer::default(),
+
                streams: Streams::new(link),
            };
            link
        } else {
@@ -172,7 +224,7 @@ impl Peer {

    /// Switch to disconnecting state.
    fn disconnecting(&mut self, reason: DisconnectReason) {
-
        if let Self::Connected { id, .. } = self {
+
        if let Self::Connected { nid: id, .. } = self {
            *self = Self::Disconnecting {
                id: Some(*id),
                reason,
@@ -188,57 +240,6 @@ impl Peer {
            panic!("Peer::disconnected: session is not connected ({self:?})");
        }
    }
-

-
    /// Switch to upgrading state.
-
    fn upgrading(&mut self, fetch: Fetch) {
-
        if let Self::Connected { id, link, inbox } = self {
-
            *self = Self::Upgrading {
-
                fetch,
-
                id: *id,
-
                link: *link,
-
                inbox: inbox.unparsed().collect(),
-
            };
-
        } else {
-
            panic!("Peer::upgrading: session is not fully connected");
-
        }
-
    }
-

-
    /// Switch to upgraded state. Returns the unread bytes from the peer.
-
    #[must_use]
-
    fn upgraded(&mut self) -> (Fetch, Vec<u8>) {
-
        if let Self::Upgrading {
-
            fetch,
-
            id,
-
            link,
-
            inbox,
-
        } = self
-
        {
-
            let fetch = fetch.clone();
-
            let inbox = inbox.drain(..).collect();
-
            log::debug!(target: "wire", "Peer {id} upgraded for fetch {}", fetch.rid);
-

-
            *self = Self::Upgraded {
-
                id: *id,
-
                link: *link,
-
            };
-
            (fetch, inbox)
-
        } else {
-
            panic!("Peer::upgraded: can't upgrade before handover");
-
        }
-
    }
-

-
    /// Switch back from upgraded to connected state.
-
    fn downgrade(&mut self) {
-
        if let Self::Upgraded { id, link, .. } = self {
-
            *self = Self::Connected {
-
                id: *id,
-
                link: *link,
-
                inbox: Deserializer::default(),
-
            };
-
        } else {
-
            panic!("Peer::downgrade: can't downgrade if not in upgraded state");
-
        }
-
    }
}

/// Wire protocol implementation for a set of peers.
@@ -246,7 +247,7 @@ pub struct Wire<R, S, W, G: Signer + Ecdh> {
    /// Backing service instance.
    service: Service<R, S, W, G>,
    /// Worker pool interface.
-
    worker: chan::Sender<Task<G>>,
+
    worker: chan::Sender<Task>,
    /// Used for authentication.
    signer: G,
    /// Internal queue of actions to send to the reactor.
@@ -266,7 +267,7 @@ where
{
    pub fn new(
        mut service: Service<R, S, W, G>,
-
        worker: chan::Sender<Task<G>>,
+
        worker: chan::Sender<Task>,
        signer: G,
        proxy: net::SocketAddr,
        clock: LocalTime,
@@ -289,13 +290,6 @@ where
        self.actions.push_back(Action::RegisterListener(socket));
    }

-
    fn peer_mut_by_fd(&mut self, fd: RawFd) -> &mut Peer {
-
        self.peers.get_mut(&fd).unwrap_or_else(|| {
-
            log::error!(target: "wire", "Peer with fd {fd} was not found");
-
            panic!("Peer with fd {fd} is not known");
-
        })
-
    }
-

    fn fd_by_id(&self, node_id: &NodeId) -> (RawFd, &Peer) {
        self.peers
            .iter()
@@ -304,6 +298,14 @@ where
            .unwrap_or_else(|| panic!("Peer {node_id} was expected to be known to the transport"))
    }

+
    fn fd_by_id_mut(&mut self, node_id: &NodeId) -> (RawFd, &mut Peer) {
+
        self.peers
+
            .iter_mut()
+
            .find(|(_, peer)| peer.id() == Some(node_id))
+
            .map(|(fd, peer)| (*fd, peer))
+
            .unwrap_or_else(|| panic!("Peer {node_id} was expected to be known to the transport"))
+
    }
+

    fn connected_fd_by_id(&self, node_id: &NodeId) -> RawFd {
        match self.fd_by_id(node_id) {
            (fd, Peer::Connected { .. }) => fd,
@@ -319,16 +321,14 @@ where
        self.peers.iter().filter_map(|(fd, peer)| match peer {
            Peer::Inbound {} => None,
            Peer::Outbound { id } => Some((*fd, id)),
-
            Peer::Connected { id, .. } => Some((*fd, id)),
-
            Peer::Upgrading { id, .. } => Some((*fd, id)),
-
            Peer::Upgraded { id, .. } => Some((*fd, id)),
+
            Peer::Connected { nid: id, .. } => Some((*fd, id)),
            Peer::Disconnecting { .. } => None,
        })
    }

    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
        self.peers.iter().filter_map(|(fd, peer)| {
-
            if let Peer::Connected { id, .. } = peer {
+
            if let Peer::Connected { nid: id, .. } = peer {
                Some((*fd, id))
            } else {
                None
@@ -353,65 +353,74 @@ where
        }
    }

-
    fn upgrade(&mut self, fd: RawFd, fetch: Fetch) {
-
        let peer = self.peer_mut_by_fd(fd);
-
        if let Peer::Disconnecting { .. } = peer {
-
            log::error!(target: "wire", "Peer (fd={fd}) is disconnecting");
+
    fn worker_result(&mut self, task: TaskResult) {
+
        log::debug!(target: "wire", "Received fetch result from worker: {:?}", task.result);
+

+
        let nid = task.fetch.remote();
+
        let Some((fd, peer)) = self
+
            .peers
+
            .iter_mut()
+
            .find(|(_, peer)| peer.id() == Some(&nid))
+
            .map(|(fd, peer)| (*fd, peer)) else {
+
                log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
+
                return;
+
            };
+

+
        let Peer::Connected { nid, link, streams, .. } = peer else {
+
            log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
            return;
        };
-
        log::debug!(target: "wire", "Requesting transport handover from reactor for peer (fd={fd})");
-
        peer.upgrading(fetch);
-

-
        self.actions.push_back(Action::UnregisterTransport(fd));
-
    }
+
        let remote = *nid;

-
    fn upgraded(&mut self, transport: NetTransport<WireSession<G>>) {
-
        let fd = transport.as_raw_fd();
-
        let peer = self.peer_mut_by_fd(fd);
-
        let (fetch, drain) = peer.upgraded();
-
        let session = match transport.into_session() {
-
            Ok(session) => session,
-
            Err(_) => panic!("Wire::upgraded: peer write buffer not empty on upgrade"),
-
        };
+
        // Only call into the service if we initiated this fetch.
+
        if let Some((rid, namespaces)) = task.fetch.initiated() {
+
            self.service.fetched(rid, namespaces, remote, task.result);
+
        }

-
        if self
-
            .worker
-
            .send(Task {
-
                fetch,
-
                session,
-
                drain,
-
            })
-
            .is_err()
-
        {
-
            log::error!(target: "wire", "Worker pool is disconnected; cannot send fetch request");
+
        // Nb. It's possible that the stream would already be unregistered if we received an early
+
        // "close" from the remote. Otherwise, we unregister it here and send the "close" ourselves.
+
        if streams.unregister(&task.stream).is_some() {
+
            let frame = Frame::control(
+
                *link,
+
                frame::Control::Close {
+
                    stream: task.stream,
+
                },
+
            );
+
            self.actions.push_back(Action::Send(fd, frame.to_bytes()));
        }
    }

-
    fn worker_result(&mut self, task: TaskResult<G>) {
-
        log::debug!(target: "wire", "Fetch completed: {:?}", task.result);
-

-
        let session = task.session;
-
        let fd = session.as_connection().as_raw_fd();
-
        let peer = self.peer_mut_by_fd(fd);
+
    fn flush(&mut self, remote: NodeId, stream: StreamId) {
+
        let (fd, peer) = self
+
            .peers
+
            .iter()
+
            .find(|(_, peer)| peer.id() == Some(&remote))
+
            .map(|(fd, peer)| (*fd, peer))
+
            .unwrap_or_else(|| panic!("Peer {remote} was expected to be known to the transport"));

-
        let session = if let Peer::Disconnecting { .. } = peer {
-
            log::error!(target: "wire", "Peer with fd {fd} is disconnecting");
+
        let Peer::Connected { streams, link, .. } = peer else {
+
            log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
+
            return;
+
        };
+
        let Some(c) = streams.get(&stream) else {
+
            log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
            return;
-
        } else if let Peer::Upgraded { link, .. } = peer {
-
            match NetTransport::with_session(session, *link) {
-
                Ok(session) => session,
-
                Err(err) => {
-
                    log::error!(target: "wire", "Session downgrade failed: {err}");
-
                    return;
-
                }
-
            }
-
        } else {
-
            todo!();
        };
-
        peer.downgrade();

-
        self.actions.push_back(Action::RegisterTransport(session));
-
        self.service.fetched(task.fetch, task.result);
+
        #[cfg(test)]
+
        if c.receiver.is_empty() {
+
            panic!("Wire:flush: redundant flush");
+
        }
+

+
        for data in c.receiver.try_iter() {
+
            let frame = match data {
+
                ChannelEvent::Data(data) => Frame::git(stream, data),
+
                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
+
                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
+
            };
+
            self.actions
+
                .push_back(reactor::Action::Send(fd, frame.to_bytes()));
+
        }
    }
}

@@ -424,7 +433,7 @@ where
{
    type Listener = NetAccept<WireSession<G>>;
    type Transport = NetTransport<WireSession<G>>;
-
    type Command = Control<G>;
+
    type Command = Control;

    fn tick(&mut self, time: Timestamp) {
        self.service
@@ -508,57 +517,98 @@ where
                self.service.connected(id, link);
            }
            SessionEvent::Data(data) => {
-
                if let Some(Peer::Connected { id, inbox, .. }) = self.peers.get_mut(&fd) {
+
                if let Some(Peer::Connected {
+
                    nid,
+
                    inbox,
+
                    streams,
+
                    ..
+
                }) = self.peers.get_mut(&fd)
+
                {
                    inbox.input(&data);

                    loop {
                        match inbox.deserialize_next() {
-
                            Ok(Some(msg)) => self.service.received_message(*id, msg),
+
                            Ok(Some(Frame {
+
                                data: FrameData::Control(frame::Control::Open { stream }),
+
                                ..
+
                            })) => {
+
                                log::debug!(target: "wire", "Received stream open for id={stream}");
+

+
                                let Some(WorkerChannels {
+
                                    sender: work_send,
+
                                    receiver: wire_recv,
+
                                }) = streams.register(stream) else {
+
                                    log::warn!(target: "wire", "Peer attempted to open already-open stream id={stream}");
+
                                    continue;
+
                                };
+

+
                                let task = Task {
+
                                    fetch: Fetch::Responder { remote: *nid },
+
                                    stream,
+
                                    send: work_send,
+
                                    recv: wire_recv,
+
                                };
+
                                if self.worker.send(task).is_err() {
+
                                    log::error!(target: "wire", "Worker pool is disconnected; cannot send task");
+
                                }
+
                            }
+
                            Ok(Some(Frame {
+
                                data: FrameData::Control(frame::Control::Eof { stream }),
+
                                ..
+
                            })) => {
+
                                if let Some(channels) = streams.get(&stream) {
+
                                    if channels.sender.send(ChannelEvent::Eof).is_err() {
+
                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
+
                                    }
+
                                } else {
+
                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream id={stream}");
+
                                }
+
                            }
+
                            Ok(Some(Frame {
+
                                data: FrameData::Control(frame::Control::Close { stream }),
+
                                ..
+
                            })) => {
+
                                log::debug!(target: "wire", "Received stream close command for id={stream}");
+

+
                                streams.unregister(&stream);
+
                            }
+
                            Ok(Some(Frame {
+
                                data: FrameData::Gossip(msg),
+
                                ..
+
                            })) => {
+
                                self.service.received_message(*nid, msg);
+
                            }
+
                            Ok(Some(Frame {
+
                                stream,
+
                                data: FrameData::Git(data),
+
                                ..
+
                            })) => {
+
                                if let Some(channels) = streams.get(&stream) {
+
                                    if channels.sender.send(ChannelEvent::Data(data)).is_err() {
+
                                        log::error!(target: "wire", "Worker is disconnected; cannot send data");
+
                                    }
+
                                } else {
+
                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream id={stream}");
+
                                }
+
                            }
                            Ok(None) => {
                                // Buffer is empty, or message isn't complete.
                                break;
                            }
                            Err(e) => {
-
                                log::error!(target: "wire", "Invalid gossip message from {id}: {e}");
-

-
                                let mut reason =
-
                                    DisconnectReason::Session(session::Error::Misbehavior);
-
                                if let Error::UnknownMessageType(t) = e {
-
                                    let leftover =
-
                                        inbox.unparsed().chain(t.to_be_bytes()).collect::<Vec<_>>();
-

-
                                    if let Ok(header) =
-
                                        str::from_utf8(&leftover[..worker::pktline::HEADER_LEN])
-
                                    {
-
                                        if header.is_ascii() && !header.is_empty() {
-
                                            log::error!(
-
                                                target: "wire",
-
                                                "Received possible Git packet-line header `{}` from {id} (protocol mismatch)",
-
                                                header
-
                                            );
-
                                            // In case of protocol mismatch, don't penalize the peer.
-
                                            reason = DisconnectReason::Session(
-
                                                session::Error::ProtocolMismatch,
-
                                            );
-
                                        }
-
                                    }
-
                                }
+
                                log::error!(target: "wire", "Invalid gossip message from {nid}: {e}");

                                if !inbox.is_empty() {
-
                                    log::debug!(target: "wire", "Dropping read buffer for {id} with {} bytes", inbox.unparsed().count());
+
                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.unparsed().count());
                                }
                                self.disconnect(
-
                                    fd, // TODO(cloudhead): Include error in reason.
-
                                    reason,
+
                                    fd,
+
                                    DisconnectReason::Session(session::Error::Misbehavior),
                                );
                                break;
                            }
                        }
                    }
-
                } else if let Some(Peer::Upgrading { inbox, .. }) = self.peers.get_mut(&fd) {
-
                    // If somehow the remote peer managed to send git data before the reactor
-
                    // unregistered our session, we'll hit this branch.
-
                    inbox.extend(data);
                } else {
                    log::warn!(target: "wire", "Dropping message from unconnected peer (fd={fd})");
                }
@@ -573,6 +623,7 @@ where
        match cmd {
            Control::User(cmd) => self.service.command(cmd),
            Control::Worker(result) => self.worker_result(result),
+
            Control::Flush { remote, stream } => self.flush(remote, stream),
        }
    }

@@ -604,7 +655,11 @@ where
            }
            reactor::Error::TransportPollError(fd, _) => {
                log::error!(target: "wire", "Received error: peer (fd={fd}) poll error");
-
                self.actions.push_back(Action::UnregisterTransport(*fd));
+

+
                self.disconnect(
+
                    *fd,
+
                    DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::Other))),
+
                )
            }
            reactor::Error::TransportDisconnect(fd, _, _) => {
                log::error!(target: "wire", "Received error: peer (fd={fd}) disconnected");
@@ -661,9 +716,6 @@ where
                        }
                        e.remove();
                    }
-
                    Peer::Upgrading { .. } => {
-
                        self.upgraded(transport);
-
                    }
                    _ => {
                        panic!("Wire::handover_transport: Unexpected peer with fd {fd} handed over from the reactor");
                    }
@@ -689,8 +741,8 @@ where
        while let Some(ev) = self.service.next() {
            match ev {
                Io::Write(node_id, msgs) => {
-
                    let fd = match self.fd_by_id(&node_id) {
-
                        (fd, Peer::Connected { .. }) => fd,
+
                    let (fd, link) = match self.fd_by_id(&node_id) {
+
                        (fd, Peer::Connected { link, .. }) => (fd, *link),
                        (_, peer) => {
                            // If the peer is disconnected by the wire protocol, the service may
                            // not be aware of this yet, and may continue to write messages to it.
@@ -704,7 +756,9 @@ where

                    let mut data = Vec::new();
                    for msg in msgs {
-
                        msg.encode(&mut data).expect("in-memory writes never fail");
+
                        Frame::gossip(link, msg)
+
                            .encode(&mut data)
+
                            .expect("in-memory writes never fail");
                    }
                    self.actions.push_back(reactor::Action::Send(fd, data));
                }
@@ -753,10 +807,40 @@ where
                Io::Wakeup(d) => {
                    self.actions.push_back(reactor::Action::SetTimer(d.into()));
                }
-
                Io::Fetch(fetch) => {
-
                    // TODO: Check that the node_id is connected, queue request otherwise.
-
                    let fd = self.connected_fd_by_id(&fetch.remote);
-
                    self.upgrade(fd, fetch);
+
                Io::Fetch {
+
                    rid,
+
                    remote,
+
                    namespaces,
+
                } => {
+
                    log::debug!(target: "wire", "Processing fetch..");
+

+
                    let (fd, Peer::Connected { link, streams,  .. }) =
+
                        self.fd_by_id_mut(&remote) else {
+
                            panic!("Wire::next: peer {remote} is not connected");
+
                        };
+
                    let (stream, channels) = streams.open();
+

+
                    log::debug!(target: "wire", "Opened new stream with id={stream} for rid={rid}");
+

+
                    let link = *link;
+
                    let task = Task {
+
                        fetch: Fetch::Initiator {
+
                            rid,
+
                            namespaces,
+
                            remote,
+
                        },
+
                        stream,
+
                        send: channels.sender,
+
                        recv: channels.receiver,
+
                    };
+

+
                    if self.worker.send(task).is_err() {
+
                        log::error!(target: "wire", "Worker pool is disconnected; cannot send fetch request");
+
                    }
+
                    self.actions.push_back(Action::Send(
+
                        fd,
+
                        Frame::control(link, frame::Control::Open { stream }).to_bytes(),
+
                    ));
                }
            }
        }
added radicle-node/src/wire/varint.rs
@@ -0,0 +1,225 @@
+
//! Variable-length integer implementation based on QUIC.
+
#![warn(clippy::missing_docs_in_private_items)]
+

+
// This implementation is largely based on the `quinn` crate.
+
// Copyright (c) 2018 The quinn developers.
+
use std::{fmt, io, ops};
+

+
use byteorder::ReadBytesExt;
+
use thiserror::Error;
+

+
use crate::wire;
+
use crate::wire::{Decode, Encode};
+

+
/// An integer less than 2^62
+
///
+
/// Based on QUIC variable-length integers (RFC 9000).
+
///
+
/// > The QUIC variable-length integer encoding reserves the two most significant bits of the first
+
/// > byte to encode the base-2 logarithm of the integer encoding length in bytes. The integer value is
+
/// > encoded on the remaining bits, in network byte order. This means that integers are encoded on 1,
+
/// > 2, 4, or 8 bytes and can encode 6-, 14-, 30-, or 62-bit values, respectively. Table 4 summarizes
+
/// > the encoding properties.
+
///
+
/// ```text
+
/// MSB   Length   Usable Bits   Range
+
/// ----------------------------------------------------
+
/// 00    1        6             0 - 63
+
/// 01    2        14            0 - 16383
+
/// 10    4        30            0 - 1073741823
+
/// 11    8        62            0 - 4611686018427387903
+
/// ```
+
#[derive(Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+
pub struct VarInt(pub(crate) u64);
+

+
impl VarInt {
+
    /// The largest representable value.
+
    pub const MAX: VarInt = VarInt((1 << 62) - 1);
+

+
    /// Succeeds iff `x` < 2^62.
+
    pub fn new(x: u64) -> Result<Self, BoundsExceeded> {
+
        if x <= Self::MAX.0 {
+
            Ok(Self(x))
+
        } else {
+
            Err(BoundsExceeded)
+
        }
+
    }
+
}
+

+
impl ops::Deref for VarInt {
+
    type Target = u64;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl From<u8> for VarInt {
+
    fn from(x: u8) -> Self {
+
        VarInt(x.into())
+
    }
+
}
+

+
impl From<u16> for VarInt {
+
    fn from(x: u16) -> Self {
+
        VarInt(x.into())
+
    }
+
}
+

+
impl From<u32> for VarInt {
+
    fn from(x: u32) -> Self {
+
        VarInt(x.into())
+
    }
+
}
+

+
impl std::convert::TryFrom<u64> for VarInt {
+
    type Error = BoundsExceeded;
+
    /// Succeeds iff `x` < 2^62.
+
    fn try_from(x: u64) -> Result<Self, BoundsExceeded> {
+
        VarInt::new(x)
+
    }
+
}
+

+
impl fmt::Debug for VarInt {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        self.0.fmt(f)
+
    }
+
}
+

+
impl fmt::Display for VarInt {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        self.0.fmt(f)
+
    }
+
}
+

+
/// Error returned when constructing a `VarInt` from a value >= 2^62.
+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Error)]
+
#[error("value too large for varint encoding")]
+
pub struct BoundsExceeded;
+

+
impl Decode for VarInt {
+
    fn decode<R: io::Read + ?Sized>(r: &mut R) -> Result<Self, wire::Error> {
+
        let mut buf = [0; 8];
+
        buf[0] = r.read_u8()?;
+

+
        // Integer length.
+
        let tag = buf[0] >> 6;
+
        buf[0] &= 0b0011_1111;
+

+
        let x = match tag {
+
            0b00 => u64::from(buf[0]),
+
            0b01 => {
+
                r.read_exact(&mut buf[1..2])?;
+
                u64::from(u16::from_be_bytes([buf[0], buf[1]]))
+
            }
+
            0b10 => {
+
                r.read_exact(&mut buf[1..4])?;
+
                u64::from(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
+
            }
+
            0b11 => {
+
                r.read_exact(&mut buf[1..8])?;
+
                u64::from_be_bytes(buf)
+
            }
+
            // SAFETY: It should be obvious that we can't have any other bit pattern
+
            // than the above, since all other bits are zeroed.
+
            _ => unreachable! {},
+
        };
+
        Ok(Self(x))
+
    }
+
}
+

+
impl Encode for VarInt {
+
    fn encode<W: io::Write + ?Sized>(&self, w: &mut W) -> io::Result<usize> {
+
        let x: u64 = self.0;
+

+
        if x < 2u64.pow(6) {
+
            (x as u8).encode(w)
+
        } else if x < 2u64.pow(14) {
+
            (0b01 << 14 | x as u16).encode(w)
+
        } else if x < 2u64.pow(30) {
+
            (0b10 << 30 | x as u32).encode(w)
+
        } else if x < 2u64.pow(62) {
+
            (0b11 << 62 | x).encode(w)
+
        } else {
+
            panic!("VarInt::encode: integer overflow");
+
        }
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use qcheck_macros::quickcheck;
+

+
    impl qcheck::Arbitrary for VarInt {
+
        fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
            let a = u16::arbitrary(g) as u64;
+
            let b = u32::arbitrary(g) as u64;
+
            let n = g
+
                .choose(&[
+
                    0,
+
                    1,
+
                    3,
+
                    7,
+
                    13,
+
                    37,
+
                    255,
+
                    4931,
+
                    54019,
+
                    69149,
+
                    151288809941952652,
+
                    u8::MAX as u64,
+
                    u16::MAX as u64,
+
                    u16::MAX as u64 - 1,
+
                    u32::MAX as u64,
+
                    u32::MAX as u64 - 1,
+
                    *Self::MAX,
+
                    a,
+
                    b,
+
                ])
+
                .copied()
+
                .unwrap();
+

+
            Self(n)
+
        }
+
    }
+

+
    #[quickcheck]
+
    fn prop_encode_decode(input: VarInt) {
+
        let encoded = wire::serialize(&input);
+
        let decoded: VarInt = wire::deserialize(&encoded).unwrap();
+

+
        assert_eq!(decoded, input);
+
    }
+

+
    #[test]
+
    #[should_panic]
+
    fn test_encode_overflow() {
+
        wire::serialize(&VarInt(u64::MAX));
+
    }
+

+
    #[test]
+
    fn test_encoding() {
+
        assert_eq!(wire::serialize(&VarInt(0)), vec![0x0]);
+
        assert_eq!(wire::serialize(&VarInt(1)), vec![0x01]);
+
        assert_eq!(wire::serialize(&VarInt(10)), vec![0x0a]);
+
        assert_eq!(wire::serialize(&VarInt(37)), vec![0x25]);
+
        assert_eq!(
+
            wire::deserialize::<VarInt>(&[0x40, 0x25]).unwrap(),
+
            VarInt(37)
+
        );
+
        assert_eq!(wire::serialize(&VarInt(15293)), vec![0x7b, 0xbd]);
+
        assert_eq!(
+
            wire::serialize(&VarInt(494878333)),
+
            vec![0x9d, 0x7f, 0x3e, 0x7d],
+
        );
+
        assert_eq!(
+
            wire::serialize(&VarInt(151288809941952652)),
+
            vec![0xc2, 0x19, 0x7c, 0x5e, 0xff, 0x14, 0xe8, 0x8c]
+
        );
+
        assert_eq!(
+
            wire::serialize(&VarInt(10000000000)),
+
            vec![0xc0, 0x00, 0x00, 0x02, 0x54, 0x0b, 0xe4, 0x00],
+
        );
+
    }
+
}
modified radicle-node/src/worker.rs
@@ -1,4 +1,6 @@
+
mod channels;
mod fetch;
+
mod tunnel;

use std::io::{prelude::*, BufReader};
use std::ops::ControlFlow;
@@ -6,20 +8,19 @@ use std::thread::JoinHandle;
use std::{env, io, net, process, thread, time};

use crossbeam_channel as chan;
-
use cyphernet::Ecdh;
-
use netservices::tunnel::Tunnel;
-
use netservices::{AsConnection, NetSession, SplitIo};

-
use radicle::crypto::Signer;
use radicle::identity::{Id, IdentityError};
+
use radicle::prelude::NodeId;
use radicle::storage::{Namespaces, ReadRepository, RefUpdate};
use radicle::{git, Storage};
-
use reactor::poller::popol;

use crate::runtime::Handle;
-
use crate::service::reactor::{Fetch, FetchDirection};
use crate::storage;
-
use crate::wire::{WireReader, WireSession, WireWriter};
+
use crate::wire::StreamId;
+
use channels::{ChannelReader, ChannelWriter, Channels};
+
use tunnel::Tunnel;
+

+
pub use channels::ChannelEvent;

/// Worker pool configuration.
pub struct Config {
@@ -54,8 +55,6 @@ pub enum FetchError {
    Identity(#[from] IdentityError),
    #[error("upload failed: {0}")]
    Upload(#[from] UploadError),
-
    #[error("remote aborted fetch")]
-
    RemoteAbortedFetch,
    #[error(transparent)]
    StagingInit(#[from] fetch::error::Init),
    #[error(transparent)]
@@ -76,8 +75,6 @@ impl FetchError {
pub enum UploadError {
    #[error("worker failed to connect to git daemon: {0}")]
    DaemonConnectionFailed(io::Error),
-
    #[error("git pkt-line command does not match fetch request")]
-
    CommandMismatch,
    #[error("error parsing git command packet-line: {0}")]
    InvalidPacketLine(io::Error),
    #[error(transparent)]
@@ -91,33 +88,73 @@ impl UploadError {
    }
}

+
/// Fetch job sent to worker thread.
+
#[derive(Debug, Clone)]
+
pub enum Fetch {
+
    /// Client is initiating a fetch in order to receive the specified
+
    /// `refspecs` determined by [`Namespaces`].
+
    Initiator {
+
        /// Repo to fetch.
+
        rid: Id,
+
        /// Namespaces to fetch.
+
        namespaces: Namespaces,
+
        /// Remote peer we are interacting with.
+
        remote: NodeId,
+
    },
+
    /// Server is responding to a fetch request by uploading the
+
    /// specified `refspecs` sent by the client.
+
    Responder {
+
        /// Remote peer we are interacting with.
+
        remote: NodeId,
+
    },
+
}
+

+
impl Fetch {
+
    pub fn remote(&self) -> NodeId {
+
        match self {
+
            Self::Initiator { remote, .. } | Self::Responder { remote } => *remote,
+
        }
+
    }
+

+
    pub fn initiated(self) -> Option<(Id, Namespaces)> {
+
        match self {
+
            Self::Initiator {
+
                rid, namespaces, ..
+
            } => Some((rid, namespaces)),
+
            Self::Responder { .. } => None,
+
        }
+
    }
+
}
+

/// Task to be accomplished on a worker thread.
/// This is either going to be an outgoing or incoming fetch.
-
pub struct Task<G: Signer + Ecdh> {
+
pub struct Task {
    pub fetch: Fetch,
-
    pub session: WireSession<G>,
-
    pub drain: Vec<u8>,
+
    pub stream: StreamId,
+
    pub send: chan::Sender<ChannelEvent>,
+
    pub recv: chan::Receiver<ChannelEvent>,
}

/// Worker response.
-
pub struct TaskResult<G: Signer + Ecdh> {
+
#[derive(Debug)]
+
pub struct TaskResult {
    pub fetch: Fetch,
+
    pub stream: StreamId,
    pub result: Result<Vec<RefUpdate>, FetchError>,
-
    pub session: WireSession<G>,
}

/// A worker that replicates git objects.
-
struct Worker<G: Signer + Ecdh> {
+
struct Worker {
    storage: Storage,
-
    tasks: chan::Receiver<Task<G>>,
+
    tasks: chan::Receiver<Task>,
    daemon: net::SocketAddr,
    timeout: time::Duration,
-
    handle: Handle<G>,
+
    handle: Handle,
    atomic: bool,
    name: String,
}

-
impl<G: Signer + Ecdh + 'static> Worker<G> {
+
impl Worker {
    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
    /// the next task.
    fn run(mut self) -> Result<(), chan::RecvError> {
@@ -127,17 +164,15 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        }
    }

-
    fn process(&mut self, task: Task<G>) {
+
    fn process(&mut self, task: Task) {
        let Task {
            fetch,
-
            session,
-
            drain,
+
            recv,
+
            send,
+
            stream,
        } = task;
-

-
        let timeout = session.as_connection().read_timeout().unwrap_or_default();
-
        let (session, result) = self._process(&fetch, drain, session);
-
        // In case the timeout is changed during the fetch, we reset it here.
-
        session.as_connection().set_read_timeout(timeout).ok();
+
        let channels = Channels::new(send, recv);
+
        let result = self._process(&fetch, stream, channels);

        log::debug!(target: "worker", "Sending response back to service..");

@@ -145,8 +180,8 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            .handle
            .worker_result(TaskResult {
                fetch,
+
                stream,
                result,
-
                session,
            })
            .is_err()
        {
@@ -155,176 +190,135 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
    }

    fn _process(
-
        &self,
+
        &mut self,
        fetch: &Fetch,
-
        drain: Vec<u8>,
-
        mut session: WireSession<G>,
-
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
-
        match &fetch.direction {
-
            FetchDirection::Initiator { namespaces } => {
-
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.rid);
-

-
                let (session, result) = self.fetch(fetch.rid, namespaces, session);
+
        stream: StreamId,
+
        mut channels: Channels,
+
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
        match &fetch {
+
            Fetch::Initiator {
+
                rid,
+
                namespaces,
+
                remote,
+
            } => {
+
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
+

+
                let result = self.fetch(*rid, *remote, stream, namespaces, channels);
                if let Err(err) = &result {
                    log::error!(target: "worker", "Fetch error: {err}");
                }
-

-
                (session, result)
+
                result
            }
-
            FetchDirection::Responder => {
-
                log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.rid);
-

-
                if let Err(err) = session.as_connection_mut().set_nonblocking(false) {
-
                    return (session, Err(err.into()));
-
                }
+
            Fetch::Responder { .. } => {
+
                log::debug!(target: "worker", "Worker processing incoming fetch..");

-
                let (mut stream_r, mut stream_w) = match session.split_io() {
-
                    Ok((r, w)) => (r, w),
-
                    Err(err) => {
-
                        return (err.original, Err(err.error.into()));
+
                let (stream_w, mut stream_r) = channels.split();
+
                let mut pktline_r = pktline::Reader::new(&mut stream_r);
+
                // Nb. two fetches are usually expected: one for the *special* refs,
+
                // followed by another for the signed refs.
+
                loop {
+
                    match self.upload_pack(fetch, stream, &mut pktline_r, stream_w) {
+
                        Ok(ControlFlow::Continue(())) => continue,
+
                        Ok(ControlFlow::Break(())) => break,
+
                        Err(e) => return Err(e.into()),
                    }
-
                };
-
                let mut pktline_r = pktline::Reader::new(drain, &mut stream_r);
-
                // Nb. two fetches are expected to happen, one for
-
                // the `rad` refs, followed by the refs listed in
-
                // signed refs.
-
                let result = self.upload_pack(fetch, &mut pktline_r, &mut stream_w);
-
                let result =
-
                    result.and_then(|_| self.upload_pack(fetch, &mut pktline_r, &mut stream_w));
-
                (WireSession::from_split_io(stream_r, stream_w), result)
+
                }
+
                Ok(vec![])
            }
        }
    }

    fn fetch(
-
        &self,
+
        &mut self,
        rid: Id,
+
        remote: NodeId,
+
        stream: StreamId,
        namespaces: &Namespaces,
-
        session: WireSession<G>,
-
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
-
        let staging = match fetch::StagingPhaseInitial::new(&self.storage, rid, namespaces.clone())
-
        {
-
            Ok(staging) => staging,
-
            Err(err) => return (session, Err(err.into())),
-
        };
-

-
        let session = match self.tunnel_fetch(&staging.repo, staging.refspecs(), session) {
-
            (session, Ok(())) => session,
-
            (session, Err(err)) => return (session, Err(err)),
-
        };
+
        mut channels: Channels,
+
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
        let staging = fetch::StagingPhaseInitial::new(&self.storage, rid, namespaces.clone())?;
+

+
        self._fetch(
+
            &staging.repo,
+
            remote,
+
            staging.refspecs(),
+
            stream,
+
            &mut channels,
+
        )?;
+
        if let Err(e) = self.handle.flush(remote, stream) {
+
            log::error!(target: "worker", "Error flushing worker stream: {e}");
+
        }

        let staging = match staging.into_final().map_err(FetchError::from) {
            Ok(staging) => staging,
-
            Err(e) => return (session, Err(e)),
+
            Err(e) => return Err(e),
        };
-

-
        let (session, res) = self.tunnel_fetch(&staging.repo, staging.refspecs(), session);
-

-
        if let Err(e) = res {
-
            return (session, Err(e));
+
        self._fetch(
+
            &staging.repo,
+
            remote,
+
            staging.refspecs(),
+
            stream,
+
            &mut channels,
+
        )?;
+
        if let Err(e) = self.handle.flush(remote, stream) {
+
            log::error!(target: "worker", "Error flushing worker stream: {e}");
        }
-

-
        (session, staging.transfer().map_err(FetchError::from))
+
        staging.transfer().map_err(FetchError::from)
    }

    fn upload_pack(
-
        &self,
+
        &mut self,
        fetch: &Fetch,
-
        pktline_r: &mut pktline::Reader<WireReader>,
-
        stream_w: &mut WireWriter<G>,
-
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        match self._upload_pack(fetch, pktline_r, stream_w) {
-
            Ok(()) => {
-
                log::debug!(target: "worker", "Upload of {} to {} exited successfully", fetch.rid, fetch.remote);
+
        stream: StreamId,
+
        pktline_r: &mut pktline::Reader<&mut ChannelReader>,
+
        stream_w: &mut ChannelWriter,
+
    ) -> Result<ControlFlow<()>, UploadError> {
+
        log::debug!(target: "worker", "Waiting for Git request pktline for..");

-
                Ok(vec![])
+
        // Read the request packet line to make sure the repository being requested matches what
+
        // we expect, and that the service requested is valid.
+
        let (rid, request) = match pktline_r.read_request_pktline() {
+
            Ok((req, pktline)) => (req.repo, pktline),
+
            Err(err) if err.kind() == io::ErrorKind::ConnectionReset => {
+
                return Ok(ControlFlow::Break(()));
            }
            Err(err) => {
-
                log::error!(target: "worker", "Upload error for {}: {err}", fetch.rid);
-

-
                // If we exited without receiving a `done` packet, wait for it here.
-
                // It's possible that the daemon exited first, or the remote crashed.
-
                log::debug!(target: "worker", "Waiting for `done` packet from remote..");
-
                let mut header = [0; pktline::HEADER_LEN];
-

-
                // Set the read timeout for the `done` packet to twice the configured
-
                // value that is used for the fetching (initiator) side.
-
                //
-
                // This is because the uploader always waits for the `done` packet;
-
                // so in case the fetch is aborted by the uploader, eg. if
-
                // it can't connect with the daemon, it will wait long enough for the
-
                // fetcher to timeout before timing out itself, and will thus receive
-
                // the `done` packet.
-
                pktline_r
-
                    .stream()
-
                    .as_connection()
-
                    .set_read_timeout(Some(self.timeout * 2))
-
                    .ok();
+
                return Err(UploadError::InvalidPacketLine(err));
+
            }
+
        };
+
        log::debug!(target: "worker", "Received Git request pktline for {rid}..");

-
                loop {
-
                    match pktline_r.read_done_pktline(&mut header) {
-
                        Ok(()) => {
-
                            log::debug!(target: "worker", "Received `done` packet from remote");
+
        match self._upload_pack(rid, fetch.remote(), request, stream, pktline_r, stream_w) {
+
            Ok(()) => {
+
                log::debug!(target: "worker", "Upload of {rid} to {} exited successfully", fetch.remote());

-
                            // If we get the `done` packet, we exit with the original
-
                            // error.
-
                            return Err(err.into());
-
                        }
-
                        Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
-
                            // If we get some other packet, because the fetch request
-
                            // is still sending stuff, we simply keep reading until we
-
                            // get a `done` packet.
-
                            continue;
-
                        }
-
                        Err(_) => {
-
                            // If we get any other error, eg. a timeout, we abort.
-
                            log::error!(
-
                                target: "worker",
-
                                "Upload of {} to {} aborted: missing `done` packet from remote",
-
                                fetch.rid,
-
                                fetch.remote
-
                            );
-
                            return Err(FetchError::RemoteAbortedFetch);
-
                        }
-
                    }
-
                }
+
                Ok(ControlFlow::Continue(()))
            }
+
            Err(e) => Err(e),
        }
    }

    fn _upload_pack(
-
        &self,
-
        fetch: &Fetch,
-
        stream_r: &mut pktline::Reader<WireReader>,
-
        stream_w: &mut WireWriter<G>,
+
        &mut self,
+
        rid: Id,
+
        remote: NodeId,
+
        request: Vec<u8>,
+
        stream: StreamId,
+
        stream_r: &mut pktline::Reader<&mut ChannelReader>,
+
        stream_w: &mut ChannelWriter,
    ) -> Result<(), UploadError> {
-
        // Read the request packet line to make sure the repository being requested matches what
-
        // we expect, and that the service requested is valid.
-
        let request = match stream_r.read_request_pktline() {
-
            Ok((req, pktline)) => {
-
                log::debug!(
-
                    target: "worker",
-
                    "Parsed git command packet-line for {}: {:?}", fetch.rid, req
-
                );
-
                if req.repo != fetch.rid {
-
                    return Err(UploadError::CommandMismatch);
-
                }
-
                pktline
-
            }
-
            Err(err) => {
-
                return Err(UploadError::InvalidPacketLine(err));
-
            }
-
        };
+
        log::debug!(target: "worker", "Connecting to daemon..");

        // Connect to our local git daemon, running as a child process.
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)
            .map_err(UploadError::DaemonConnectionFailed)?;
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone()?, daemon);
-
        let mut daemon_r = pktline::Reader::new(vec![], &mut daemon_r);
+
        let mut daemon_r = pktline::Reader::new(&mut daemon_r);

-
        // Write the raw request to the daemon, once we've verified it.
+
        // Write the raw request to the daemon, once we've parsed it.
        daemon_w.write_all(&request)?;

+
        log::debug!(target: "worker", "Entering Git protocol loop for {rid}..");
        // We now loop, alternating between reading requests from the client, and writing responses
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
        let mut buffer = [0; u16::MAX as usize + 1];
@@ -333,23 +327,28 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            if let Err(e) = daemon_r.pipe(stream_w, &mut buffer) {
                // This is the expected error when the daemon disconnects.
                if e.kind() == io::ErrorKind::UnexpectedEof {
-
                    log::debug!(target: "worker", "Daemon closed the git connection for {}", fetch.rid);
+
                    log::debug!(target: "worker", "Daemon closed the git connection for {rid}");
+
                    log::debug!(target: "worker", "Waiting for EOF from remote..");
+

+
                    stream_r.wait_for_eof()?;
+

+
                    return Ok(());
                }
                return Err(e.into());
            }
+

+
            if let Err(e) = self.handle.flush(remote, stream) {
+
                log::error!(target: "worker", "Worker channel disconnected; aborting");
+
                return Err(e.into());
+
            }
+

            // Read from the stream and write to the daemon.
            match stream_r.pipe(&mut daemon_w, &mut buffer) {
-
                // Triggered by a [`pktline::DONE_PKT`] packet.
-
                Ok(ControlFlow::Break(())) => {
-
                    log::debug!(target: "worker", "Received `done` packet from remote for {}", fetch.rid);
-
                    return Ok(());
-
                }
-
                Ok(ControlFlow::Continue(())) => {
-
                    continue;
-
                }
+
                Ok(()) => continue,
+
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
                Err(e) => {
-
                    if e.kind() == io::ErrorKind::UnexpectedEof {
-
                        log::debug!(target: "worker", "Remote closed the git connection for {}", fetch.rid);
+
                    if e.kind() == io::ErrorKind::ConnectionReset {
+
                        log::debug!(target: "worker", "Remote closed the git connection for {rid}");
                    }
                    return Err(e.into());
                }
@@ -357,41 +356,20 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
        }
    }

-
    fn tunnel_fetch<Specs>(
-
        &self,
-
        repo: &fetch::StagedRepository,
-
        specs: Specs,
-
        session: WireSession<G>,
-
    ) -> (WireSession<G>, Result<(), FetchError>)
-
    where
-
        Specs: fetch::AsRefspecs,
-
    {
-
        let mut tunnel = match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
-
            Ok(tunnel) => tunnel,
-
            Err((session, err)) => return (session, Err(err.into())),
-
        };
-
        let res = self._fetch(repo, specs, &mut tunnel);
-

-
        let mut session = tunnel.into_session();
-

-
        log::debug!(target: "worker", "Sending `done` packet to remote..");
-
        if let Err(err) = pktline::done(&mut session) {
-
            log::error!(target: "worker", "Fetch error: error sending `done` packet: {err}");
-
        }
-
        (session, res)
-
    }
-

    fn _fetch<S>(
        &self,
        repo: &storage::git::Repository,
+
        remote: NodeId,
        specs: S,
-
        tunnel: &mut Tunnel<WireSession<G>>,
+
        stream: StreamId,
+
        channels: &mut Channels,
    ) -> Result<(), FetchError>
    where
        S: fetch::AsRefspecs,
    {
+
        let mut tunnel = Tunnel::with(channels, stream, remote, self.handle.clone())?;
        let rid = repo.id;
-
        let tunnel_addr = tunnel.local_addr()?;
+
        let tunnel_addr = tunnel.local_addr();
        let mut cmd = process::Command::new("git");
        cmd.current_dir(repo.path())
            .env_clear()
@@ -429,10 +407,10 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            }
        })?;

-
        let _ = tunnel.tunnel_once(popol::Poller::new(), self.timeout)?;
+
        tunnel.run(self.timeout)?;

        let result = child.wait()?;
-
        if result.success() {
+
        let result = if result.success() {
            log::debug!(target: "worker", "Fetch for {} exited successfully", rid);
            Ok(())
        } else {
@@ -440,7 +418,14 @@ impl<G: Signer + Ecdh + 'static> Worker<G> {
            Err(FetchError::CommandFailed {
                code: result.code().unwrap_or(1),
            })
+
        };
+

+
        log::debug!(target: "worker", "Sending `EOF` to remote..");
+

+
        if let Err(e) = channels.sender.eof() {
+
            log::error!(target: "worker", "Fetch error: error sending `EOF` message: {e}");
        }
+
        result
    }
}

@@ -451,11 +436,7 @@ pub struct Pool {

impl Pool {
    /// Create a new worker pool with the given parameters.
-
    pub fn with<G: Signer + Ecdh + 'static>(
-
        tasks: chan::Receiver<Task<G>>,
-
        handle: Handle<G>,
-
        config: Config,
-
    ) -> Self {
+
    pub fn with(tasks: chan::Receiver<Task>, handle: Handle, config: Config) -> Self {
        let mut pool = Vec::with_capacity(config.capacity);
        for _ in 0..config.capacity {
            let worker = Worker {
@@ -495,7 +476,7 @@ impl Pool {
pub mod pktline {
    use std::io;
    use std::io::Read;
-
    use std::ops::ControlFlow;
+
    use std::net::TcpStream;
    use std::str;

    use super::Id;
@@ -504,84 +485,69 @@ pub mod pktline {
    pub const FLUSH_PKT: &[u8; HEADER_LEN] = b"0000";
    pub const DELIM_PKT: &[u8; HEADER_LEN] = b"0001";
    pub const RESPONSE_END_PKT: &[u8; HEADER_LEN] = b"0002";
-
    /// When the remote `fetch` exits, it sends a special `done` packet which triggers
-
    /// an EOF. This `done` packet is not part of the git protocol, and so is
-
    /// not sent to the deamon.
-
    pub const DONE_PKT: &[u8; HEADER_LEN] = b"done";
-

-
    /// Packetline read result.
-
    #[derive(Debug, PartialEq, Eq)]
-
    pub enum Packetline {
-
        /// Received a `done` control packet.
-
        Done,
-
        /// Received a git packet with the given length.
-
        Git(usize),
-
    }
-

-
    /// Send a special `done` packet. Since the git protocol is tunneled over an existing
-
    /// connection, we can't signal the end of the protocol via the usual means, which is
-
    /// to close the connection and trigger an EOF on the other side. Git also doesn't have
-
    /// any special message we can send to signal the end of the protocol. Hence, we there's
-
    /// no other way for the server to know that we're done sending commands than to send a
-
    /// message that is not part of the git protocol. This message can then be processed by
-
    /// the remote worker to end the protocol.
-
    pub fn done<W: io::Write>(w: &mut W) -> io::Result<()> {
-
        w.write_all(DONE_PKT)
-
    }

    pub struct Reader<'a, R> {
-
        drain: Vec<u8>,
        stream: &'a mut R,
    }

+
    impl<'a> Reader<'a, TcpStream> {
+
        /// Check whether the stream ended.
+
        pub fn is_eof(&self) -> io::Result<bool> {
+
            // Use non-blocking mode instead of timeouts, as we don't want to mess
+
            // with existing timeouts.
+
            self.stream.set_nonblocking(true)?;
+
            let eof = match self.stream.peek(&mut []) {
+
                Ok(0) => true,
+
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => true,
+
                _ => false,
+
            };
+
            self.stream.set_nonblocking(false)?;
+

+
            Ok(eof)
+
        }
+
    }
+

    impl<'a, R: io::Read> Reader<'a, R> {
        /// Create a new packet-line reader.
-
        pub fn new(drain: Vec<u8>, stream: &'a mut R) -> Self {
-
            Self { drain, stream }
+
        pub fn new(stream: &'a mut R) -> Self {
+
            Self { stream }
        }

-
        /// Return the underlying stream.
-
        pub fn stream(&self) -> &R {
+
        /// Get the underlying stream.
+
        pub fn stream(&mut self) -> &mut R {
            self.stream
        }

+
        /// Wait for EOF.
+
        pub fn wait_for_eof(&mut self) -> io::Result<()> {
+
            match self.stream.read_to_end(&mut Vec::new()) {
+
                Ok(_) => Ok(()),
+
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(()),
+
                Err(e) => Err(e),
+
            }
+
        }
+

        /// Parse a Git request packet-line.
        ///
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
        ///
        pub fn read_request_pktline(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
            let mut pktline = [0u8; 1024];
-
            let Packetline::Git(length) = self.read_pktline(&mut pktline)? else {
-
                return Err(io::ErrorKind::InvalidInput.into());
-
            };
+
            let length = self.read_pktline(&mut pktline)?;
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
                return Err(io::ErrorKind::InvalidInput.into());
            };
            Ok((cmd, Vec::from(&pktline[..length])))
        }

-
        /// Parse a `done` packet-line.
-
        pub fn read_done_pktline(&mut self, buf: &mut [u8]) -> io::Result<()> {
-
            self.read_exact(&mut buf[..HEADER_LEN])?;
-

-
            if &buf[..HEADER_LEN] == DONE_PKT {
-
                return Ok(());
-
            }
-
            Err(io::ErrorKind::InvalidInput.into())
-
        }
-

        /// Parse a Git packet-line.
-
        pub fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<Packetline> {
+
        pub fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.read_exact(&mut buf[..HEADER_LEN])?;
-

-
            if &buf[..HEADER_LEN] == DONE_PKT {
-
                return Ok(Packetline::Done);
-
            }
            if &buf[..HEADER_LEN] == FLUSH_PKT
                || &buf[..HEADER_LEN] == DELIM_PKT
                || &buf[..HEADER_LEN] == RESPONSE_END_PKT
            {
-
                return Ok(Packetline::Git(HEADER_LEN));
+
                return Ok(HEADER_LEN);
            }
            let length = str::from_utf8(&buf[..HEADER_LEN])
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
@@ -590,25 +556,14 @@ pub mod pktline {

            self.read_exact(&mut buf[HEADER_LEN..length])?;

-
            Ok(Packetline::Git(length))
+
            Ok(length)
        }

        /// Read packet-lines from the internal reader into `buf`,
-
        /// and write them to the given writer.
-
        ///
-
        /// Returns [`ControlFlow::Break`] if the fetch should be terminated.
-
        /// Otherwise, returns [`ControlFlow::Continue`] to mean that we're
-
        /// expecting a response from the remote.
-
        pub fn pipe<W: io::Write>(
-
            &mut self,
-
            w: &mut W,
-
            buf: &mut [u8],
-
        ) -> io::Result<ControlFlow<()>> {
+
        /// and write them to the given writer. Exits when a [`FLUSH_PKT`] packet is received.
+
        pub fn pipe<W: io::Write>(&mut self, w: &mut W, buf: &mut [u8]) -> io::Result<()> {
            loop {
-
                let n = match self.read_pktline(buf)? {
-
                    Packetline::Done => return Ok(ControlFlow::Break(())),
-
                    Packetline::Git(n) => n,
-
                };
+
                let n = self.read_pktline(buf)?;
                if n == 0 {
                    break;
                }
@@ -618,19 +573,12 @@ pub mod pktline {
                    break;
                }
            }
-
            Ok(ControlFlow::Continue(()))
+
            Ok(())
        }
    }

    impl<'a, R: io::Read> io::Read for Reader<'a, R> {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
            if !self.drain.is_empty() {
-
                let count = buf.len().min(self.drain.len());
-
                buf[..count].copy_from_slice(&self.drain[..count]);
-
                self.drain.drain(..count);
-

-
                return Ok(count);
-
            }
            self.stream.read(buf)
        }
    }
added radicle-node/src/worker/channels.rs
@@ -0,0 +1,136 @@
+
use std::io::{Read, Write};
+
use std::{fmt, io};
+

+
use crossbeam_channel as chan;
+

+
/// Data that can be sent and received on worker channels.
+
pub enum ChannelEvent<T = Vec<u8>> {
+
    /// Git protocol data.
+
    Data(T),
+
    /// A request to close the channel.
+
    Close,
+
    /// A signal that the git protocol has ended, eg. when the remote fetch closes the
+
    /// connection.
+
    Eof,
+
}
+

+
impl<T> fmt::Debug for ChannelEvent<T> {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::Data(_) => write!(f, "ChannelEvent::Data(..)"),
+
            Self::Close => write!(f, "ChannelEvent::Close"),
+
            Self::Eof => write!(f, "ChannelEvent::Eof"),
+
        }
+
    }
+
}
+

+
/// Worker channels for communicating through the git stream with the remote.
+
pub struct Channels<T = Vec<u8>> {
+
    pub sender: ChannelWriter<T>,
+
    pub receiver: ChannelReader<T>,
+
}
+

+
impl Write for Channels {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        self.sender.write(buf)
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        self.sender.flush()
+
    }
+
}
+

+
impl Read for Channels {
+
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        self.receiver.read(buf)
+
    }
+
}
+

+
impl<T> Channels<T> {
+
    pub fn new(
+
        sender: chan::Sender<ChannelEvent<T>>,
+
        receiver: chan::Receiver<ChannelEvent<T>>,
+
    ) -> Self {
+
        Channels {
+
            sender: ChannelWriter(sender),
+
            receiver: ChannelReader {
+
                receiver,
+
                buffer: io::Cursor::new(Vec::new()),
+
            },
+
        }
+
    }
+

+
    pub fn split(&mut self) -> (&mut ChannelWriter<T>, &mut ChannelReader<T>) {
+
        (&mut self.sender, &mut self.receiver)
+
    }
+
}
+

+
/// Wraps a [`chan::Receiver`] and provides it with [`io::Read`].
+
#[derive(Clone)]
+
pub struct ChannelReader<T = Vec<u8>> {
+
    buffer: io::Cursor<Vec<u8>>,
+
    receiver: chan::Receiver<ChannelEvent<T>>,
+
}
+

+
impl Read for ChannelReader<Vec<u8>> {
+
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
        let read = self.buffer.read(buf)?;
+
        if read == 0 {
+
            let event = self.receiver.recv().map_err(|_| {
+
                io::Error::new(
+
                    io::ErrorKind::BrokenPipe,
+
                    "error reading from stream: channel is disconnected",
+
                )
+
            })?;
+

+
            match event {
+
                ChannelEvent::Data(data) => {
+
                    self.buffer = io::Cursor::new(data);
+
                    self.buffer.read(buf)
+
                }
+
                ChannelEvent::Eof => Err(io::ErrorKind::UnexpectedEof.into()),
+
                ChannelEvent::Close => Err(io::ErrorKind::ConnectionReset.into()),
+
            }
+
        } else {
+
            Ok(read)
+
        }
+
    }
+
}
+

+
/// Wraps a [`chan::Sender`] and provides it with [`io::Write`].
+
#[derive(Clone)]
+
pub struct ChannelWriter<T = Vec<u8>>(chan::Sender<ChannelEvent<T>>);
+

+
impl ChannelWriter {
+
    /// Since the git protocol is tunneled over an existing connection, we can't signal the end of
+
    /// the protocol via the usual means, which is to close the connection. Git also doesn't have
+
    /// any special message we can send to signal the end of the protocol.
+
    ///
+
    /// Hence, we there's no other way for the server to know that we're done sending requests
+
    /// than to send a special message outside the git protocol. This message can then be processed
+
    /// by the remote worker to end the protocol. We use the special "eof" control message for this.
+
    pub fn eof(&self) -> Result<(), chan::SendError<ChannelEvent>> {
+
        self.0.send(ChannelEvent::Eof)
+
    }
+
}
+

+
impl Write for ChannelWriter {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        let data = buf.to_vec();
+
        self.0.send(ChannelEvent::Data(data)).map_err(|m| {
+
            io::Error::new(
+
                io::ErrorKind::BrokenPipe,
+
                format!(
+
                    "error writing to stream: channel is disconnected: dropped {:?}",
+
                    m.into_inner()
+
                ),
+
            )
+
        })?;
+

+
        Ok(buf.len())
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        Ok(())
+
    }
+
}
added radicle-node/src/worker/tunnel.rs
@@ -0,0 +1,80 @@
+
use std::{
+
    io::{self, Write},
+
    net, time,
+
};
+

+
use super::channels::Channels;
+
use super::{pktline, Handle, NodeId, StreamId};
+

+
/// Tunnels fetches to a remote peer.
+
pub struct Tunnel<'a> {
+
    stream: &'a mut Channels,
+
    listener: net::TcpListener,
+
    local_addr: net::SocketAddr,
+
    channel: StreamId,
+
    remote: NodeId,
+
    handle: Handle,
+
}
+

+
impl<'a> Tunnel<'a> {
+
    pub(super) fn with(
+
        stream: &'a mut Channels,
+
        channel: StreamId,
+
        remote: NodeId,
+
        handle: Handle,
+
    ) -> io::Result<Self> {
+
        let listener = net::TcpListener::bind(net::SocketAddr::from(([0, 0, 0, 0], 0)))?;
+
        let local_addr = listener.local_addr()?;
+

+
        Ok(Self {
+
            stream,
+
            listener,
+
            local_addr,
+
            channel,
+
            remote,
+
            handle,
+
        })
+
    }
+

+
    pub fn local_addr(&self) -> net::SocketAddr {
+
        self.local_addr
+
    }
+

+
    /// Run the tunnel until the connection is closed.
+
    pub fn run(&mut self, timeout: time::Duration) -> io::Result<()> {
+
        // We now loop, alternating between reading requests from the client, and writing responses
+
        // back from the daemon.. Requests are delimited with a flush packet (`flush-pkt`).
+
        let mut buffer = [0; u16::MAX as usize + 1];
+
        let (mut remote_w, mut remote_r) = self.stream.split();
+
        let (mut stream, _) = self.listener.accept()?;
+

+
        let mut local = pktline::Reader::new(&mut stream);
+
        let mut remote_r = pktline::Reader::new(&mut remote_r);
+

+
        local.stream().set_read_timeout(Some(timeout))?;
+
        local.stream().set_write_timeout(Some(timeout))?;
+

+
        let (_, buf) = local.read_request_pktline()?;
+
        remote_w.write_all(&buf)?;
+

+
        // Nb. Annoyingly, we have to always check if the fetch stream is closed on every
+
        // iteration, otherwise we may get stuck waiting for data from the remote while
+
        // we're actually done. After measurement, this checking for EOF only takes
+
        // between 1µs and 4µs, and is therefore an okay compromise.
+
        while !local.is_eof()? {
+
            if self.handle.flush(self.remote, self.channel).is_err() {
+
                return Err(io::ErrorKind::BrokenPipe.into());
+
            }
+
            remote_r.pipe(local.stream(), &mut buffer)?;
+

+
            if let Err(e) = local.pipe(&mut remote_w, &mut buffer) {
+
                // This is the expected error when the git fetch closes the connection.
+
                if e.kind() == io::ErrorKind::UnexpectedEof {
+
                    break;
+
                }
+
                return Err(e);
+
            }
+
        }
+
        Ok(())
+
    }
+
}