Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Various fixes
Merged did:key:z6MksFqX...wzpT opened 1 year ago

Stuff that came up while looking at the logs.

  • node: Keep track of last announce separately
  • node: Handle connection state discrepancies
  • node: Revise fetch dequeue behavior

See individual commits for details.

2 files changed +86 -46 484cf022 dd7a0b35
modified radicle-node/src/service.rs
@@ -452,7 +452,9 @@ pub struct Service<D, S, G> {
    last_sync: LocalTime,
    /// Last time the service routing table was pruned.
    last_prune: LocalTime,
-
    /// Last time the inventory was announced.
+
    /// Last time the announcement task was run.
+
    last_announce: LocalTime,
+
    /// Timestamp of last local inventory announced.
    last_inventory: LocalTime,
    /// Last timestamp used for announcements.
    last_timestamp: Timestamp,
@@ -531,6 +533,7 @@ where
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
            last_timestamp,
+
            last_announce: LocalTime::default(),
            last_inventory: LocalTime::default(),
            started_at: None,     // Updated on initialize.
            last_online_at: None, // Updated on initialize.
@@ -806,6 +809,7 @@ where
            self.disconnect_unresponsive_peers(&now);
            self.idle_connections();
            self.maintain_connections();
+
            self.dequeue_fetch();
            self.outbox.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
@@ -827,11 +831,12 @@ where
            self.outbox.wakeup(SYNC_INTERVAL);
            self.last_sync = now;
        }
-
        if now - self.last_inventory >= ANNOUNCE_INTERVAL {
+
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
            trace!(target: "service", "Running 'announce' task...");

            self.announce_inventory();
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
+
            self.last_announce = now;
        }
        if now - self.last_prune >= PRUNE_INTERVAL {
            trace!(target: "service", "Running 'prune' task...");
@@ -985,8 +990,7 @@ where
                if status.want.is_empty() {
                    debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
                } else {
-
                    self._fetch(rid, from, status.want, timeout, channel);
-
                    return true;
+
                    return self._fetch(rid, from, status.want, timeout, channel);
                }
            }
            Err(e) => {
@@ -1004,7 +1008,7 @@ where
        from: NodeId,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) {
+
    ) -> bool {
        self._fetch(rid, from, vec![], timeout, channel)
    }

@@ -1015,12 +1019,13 @@ where
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
        channel: Option<chan::Sender<FetchResult>>,
-
    ) {
+
    ) -> bool {
        match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
            Ok(fetching) => {
                if let Some(c) = channel {
                    fetching.subscribe(c);
                }
+
                return true;
            }
            Err(TryFetchError::AlreadyFetching(fetching)) => {
                // If we're already fetching the same refs from the requested peer, there's nothing
@@ -1043,7 +1048,7 @@ where
                    if self.queue.contains(&fetch) {
                        debug!(target: "service", "Fetch for {rid} with {from} is already queued..");
                    } else {
-
                        debug!(target: "service", "Queueing fetch for {rid} with {from}..");
+
                        debug!(target: "service", "Queueing fetch for {rid} with {from} (already fetching)..");
                        self.queue.push_back(fetch);
                    }
                }
@@ -1067,6 +1072,7 @@ where
                }
            }
        }
+
        false
    }

    // TODO: Buffer/throttle fetches.
@@ -1216,6 +1222,8 @@ where
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
    pub fn dequeue_fetch(&mut self) {
+
        let mut tries = self.queue.len();
+

        while let Some(QueuedFetch {
            rid,
            from,
@@ -1241,7 +1249,19 @@ where
                }
            } else {
                // If no refs are specified, always do a full fetch.
-
                self.fetch(rid, from, timeout, channel);
+
                if self.fetch(rid, from, timeout, channel) {
+
                    break;
+
                }
+
            }
+
            // Nb. Just a precaution, `tries` should always be >= 1 here.
+
            tries = tries.saturating_sub(1);
+
            // To avoid looping forever, only try to dequeue a fixed number of fetches at a time.
+
            if tries == 0 {
+
                debug!(
+
                    target: "service",
+
                    "Giving up on dequeuing, {} item(s) still left in the queue..",
+
                    self.queue.len()
+
                );
                break;
            }
        }
