Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Use message timestamp for filtering
Alexis Sellier committed 3 years ago
commit af06ad645133f580a87895353508053c5de60716
parent e35ec2f715c8fa5bacff8249373682fc0e3a47ed
6 files changed +84 -9
modified radicle-node/src/service.rs
@@ -453,7 +453,12 @@ where
                let remote = repo.remote(&node).unwrap();
                let peers = self.sessions.negotiated().map(|(_, p)| p);
                let refs = remote.refs.into();
-
                let msg = AnnouncementMessage::from(RefsAnnouncement { id, refs });
+
                let timestamp = self.clock.timestamp();
+
                let msg = AnnouncementMessage::from(RefsAnnouncement {
+
                    id,
+
                    refs,
+
                    timestamp,
+
                });
                let ann = msg.signed(&self.signer);

                self.reactor.broadcast(ann, peers);
@@ -708,7 +713,7 @@ where

                // Returning true here means that the message should be relayed.
                if self.handle_announcement(&git, &ann)? {
-
                    self.gossip.received(ann.clone(), self.clock.timestamp());
+
                    self.gossip.received(ann.clone(), ann.message.timestamp());
                    return Ok(Some(ann));
                }
            }
modified radicle-node/src/service/message.rs
@@ -173,14 +173,17 @@ pub struct RefsAnnouncement {
    pub id: Id,
    /// Updated refs.
    pub refs: Refs,
-
    // TODO: Add timestamp
+
    /// Time of announcement.
+
    pub timestamp: Timestamp,
}

/// Node announcing its inventory to the network.
/// This should be the whole inventory every time.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InventoryAnnouncement {
+
    /// Node inventory.
    pub inventory: Vec<Id>,
+
    /// Time of announcement.
    pub timestamp: Timestamp,
}

@@ -207,6 +210,14 @@ impl AnnouncementMessage {
            signature,
        }
    }
+

+
    pub fn timestamp(&self) -> Timestamp {
+
        match self {
+
            Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
+
            Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
+
            Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
+
        }
+
    }
}

impl From<NodeAnnouncement> for AnnouncementMessage {
@@ -368,7 +379,12 @@ mod tests {
    #[quickcheck]
    fn prop_refs_announcement_signing(id: Id, refs: Refs) {
        let signer = MockSigner::new(&mut fastrand::Rng::new());
-
        let message = AnnouncementMessage::Refs(RefsAnnouncement { id, refs });
+
        let timestamp = 0;
+
        let message = AnnouncementMessage::Refs(RefsAnnouncement {
+
            id,
+
            refs,
+
            timestamp,
+
        });
        let ann = message.signed(&signer);

        assert!(ann.verify());
modified radicle-node/src/test/arbitrary.rs
@@ -61,6 +61,7 @@ impl Arbitrary for Message {
                message: RefsAnnouncement {
                    id: Id::arbitrary(g),
                    refs: Refs::arbitrary(g),
+
                    timestamp: Timestamp::arbitrary(g),
                }
                .into(),
                signature: crypto::Signature::from(ByteArray::<64>::arbitrary(g).into_inner()),
modified radicle-node/src/test/gossip.rs
@@ -12,8 +12,17 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa

    for _ in 0..count {
        let signer = MockSigner::new(&mut rng);
-
        let delta = LocalDuration::from_secs(rng.u64(0..delta.as_secs()));
-
        let time = if rng.bool() { now + delta } else { now - delta };
+
        let time = if delta == LocalDuration::from_secs(0) {
+
            now
+
        } else {
+
            let delta = LocalDuration::from_secs(rng.u64(0..delta.as_secs()));
+

+
            if rng.bool() {
+
                now + delta
+
            } else {
+
                now - delta
+
            }
+
        };

        msgs.push(Message::inventory(
            InventoryAnnouncement {
modified radicle-node/src/test/tests.rs
@@ -5,7 +5,7 @@ use crossbeam_channel as chan;
use nakamoto_net as nakamoto;

use crate::collections::{HashMap, HashSet};
-
use crate::prelude::Timestamp;
+
use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
use crate::service::message::*;
@@ -223,7 +223,6 @@ fn test_gossip_rebroadcast() {

    let received = test::gossip::messages(6, alice.local_time(), MAX_TIME_DELTA);
    for msg in received.iter().cloned() {
-
        // TODO: Test with elapsed time
        alice.receive(&bob.addr(), msg);
    }

@@ -242,6 +241,45 @@ fn test_gossip_rebroadcast() {
}

#[test]
+
fn test_gossip_rebroadcast_timestamp_filtered() {
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
+
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
+

+
    alice.connect_to(&bob);
+

+
    let delta = LocalDuration::from_mins(10);
+
    let first = test::gossip::messages(3, alice.local_time() - delta, LocalDuration::from_secs(0));
+
    let second = test::gossip::messages(3, alice.local_time(), LocalDuration::from_secs(0));
+
    let third = test::gossip::messages(3, alice.local_time() + delta, LocalDuration::from_secs(0));
+

+
    // Alice receives three batches of messages.
+
    for msg in first
+
        .iter()
+
        .chain(second.iter())
+
        .chain(third.iter())
+
        .cloned()
+
    {
+
        alice.receive(&bob.addr(), msg);
+
    }
+

+
    // Eve subscribes to messages within the period of the second batch only.
+
    alice.connect_from(&eve);
+
    alice.receive(
+
        &eve.addr(),
+
        Message::Subscribe(Subscribe {
+
            filter: Filter::default(),
+
            since: alice.local_time().as_secs(),
+
            until: (alice.local_time() + delta).as_secs(),
+
        }),
+
    );
+

+
    let relayed = alice.messages(&eve.addr()).collect::<Vec<_>>();
+
    assert_eq!(relayed.len(), second.len());
+
    assert_eq!(relayed, second);
+
}
+

+
#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
modified radicle-node/src/wire/message.rs
@@ -100,6 +100,7 @@ impl wire::Encode for RefsAnnouncement {

        n += self.id.encode(writer)?;
        n += self.refs.encode(writer)?;
+
        n += self.timestamp.encode(writer)?;

        Ok(n)
    }
@@ -109,8 +110,13 @@ impl wire::Decode for RefsAnnouncement {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
        let id = Id::decode(reader)?;
        let refs = Refs::decode(reader)?;
+
        let timestamp = Timestamp::decode(reader)?;

-
        Ok(Self { id, refs })
+
        Ok(Self {
+
            id,
+
            refs,
+
            timestamp,
+
        })
    }
}