Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Wire protocol updates
Merged did:key:z6MksFqX...wzpT opened 2 years ago
  • Keep track and log the Git data sent/received.
  • Always notify the service of a fetch result
2 files changed +73 -34 e3ecf4d7 53ecc884
modified radicle-node/src/service.rs
@@ -867,7 +867,7 @@ where
        match self.refs_status_of(rid, refs, &scope) {
            Ok(status) => {
                if status.want.is_empty() {
-
                    debug!(target: "service", "Skipping  fetch for {rid}, all refs are already in storage");
+
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
                    self._fetch(rid, from, status.want, timeout, channel);
                    return true;
modified radicle-node/src/wire/protocol.rs
@@ -74,12 +74,32 @@ pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::Tcp
/// Reactor action.
type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;

+
/// A worker stream.
+
struct Stream {
+
    /// Channels.
+
    channels: worker::Channels,
+
    /// Data sent.
+
    sent_bytes: usize,
+
    /// Data received.
+
    received_bytes: usize,
+
}
+

+
impl Stream {
+
    fn new(channels: worker::Channels) -> Self {
+
        Self {
+
            channels,
+
            sent_bytes: 0,
+
            received_bytes: 0,
+
        }
+
    }
+
}
+

/// Streams associated with a connected peer.
struct Streams {
    /// Active streams and their associated worker channels.
    /// Note that the gossip and control streams are not included here as they are always
    /// implied to exist.
-
    streams: RandomMap<StreamId, worker::Channels>,
+
    streams: RandomMap<StreamId, Stream>,
    /// Connection direction.
    link: Link,
    /// Sequence number used to compute the next stream id.
@@ -97,10 +117,15 @@ impl Streams {
    }

    /// Get a known stream.
-
    fn get(&self, stream: &StreamId) -> Option<&worker::Channels> {
+
    fn get(&self, stream: &StreamId) -> Option<&Stream> {
        self.streams.get(stream)
    }

+
    /// Get a known stream, mutably.
+
    fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
+
        self.streams.get_mut(stream)
+
    }
+

    /// Open a new stream.
    fn open(&mut self) -> (StreamId, worker::Channels) {
        self.seq += 1;
@@ -122,7 +147,7 @@ impl Streams {

        match self.streams.entry(stream) {
            Entry::Vacant(e) => {
-
                e.insert(worker);
+
                e.insert(Stream::new(worker));
                Some(wire)
            }
            Entry::Occupied(_) => None,
@@ -130,15 +155,15 @@ impl Streams {
    }

    /// Unregister an open stream.
-
    fn unregister(&mut self, stream: &StreamId) -> Option<worker::Channels> {
+
    fn unregister(&mut self, stream: &StreamId) -> Option<Stream> {
        self.streams.remove(stream)
    }

    /// Close all streams.
    fn shutdown(&mut self) {
-
        for (sid, chans) in self.streams.drain() {
+
        for (sid, stream) in self.streams.drain() {
            log::debug!(target: "wire", "Closing worker stream {sid}");
-
            chans.close().ok();
+
            stream.channels.close().ok();
        }
    }
}
@@ -384,10 +409,26 @@ where
            return;
        };

-
        let Peer::Connected {
-
            nid, link, streams, ..
-
        } = peer
-
        else {
+
        if let Peer::Connected { link, streams, .. } = peer {
+
            // Nb. It's possible that the stream would already be unregistered if we received an
+
            // early "close" from the remote. Otherwise, we unregister it here and send the "close"
+
            // ourselves.
+
            if let Some(s) = streams.unregister(&task.stream) {
+
                log::debug!(
+
                    target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
+
                    task.stream, task.remote, s.sent_bytes, s.received_bytes
+
                );
+
                let frame = Frame::control(
+
                    *link,
+
                    frame::Control::Close {
+
                        stream: task.stream,
+
                    },
+
                );
+
                self.actions.push_back(Action::Send(fd, frame.to_bytes()));
+
            }
+
        } else {
+
            // If the peer disconnected, we'll get here, but we still want to let the service know
+
            // about the fetch result, so we don't return here.
            log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
            return;
        };
@@ -395,7 +436,7 @@ where
        // Only call into the service if we initiated this fetch.
        match task.result {
            FetchResult::Initiator { rid, result } => {
-
                self.service.fetched(rid, *nid, result);
+
                self.service.fetched(rid, nid, result);
            }
            FetchResult::Responder { rid, result } => {
                if let Some(rid) = rid {
@@ -407,22 +448,10 @@ where
                }
            }
        }
-

-
        // Nb. It's possible that the stream would already be unregistered if we received an early
-
        // "close" from the remote. Otherwise, we unregister it here and send the "close" ourselves.
-
        if streams.unregister(&task.stream).is_some() {
-
            let frame = Frame::control(
-
                *link,
-
                frame::Control::Close {
-
                    stream: task.stream,
-
                },
-
            );
-
            self.actions.push_back(Action::Send(fd, frame.to_bytes()));
-
        }
    }

    fn flush(&mut self, remote: NodeId, stream: StreamId) {
-
        let Some((fd, peer)) = self.peers.lookup(&remote) else {
+
        let Some((fd, peer)) = self.peers.lookup_mut(&remote) else {
            log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
            return;
        };
@@ -430,14 +459,17 @@ where
            log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
            return;
        };
-
        let Some(c) = streams.get(&stream) else {
+
        let Some(s) = streams.get_mut(&stream) else {
            log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
            return;
        };

-
        for data in c.try_iter() {
+
        for data in s.channels.try_iter() {
            let frame = match data {
-
                ChannelEvent::Data(data) => Frame::git(stream, data),
+
                ChannelEvent::Data(data) => {
+
                    s.sent_bytes += data.len();
+
                    Frame::git(stream, data)
+
                }
                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
            };
@@ -702,10 +734,10 @@ where
                                data: FrameData::Control(frame::Control::Eof { stream }),
                                ..
                            })) => {
-
                                if let Some(channels) = streams.get(&stream) {
+
                                if let Some(s) = streams.get(&stream) {
                                    log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");

-
                                    if channels.send(ChannelEvent::Eof).is_err() {
+
                                    if s.channels.send(ChannelEvent::Eof).is_err() {
                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
                                    }
                                } else {
@@ -718,8 +750,13 @@ where
                            })) => {
                                log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");

-
                                if let Some(chans) = streams.unregister(&stream) {
-
                                    chans.close().ok();
+
                                if let Some(s) = streams.unregister(&stream) {
+
                                    log::debug!(
+
                                        target: "wire",
+
                                        "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
+
                                        s.sent_bytes, s.received_bytes
+
                                    );
+
                                    s.channels.close().ok();
                                }
                            }
                            Ok(Some(Frame {
@@ -733,8 +770,10 @@ where
                                data: FrameData::Git(data),
                                ..
                            })) => {
-
                                if let Some(channels) = streams.get(&stream) {
-
                                    if channels.send(ChannelEvent::Data(data)).is_err() {
+
                                if let Some(s) = streams.get_mut(&stream) {
+
                                    s.received_bytes += data.len();
+

+
                                    if s.channels.send(ChannelEvent::Data(data)).is_err() {
                                        log::error!(target: "wire", "Worker is disconnected; cannot send data");
                                    }
                                } else {