Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Use timestamps instead of sequence numbers
Alexis Sellier committed 3 years ago
commit 9ca982ee28ea7e08545d8a2dbe44fb7656919ac9
parent 32591e6aa45a717bb5e44fe4406b9210d5426884
2 files changed +48 -37
modified node/src/protocol.rs
@@ -25,6 +25,8 @@ use crate::storage::{Inventory, ReadStorage, Remotes, Unverified, WriteStorage};
pub type PeerId = IpAddr;
/// Network routing table. Keeps track of where projects are hosted.
pub type Routing = HashMap<ProjId, HashSet<PeerId>>;
+
/// Seconds since epoch.
+
pub type Timestamp = u64;

pub const NETWORK_MAGIC: u32 = 0x819b43d9;
pub const DEFAULT_PORT: u16 = 8776;
@@ -67,8 +69,8 @@ pub enum Message {
    /// Send our inventory to a peer. Sent in response to [`Message::GetInventory`].
    /// Nb. This should be the whole inventory, not a partial update.
    Inventory {
-
        seq: u64,
        inv: Inventory,
+
        timestamp: Timestamp,
        /// Original peer this inventory came from. We don't set this when we
        /// are the originator, only when relaying.
        origin: Option<PeerId>,
@@ -96,13 +98,11 @@ impl Message {
    where
        T: storage::ReadStorage,
    {
-
        ctx.seq += 1;
-

-
        let seq = ctx.seq;
+
        let timestamp = ctx.timestamp();
        let inv = ctx.storage.inventory()?;

        Ok(Self::Inventory {
-
            seq,
+
            timestamp,
            inv,
            origin: None,
        })
@@ -656,10 +656,8 @@ pub struct Context<S, T> {
    io: VecDeque<Io<(), DisconnectReason>>,
    /// Clock. Tells the time.
    clock: RefClock,
-
    /// Sequence number of known peers.
-
    seqs: HashMap<PeerId, u64>,
-
    /// Our own sequence number.
-
    seq: u64,
+
    /// Timestamps of known peers.
+
    timestamps: HashMap<PeerId, u64>,
    /// Project storage.
    storage: T,
    /// Peer address manager.
@@ -683,9 +681,8 @@ where
            config,
            clock,
            routing: HashMap::with_hasher(rng.clone().into()),
-
            seqs: HashMap::with_hasher(rng.clone().into()),
+
            timestamps: HashMap::with_hasher(rng.clone().into()),
            io: VecDeque::new(),
-
            seq: 0,
            storage,
            addrmgr,
            rng,
@@ -707,6 +704,11 @@ where
        }
    }

+
    /// Get current local timestamp.
+
    fn timestamp(&self) -> Timestamp {
+
        self.clock.local_time().as_secs()
+
    }
+

    /// Disconnect a peer.
    fn disconnect(&mut self, addr: net::SocketAddr, reason: DisconnectReason) {
        self.io.push_back(Io::Disconnect(addr, reason));
@@ -795,8 +797,8 @@ pub enum PeerError {
    WrongMagic(u32),
    #[error("wrong protocol version in message: {0}")]
    WrongVersion(u32),
-
    #[error("invalid inventory sequence number: {0}")]
-
    InvalidSequenceNumber(u64),
+
    #[error("invalid inventory timestamp: {0}")]
+
    InvalidTimestamp(u64),
}

#[derive(Debug)]
@@ -876,15 +878,22 @@ impl Peer {
                let inventory = Message::inventory(ctx).unwrap();
                ctx.write(self.addr, inventory);
            }
-
            Message::Inventory { seq: 0, .. } => {
-
                return Err(PeerError::InvalidSequenceNumber(0));
+
            Message::Inventory { timestamp: 0, .. } => {
+
                return Err(PeerError::InvalidTimestamp(0));
            }
-
            Message::Inventory { seq, inv, origin } => {
-
                let sequence = ctx.seqs.entry(self.id()).or_insert(0);
-

-
                // Discard inventory messages from sequence numbers we've already seen.
-
                if seq > *sequence {
-
                    *sequence = seq;
+
            Message::Inventory {
+
                timestamp,
+
                inv,
+
                origin,
+
            } => {
+
                let last = ctx
+
                    .timestamps
+
                    .entry(self.id())
+
                    .or_insert_with(Timestamp::default);
+

+
                // Discard inventory messages from timestamps in the past.
+
                if timestamp > *last {
+
                    *last = timestamp;
                } else {
                    return Ok(None);
                }
@@ -892,7 +901,7 @@ impl Peer {

                if ctx.config.relay {
                    return Ok(Some(Message::Inventory {
-
                        seq,
+
                        timestamp,
                        inv,
                        origin: origin.or_else(|| Some(self.id())),
                    }));
modified node/src/test/tests.rs
@@ -105,12 +105,13 @@ fn test_wrong_peer_magic() {
fn test_inventory_fetch() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+
    let now = LocalTime::now().as_secs();

    alice.connect_to(&bob.addr());
    alice.receive(
        &bob.addr(),
        Message::Inventory {
-
            seq: 1,
+
            timestamp: now,
            inv: vec![],
            origin: None,
        },
@@ -118,7 +119,7 @@ fn test_inventory_fetch() {
}

#[test]
-
fn test_inventory_relay_bad_seq() {
+
fn test_inventory_relay_bad_timestamp() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());

@@ -126,15 +127,15 @@ fn test_inventory_relay_bad_seq() {
    alice.receive(
        &bob.addr(),
        Message::Inventory {
-
            seq: 0,
+
            timestamp: 0,
            inv: vec![],
            origin: None,
        },
    );
    assert_matches!(
        alice.outbox().next(),
-
        Some(Io::Disconnect(addr, DisconnectReason::Error(PeerError::InvalidSequenceNumber(seq))))
-
        if addr == bob.addr() && seq == 0
+
        Some(Io::Disconnect(addr, DisconnectReason::Error(PeerError::InvalidTimestamp(t))))
+
        if addr == bob.addr() && t == 0
    );
}

@@ -145,6 +146,7 @@ fn test_inventory_relay() {
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
    let inv = vec![];
+
    let now = LocalTime::now().as_secs();

    // Inventory from Bob relayed to Eve.
    alice.connect_to(&bob.addr());
@@ -152,15 +154,15 @@ fn test_inventory_relay() {
    alice.receive(
        &bob.addr(),
        Message::Inventory {
-
            seq: 1,
+
            timestamp: now,
            inv: inv.clone(),
            origin: None,
        },
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
-
        Some(Message::Inventory { seq, origin, .. })
-
        if origin == Some(bob.ip) && seq == 1
+
        Some(Message::Inventory { timestamp, origin, .. })
+
        if origin == Some(bob.ip) && timestamp == now
    );
    assert_matches!(
        alice.messages(&bob.addr()).next(),
@@ -171,7 +173,7 @@ fn test_inventory_relay() {
    alice.receive(
        &bob.addr(),
        Message::Inventory {
-
            seq: 1,
+
            timestamp: now,
            inv: inv.clone(),
            origin: None,
        },
@@ -185,15 +187,15 @@ fn test_inventory_relay() {
    alice.receive(
        &bob.addr(),
        Message::Inventory {
-
            seq: 2,
+
            timestamp: now + 1,
            inv: inv.clone(),
            origin: None,
        },
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
-
        Some(Message::Inventory { seq, origin, .. })
-
        if origin == Some(bob.ip) && seq == 2,
+
        Some(Message::Inventory { timestamp, origin, .. })
+
        if origin == Some(bob.ip) && timestamp == now + 1,
        "Sending a new inventory does trigger the relay"
    );

@@ -201,15 +203,15 @@ fn test_inventory_relay() {
    alice.receive(
        &eve.addr(),
        Message::Inventory {
-
            seq: 4,
+
            timestamp: now,
            inv,
            origin: None,
        },
    );
    assert_matches!(
        alice.messages(&bob.addr()).next(),
-
        Some(Message::Inventory { seq, origin, .. })
-
        if origin == Some(eve.ip) && seq == 4
+
        Some(Message::Inventory { timestamp, origin, .. })
+
        if origin == Some(eve.ip) && timestamp == now
    );
}