Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Remove peer timestamp
Alexis Sellier committed 3 years ago
commit 71698818496edab9bef8854553b5e97ca7a7e72a
parent 1fa85f0da3d3b545f830b5f48b2e79ec7b34fec4
7 files changed +47 -70
modified radicle-node/src/service.rs
@@ -4,8 +4,9 @@ pub mod filter;
pub mod message;
pub mod peer;

+
use std::collections::{BTreeMap, VecDeque};
use std::ops::{Deref, DerefMut};
-
use std::{collections::VecDeque, fmt, net, net::IpAddr};
+
use std::{fmt, net, net::IpAddr};

use crossbeam_channel as chan;
use fastrand::Rng;
@@ -236,7 +237,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
    }

    /// Get the connected peers.
-
    pub fn peers(&self) -> &Sessions {
+
    pub fn sessions(&self) -> &Sessions {
        &self.sessions
    }

@@ -651,6 +652,13 @@ pub struct Lookup {
    pub remote: Vec<NodeId>,
}

+
/// Information on a peer, that we may or may not be connected to.
+
#[derive(Default, Debug)]
+
pub struct Peer {
+
    /// Timestamp of the last message received from peer.
+
    pub last_message: Timestamp,
+
}
+

/// Global service state used across peers.
#[derive(Debug)]
pub struct Context<S, T, G> {
@@ -660,6 +668,8 @@ pub struct Context<S, T, G> {
    signer: G,
    /// Tracks the location of projects.
    routing: Routing,
+
    /// Keeps track of peer states.
+
    peers: BTreeMap<NodeId, Peer>,
    /// Outgoing I/O queue.
    io: VecDeque<Io>,
    /// Clock. Tells the time.
@@ -700,6 +710,7 @@ where
            signer,
            clock,
            routing: HashMap::with_hasher(rng.clone().into()),
+
            peers: BTreeMap::new(),
            io: VecDeque::new(),
            storage,
            addrmgr,
@@ -741,12 +752,7 @@ where
    fn handshake_messages(&self) -> [Message; 4] {
        let git = self.config.git_url.clone();
        [
-
            Message::init(
-
                self.node_id(),
-
                self.timestamp(),
-
                self.config.listen.clone(),
-
                git,
-
            ),
+
            Message::init(self.node_id(), self.config.listen.clone(), git),
            Message::node(self.node_announcement(), &self.signer),
            Message::inventory(self.inventory_announcement().unwrap(), &self.signer),
            Message::subscribe(self.filter(), self.timestamp(), Timestamp::MAX),
modified radicle-node/src/service/message.rs
@@ -156,13 +156,14 @@ impl InventoryAnnouncement {

/// Message payload.
/// These are the messages peers send to each other.
+
///
+
/// "Announcement" messages are messages that are relayed between peers.
#[derive(Clone, PartialEq, Eq)]
pub enum Message {
    /// The first message sent to a peer after connection.
    Initialize {
        // TODO: This is currently untrusted.
        id: NodeId,
-
        timestamp: Timestamp,
        version: u32,
        addrs: Vec<Address>,
        git: git::Url,
@@ -205,13 +206,12 @@ pub enum Message {
}

impl Message {
-
    pub fn init(id: NodeId, timestamp: Timestamp, addrs: Vec<Address>, git: git::Url) -> Self {
+
    pub fn init(id: NodeId, addrs: Vec<Address>, git: git::Url) -> Self {
        Self::Initialize {
            id,
-
            timestamp,
            version: PROTOCOL_VERSION,
-
            addrs,
            git,
+
            addrs,
        }
    }

modified radicle-node/src/service/peer.rs
@@ -29,7 +29,7 @@ pub enum SessionError {
    WrongMagic(u32),
    #[error("wrong protocol version in message: {0}")]
    WrongVersion(u32),
-
    #[error("invalid inventory timestamp: {0}")]
+
    #[error("invalid announcement timestamp: {0}")]
    InvalidTimestamp(u64),
    #[error("peer misbehaved")]
    Misbehavior,
@@ -47,8 +47,6 @@ pub struct Session {
    pub persistent: bool,
    /// Peer connection state.
    pub state: SessionState,
-
    /// Last known peer time.
-
    pub timestamp: Timestamp,
    /// Peer subscription.
    pub subscribe: Option<Subscribe>,

@@ -64,7 +62,6 @@ impl Session {
            addr,
            state: SessionState::default(),
            link,
-
            timestamp: Timestamp::default(),
            subscribe: None,
            persistent,
            attempts: 0,
@@ -110,17 +107,11 @@ impl Session {
                SessionState::Initial,
                Message::Initialize {
                    id,
-
                    timestamp,
                    version,
                    addrs,
                    git,
                },
            ) => {
-
                let now = ctx.timestamp();
-

-
                if timestamp.abs_diff(now) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(SessionError::InvalidTimestamp(timestamp));
-
                }
                if version != PROTOCOL_VERSION {
                    return Err(SessionError::WrongVersion(version));
                }
@@ -154,18 +145,17 @@ impl Session {
                    signature,
                },
            ) => {
-
                // FIXME: This is wrong, we are comparing timestamps of different peers.
                let now = ctx.clock.local_time();
-
                let last = self.timestamp;
+
                let peer = ctx.peers.entry(node).or_insert_with(Peer::default);

-
                // Don't allow messages from too far in the past or future.
-
                if message.timestamp.abs_diff(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
+
                // Don't allow messages from too far in the future.
+
                if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
                }
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
-
                if message.timestamp > last {
-
                    self.timestamp = message.timestamp;
+
                if message.timestamp > peer.last_message {
+
                    peer.last_message = message.timestamp;
                } else {
                    return Ok(None);
                }
@@ -188,6 +178,8 @@ impl Session {
                    signature,
                },
            ) => {
+
                // FIXME: Check message timestamp.
+

                if message.verify(&node, &signature) {
                    // TODO: Buffer/throttle fetches.
                    // TODO: Check that we're tracking this user as well.
@@ -222,6 +214,8 @@ impl Session {
                    signature,
                },
            ) => {
+
                // FIXME: Check message timestamp.
+

                if !message.verify(&node, &signature) {
                    return Err(SessionError::Misbehavior);
                }
modified radicle-node/src/test/peer.rs
@@ -150,12 +150,7 @@ where
        self.service.connected(remote, &local, Link::Inbound);
        self.receive(
            &remote,
-
            Message::init(
-
                peer.node_id(),
-
                self.local_time().as_secs(),
-
                vec![Address::from(remote)],
-
                git,
-
            ),
+
            Message::init(peer.node_id(), vec![Address::from(remote)], git),
        );

        let mut msgs = self.messages(&remote);
@@ -182,12 +177,7 @@ where
        let git = peer.config().git_url.clone();
        self.receive(
            &remote,
-
            Message::init(
-
                peer.node_id(),
-
                self.local_time().as_secs(),
-
                peer.config().listen.clone(),
-
                git,
-
            ),
+
            Message::init(peer.node_id(), peer.config().listen.clone(), git),
        );
    }

modified radicle-node/src/test/tests.rs
@@ -20,8 +20,8 @@ use crate::test::signer::MockSigner;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};
use crate::test::storage::MockStorage;
+
use crate::LocalTime;
use crate::{client, identity, rad, service, storage, test};
-
use crate::{Link, LocalTime};

// NOTE
//
@@ -42,7 +42,7 @@ fn test_outbound_connection() {

    let peers = alice
        .service
-
        .peers()
+
        .sessions()
        .negotiated()
        .map(|(ip, _)| *ip)
        .collect::<Vec<_>>();
@@ -62,7 +62,7 @@ fn test_inbound_connection() {

    let peers = alice
        .service
-
        .peers()
+
        .sessions()
        .negotiated()
        .map(|(ip, _)| *ip)
        .collect::<Vec<_>>();
@@ -104,27 +104,6 @@ fn test_wrong_peer_version() {
}

#[test]
-
fn test_handshake_invalid_timestamp() {
-
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
-
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
-
    let time_delta = MAX_TIME_DELTA.as_secs() + 1;
-
    let local = std::net::SocketAddr::new(bob.ip, bob.rng.u16(..));
-

-
    alice.initialize();
-
    alice.connected(bob.addr(), &local, Link::Inbound);
-
    alice.receive(
-
        &bob.addr(),
-
        Message::init(
-
            bob.node_id(),
-
            alice.timestamp() - time_delta,
-
            vec![],
-
            bob.git_url(),
-
        ),
-
    );
-
    assert_matches!(alice.outbox().next(), Some(Io::Disconnect(addr, _)) if addr == bob.addr());
-
}
-

-
#[test]
#[ignore]
fn test_wrong_peer_magic() {
    // TODO
@@ -211,7 +190,7 @@ fn test_inventory_relay_bad_timestamp() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
    let two_hours = 3600 * 2;
-
    let timestamp = alice.local_time.as_secs() - two_hours;
+
    let timestamp = alice.local_time.as_secs() + two_hours;

    alice.connect_to(&bob);
    alice.receive(
@@ -341,7 +320,7 @@ fn test_persistent_peer_reconnect() {
    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

    let ips = alice
-
        .peers()
+
        .sessions()
        .negotiated()
        .map(|(ip, _)| *ip)
        .collect::<Vec<_>>();
modified radicle-node/src/wire/message.rs
@@ -132,13 +132,11 @@ impl wire::Encode for Message {
        match self {
            Self::Initialize {
                id,
-
                timestamp,
                version,
                addrs,
                git,
            } => {
                n += id.encode(writer)?;
-
                n += timestamp.encode(writer)?;
                n += version.encode(writer)?;
                n += addrs.as_slice().encode(writer)?;
                n += git.encode(writer)?;
@@ -191,14 +189,12 @@ impl wire::Decode for Message {
        match MessageType::try_from(type_id) {
            Ok(MessageType::Initialize) => {
                let id = NodeId::decode(reader)?;
-
                let timestamp = Timestamp::decode(reader)?;
                let version = u32::decode(reader)?;
                let addrs = Vec::<Address>::decode(reader)?;
                let git = git::Url::decode(reader)?;

                Ok(Self::Initialize {
                    id,
-
                    timestamp,
                    version,
                    addrs,
                    git,
modified radicle/src/crypto.rs
@@ -135,6 +135,18 @@ impl std::hash::Hash for PublicKey {
    }
}

+
impl PartialOrd for PublicKey {
+
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+
        self.0.as_ref().partial_cmp(other.as_ref())
+
    }
+
}
+

+
impl Ord for PublicKey {
+
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+
        self.0.as_ref().cmp(other.as_ref())
+
    }
+
}
+

impl fmt::Display for PublicKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.to_human())