Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Inventory relay
Alexis Sellier committed 3 years ago
commit 9b3f5b9f3c9fd45a212fc53c041ccab371381290
parent e078706cd1382d8e01c505e77d06a18e9fd3ecba
3 files changed +207 -27
modified node/src/protocol.rs
@@ -65,7 +65,13 @@ pub enum Message {
    GetInventory { ids: Vec<ProjId> },
    /// Send our inventory to a peer. Sent in response to [`Message::GetInventory`].
    /// Nb. This should be the whole inventory, not a partial update.
-
    Inventory { seq: u64, inv: Inventory },
+
    Inventory {
+
        seq: u64,
+
        inv: Inventory,
+
        /// Original peer this inventory came from. We don't set this when we
+
        /// are the originator, only when relaying.
+
        origin: Option<PeerId>,
+
    },
}

impl From<Message> for Envelope {
@@ -93,7 +99,11 @@ impl Message {
        let seq = ctx.seq;
        let inv = ctx.storage.inventory()?;

-
        Ok(Self::Inventory { seq, inv })
+
        Ok(Self::Inventory {
+
            seq,
+
            inv,
+
            origin: None,
+
        })
    }

    pub fn get_inventory(ids: impl Into<Vec<ProjId>>) -> Self {
@@ -140,6 +150,8 @@ pub struct Config {
    pub project_tracking: ProjectTracking,
    /// Project remote tracking policy.
    pub remote_tracking: RemoteTracking,
+
    /// Whether or not our node should relay inventories.
+
    pub relay: bool,
}

impl Config {
@@ -509,6 +521,11 @@ where

    fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
        let peer = addr.ip();
+
        let negotiated = self
+
            .peers
+
            .negotiated()
+
            .map(|(id, p)| (*id, p.addr))
+
            .collect::<Vec<_>>();

        let (peer, msgs) = if let Some(peer) = self.peers.get_mut(&peer) {
            let decoder = &mut peer.inbox;
@@ -532,9 +549,21 @@ where
        };

        for msg in msgs {
-
            if let Err(err) = peer.received(msg, &mut self.context) {
-
                self.context
-
                    .disconnect(peer.addr, DisconnectReason::Error(err));
+
            match peer.received(msg, &mut self.context) {
+
                Ok(None) => {}
+
                Ok(Some(msg)) => {
+
                    let peers = negotiated
+
                        .iter()
+
                        .filter(|(id, _)| *id != peer.id())
+
                        .map(|(_, addr)| *addr)
+
                        .collect::<Vec<_>>();
+

+
                    self.context.broadcast(msg, &peers);
+
                }
+
                Err(err) => {
+
                    self.context
+
                        .disconnect(peer.addr, DisconnectReason::Error(err));
+
                }
            }
        }
    }
@@ -612,7 +641,9 @@ pub struct Context<S, T> {
    io: VecDeque<Io<(), DisconnectReason>>,
    /// Clock. Tells the time.
    clock: RefClock,
-
    /// Sequence number used for inventory messages.
+
    /// Sequence number of known peers.
+
    seqs: HashMap<PeerId, u64>,
+
    /// Our own sequence number.
    seq: u64,
    /// Project storage.
    storage: T,
@@ -637,6 +668,7 @@ where
            config,
            clock,
            routing: HashMap::with_hasher(rng.clone().into()),
+
            seqs: HashMap::with_hasher(rng.clone().into()),
            io: VecDeque::new(),
            seq: 0,
            storage,
@@ -646,11 +678,11 @@ where
    }

    /// Process a peer inventory announcement by updating our routing table.
-
    fn process_inventory(&mut self, from: PeerId, inventory: Inventory) {
+
    fn process_inventory(&mut self, inventory: &Inventory, from: PeerId) {
        for (proj_id, _refs) in inventory {
            let inventory = self
                .routing
-
                .entry(proj_id)
+
                .entry(proj_id.clone())
                .or_insert_with(|| HashSet::with_hasher(self.rng.clone().into()));

            // TODO: If we're tracking this project, check the refs to see if we need to
@@ -691,6 +723,13 @@ impl<S, T> Context<S, T> {

        self.io.push_back(Io::Write(remote, bytes));
    }
+

+
    /// Broadcast a message to a list of peers.
+
    fn broadcast(&mut self, msg: Message, peers: &[net::SocketAddr]) {
+
        for peer in peers {
+
            self.write(*peer, msg.clone());
+
        }
+
    }
}

#[derive(Debug)]
@@ -741,6 +780,8 @@ pub enum PeerError {
    WrongMagic(u32),
    #[error("wrong protocol version in message: {0}")]
    WrongVersion(u32),
+
    #[error("invalid inventory sequence number: {0}")]
+
    InvalidSequenceNumber(u64),
}

#[derive(Debug)]
@@ -786,7 +827,7 @@ impl Peer {
        &mut self,
        envelope: Envelope,
        ctx: &mut Context<S, T>,
-
    ) -> Result<(), PeerError>
+
    ) -> Result<Option<Message>, PeerError>
    where
        T: storage::ReadStorage,
    {
@@ -818,8 +859,27 @@ impl Peer {
                let inventory = Message::inventory(ctx).unwrap();
                ctx.write(self.addr, inventory);
            }
-
            Message::Inventory { inv, .. } => {
-
                ctx.process_inventory(self.id(), inv);
+
            Message::Inventory { seq: 0, .. } => {
+
                return Err(PeerError::InvalidSequenceNumber(0));
+
            }
+
            Message::Inventory { seq, inv, origin } => {
+
                let sequence = ctx.seqs.entry(self.id()).or_insert(0);
+

+
                // Discard inventory messages from sequence numbers we've already seen.
+
                if seq > *sequence {
+
                    *sequence = seq;
+
                } else {
+
                    return Ok(None);
+
                }
+
                ctx.process_inventory(&inv, origin.unwrap_or_else(|| self.id()));
+

+
                if ctx.config.relay {
+
                    return Ok(Some(Message::Inventory {
+
                        seq,
+
                        inv,
+
                        origin: origin.or_else(|| Some(self.id())),
+
                    }));
+
                }
            }
            Message::GetAddrs => {
                // TODO: Send peer addresses.
@@ -830,6 +890,6 @@ impl Peer {
                todo!();
            }
        }
-
        Ok(())
+
        Ok(None)
    }
}
modified node/src/test/peer.rs
@@ -124,13 +124,6 @@ where
        self.initialize();
        self.protocol.connected(*remote, &local, Link::Inbound);
        self.receive(remote, Message::hello());
-
        self.receive(
-
            remote,
-
            Message::Inventory {
-
                seq: 0,
-
                inv: vec![],
-
            },
-
        );

        let mut msgs = self.messages(remote);
        msgs.find(|m| matches!(m, Message::Hello { .. }))
@@ -154,19 +147,18 @@ where
        self.receive(remote, Message::hello());
    }

-
    /// Get outgoing messages sent from this peer to the remote address.
+
    /// Drain outgoing messages sent from this peer to the remote address.
    pub fn messages(&mut self, remote: &net::SocketAddr) -> impl Iterator<Item = Message> {
        let mut stream = Decoder::<Envelope>::new(2048);
        let mut msgs = Vec::new();

-
        for o in self.protocol.outbox().iter() {
-
            match o {
-
                Io::Write(a, bytes) if a == remote => {
-
                    stream.input(bytes);
-
                }
-
                _ => {}
+
        self.protocol.outbox().retain(|o| match o {
+
            Io::Write(a, bytes) if a == remote => {
+
                stream.input(bytes);
+
                false
            }
-
        }
+
            _ => true,
+
        });

        while let Some(envelope) = stream.decode_next().unwrap() {
            msgs.push(envelope.msg);
modified node/src/test/tests.rs
@@ -90,6 +90,134 @@ fn test_persistent_peer_connect() {
}

#[test]
+
#[ignore]
+
fn test_wrong_peer_version() {
+
    // TODO
+
}
+

+
#[test]
+
#[ignore]
+
fn test_wrong_peer_magic() {
+
    // TODO
+
}
+

+
#[test]
+
fn test_inventory_relay_bad_seq() {
+
    let mut alice = Peer::config(
+
        "alice",
+
        Config {
+
            relay: true,
+
            ..Config::default()
+
        },
+
        [7, 7, 7, 7],
+
        vec![],
+
        MockStorage::empty(),
+
        fastrand::Rng::new(),
+
    );
+
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+

+
    alice.connect_to(&bob.addr());
+
    alice.receive(
+
        &bob.addr(),
+
        Message::Inventory {
+
            seq: 0,
+
            inv: vec![],
+
            origin: None,
+
        },
+
    );
+
    assert_matches!(
+
        alice.outbox().next(),
+
        Some(Io::Disconnect(addr, DisconnectReason::Error(PeerError::InvalidSequenceNumber(seq))))
+
        if addr == bob.addr() && seq == 0
+
    );
+
}
+

+
#[test]
+
fn test_inventory_relay() {
+
    // Topology is eve <-> alice <-> bob
+
    let mut alice = Peer::config(
+
        "alice",
+
        Config {
+
            relay: true,
+
            ..Config::default()
+
        },
+
        [7, 7, 7, 7],
+
        vec![],
+
        MockStorage::empty(),
+
        fastrand::Rng::new(),
+
    );
+
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
+
    let inv = vec![];
+

+
    // Inventory from Bob relayed to Eve.
+
    alice.connect_to(&bob.addr());
+
    alice.connect_from(&eve.addr());
+
    alice.receive(
+
        &bob.addr(),
+
        Message::Inventory {
+
            seq: 1,
+
            inv: inv.clone(),
+
            origin: None,
+
        },
+
    );
+
    assert_matches!(
+
        alice.messages(&eve.addr()).next(),
+
        Some(Message::Inventory { seq, origin, .. })
+
        if origin == Some(bob.ip) && seq == 1
+
    );
+
    assert_matches!(
+
        alice.messages(&bob.addr()).next(),
+
        None,
+
        "The inventory is not sent back to Bob"
+
    );
+

+
    alice.receive(
+
        &bob.addr(),
+
        Message::Inventory {
+
            seq: 1,
+
            inv: inv.clone(),
+
            origin: None,
+
        },
+
    );
+
    assert_matches!(
+
        alice.messages(&eve.addr()).next(),
+
        None,
+
        "Sending the same inventory again doesn't trigger a relay"
+
    );
+

+
    alice.receive(
+
        &bob.addr(),
+
        Message::Inventory {
+
            seq: 2,
+
            inv: inv.clone(),
+
            origin: None,
+
        },
+
    );
+
    assert_matches!(
+
        alice.messages(&eve.addr()).next(),
+
        Some(Message::Inventory { seq, origin, .. })
+
        if origin == Some(bob.ip) && seq == 2,
+
        "Sending a new inventory does trigger the relay"
+
    );
+

+
    // Inventory from Eve relayed to Bob.
+
    alice.receive(
+
        &eve.addr(),
+
        Message::Inventory {
+
            seq: 4,
+
            inv,
+
            origin: None,
+
        },
+
    );
+
    assert_matches!(
+
        alice.messages(&bob.addr()).next(),
+
        Some(Message::Inventory { seq, origin, .. })
+
        if origin == Some(eve.ip) && seq == 4
+
    );
+
}
+

+
#[test]
fn test_persistent_peer_reconnect() {
    let mut bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
    let mut eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());