@@ -1310,11 +1330,17 @@ where
            }
        } else {
            match self.sessions.entry(remote) {
-
                Entry::Occupied(e) => {
-
                    warn!(
+
                Entry::Occupied(mut e) => {
+
                    // In this scenario, it's possible that our peer is persistent, and
+
                    // disconnected. We get an inbound connection before we attempt a re-connection,
+
                    // and therefore we treat it as a regular inbound connection.
+
                    let peer = e.get_mut();
+
                    debug!(
                        target: "service",
-
                        "Connecting peer {remote} already has a session open ({})", e.get()
+
                        "Connecting peer {remote} already has a session open ({peer})"
                    );
+
                    peer.to_connected(self.clock);
+
                    self.outbox.write_all(peer, msgs);
                }
                Entry::Vacant(e) => {
                    if let HostName::Ip(ip) = addr.host {
@@ -1768,15 +1794,39 @@ where
        }
        message.log(log::Level::Debug, remote, Link::Inbound);

-
        trace!(target: "service", "Received message {:?} from {}", &message, peer.id);
+
        let connected = match &mut peer.state {
+
            session::State::Disconnected { .. } => {
+
                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
+
                return Ok(());
+
            }
+
            // In case of a discrepancy between the service state and the state of the underlying
+
            // wire protocol, we may receive a message from a peer that we consider not fully connected
+
            // at the service level. To remedy this, we simply transition the peer to a connected state.
+
            //
+
            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
+
            // solution to converge towards the same state.
+
            session::State::Attempted { .. } | session::State::Initial => {
+
                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
+
                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);

-
        match (&mut peer.state, message) {
+
                peer.to_connected(self.clock);
+

+
                None
+
            }
+
            session::State::Connected {
+
                ping, latencies, ..
+
            } => Some((ping, latencies)),
+
        };
+

+
        trace!(target: "service", "Received message {message:?} from {remote}");
+

+
        match message {
            // Process a peer announcement.
-
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
-
                let relayer = peer.id;
+
            Message::Announcement(ann) => {
+
                let relayer = remote;
                let relayer_addr = peer.addr.clone();

-
                if let Some(id) = self.handle_announcement(&relayer, &relayer_addr, &ann)? {
+
                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
                    if self.config.is_relay() {
                        if let AnnouncementMessage::Inventory(_) = ann.message {
                            if let Err(e) = self
@@ -1793,7 +1843,7 @@ where
                    }
                }
            }
-
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
+
            Message::Subscribe(subscribe) => {
                // Filter announcements by interest.
                match self
                    .db
@@ -1825,11 +1875,10 @@ where
                }
                peer.subscribe = Some(subscribe);
            }
-
            (session::State::Connected { .. }, Message::Info(info)) => {
-
                let remote = peer.id;
-
                self.handle_info(remote, &info)?;
+
            Message::Info(info) => {
+
                self.handle_info(*remote, &info)?;
            }
-
            (session::State::Connected { .. }, Message::Ping(Ping { ponglen, .. })) => {
+
            Message::Ping(Ping { ponglen, .. }) => {
                // Ignore pings which ask for too much data.
                if ponglen > Ping::MAX_PONG_ZEROES {
                    return Ok(());
@@ -1841,33 +1890,24 @@ where
                    },
                );
            }
-
            (
-
                session::State::Connected {
-
                    ping, latencies, ..
-
                },
-
                Message::Pong { zeroes },
-
            ) => {
-
                if let session::PingState::AwaitingResponse {
-
                    len: ponglen,
-
                    since,
-
                } = *ping
-
                {
-
                    if (ponglen as usize) == zeroes.len() {
-
                        *ping = session::PingState::Ok;
-
                        // Keep track of peer latency.
-
                        latencies.push_back(self.clock - since);
-
                        if latencies.len() > MAX_LATENCIES {
-
                            latencies.pop_front();
+
            Message::Pong { zeroes } => {
+
                if let Some((ping, latencies)) = connected {
+
                    if let session::PingState::AwaitingResponse {
+
                        len: ponglen,
+
                        since,
+
                    } = *ping
+
                    {
+
                        if (ponglen as usize) == zeroes.len() {
+
                            *ping = session::PingState::Ok;
+
                            // Keep track of peer latency.
+
                            latencies.push_back(self.clock - since);
+
                            if latencies.len() > MAX_LATENCIES {
+
                                latencies.pop_front();
+
                            }
                        }
                    }
                }
            }
-
            (session::State::Attempted { .. } | session::State::Initial, msg) => {
-
                debug!(target: "service", "Ignoring unexpected message {:?} from connecting peer {}", msg, peer.id);
-
            }
-
            (session::State::Disconnected { .. }, msg) => {
-
                debug!(target: "service", "Ignoring {:?} from disconnected peer {}", msg, peer.id);
-
            }
        }
        Ok(())
    }
modified radicle-node/src/service/session.rs
@@ -230,8 +230,8 @@ impl Session {
    pub fn to_connected(&mut self, since: LocalTime) {
        self.last_active = since;

-
        let State::Attempted = &self.state else {
-
            panic!("Session::to_connected: can only transition to 'connected' state from 'attempted' state");
+
        if let State::Connected { .. } = &self.state {
+
            panic!("Session::to_connected: session is already in 'connected' state");
        };
        self.state = State::Connected {
            since,