Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Improve output of `Session::fetch` function
Alexis Sellier committed 3 years ago
commit 42b4d95f93f0cc9e0e798bc3354bd73fd29509ae
parent bd7c5a0daa7594cbde84c1ebdf7ccef06ba1aeee
2 files changed +53 -26
modified radicle-node/src/service.rs
@@ -478,17 +478,27 @@ where

        let seed = session.id;

-
        if let Some(fetch) = session.fetch(rid) {
-
            debug!(target: "service", "Fetch initiated for {rid} with {seed}..");
+
        match session.fetch(rid) {
+
            session::FetchResult::Ready(fetch) => {
+
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

-
            self.reactor.write(session.id, fetch);
-
        } else {
-
            // TODO: If we can't fetch, it's because we're already fetching from
-
            // this peer. So we need to queue the request, or find another peer.
-
            error!(
-
                target: "service",
-
                "Unable to fetch {rid} from peer {seed} that is already being fetched from"
-
            );
+
                self.reactor.write(session.id, fetch);
+
            }
+
            session::FetchResult::AlreadyFetching(other) => {
+
                if other == rid {
+
                    debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}");
+
                } else {
+
                    // TODO: If we can't fetch, it's because we're already fetching from
+
                    // this peer. So we need to queue the request, or find another peer.
+
                    error!(
+
                        target: "service",
+
                        "Dropping fetch for {rid} from {from}: another fetch is ongoing"
+
                    );
+
                }
+
            }
+
            session::FetchResult::NotConnected => {
+
                error!(target: "service", "Unable to fetch {rid} from peer {seed}: peer is not connected");
+
            }
        }
    }

@@ -542,7 +552,7 @@ where

        if let Some(session) = self.sessions.get_mut(&remote) {
            if let session::State::Connected { protocol, .. } = &mut session.state {
-
                if *protocol == session::Protocol::Fetch {
+
                if let session::Protocol::Fetch { .. } = protocol {
                    *protocol = session::Protocol::default();
                } else {
                    panic!(
@@ -736,7 +746,7 @@ where
                    .expect("Service::handle_announcement: error accessing tracking configuration")
                {
                    // Discard inventory messages we've already seen, otherwise update
-
                    // out last seen time.
+
                    // our last seen time.
                    if !peer.refs_announced(message.id, timestamp) {
                        debug!(target: "service", "Ignoring stale refs announcement from {announcer}");
                        return Ok(false);
@@ -833,7 +843,7 @@ where
        match (&mut peer.state, message) {
            (
                session::State::Connected {
-
                    protocol: session::Protocol::Fetch,
+
                    protocol: session::Protocol::Fetch { .. },
                    ..
                },
                _,
@@ -928,7 +938,7 @@ where

                // TODO: Check that we have the repo first?

-
                *protocol = Protocol::Fetch;
+
                *protocol = Protocol::Fetch { rid };
                // Accept the request and instruct the transport to handover the socket to the worker.
                self.reactor.write(*remote, Message::FetchOk { rid });
                self.reactor
@@ -950,7 +960,7 @@ where
                }
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

-
                *protocol = Protocol::Fetch;
+
                *protocol = Protocol::Fetch { rid };
                // Instruct the transport to handover the socket to the worker.
                self.reactor
                    .fetch(*remote, rid, Namespaces::default(), true);
modified radicle-node/src/service/session.rs
@@ -25,7 +25,7 @@ pub enum Protocol {
    /// Git smart protocol. Used for fetching repository data.
    /// This protocol is used after a connection upgrade via the
    /// [`Message::Fetch`] message.
-
    Fetch,
+
    Fetch { rid: Id },
}

impl Default for Protocol {
@@ -64,7 +64,7 @@ impl fmt::Display for State {
                Protocol::Gossip { .. } => {
                    write!(f, "connected <gossip>")
                }
-
                Protocol::Fetch => {
+
                Protocol::Fetch { .. } => {
                    write!(f, "connected <fetch>")
                }
            },
@@ -75,6 +75,17 @@ impl fmt::Display for State {
    }
}

+
/// Return value of [`Session::fetch`].
+
#[derive(Debug)]
+
pub enum FetchResult {
+
    /// We are already fetching from this peer.
+
    AlreadyFetching(Id),
+
    /// Ok, ready to fetch.
+
    Ready(Message),
+
    /// This peer is not ready to fetch.
+
    NotConnected,
+
}
+

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("wrong protocol version in message: {0}")]
@@ -196,18 +207,24 @@ impl Session {
        self.attempts
    }

-
    pub fn fetch(&mut self, rid: Id) -> Option<Message> {
+
    pub fn fetch(&mut self, rid: Id) -> FetchResult {
        if let State::Connected { protocol, .. } = &mut self.state {
-
            if *protocol == (Protocol::Gossip { requested: None }) {
-
                *protocol = Protocol::Gossip {
-
                    requested: Some(rid),
-
                };
-
                return Some(Message::Fetch { rid });
-
            } else {
-
                log::error!("Attempted to fetch from peer {} which isn't ready", self.id);
+
            match protocol {
+
                Protocol::Gossip { requested } => {
+
                    if let Some(requested) = requested {
+
                        FetchResult::AlreadyFetching(*requested)
+
                    } else {
+
                        *protocol = Protocol::Gossip {
+
                            requested: Some(rid),
+
                        };
+
                        FetchResult::Ready(Message::Fetch { rid })
+
                    }
+
                }
+
                Protocol::Fetch { rid } => FetchResult::AlreadyFetching(*rid),
            }
+
        } else {
+
            FetchResult::NotConnected
        }
-
        None
    }

    pub fn to_connecting(&mut self) {