Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Only try to fetch from connected nodes
Alexis Sellier committed 3 years ago
commit be77eebc3d623bca34e1548e091f966e002504cf
parent ac23f45d9b880421270b9b5bde5ac11352218c23
2 files changed +44 -21
modified radicle-node/src/service.rs
@@ -394,36 +394,44 @@ where

        match cmd {
            Command::Connect(id, addr) => self.reactor.connect(id, addr),
-
            Command::Fetch(id, resp) => {
+
            Command::Fetch(rid, resp) => {
                if !self
                    .tracking
-
                    .is_repo_tracked(&id)
+
                    .is_repo_tracked(&rid)
                    .expect("Service::command: error accessing tracking configuration")
                {
                    resp.send(FetchLookup::NotTracking).ok();
                    return;
                }

-
                let seeds = match self.routing.get(&id) {
+
                let (connected, unconnected) = match self.routing.get(&rid) {
                    Ok(seeds) => seeds
                        .into_iter()
                        .filter(|node| *node != self.node_id())
-
                        .collect(),
+
                        .partition::<Vec<_>, _>(|node| self.sessions.is_negotiated(node)),
                    Err(err) => {
-
                        error!(target: "service", "Error reading routing table for {id}: {err}");
+
                        error!(target: "service", "Error reading routing table for {rid}: {err}");
                        resp.send(FetchLookup::NotFound).ok();

                        return;
                    }
                };

-
                let Some(seeds) = NonEmpty::from_vec(seeds) else {
-
                    warn!(target: "service", "No seeds found to fetch from, for {}", id);
+
                debug!(
+
                    target: "service",
+
                    "Found {} connected seed(s) and {} unconnected seed(s) for {}",
+
                    connected.len(), unconnected.len(), rid
+
                );
+

+
                let Some(seeds) = NonEmpty::from_vec(connected) else {
+
                    warn!(target: "service", "No connected seeds found for {}", rid);
                    resp.send(FetchLookup::NotFound).ok();

+
                    // TODO: Establish connections to unconnected seeds, and retry.
+
                    // TODO: Fetch requests should be queued and re-checked to see if they can
+
                    //       be fulfilled everytime a new node connects.
                    return;
                };
-
                debug!(target: "service", "Found {} seed(s) to fetch from, for {}", seeds.len(), id);

                let (results_send, results) = chan::bounded(seeds.len());
                resp.send(FetchLookup::Found {
@@ -432,15 +440,11 @@ where
                })
                .ok();

-
                self.fetch_reqs.insert(id, results_send);
+
                self.fetch_reqs.insert(rid, results_send);

                // TODO: Limit the number of seeds we fetch from? Randomize?
                for seed in seeds {
-
                    if let Some(session) = self.sessions.get_mut(&seed) {
-
                        Self::fetch(id, session, &mut self.reactor);
-
                    } else {
-
                        // TODO: Establish connection?
-
                    }
+
                    self.fetch(rid, &seed);
                }
            }
            Command::TrackRepo(id, resp) => {
@@ -480,17 +484,24 @@ where
        }
    }

-
    pub fn fetch(rid: Id, session: &mut Session, reactor: &mut Reactor) {
+
    pub fn fetch(&mut self, rid: Id, from: &NodeId) {
+
        let Some(session) = self.sessions.get_mut(from) else {
+
            error!(target: "service", "Session {from} does not exist; cannot initiate fetch");
+
            return;
+
        };
+
        debug_assert!(session.is_negotiated());
+

        let seed = session.id;

        if let Some(fetch) = session.fetch(rid) {
            debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

-
            reactor.write(session.id, fetch);
+
            self.reactor.write(session.id, fetch);
        } else {
            // TODO: If we can't fetch, it's because we're already fetching from
            // this peer. So we need to queue the request, or find another peer.
-
            error!(target: "service",
+
            error!(
+
                target: "service",
                "Unable to fetch {rid} from peer {seed} that is already being fetched from"
            );
        }
@@ -747,10 +758,7 @@ where
                    // Refs are only supposed to be relayed by peers who are tracking
                    // the resource. Therefore, it's safe to fetch from the remote
                    // peer, even though it isn't the announcer.
-
                    let Some(session) = self.sessions.get_mut(relayer) else {
-
                        panic!(); // TODO
-
                    };
-
                    Self::fetch(message.id, session, &mut self.reactor);
+
                    self.fetch(message.id, relayer);

                    return Ok(true);
                } else {
@@ -1322,6 +1330,11 @@ impl Sessions {
    pub fn negotiated_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
        self.0.iter_mut().filter(move |(_, p)| p.is_connected())
    }
+

+
    /// Return whether this node has a fully established session.
+
    pub fn is_negotiated(&self, id: &NodeId) -> bool {
+
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
+
    }
}

impl Deref for Sessions {
modified radicle-node/src/service/session.rs
@@ -136,6 +136,16 @@ impl Session {
        matches!(self.state, State::Connected { .. })
    }

+
    pub fn is_negotiated(&self) -> bool {
+
        matches!(
+
            self.state,
+
            State::Connected {
+
                initialized: true,
+
                ..
+
            }
+
        )
+
    }
+

    pub fn attempts(&self) -> usize {
        self.attempts
    }