Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Start implementing gossip subscriptions
Alexis Sellier committed 3 years ago
commit de8dcc50226d0b201eeb85cca789e59c6cc06ea4
parent 651513c757f34898abd5d0dbedc1ddd4a1ac66a8
6 files changed +92 -0
modified Cargo.lock
@@ -106,6 +106,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"

[[package]]
+
name = "bloomy"
+
version = "1.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "489d2af57852b78a86478273ac6a1ef912061b6af3a439694c49f309f6ea3bdd"
+
dependencies = [
+
 "siphasher",
+
]
+

+
[[package]]
name = "bs58"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -792,6 +801,7 @@ name = "radicle-node"
version = "0.2.0"
dependencies = [
 "anyhow",
+
 "bloomy",
 "bs58",
 "byteorder",
 "chrono",
modified node/Cargo.toml
@@ -10,6 +10,7 @@ anyhow = { version = "1" }
bs58 = { version = "0.4.0" }
ed25519-consensus = { version = "2.0.1" }
byteorder = { version = "1" }
+
bloomy = { version = "1.2" }
chrono = { version = "0.4.0" }
colored = { version = "1.9.0" }
crossbeam-channel = { version = "0.5.6" }
modified node/src/protocol.rs
@@ -1,5 +1,6 @@
#![allow(dead_code)]
pub mod config;
+
pub mod filter;
pub mod message;
pub mod peer;
pub mod wire;
added node/src/protocol/filter.rs
@@ -0,0 +1,58 @@
+
use std::io;
+
use std::ops::{Deref, DerefMut};
+

+
use bloomy::BloomFilter;
+

+
use crate::identity::Id;
+
use crate::protocol::wire;
+

+
/// Size in bytes of subscription bloom filter.
+
pub const FILTER_SIZE: usize = 1024 * 16;
+
/// Number of hashes used for bloom filter.
+
pub const FILTER_HASHES: usize = 2;
+

+
/// Subscription filter.
+
/// Nb. This filter doesn't currently support inserting public keys.
+
#[derive(Clone, PartialEq, Eq)]
+
pub struct Filter(BloomFilter<Id>);
+

+
impl Deref for Filter {
+
    type Target = BloomFilter<Id>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl DerefMut for Filter {
+
    fn deref_mut(&mut self) -> &mut Self::Target {
+
        &mut self.0
+
    }
+
}
+

+
impl Default for Filter {
+
    fn default() -> Self {
+
        Self(BloomFilter::with_size(FILTER_SIZE))
+
    }
+
}
+

+
impl wire::Encode for Filter {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.0.as_bytes().encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for Filter {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let bytes: [u8; FILTER_SIZE] = wire::Decode::decode(reader)?;
+
        let bf = BloomFilter::from(Vec::from(bytes));
+

+
        debug_assert_eq!(bf.hashes(), FILTER_HASHES);
+

+
        Ok(Self(bf))
+
    }
+
}
modified node/src/protocol/message.rs
@@ -5,6 +5,7 @@ use byteorder::{NetworkEndian, ReadBytesExt};
use crate::crypto;
use crate::git;
use crate::identity::Id;
+
use crate::protocol::filter::Filter;
use crate::protocol::wire;
use crate::protocol::{NodeId, Timestamp, PROTOCOL_VERSION};
use crate::storage::refs::Refs;
@@ -33,6 +34,7 @@ pub enum MessageType {
    NodeAnnouncement = 2,
    InventoryAnnouncement = 4,
    RefsAnnouncement = 6,
+
    Subscribe = 8,
}

impl From<MessageType> for u16 {
@@ -50,6 +52,7 @@ impl TryFrom<u16> for MessageType {
            2 => Ok(MessageType::NodeAnnouncement),
            4 => Ok(MessageType::InventoryAnnouncement),
            6 => Ok(MessageType::RefsAnnouncement),
+
            8 => Ok(MessageType::Subscribe),
            _ => Err(other),
        }
    }
@@ -334,6 +337,10 @@ pub enum Message {
        git: git::Url,
    },

+
    /// Subscribe to gossip messages matching the filter, that are newer than the given
+
    /// timestamp.
+
    Subscribe { filter: Filter, since: Timestamp },
+

    /// Node announcing its inventory to the network.
    /// This should be the whole inventory every time.
    InventoryAnnouncement {
@@ -404,6 +411,7 @@ impl Message {
    pub fn type_id(&self) -> u16 {
        match self {
            Self::Hello { .. } => MessageType::Hello,
+
            Self::Subscribe { .. } => MessageType::Subscribe,
            Self::NodeAnnouncement { .. } => MessageType::NodeAnnouncement,
            Self::InventoryAnnouncement { .. } => MessageType::InventoryAnnouncement,
            Self::RefsAnnouncement { .. } => MessageType::RefsAnnouncement,
@@ -416,6 +424,7 @@ impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Hello { id, .. } => write!(f, "Hello({})", id),
+
            Self::Subscribe { since, .. } => write!(f, "Subscribe({})", since),
            Self::NodeAnnouncement { node, .. } => write!(f, "NodeAnnouncement({})", node),
            Self::InventoryAnnouncement { node, message, .. } => {
                write!(
@@ -460,6 +469,10 @@ impl wire::Encode for Message {
                n += addrs.as_slice().encode(writer)?;
                n += git.encode(writer)?;
            }
+
            Self::Subscribe { filter, since } => {
+
                n += filter.encode(writer)?;
+
                n += since.encode(writer)?;
+
            }
            Self::RefsAnnouncement {
                node,
                message,
@@ -512,6 +525,12 @@ impl wire::Decode for Message {
                    git,
                })
            }
+
            Ok(MessageType::Subscribe) => {
+
                let filter = Filter::decode(reader)?;
+
                let since = Timestamp::decode(reader)?;
+

+
                Ok(Self::Subscribe { filter, since })
+
            }
            Ok(MessageType::NodeAnnouncement) => {
                let node = NodeId::decode(reader)?;
                let message = NodeAnnouncement::decode(reader)?;
modified node/src/protocol/peer.rs
@@ -230,6 +230,9 @@ impl Peer {
                }
                log::warn!("Node announcement handling is not implemented");
            }
+
            (PeerState::Negotiated { .. }, Message::Subscribe { .. }) => {
+
                log::warn!("Subscribe message handling is not implemented");
+
            }
            (PeerState::Negotiated { .. }, Message::Hello { .. }) => {
                debug!(
                    "Disconnecting peer {} for sending us a redundant handshake message",