Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement subscriptions via bloom filter
Alexis Sellier committed 3 years ago
commit 79c89a239ad0b364fcfa2d072510f58e6e3fd0ce
parent d7a2520ad3ac89a593e92c7e17e06102905f5f8e
6 files changed +134 -36
modified node/src/protocol.rs
@@ -33,6 +33,7 @@ use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, Writ

pub use crate::protocol::config::{Config, Network};

+
use self::filter::Filter;
use self::message::{InventoryAnnouncement, NodeFeatures};

pub const DEFAULT_PORT: u16 = 8776;
@@ -450,7 +451,7 @@ where
                let node = self.node_id();
                let repo = self.storage.repository(&id).unwrap();
                let remote = repo.remote(&node).unwrap();
-
                let peers = self.peers.negotiated().map(|(_, p)| p.addr);
+
                let peers = self.peers.negotiated().map(|(_, p)| p);
                let refs = remote.refs.into();
                let message = RefsAnnouncement { id, refs };
                let signature = message.sign(&self.signer);
@@ -551,14 +552,8 @@ where
    }

    fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
-
        let peer = addr.ip();
-
        let negotiated = self
-
            .peers
-
            .negotiated()
-
            .map(|(id, p)| (*id, p.addr))
-
            .collect::<Vec<_>>();
-

-
        let (peer, msgs) = if let Some(peer) = self.peers.get_mut(&peer) {
+
        let peer_ip = addr.ip();
+
        let (peer, msgs) = if let Some(peer) = self.peers.get_mut(&peer_ip) {
            let decoder = peer.inbox();
            decoder.input(bytes);

@@ -581,23 +576,31 @@ where
            return;
        };

+
        let mut relay = Vec::new();
        for msg in msgs {
            match peer.received(msg, &mut self.context) {
                Ok(None) => {}
                Ok(Some(msg)) => {
-
                    let peers = negotiated
-
                        .iter()
-
                        .filter(|(ip, _)| *ip != peer.ip())
-
                        .map(|(_, addr)| *addr);
-

-
                    self.context.broadcast(msg, peers);
+
                    relay.push(msg);
                }
                Err(err) => {
                    self.context
                        .disconnect(peer.addr, DisconnectReason::Error(err));
+
                    // If there's an error, stop processing messages from this peer.
+
                    // However, we still relay messages returned up to this point.
+
                    break;
                }
            }
        }
+

+
        let negotiated = self
+
            .peers
+
            .negotiated()
+
            .filter(|(ip, _)| **ip != peer_ip)
+
            .map(|(_, p)| p);
+
        for msg in relay {
+
            self.context.relay(msg, negotiated.clone());
+
        }
    }
}

@@ -742,7 +745,14 @@ where
        })
    }

-
    fn handshake_messages(&self) -> [Message; 3] {
+
    fn filter(&self) -> Filter {
+
        match &self.config.project_tracking {
+
            ProjectTracking::All { .. } => Filter::default(),
+
            ProjectTracking::Allowed(ids) => Filter::new(ids.iter()),
+
        }
+
    }
+

+
    fn handshake_messages(&self) -> [Message; 4] {
        let git = self.config.git_url.clone();
        [
            Message::hello(
@@ -753,6 +763,7 @@ where
            ),
            Message::node(self.node_announcement(), &self.signer),
            Message::inventory(self.inventory_announcement().unwrap(), &self.signer),
+
            Message::subscribe(self.filter(), self.timestamp(), Timestamp::MAX),
        ]
    }

@@ -835,9 +846,28 @@ impl<S, T, G> Context<S, T, G> {
    }

    /// Broadcast a message to a list of peers.
-
    fn broadcast(&mut self, msg: Message, peers: impl IntoIterator<Item = net::SocketAddr>) {
+
    fn broadcast<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Peer>) {
        for peer in peers {
-
            self.write(peer, msg.clone());
+
            self.write(peer.addr, msg.clone());
+
        }
+
    }
+

+
    /// Relay a message to interested peers.
+
    fn relay<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Peer>) {
+
        if let Message::RefsAnnouncement { message, .. } = &msg {
+
            let id = message.id.clone();
+
            let peers = peers.into_iter().filter(|p| {
+
                if let Some(subscribe) = &p.subscribe {
+
                    subscribe.filter.contains(&id)
+
                } else {
+
                    // If the peer did not send us a `subscribe` message, we don'the
+
                    // relay any messages to them.
+
                    false
+
                }
+
            });
+
            self.broadcast(msg, peers);
+
        } else {
+
            self.broadcast(msg, peers);
        }
    }
}
modified node/src/protocol/filter.rs
@@ -9,13 +9,34 @@ use crate::protocol::wire;
/// Size in bytes of subscription bloom filter.
pub const FILTER_SIZE: usize = 1024 * 16;
/// Number of hashes used for bloom filter.
-
pub const FILTER_HASHES: usize = 2;
+
pub const FILTER_HASHES: usize = 7;

