Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Queue pending fetches in Session
Han Xu committed 3 years ago
commit 241c41a63294d7f33f53fa2541ac7ba7abcd9549
parent fc724a2333040ced38f87aa4c70149ffab095bb7
4 files changed +122 -7
modified radicle-node/src/service.rs
@@ -568,12 +568,14 @@ where
                if other == rid {
                    debug!(target: "service", "Ignoring redundant attempt to fetch {rid} from {from}");
                } else {
-
                    // TODO: If we can't fetch, it's because we're already fetching from
-
                    // this peer. So we need to queue the request, or find another peer.
-
                    error!(
+
                    // If we can't fetch, it's because we're already fetching from
+
                    // this peer. So we need to queue the request.
+
                    // TODO: consider to find another peer.
+
                    debug!(
                        target: "service",
-
                        "Dropping fetch for {rid} from {from}: another fetch is ongoing"
+
                        "Queueing fetch for {rid} from {from}: another fetch is ongoing"
                    );
+
                    session.queue_fetch(rid);
                }
            }
            session::FetchResult::NotConnected => {
@@ -651,6 +653,7 @@ where
                // to the gossip protocol, otherwise the messages will
                // be queued.
                self.sync_and_announce();
+
                self.process_fetch_queue(&remote);
            }
            FetchDirection::Responder => self.switch_to_gossip(remote),
        }
@@ -1277,6 +1280,16 @@ where
        }
    }

+
    /// Execute the next pending fetch with `remote`, if any.
+
    fn process_fetch_queue(&mut self, remote: &NodeId) {
+
        if let Some(session) = self.sessions.get_mut(remote) {
+
            if let Some(rid) = session.dequeue_fetch() {
+
                debug!(target: "service", "Dequeued a pending fetch {rid} with {remote}");
+
                self.fetch(rid, remote);
+
            }
+
        }
+
    }
+

    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
        if let Some(sess) = self.sessions.get_mut(&nid) {
            sess.to_initial();
modified radicle-node/src/service/session.rs
@@ -1,3 +1,4 @@
+
use std::collections::VecDeque;
use std::fmt;

use radicle::storage::Namespaces;
@@ -154,6 +155,9 @@ pub struct Session {
    /// Last time a message was received from the peer.
    pub last_active: LocalTime,

+
    /// Fetches queued due to another ongoing fetch.
+
    pending_fetches: VecDeque<Id>,
+

    /// Connection attempts. For persistent peers, Tracks
    /// how many times we've attempted to connect. We reset this to zero
    /// upon successful connection.
@@ -192,6 +196,7 @@ impl Session {
            persistent,
            last_active: LocalTime::default(),
            attempts: 1,
+
            pending_fetches: VecDeque::new(),
            rng,
        }
    }
@@ -209,6 +214,7 @@ impl Session {
            persistent,
            last_active: LocalTime::default(),
            attempts: 0,
+
            pending_fetches: VecDeque::new(),
            rng,
        }
    }
@@ -363,4 +369,12 @@ impl Session {
        }
        Ok(())
    }
+

+
    pub(crate) fn queue_fetch(&mut self, rid: Id) {
+
        self.pending_fetches.push_back(rid);
+
    }
+

+
    pub(crate) fn dequeue_fetch(&mut self) -> Option<Id> {
+
        self.pending_fetches.pop_front()
+
    }
}
modified radicle-node/src/tests.rs
@@ -1262,3 +1262,89 @@ fn prop_inventory_exchange_dense() {
        .gen(qcheck::Gen::new(8))
        .quickcheck(property as fn(MockStorage, MockStorage, MockStorage));
}
+

+
#[test]
+
fn test_queued_fetch() {
+
    let storage = arbitrary::nonempty_storage(3);
+
    let mut repo_keys = storage.inventory.keys();
+
    let rid = *repo_keys.next().unwrap();
+
    let rid2 = *repo_keys.next().unwrap();
+
    let rid3 = *repo_keys.next().unwrap();
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let (send, _recv) = chan::bounded::<node::FetchResult>(1);
+

+
    logger::init(log::Level::Debug);
+

+
    // Send the first fetch.
+
    alice.connect_to(&bob);
+
    alice.command(Command::Fetch(rid, bob.id, send));
+

+
    assert_matches!(alice.messages(bob.id).next(), Some(Message::Fetch { .. }));
+

+
    // Send the 2nd fetch that will be queued.
+
    let (send2, _recv2) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid2, bob.id, send2));
+

+
    // Send the 3rd fetch that will be queued.
+
    let (send3, _recv3) = chan::bounded::<node::FetchResult>(1);
+
    alice.command(Command::Fetch(rid3, bob.id, send3));
+

+
    // We shouldn't send out the 2nd, 3rd fetch while we're doing the 1st fetch.
+
    assert_matches!(alice.messages(bob.id).next(), None);
+

+
    alice.receive(bob.id(), Message::FetchOk { rid });
+
    assert_matches!(alice.messages(bob.id).next(), None);
+

+
    // Have enough time pass that Alice sends a "ping" to Bob.
+
    alice.elapse(KEEP_ALIVE_DELTA);
+

+
    // Finish the 1st fetch.
+
    alice.fetched(
+
        Fetch {
+
            rid,
+
            direction: FetchDirection::Initiator {
+
                namespaces: Namespaces::All,
+
            },
+
            remote: bob.id,
+
        },
+
        Ok(vec![]),
+
    );
+

+
    // Now the 1st fetch is done, the gossip messages are drained.
+
    let mut messages = alice.messages(bob.id);
+
    assert_matches!(messages.next(), Some(Message::Ping(_)));
+

+
    // The message after all queued gossip messages is Fetch.
+
    assert_eq!(messages.last(), Some(Message::Fetch { rid: rid2 }));
+

+
    // `FetchOk` for the 2nd fetch.
+
    alice.receive(bob.id(), Message::FetchOk { rid: rid2 });
+

+
    // The 2nd fetch should be in `Io` now. Not the 3rd fetch yet.
+
    let last_io = alice.outbox().last().unwrap();
+
    assert_matches!(last_io, Io::Fetch(fetch) if fetch.rid == rid2);
+

+
    // Finish the 2nd fetch.
+
    alice.fetched(
+
        Fetch {
+
            rid: rid2,
+
            direction: FetchDirection::Initiator {
+
                namespaces: Namespaces::All,
+
            },
+
            remote: bob.id,
+
        },
+
        Ok(vec![]),
+
    );
+

+
    // Now the 2nd fetch is done, the 3rd fetch is drained.
+
    let mut messages = alice.messages(bob.id);
+
    assert_eq!(messages.next(), Some(Message::Fetch { rid: rid3 }));
+

+
    // `FetchOk` for the 3rd fetch.
+
    alice.receive(bob.id(), Message::FetchOk { rid: rid3 });
+

+
    // The 3rd fetch should be in `Io` now.
+
    let last_io = alice.outbox().last().unwrap();
+
    assert_matches!(last_io, Io::Fetch(fetch) if fetch.rid == rid3);
+
}
modified radicle/src/test/arbitrary.rs
@@ -60,9 +60,11 @@ pub fn vec<T: Eq + Arbitrary>(size: usize) -> Vec<T> {

pub fn nonempty_storage(size: usize) -> MockStorage {
    let mut storage = gen::<MockStorage>(size);
-
    storage
-
        .inventory
-
        .insert(gen::<Id>(size), gen::<Doc<Verified>>(size));
+
    for _ in 0..size {
+
        storage
+
            .inventory
+
            .insert(gen::<Id>(1), gen::<Doc<Verified>>(1));
+
    }
    storage
}