Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
protocol: batch inventory removals and events in `sync_routing`
Merged Defelo opened 3 months ago
2 files changed +32 -23 d2ab7b1b 589925e3
modified crates/radicle-protocol/src/service.rs
@@ -1198,16 +1198,14 @@ where
                    rid,
                    updated: updated.clone(),
                });
-
                self.emitter.emit_all(
-
                    canonical
-
                        .into_iter()
-
                        .map(|(refname, target)| Event::CanonicalRefUpdated {
+
                self.emitter
+
                    .emit_all(canonical.into_iter().map(|(refname, target)| {
+
                        Event::CanonicalRefUpdated {
                            rid,
                            refname,
                            target,
-
                        })
-
                        .collect(),
-
                );
+
                        }
+
                    }));

                // Announce our new inventory if this fetch was a full clone.
                // Only update and announce inventory for public repositories.
@@ -2097,6 +2095,7 @@ where
    ) -> Result<SyncedRouting, Error> {
        let mut synced = SyncedRouting::default();
        let included = inventory.into_iter().collect::<BTreeSet<_>>();
+
        let mut events = Vec::new();

        for (rid, result) in
            self.db
@@ -2106,7 +2105,7 @@ where
            match result {
                InsertResult::SeedAdded => {
                    debug!(target: "service", "Routing table updated for {rid} with seed {from}");
-
                    self.emitter.emit(Event::SeedDiscovered { rid, nid: from });
+
                    events.push(Event::SeedDiscovered { rid, nid: from });

                    if self
                        .policies
@@ -2124,14 +2123,26 @@ where
                InsertResult::NotUpdated => {}
            }
        }
-
        for rid in self.db.routing().get_inventory(&from)?.into_iter() {
-
            if !included.contains(&rid) {
-
                if self.db.routing_mut().remove_inventory(&rid, &from)? {
-
                    synced.removed.push(rid);
-
                    self.emitter.emit(Event::SeedDropped { rid, nid: from });
-
                }
-
            }
-
        }
+

+
        synced.removed.extend(
+
            self.db
+
                .routing()
+
                .get_inventory(&from)?
+
                .into_iter()
+
                .filter(|rid| !included.contains(rid)),
+
        );
+
        self.db
+
            .routing_mut()
+
            .remove_inventories(&synced.removed, &from)?;
+
        events.extend(
+
            synced
+
                .removed
+
                .iter()
+
                .map(|&rid| Event::SeedDropped { rid, nid: from }),
+
        );
+

+
        self.emitter.emit_all(events);
+

        Ok(synced)
    }

modified crates/radicle/src/node/events.rs
@@ -228,15 +228,13 @@ impl<T: Clone> Emitter<T> {
    /// Emit a batch of events to subscribers and drop those who can't receive
    /// them.
    /// N.b. subscribers are also dropped if their channel is full.
-
    pub fn emit_all(&self, events: Vec<T>) {
+
    pub fn emit_all(&self, events: impl IntoIterator<Item = T>) {
        // SAFETY: We deliberately propagate panics from other threads holding the lock.
        #[allow(clippy::unwrap_used)]
-
        self.subscribers.lock().unwrap().retain(|s| {
-
            events
-
                .clone()
-
                .into_iter()
-
                .all(|event| s.try_send(event).is_ok())
-
        });
+
        let mut subscribers = self.subscribers.lock().unwrap();
+
        for event in events {
+
            subscribers.retain(|s| s.try_send(event.clone()).is_ok());
+
        }
    }

    /// Subscribe to events stream.