Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fetch missing inventory
Alexis Sellier committed 3 years ago
commit 4f062ef2c66c1c3d7a91083247acbd610245591b
parent b33fb2a33d73c40e953629a21b6b1778fa42e08a
3 files changed +138 -45
modified radicle-node/src/service.rs
@@ -408,7 +408,9 @@ where
        if now - self.last_sync >= SYNC_INTERVAL {
            debug!(target: "service", "Running 'sync' task...");

-
            // TODO: What do we do here?
+
            if let Err(e) = self.fetch_missing_inventory() {
+
                error!(target: "service", "Error fetching missing inventory: {e}");
+
            }
            self.reactor.wakeup(SYNC_INTERVAL);
            self.last_sync = now;
        }
@@ -419,7 +421,7 @@ where
                    .inventory()
                    .and_then(|i| self.announce_inventory(i))
                {
-
                    error!("Error announcing inventory: {}", err);
+
                    error!(target: "service", "Error announcing inventory: {}", err);
                }
            }
            self.reactor.wakeup(ANNOUNCE_INTERVAL);
@@ -443,48 +445,14 @@ where
            Command::Connect(id, addr) => {
                self.connect(id, addr);
            }
-
            Command::Seeds(rid, resp) => {
-
                #[derive(Default)]
-
                pub struct Stats {
-
                    connected: usize,
-
                    disconnected: usize,
-
                    fetching: usize,
+
            Command::Seeds(rid, resp) => match self.seeds(&rid) {
+
                Ok(seeds) => {
+
                    resp.send(seeds).ok();
                }
-

-
                let (stats, seeds) = match self.routing.get(&rid) {
-
                    Ok(seeds) => seeds.into_iter().fold(
-
                        (Stats::default(), Seeds::default()),
-
                        |(mut stats, mut seeds), node| {
-
                            if node != self.node_id() {
-
                                if self.sessions.is_fetching(&node) {
-
                                    seeds.insert(Seed::Fetching(node));
-
                                    stats.fetching += 1;
-
                                } else if self.sessions.is_connected(&node) {
-
                                    seeds.insert(Seed::Connected(node));
-
                                    stats.connected += 1;
-
                                } else if self.sessions.is_disconnected(&node) {
-
                                    seeds.insert(Seed::Disconnected(node));
-
                                    stats.connected += 1;
-
                                }
-
                            }
-

-
                            (stats, seeds)
-
                        },
-
                    ),
-
                    Err(err) => {
-
                        error!(target: "service", "Error reading routing table for {rid}: {err}");
-
                        drop(resp);
-

-
                        return;
-
                    }
-
                };
-
                debug!(
-
                    target: "service",
-
                    "Found {} connected seed(s), {} disconnected seed(s), and {} fetching seed(s) for {}",
-
                    stats.connected, stats.disconnected, stats.fetching, rid
-
                );
-
                resp.send(seeds).ok();
-
            }
+
                Err(e) => {
+
                    error!(target: "service", "Error reading routing table for {rid}: {e}");
+
                }
+
            },
            Command::Fetch(rid, seed, resp) => {
                // TODO: Establish connections to unconnected seeds, and retry.
                // TODO: Fetch requests should be queued and re-checked to see if they can
@@ -493,10 +461,10 @@ where
                self.fetch(rid, &seed);
            }
            Command::TrackRepo(rid, scope, resp) => {
+
                // Update our tracking policy.
                let tracked = self
                    .track_repo(&rid, scope)
                    .expect("Service::command: error tracking repository");
-
                // TODO: Try to fetch project if we weren't tracking it before.
                resp.send(tracked).ok();

                // Let all our peers know that we're interested in this repo from now on.
@@ -1287,6 +1255,47 @@ where
        false
    }

+
    fn seeds(&self, rid: &Id) -> Result<Seeds, Error> {
+
        #[derive(Default)]
+
        pub struct Stats {
+
            connected: usize,
+
            disconnected: usize,
+
            fetching: usize,
+
        }
+

+
        let (stats, seeds) = match self.routing.get(rid) {
+
            Ok(seeds) => seeds.into_iter().fold(
+
                (Stats::default(), Seeds::default()),
+
                |(mut stats, mut seeds), node| {
+
                    if node != self.node_id() {
+
                        if self.sessions.is_fetching(&node) {
+
                            seeds.insert(Seed::Fetching(node));
+
                            stats.fetching += 1;
+
                        } else if self.sessions.is_connected(&node) {
+
                            seeds.insert(Seed::Connected(node));
+
                            stats.connected += 1;
+
                        } else if self.sessions.is_disconnected(&node) {
+
                            seeds.insert(Seed::Disconnected(node));
+
                            stats.connected += 1;
+
                        }
+
                    }
+

+
                    (stats, seeds)
+
                },
+
            ),
+
            Err(err) => {
+
                return Err(Error::Routing(err));
+
            }
+
        };
+
        debug!(
+
            target: "service",
+
            "Found {} connected seed(s), {} disconnected seed(s), and {} fetching seed(s) for {}",
+
            stats.connected, stats.disconnected, stats.fetching, rid
+
        );
+

+
        Ok(seeds)
+
    }
+

    /// Return a new filter object, based on our tracking policy.
    fn filter(&self) -> Filter {
        if self.config.policy == tracking::Policy::Track {
@@ -1378,6 +1387,43 @@ where
            .collect()
    }

+
    /// Fetch all repositories that are tracked but missing from our inventory.
+
    fn fetch_missing_inventory(&mut self) -> Result<(), Error> {
+
        let inventory = self.storage().inventory()?;
+
        let missing = self
+
            .tracking
+
            .repo_entries()?
+
            .filter_map(|t| (t.policy == tracking::Policy::Track).then_some(t.id))
+
            .filter(|rid| !inventory.contains(rid));
+

+
        for rid in missing {
+
            match self.seeds(&rid) {
+
                Ok(mut seeds) => {
+
                    if seeds.has_connections() {
+
                        for seed in seeds.connected() {
+
                            self.fetch(rid, seed);
+
                        }
+
                    } else {
+
                        // TODO: We should make sure that this fetch is retried later, either
+
                        // when we connect to a seed, or when we discover a new seed.
+
                        // Since new connections and routing table updates are both conditions for
+
                        // fetching, we should trigger fetches when those conditions appear.
+
                        // Another way to handle this would be to update our database, saying
+
                        // that we're trying to fetch a certain repo. We would then just
+
                        // iterate over those entries in the above circumstances. This is
+
                        // merely an optimization though, we can also iterate over all tracked
+
                        // repos and check which ones are not in our inventory.
+
                        warn!(target: "service", "No connected seeds found for {rid}..");
+
                    }
+
                }
+
                Err(e) => {
+
                    error!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
+
                }
+
            }
+
        }
+
        Ok(())
+
    }
+

    fn maintain_connections(&mut self) {
        let addrs = self.choose_addresses();
        if addrs.is_empty() {
modified radicle-node/src/tests.rs
@@ -945,6 +945,53 @@ fn test_track_repo_subscribe() {
}

#[test]
+
fn test_fetch_missing_inventory() {
+
    let rid = arbitrary::gen::<Id>(1);
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let eve = Peer::new("eve", [9, 9, 9, 9]);
+
    let (send, recv) = chan::bounded::<bool>(1);
+
    let now = LocalTime::now();
+

+
    alice.connect_to(&bob);
+
    alice.connect_to(&eve);
+
    alice.receive(
+
        bob.id(),
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: vec![rid].try_into().unwrap(),
+
                timestamp: now.as_millis(),
+
            },
+
            bob.signer(),
+
        ),
+
    );
+
    alice.receive(
+
        eve.id(),
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: vec![rid].try_into().unwrap(),
+
                timestamp: now.as_millis(),
+
            },
+
            eve.signer(),
+
        ),
+
    );
+
    alice.command(Command::TrackRepo(rid, node::tracking::Scope::All, send));
+
    alice.outbox().for_each(drop);
+

+
    assert!(recv.recv().unwrap());
+

+
    alice.elapse(service::SYNC_INTERVAL);
+
    alice
+
        .messages(bob.id)
+
        .find(|m| matches!(m, Message::Fetch { .. }))
+
        .unwrap();
+
    alice
+
        .messages(eve.id)
+
        .find(|m| matches!(m, Message::Fetch { .. }))
+
        .unwrap();
+
}
+

+
#[test]
fn test_push_and_pull() {
    let tempdir = tempfile::tempdir().unwrap();

modified radicle/src/storage/git.rs
@@ -145,7 +145,7 @@ impl Storage {

            // For performance reasons, we don't do a full repository check here.
            if let Err(e) = repo.head() {
-
                log::error!(target: "storage", "Repository {rid} is corrupted: looking up head: {e}");
+
                log::warn!(target: "storage", "Repository {rid} is invalid: looking up head: {e}");
                continue;
            }
            repos.push(rid);