Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Handle announcement commands
Alexis Sellier committed 3 years ago
commit ff2bd185e7484d6bcbe82e40de17201099f3a5bd
parent 400ba8d9352e79a6ab9a557d3758b07caed7075d
3 files changed +42 -20
modified node/src/protocol.rs
@@ -208,7 +208,7 @@ impl<T: ReadStorage + WriteStorage, S: address_book::Store, G: crypto::Signer> P

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
-
        let inv = Message::inventory(&mut self.context)?;
+
        let inv = Message::inventory(&self.context)?;

        for addr in self.peers.negotiated().map(|(_, p)| p.addr) {
            self.context.write(addr, inv.clone());
@@ -335,8 +335,11 @@ where
                    })
                    .unwrap();
            }
-
            Command::AnnounceInventory(_proj) => {
-
                todo!()
+
            Command::AnnounceInventory(proj) => {
+
                let peers = self.peers.negotiated().map(|(_, p)| p.addr);
+

+
                self.context
+
                    .broadcast(Message::InventoryUpdate { inv: vec![proj] }, peers);
            }
        }
    }
@@ -470,10 +473,9 @@ where
                    let peers = negotiated
                        .iter()
                        .filter(|(ip, _)| *ip != peer.ip())
-
                        .map(|(_, addr)| *addr)
-
                        .collect::<Vec<_>>();
+
                        .map(|(_, addr)| *addr);

-
                    self.context.broadcast(msg, &peers);
+
                    self.context.broadcast(msg, peers);
                }
                Err(err) => {
                    self.context
@@ -603,21 +605,32 @@ where
                .entry(proj_id.clone())
                .or_insert_with(|| HashSet::with_hasher(self.rng.clone().into()));

-
            if self.config.is_tracking(proj_id) {
-
                // TODO: Verify refs before adding them to storage.
-
                let mut repo = self.storage.repository(proj_id).unwrap();
-
                repo.fetch(&Url {
-
                    path: format!("/{}", proj_id).into(),
-
                    ..remote.clone()
-
                })
-
                .unwrap();
+
            // TODO: Fire an event on routing update.
+
            if inventory.insert(from) && self.config.is_tracking(proj_id) {
+
                self.fetch(proj_id, remote);
            }
+
        }
+
    }

-
            // TODO: Fire an event on routing update.
-
            inventory.insert(from);
+
    /// Process a peer inventory update announcement by (maybe) fetching.
+
    fn process_inventory_update(&mut self, inventory: &Inventory, _from: NodeId, remote: &Url) {
+
        for proj_id in inventory {
+
            if self.config.is_tracking(proj_id) {
+
                self.fetch(proj_id, remote);
+
            }
        }
    }

+
    fn fetch(&mut self, proj_id: &ProjId, remote: &Url) {
+
        // TODO: Verify refs before adding them to storage.
+
        let mut repo = self.storage.repository(proj_id).unwrap();
+
        repo.fetch(&Url {
+
            path: format!("/{}", proj_id).into(),
+
            ..remote.clone()
+
        })
+
        .unwrap();
+
    }
+

    /// Disconnect a peer.
    fn disconnect(&mut self, addr: net::SocketAddr, reason: DisconnectReason) {
        self.io.push_back(Io::Disconnect(addr, reason));
@@ -658,9 +671,9 @@ impl<S, T, G> Context<S, T, G> {
    }

    /// Broadcast a message to a list of peers.
-
    fn broadcast(&mut self, msg: Message, peers: &[net::SocketAddr]) {
+
    fn broadcast(&mut self, msg: Message, peers: impl IntoIterator<Item = net::SocketAddr>) {
        for peer in peers {
-
            self.write(*peer, msg.clone());
+
            self.write(peer, msg.clone());
        }
    }
}
modified node/src/protocol/message.rs
@@ -86,7 +86,9 @@ pub enum Message {
        announcement: NodeAnnouncement,
    },
    /// Get a peer's inventory.
-
    GetInventory { ids: Vec<ProjId> },
+
    GetInventory {
+
        ids: Vec<ProjId>,
+
    },
    /// Send our inventory to a peer. Sent in response to [`Message::GetInventory`].
    /// Nb. This should be the whole inventory, not a partial update.
    Inventory {
@@ -96,6 +98,9 @@ pub enum Message {
        /// are the originator, only when relaying.
        origin: Option<NodeId>,
    },
+
    InventoryUpdate {
+
        inv: Vec<ProjId>,
+
    },
}

impl Message {
@@ -119,7 +124,7 @@ impl Message {
        }
    }

-
    pub fn inventory<S, T, G>(ctx: &mut Context<S, T, G>) -> Result<Self, storage::Error>
+
    pub fn inventory<S, T, G>(ctx: &Context<S, T, G>) -> Result<Self, storage::Error>
    where
        T: storage::ReadStorage,
    {
modified node/src/protocol/peer.rs
@@ -194,6 +194,10 @@ impl Peer {
                    }));
                }
            }
+
            (PeerState::Negotiated { id, git, .. }, Message::InventoryUpdate { inv }) => {
+
                // TODO: Buffer/throttle fetches.
+
                ctx.process_inventory_update(&inv, *id, git);
+
            }
            (
                PeerState::Negotiated { .. },
                Message::Node {