/// Subscription filter.
+
///
+
/// The [`Default`] instance has all bits set to `1`, ie. it will match
+
/// everything.
+
///
/// Nb. This filter doesn't currently support inserting public keys.
-
#[derive(Clone, PartialEq, Eq)]
+
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Filter(BloomFilter<Id>);

+
impl Default for Filter {
+
    fn default() -> Self {
+
        Self(BloomFilter::from(vec![0xff; FILTER_SIZE]))
+
    }
+
}
+

+
impl Filter {
+
    pub fn new<'a>(ids: impl IntoIterator<Item = &'a Id>) -> Self {
+
        let mut bloom = BloomFilter::with_size(FILTER_SIZE);
+

+
        for id in ids.into_iter() {
+
            bloom.insert(id);
+
        }
+
        Self(bloom)
+
    }
+
}
+

impl Deref for Filter {
    type Target = BloomFilter<Id>;

@@ -30,9 +51,10 @@ impl DerefMut for Filter {
    }
}

-
impl Default for Filter {
-
    fn default() -> Self {
-
        Self(BloomFilter::with_size(FILTER_SIZE))
+
#[cfg(test)]
+
impl From<BloomFilter<Id>> for Filter {
+
    fn from(bloom: BloomFilter<Id>) -> Self {
+
        Self(bloom)
    }
}

@@ -48,6 +70,11 @@ impl wire::Encode for Filter {

impl wire::Decode for Filter {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let size: usize = wire::Decode::decode(reader)?;
+
        if size != FILTER_SIZE {
+
            return Err(wire::Error::InvalidFilterSize(size));
+
        }
+

        let bytes: [u8; FILTER_SIZE] = wire::Decode::decode(reader)?;
        let bf = BloomFilter::from(Vec::from(bytes));

modified node/src/protocol/message.rs
@@ -196,6 +196,16 @@ impl wire::Decode for Address {
}

#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Subscribe {
+
    /// Subscribe to events matching this filter.
+
    pub filter: Filter,
+
    /// Request messages since this time.
+
    pub since: Timestamp,
+
    /// Request messages until this time.
+
    pub until: Timestamp,
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeAnnouncement {
    /// Advertized features.
    pub features: NodeFeatures,
@@ -339,11 +349,7 @@ pub enum Message {

    /// Subscribe to gossip messages matching the filter and time range.
    /// timestamp.
-
    Subscribe {
-
        filter: Filter,
-
        since: Timestamp,
-
        until: Timestamp,
-
    },
+
    Subscribe(Subscribe),

    /// Node announcing its inventory to the network.
    /// This should be the whole inventory every time.
@@ -412,6 +418,14 @@ impl Message {
        }
    }

+
    pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
+
        Self::Subscribe(Subscribe {
+
            filter,
+
            since,
+
            until,
+
        })
+
    }
+

    pub fn type_id(&self) -> u16 {
        match self {
            Self::Hello { .. } => MessageType::Hello,
@@ -428,7 +442,10 @@ impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Hello { id, .. } => write!(f, "Hello({})", id),
-
            Self::Subscribe { since, .. } => write!(f, "Subscribe({})", since),
+
            Self::Subscribe(Subscribe { since, until, .. }) => {
+
                write!(f, "Subscribe({}..{})", since, until)
+
            }
+

            Self::NodeAnnouncement { node, .. } => write!(f, "NodeAnnouncement({})", node),
            Self::InventoryAnnouncement { node, message, .. } => {
                write!(
@@ -473,11 +490,11 @@ impl wire::Encode for Message {
                n += addrs.as_slice().encode(writer)?;
                n += git.encode(writer)?;
            }
-
            Self::Subscribe {
+
            Self::Subscribe(Subscribe {
                filter,
                since,
                until,
-
            } => {
+
            }) => {
                n += filter.encode(writer)?;
                n += since.encode(writer)?;
                n += until.encode(writer)?;
@@ -539,11 +556,11 @@ impl wire::Decode for Message {
                let since = Timestamp::decode(reader)?;
                let until = Timestamp::decode(reader)?;

-
                Ok(Self::Subscribe {
+
                Ok(Self::Subscribe(Subscribe {
                    filter,
                    since,
                    until,
-
                })
+
                }))
            }
            Ok(MessageType::NodeAnnouncement) => {
                let node = NodeId::decode(reader)?;
modified node/src/protocol/peer.rs
@@ -49,6 +49,8 @@ pub struct Peer {
    pub state: PeerState,
    /// Last known peer time.
    pub timestamp: Timestamp,
+
    /// Peer subscription.
+
    pub subscribe: Option<Subscribe>,

    /// Inbox for incoming messages from peer.
    inbox: Decoder,
@@ -66,6 +68,7 @@ impl Peer {
            state: PeerState::default(),
            link,
            timestamp: Timestamp::default(),
+
            subscribe: None,
            persistent,
            attempts: 0,
        }
@@ -230,8 +233,8 @@ impl Peer {
                }
                log::warn!("Node announcement handling is not implemented");
            }
-
            (PeerState::Negotiated { .. }, Message::Subscribe { .. }) => {
-
                log::warn!("Subscribe message handling is not implemented");
+
            (PeerState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
+
                self.subscribe = Some(subscribe);
            }
            (PeerState::Negotiated { .. }, Message::Hello { .. }) => {
                debug!(
modified node/src/protocol/wire.rs
@@ -21,6 +21,8 @@ pub enum Error {
    FromUtf8(#[from] FromUtf8Error),
    #[error("invalid size: expected {expected}, got {actual}")]
    InvalidSize { expected: usize, actual: usize },
+
    #[error("invalid filter size: {0}")]
+
    InvalidFilterSize(usize),
    #[error(transparent)]
    InvalidRefName(#[from] fmt::Error),
    #[error("invalid git url `{url}`: {error}")]
modified node/src/test/arbitrary.rs
@@ -4,6 +4,7 @@ use std::net;
use std::ops::RangeBounds;
use std::path::PathBuf;

+
use bloomy::BloomFilter;
use nonempty::NonEmpty;
use quickcheck::Arbitrary;

@@ -13,9 +14,10 @@ use crate::crypto::{PublicKey, SecretKey};
use crate::git;
use crate::hash;
use crate::identity::{Delegate, Did, Doc, Id, Project};
+
use crate::protocol::filter::{Filter, FILTER_SIZE};
use crate::protocol::message::{
    Address, Envelope, InventoryAnnouncement, Message, MessageType, NodeAnnouncement,
-
    RefsAnnouncement,
+
    RefsAnnouncement, Subscribe,
};
use crate::protocol::{NodeId, Timestamp};
use crate::storage;
@@ -60,6 +62,17 @@ impl<const N: usize> Arbitrary for ByteArray<N> {
    }
}

+
impl Arbitrary for Filter {
+
    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
+
        let mut bytes = vec![0; FILTER_SIZE];
+
        for _ in 0..64 {
+
            let index = usize::arbitrary(g) % bytes.len();
+
            bytes[index] = u8::arbitrary(g);
+
        }
+
        Self::from(BloomFilter::from(bytes))
+
    }
+
}
+

impl Arbitrary for Envelope {
    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
        Self {
@@ -76,6 +89,7 @@ impl Arbitrary for Message {
                MessageType::InventoryAnnouncement,
                MessageType::NodeAnnouncement,
                MessageType::RefsAnnouncement,
+
                MessageType::Subscribe,
            ])
            .unwrap();

@@ -112,6 +126,11 @@ impl Arbitrary for Message {
                    message,
                }
            }
+
            MessageType::Subscribe => Self::Subscribe(Subscribe {
+
                filter: Filter::arbitrary(g),
+
                since: Timestamp::arbitrary(g),
+
                until: Timestamp::arbitrary(g),
+
            }),
            _ => unreachable!(),
        }
    }