Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: defer inventory scan to idle
Quaylyn Rimer committed 3 months ago
commit dd1af881731945840df147af94993168423545ee
parent b25bb21b1438ebf205f58f689fad05ac6984c2c4
1 file changed +87 -63
modified crates/radicle-protocol/src/service.rs
@@ -469,6 +469,8 @@ pub struct Service<D, S, G> {
    metrics: Metrics,
    /// Whether refs DB population is pending.
    refs_populate_pending: bool,
+
    /// Whether inventory scan is pending.
+
    inventory_scan_pending: bool,
}

impl<D, S, G> Service<D, S, G> {
@@ -538,6 +540,7 @@ where
            listening: vec![],
            metrics: Metrics::default(),
            refs_populate_pending: false,
+
            inventory_scan_pending: false,
        }
    }

@@ -681,69 +684,8 @@ where
            }
        }

-
        let announced = self
-
            .db
-
            .seeds()
-
            .seeded_by(&nid)?
-
            .collect::<Result<HashMap<_, _>, _>>()?;
-
        let mut inventory = BTreeSet::new();
-
        let mut private = BTreeSet::new();
-

-
        for repo in self.storage.repositories()? {
-
            let rid = repo.rid;
-

-
            // If we're not seeding this repo, just skip it.
-
            if !self.policies.is_seeding(&rid)? {
-
                debug!(target: "service", "Local repository {rid} is not seeded");
-
                continue;
-
            }
-
            // Add public repositories to inventory.
-
            if repo.doc.is_public() {
-
                inventory.insert(rid);
-
            } else {
-
                private.insert(rid);
-
            }
-
            // If we have no owned refs for this repo, then there's nothing to announce.
-
            let Some(updated_at) = repo.synced_at else {
-
                continue;
-
            };
-
            // Skip this repo if the sync status matches what we have in storage.
-
            if let Some(announced) = announced.get(&rid) {
-
                if updated_at.oid == announced.oid {
-
                    continue;
-
                }
-
            }
-
            // Make sure our local node's sync status is up to date with storage.
-
            if self.db.seeds_mut().synced(
-
                &rid,
-
                &nid,
-
                updated_at.oid,
-
                updated_at.timestamp.into(),
-
            )? {
-
                debug!(target: "service", "Saved local sync status for {rid}..");
-
            }
-
            // If we got here, it likely means a repo was updated while the node was stopped.
-
            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
-
            // the historical gossip messages when a node connects and subscribes to this repo.
-
            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
-
                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
-
                self.db.gossip_mut().announced(&nid, &ann)?;
-
            }
-
        }
-

-
        // Ensure that our inventory is recorded in our routing table, and we are seeding
-
        // all of it. It can happen that inventory is not properly seeded if for eg. the
-
        // user creates a new repository while the node is stopped.
-
        self.db
-
            .routing_mut()
-
            .add_inventory(inventory.iter(), nid, time.into())?;
-
        self.inventory = gossip::inventory(self.timestamp(), inventory);
-

-
        // Ensure that private repositories are not in our inventory. It's possible that
-
        // a repository was public and then it was made private.
-
        self.db
-
            .routing_mut()
-
            .remove_inventories(private.iter(), &nid)?;
+
        info!(target: "service", "Deferring local inventory scan until idle..");
+
        self.inventory_scan_pending = true;

        // Setup subscription filter for seeded repos.
        self.filter = Filter::allowed_by(self.policies.seed_policies()?);
@@ -803,6 +745,16 @@ where
                    self.refs_populate_pending = true;
                }
            }
+
            if self.inventory_scan_pending {
+
                info!(target: "service", "Scanning local inventory from storage..");
+
                match self.refresh_inventory_from_storage() {
+
                    Ok(()) => self.inventory_scan_pending = false,
+
                    Err(e) => {
+
                        warn!(target: "service", "Failed to scan local inventory: {e}");
+
                        self.inventory_scan_pending = true;
+
                    }
+
                }
+
            }

            self.keep_alive(&now);
            self.disconnect_unresponsive_peers(&now);
@@ -2096,6 +2048,78 @@ where
            .map_err(Error::from)
    }

+
    fn refresh_inventory_from_storage(&mut self) -> Result<(), Error> {
+
        let nid = self.node_id();
+
        let time = self.clock;
+
        let announced = self
+
            .db
+
            .seeds()
+
            .seeded_by(&nid)?
+
            .collect::<Result<HashMap<_, _>, _>>()?;
+
        let mut inventory = BTreeSet::new();
+
        let mut private = BTreeSet::new();
+

+
        for repo in self.storage.repositories()? {
+
            let rid = repo.rid;
+

+
            // If we're not seeding this repo, just skip it.
+
            if !self.policies.is_seeding(&rid)? {
+
                debug!(target: "service", "Local repository {rid} is not seeded");
+
                continue;
+
            }
+
            // Add public repositories to inventory.
+
            if repo.doc.is_public() {
+
                inventory.insert(rid);
+
            } else {
+
                private.insert(rid);
+
            }
+
            // If we have no owned refs for this repo, then there's nothing to announce.
+
            let Some(updated_at) = repo.synced_at else {
+
                continue;
+
            };
+
            // Skip this repo if the sync status matches what we have in storage.
+
            if let Some(announced) = announced.get(&rid) {
+
                if updated_at.oid == announced.oid {
+
                    continue;
+
                }
+
            }
+
            // Make sure our local node's sync status is up to date with storage.
+
            if self.db.seeds_mut().synced(
+
                &rid,
+
                &nid,
+
                updated_at.oid,
+
                updated_at.timestamp.into(),
+
            )? {
+
                debug!(target: "service", "Saved local sync status for {rid}..");
+
            }
+
            // If we got here, it likely means a repo was updated while the node was stopped.
+
            // Therefore, we pre-load a refs announcement for this repo, so that it is included in
+
            // the historical gossip messages when a node connects and subscribes to this repo.
+
            if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
+
                debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
+
                self.db.gossip_mut().announced(&nid, &ann)?;
+
            }
+
        }
+

+
        // Ensure that our inventory is recorded in our routing table, and we are seeding
+
        // all of it. It can happen that inventory is not properly seeded if for eg. the
+
        // user creates a new repository while the node is stopped.
+
        self.db
+
            .routing_mut()
+
            .add_inventory(inventory.iter(), nid, time.into())?;
+
        self.inventory = gossip::inventory(self.timestamp(), inventory);
+

+
        // Ensure that private repositories are not in our inventory. It's possible that
+
        // a repository was public and then it was made private.
+
        self.db
+
            .routing_mut()
+
            .remove_inventories(private.iter(), &nid)?;
+

+
        self.announce_inventory();
+

+
        Ok(())
+
    }
+

    /// Process a peer inventory announcement by updating our routing table.
    /// This function expects the peer's full inventory, and prunes entries that are not in the
    /// given inventory.