Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Improve crate organization
Alexis Sellier committed 3 years ago
commit 1057fb952c732f12cbd5b6a71ce6420d5d0c8dd1
parent 77ced78b31d70f797892e57981a084d307891ef9
7 files changed +467 -421
modified node/src/decoder.rs
@@ -1,6 +1,6 @@
use std::marker::PhantomData;

-
use crate::protocol::Envelope;
+
use crate::protocol::message::Envelope;
use serde::Deserialize;

/// Message stream decoder.
modified node/src/protocol.rs
@@ -1,4 +1,8 @@
#![allow(dead_code)]
+
pub mod config;
+
pub mod message;
+
pub mod peer;
+

use std::ops::{Deref, DerefMut};
use std::{collections::VecDeque, fmt, io, net, net::IpAddr};

@@ -8,7 +12,6 @@ use log::*;
use nakamoto::{LocalDuration, LocalTime};
use nakamoto_net as nakamoto;
use nakamoto_net::{Io, Link};
-
use serde::{Deserialize, Serialize};

use crate::address_book;
use crate::address_book::AddressBook;
@@ -16,42 +19,13 @@ use crate::address_manager::AddressManager;
use crate::clock::RefClock;
use crate::collections::{HashMap, HashSet};
use crate::crypto;
-
use crate::decoder::Decoder;
-
use crate::identity::{ProjId, UserId};
+
use crate::identity::ProjId;
+
use crate::protocol::config::{Config, ProjectTracking};
+
use crate::protocol::message::{Envelope, Message};
+
use crate::protocol::peer::{Peer, PeerError, PeerState};
use crate::storage::{self, WriteRepository};
use crate::storage::{Inventory, ReadStorage, Remotes, Unverified, WriteStorage};

-
/// Network node identifier.
-
pub type NodeId = crypto::PublicKey;
-
/// Network routing table. Keeps track of where projects are hosted.
-
pub type Routing = HashMap<ProjId, HashSet<NodeId>>;
-
/// Seconds since epoch.
-
pub type Timestamp = u64;
-

-
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
-
// TODO: We should check the length and charset when deserializing.
-
pub struct Hostname(String);
-

-
/// Peer public protocol address.
-
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
-
pub enum Address {
-
    /// Tor V3 onion address.
-
    Onion {
-
        key: crypto::PublicKey,
-
        port: u16,
-
        checksum: u16,
-
        version: u8,
-
    },
-
    Ip {
-
        ip: net::IpAddr,
-
        port: u16,
-
    },
-
    Hostname {
-
        host: Hostname,
-
        port: u16,
-
    },
-
}
-

pub const NETWORK_MAGIC: u32 = 0x819b43d9;
pub const DEFAULT_PORT: u16 = 8776;
pub const PROTOCOL_VERSION: u32 = 1;
@@ -63,6 +37,13 @@ pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);

+
/// Network node identifier.
+
pub type NodeId = crypto::PublicKey;
+
/// Network routing table. Keeps track of where projects are hosted.
+
pub type Routing = HashMap<ProjId, HashSet<NodeId>>;
+
/// Seconds since epoch.
+
pub type Timestamp = u64;
+

/// Commands sent to the protocol by the operator.
#[derive(Debug)]
pub enum Command {
@@ -70,192 +51,6 @@ pub enum Command {
    Fetch(ProjId, net::SocketAddr),
}

-
/// Message envelope. All messages sent over the network are wrapped in this type.
-
#[derive(Debug, Serialize, Deserialize)]
-
pub struct Envelope {
-
    /// Network magic constant. Used to differentiate networks.
-
    pub magic: u32,
-
    /// The message payload.
-
    pub msg: Message,
-
}
-

-
/// Advertized node feature. Signals what services the node supports.
-
pub type NodeFeatures = [u8; 32];
-

-
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
-
pub struct NodeAnnouncement {
-
    /// Node identifier.
-
    id: NodeId,
-
    /// Advertized features.
-
    features: NodeFeatures,
-
    /// Monotonic timestamp.
-
    timestamp: Timestamp,
-
    /// Non-unique alias. Must be valid UTF-8.
-
    alias: [u8; 32],
-
    /// Announced addresses.
-
    addresses: Vec<Address>,
-
}
-

-
/// Message payload.
-
/// These are the messages peers send to each other.
-
#[derive(Debug, Serialize, Deserialize, Clone)]
-
pub enum Message {
-
    /// Say hello to a peer. This is the first message sent to a peer after connection.
-
    Hello {
-
        // TODO: This is currently untrusted.
-
        id: NodeId,
-
        timestamp: Timestamp,
-
        version: u32,
-
        addrs: Vec<Address>,
-
        git: Url,
-
    },
-
    Node {
-
        /// Signature over the announcement, by the node being announced.
-
        signature: crypto::Signature,
-
        /// Unsigned node announcement.
-
        announcement: NodeAnnouncement,
-
    },
-
    /// Get a peer's inventory.
-
    GetInventory { ids: Vec<ProjId> },
-
    /// Send our inventory to a peer. Sent in response to [`Message::GetInventory`].
-
    /// Nb. This should be the whole inventory, not a partial update.
-
    Inventory {
-
        inv: Vec<ProjId>,
-
        timestamp: Timestamp,
-
        /// Original peer this inventory came from. We don't set this when we
-
        /// are the originator, only when relaying.
-
        origin: Option<NodeId>,
-
    },
-
}
-

-
impl From<Message> for Envelope {
-
    fn from(msg: Message) -> Self {
-
        Self {
-
            magic: NETWORK_MAGIC,
-
            msg,
-
        }
-
    }
-
}
-

-
impl Message {
-
    pub fn hello(id: NodeId, timestamp: Timestamp, addrs: Vec<Address>, git: Url) -> Self {
-
        Self::Hello {
-
            id,
-
            timestamp,
-
            version: PROTOCOL_VERSION,
-
            addrs,
-
            git,
-
        }
-
    }
-

-
    pub fn inventory<S, T, G>(ctx: &mut Context<S, T, G>) -> Result<Self, storage::Error>
-
    where
-
        T: storage::ReadStorage,
-
    {
-
        let timestamp = ctx.timestamp();
-
        let inv = ctx.storage.inventory()?;
-

-
        Ok(Self::Inventory {
-
            timestamp,
-
            inv,
-
            origin: None,
-
        })
-
    }
-

-
    pub fn get_inventory(ids: impl Into<Vec<ProjId>>) -> Self {
-
        Self::GetInventory { ids: ids.into() }
-
    }
-
}
-

-
/// Project tracking policy.
-
#[derive(Debug)]
-
pub enum ProjectTracking {
-
    /// Track all projects we come across.
-
    All { blocked: HashSet<ProjId> },
-
    /// Track a static list of projects.
-
    Allowed(HashSet<ProjId>),
-
}
-

-
impl Default for ProjectTracking {
-
    fn default() -> Self {
-
        Self::All {
-
            blocked: HashSet::default(),
-
        }
-
    }
-
}
-

-
/// Project remote tracking policy.
-
#[derive(Debug, Default)]
-
pub enum RemoteTracking {
-
    /// Only track remotes of project delegates.
-
    #[default]
-
    DelegatesOnly,
-
    /// Track all remotes.
-
    All { blocked: HashSet<UserId> },
-
    /// Track a specific list of users as well as the project delegates.
-
    Allowed(HashSet<UserId>),
-
}
-

-
/// Protocol configuration.
-
#[derive(Debug)]
-
pub struct Config {
-
    /// Peers to connect to on startup.
-
    /// Connections to these peers will be maintained.
-
    pub connect: Vec<net::SocketAddr>,
-
    /// Project tracking policy.
-
    pub project_tracking: ProjectTracking,
-
    /// Project remote tracking policy.
-
    pub remote_tracking: RemoteTracking,
-
    /// Whether or not our node should relay inventories.
-
    pub relay: bool,
-
    /// List of addresses to listen on for protocol connections.
-
    pub listen: Vec<Address>,
-
    /// Our Git URL for fetching projects.
-
    pub git_url: Url,
-
}
-

-
impl Default for Config {
-
    fn default() -> Self {
-
        Self {
-
            connect: Vec::default(),
-
            project_tracking: ProjectTracking::default(),
-
            remote_tracking: RemoteTracking::default(),
-
            relay: true,
-
            listen: vec![],
-
            git_url: Url::default(),
-
        }
-
    }
-
}
-

-
impl Config {
-
    pub fn is_persistent(&self, addr: &net::SocketAddr) -> bool {
-
        self.connect.contains(addr)
-
    }
-

-
    pub fn is_tracking(&self, proj: &ProjId) -> bool {
-
        match &self.project_tracking {
-
            ProjectTracking::All { blocked } => !blocked.contains(proj),
-
            ProjectTracking::Allowed(projs) => projs.contains(proj),
-
        }
-
    }
-

-
    /// Track a project. Returns whether the policy was updated.
-
    pub fn track(&mut self, proj: ProjId) -> bool {
-
        match &mut self.project_tracking {
-
            ProjectTracking::All { .. } => false,
-
            ProjectTracking::Allowed(projs) => projs.insert(proj),
-
        }
-
    }
-

-
    /// Untrack a project. Returns whether the policy was updated.
-
    pub fn untrack(&mut self, proj: ProjId) -> bool {
-
        match &mut self.project_tracking {
-
            ProjectTracking::All { blocked } => blocked.insert(proj),
-
            ProjectTracking::Allowed(projs) => projs.remove(&proj),
-
        }
-
    }
-
}
-

#[derive(Debug)]
pub struct Protocol<S, T, G> {
    /// Peers currently or recently connected.
@@ -536,12 +331,12 @@ where
    fn attempted(&mut self, addr: &std::net::SocketAddr) {
        let ip = addr.ip();
        let persistent = self.context.config.is_persistent(addr);
-
        let mut peer = self
+
        let peer = self
            .peers
            .entry(ip)
            .or_insert_with(|| Peer::new(*addr, Link::Outbound, persistent));

-
        peer.attempts += 1;
+
        peer.attempted();
    }

    fn connected(
@@ -573,8 +368,7 @@ where
                        Message::get_inventory([]),
                    ],
                );
-

-
                peer.attempts = 0;
+
                peer.connected();
            }
        } else {
            self.peers.insert(
@@ -602,7 +396,8 @@ where
            peer.state = PeerState::Disconnected { since };

            // Attempt to re-connect to persistent peers.
-
            if self.context.config.is_persistent(addr) && peer.attempts < MAX_CONNECTION_ATTEMPTS {
+
            if self.context.config.is_persistent(addr) && peer.attempts() < MAX_CONNECTION_ATTEMPTS
+
            {
                if reason.is_dial_err() {
                    return;
                }
@@ -613,7 +408,7 @@ where
                }
                // TODO: Eventually we want a delay before attempting a reconnection,
                // with exponential back-off.
-
                debug!("Reconnecting to {} (attempts={})...", ip, peer.attempts);
+
                debug!("Reconnecting to {} (attempts={})...", ip, peer.attempts());

                // TODO: Try to reconnect only if the peer was attempted. A disconnect without
                // even a successful attempt means that we're unlikely to be able to reconnect.
@@ -635,7 +430,7 @@ where
            .collect::<Vec<_>>();

        let (peer, msgs) = if let Some(peer) = self.peers.get_mut(&peer) {
-
            let decoder = &mut peer.inbox;
+
            let decoder = &mut peer.inbox();
            decoder.input(bytes);

            let mut msgs = Vec::with_capacity(1);
@@ -893,196 +688,3 @@ impl DerefMut for Peers {
        &mut self.0
    }
}
-

-
#[derive(Debug, Default)]
-
#[allow(clippy::large_enum_variant)]
-
enum PeerState {
-
    /// Initial peer state. For outgoing peers this
-
    /// means we've attempted a connection. For incoming
-
    /// peers, this means they've successfully connected
-
    /// to us.
-
    #[default]
-
    Initial,
-
    /// State after successful handshake.
-
    Negotiated {
-
        /// The peer's unique identifier.
-
        id: NodeId,
-
        since: LocalTime,
-
        /// Addresses this peer is reachable on.
-
        addrs: Vec<Address>,
-
        git: Url,
-
    },
-
    /// When a peer is disconnected.
-
    Disconnected { since: LocalTime },
-
}
-

-
#[derive(thiserror::Error, Debug, Clone)]
-
pub enum PeerError {
-
    #[error("wrong network constant in message: {0}")]
-
    WrongMagic(u32),
-
    #[error("wrong protocol version in message: {0}")]
-
    WrongVersion(u32),
-
    #[error("invalid inventory timestamp: {0}")]
-
    InvalidTimestamp(u64),
-
    #[error("peer misbehaved")]
-
    Misbehavior,
-
}
-

-
#[derive(Debug)]
-
pub struct Peer {
-
    /// Peer address.
-
    addr: net::SocketAddr,
-
    /// Inbox for incoming messages from peer.
-
    inbox: Decoder,
-
    /// Peer connection state.
-
    state: PeerState,
-
    /// Connection direction.
-
    link: Link,
-
    /// Last known peer time.
-
    timestamp: Timestamp,
-
    /// Whether we should attempt to re-connect
-
    /// to this peer upon disconnection.
-
    persistent: bool,
-
    /// Connection attempts. For persistent peers, Tracks
-
    /// how many times we've attempted to connect. We reset this to zero
-
    /// upon successful connection.
-
    attempts: usize,
-
}
-

-
impl Peer {
-
    fn new(addr: net::SocketAddr, link: Link, persistent: bool) -> Self {
-
        Self {
-
            addr,
-
            inbox: Decoder::new(256),
-
            state: PeerState::default(),
-
            link,
-
            timestamp: Timestamp::default(),
-
            persistent,
-
            attempts: 0,
-
        }
-
    }
-

-
    fn ip(&self) -> IpAddr {
-
        self.addr.ip()
-
    }
-

-
    fn is_negotiated(&self) -> bool {
-
        matches!(self.state, PeerState::Negotiated { .. })
-
    }
-

-
    fn received<S, T, G>(
-
        &mut self,
-
        envelope: Envelope,
-
        ctx: &mut Context<S, T, G>,
-
    ) -> Result<Option<Message>, PeerError>
-
    where
-
        T: storage::ReadStorage + storage::WriteStorage,
-
        G: crypto::Signer,
-
    {
-
        if envelope.magic != NETWORK_MAGIC {
-
            return Err(PeerError::WrongMagic(envelope.magic));
-
        }
-
        debug!("Received {:?} from {}", &envelope.msg, self.ip());
-

-
        match (&self.state, envelope.msg) {
-
            (
-
                PeerState::Initial,
-
                Message::Hello {
-
                    id,
-
                    timestamp,
-
                    version,
-
                    addrs,
-
                    git,
-
                },
-
            ) => {
-
                let now = ctx.timestamp();
-

-
                if timestamp.abs_diff(now) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(PeerError::InvalidTimestamp(timestamp));
-
                }
-
                if version != PROTOCOL_VERSION {
-
                    return Err(PeerError::WrongVersion(version));
-
                }
-
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
-
                // extra "acknowledgment" message sent when the `Hello` is well received.
-
                if self.link.is_inbound() {
-
                    let git = ctx.config.git_url.clone();
-
                    ctx.write_all(
-
                        self.addr,
-
                        [
-
                            Message::hello(ctx.id(), now, ctx.config.listen.clone(), git),
-
                            Message::get_inventory([]),
-
                        ],
-
                    );
-
                }
-
                // Nb. we don't set the peer timestamp here, since it is going to be
-
                // set after the first message is received only. Setting it here would
-
                // mean that messages received right after the handshake could be ignored.
-
                self.state = PeerState::Negotiated {
-
                    id,
-
                    since: ctx.clock.local_time(),
-
                    addrs,
-
                    git,
-
                };
-
            }
-
            (PeerState::Initial, _) => {
-
                debug!(
-
                    "Disconnecting peer {} for sending us a message before handshake",
-
                    self.ip()
-
                );
-
                return Err(PeerError::Misbehavior);
-
            }
-
            (PeerState::Negotiated { .. }, Message::GetInventory { .. }) => {
-
                // TODO: Handle partial inventory requests.
-
                let inventory = Message::inventory(ctx).unwrap();
-
                ctx.write(self.addr, inventory);
-
            }
-
            (
-
                PeerState::Negotiated { id, git, .. },
-
                Message::Inventory {
-
                    timestamp,
-
                    inv,
-
                    origin,
-
                },
-
            ) => {
-
                let now = ctx.clock.local_time();
-
                let last = self.timestamp;
-

-
                // Don't allow messages from too far in the past or future.
-
                if timestamp.abs_diff(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(PeerError::InvalidTimestamp(timestamp));
-
                }
-
                // Discard inventory messages we've already seen, otherwise update
-
                // out last seen time.
-
                if timestamp > last {
-
                    self.timestamp = timestamp;
-
                } else {
-
                    return Ok(None);
-
                }
-
                ctx.process_inventory(&inv, origin.unwrap_or(*id), git);
-

-
                if ctx.config.relay {
-
                    return Ok(Some(Message::Inventory {
-
                        timestamp,
-
                        inv,
-
                        origin: origin.or(Some(*id)),
-
                    }));
-
                }
-
            }
-
            (PeerState::Negotiated { .. }, Message::Node { .. }) => {
-
                todo!();
-
            }
-
            (PeerState::Negotiated { .. }, Message::Hello { .. }) => {
-
                debug!(
-
                    "Disconnecting peer {} for sending us a redundant handshake message",
-
                    self.ip()
-
                );
-
                return Err(PeerError::Misbehavior);
-
            }
-
            (PeerState::Disconnected { .. }, msg) => {
-
                debug!("Ignoring {:?} from disconnected peer {}", msg, self.ip());
-
            }
-
        }
-
        Ok(None)
-
    }
-
}
added node/src/protocol/config.rs
@@ -0,0 +1,96 @@
+
use std::net;
+

+
use git_url::Url;
+

+
use crate::collections::HashSet;
+
use crate::identity::{ProjId, UserId};
+
use crate::protocol::message::Address;
+

+
/// Project tracking policy.
+
#[derive(Debug)]
+
pub enum ProjectTracking {
+
    /// Track all projects we come across.
+
    All { blocked: HashSet<ProjId> },
+
    /// Track a static list of projects.
+
    Allowed(HashSet<ProjId>),
+
}
+

+
impl Default for ProjectTracking {
+
    fn default() -> Self {
+
        Self::All {
+
            blocked: HashSet::default(),
+
        }
+
    }
+
}
+

+
/// Project remote tracking policy.
+
#[derive(Debug, Default)]
+
pub enum RemoteTracking {
+
    /// Only track remotes of project delegates.
+
    #[default]
+
    DelegatesOnly,
+
    /// Track all remotes.
+
    All { blocked: HashSet<UserId> },
+
    /// Track a specific list of users as well as the project delegates.
+
    Allowed(HashSet<UserId>),
+
}
+

+
/// Protocol configuration.
+
#[derive(Debug)]
+
pub struct Config {
+
    /// Peers to connect to on startup.
+
    /// Connections to these peers will be maintained.
+
    pub connect: Vec<net::SocketAddr>,
+
    /// Project tracking policy.
+
    pub project_tracking: ProjectTracking,
+
    /// Project remote tracking policy.
+
    pub remote_tracking: RemoteTracking,
+
    /// Whether or not our node should relay inventories.
+
    pub relay: bool,
+
    /// List of addresses to listen on for protocol connections.
+
    pub listen: Vec<Address>,
+
    /// Our Git URL for fetching projects.
+
    pub git_url: Url,
+
}
+

+
impl Default for Config {
+
    fn default() -> Self {
+
        Self {
+
            connect: Vec::default(),
+
            project_tracking: ProjectTracking::default(),
+
            remote_tracking: RemoteTracking::default(),
+
            relay: true,
+
            listen: vec![],
+
            git_url: Url::default(),
+
        }
+
    }
+
}
+

+
impl Config {
+
    pub fn is_persistent(&self, addr: &net::SocketAddr) -> bool {
+
        self.connect.contains(addr)
+
    }
+

+
    pub fn is_tracking(&self, proj: &ProjId) -> bool {
+
        match &self.project_tracking {
+
            ProjectTracking::All { blocked } => !blocked.contains(proj),
+
            ProjectTracking::Allowed(projs) => projs.contains(proj),
+
        }
+
    }
+

+
    /// Track a project. Returns whether the policy was updated.
+
    pub fn track(&mut self, proj: ProjId) -> bool {
+
        match &mut self.project_tracking {
+
            ProjectTracking::All { .. } => false,
+
            ProjectTracking::Allowed(projs) => projs.insert(proj),
+
        }
+
    }
+

+
    /// Untrack a project. Returns whether the policy was updated.
+
    pub fn untrack(&mut self, proj: ProjId) -> bool {
+
        match &mut self.project_tracking {
+
            ProjectTracking::All { blocked } => blocked.insert(proj),
+
            ProjectTracking::Allowed(projs) => projs.remove(&proj),
+
        }
+
    }
+
}
added node/src/protocol/message.rs
@@ -0,0 +1,130 @@
+
use std::net;
+

+
use git_url::Url;
+
use serde::{Deserialize, Serialize};
+

+
use crate::crypto;
+
use crate::identity::ProjId;
+
use crate::protocol::{Context, NodeId, Timestamp, NETWORK_MAGIC, PROTOCOL_VERSION};
+
use crate::storage;
+

+
/// Message envelope. All messages sent over the network are wrapped in this type.
+
#[derive(Debug, Serialize, Deserialize)]
+
pub struct Envelope {
+
    /// Network magic constant. Used to differentiate networks.
+
    pub magic: u32,
+
    /// The message payload.
+
    pub msg: Message,
+
}
+

+
/// Advertized node feature. Signals what services the node supports.
+
pub type NodeFeatures = [u8; 32];
+

+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
+
// TODO: We should check the length and charset when deserializing.
+
pub struct Hostname(String);
+

+
/// Peer public protocol address.
+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
+
pub enum Address {
+
    /// Tor V3 onion address.
+
    Onion {
+
        key: crypto::PublicKey,
+
        port: u16,
+
        checksum: u16,
+
        version: u8,
+
    },
+
    Ip {
+
        ip: net::IpAddr,
+
        port: u16,
+
    },
+
    Hostname {
+
        host: Hostname,
+
        port: u16,
+
    },
+
}
+

+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
+
pub struct NodeAnnouncement {
+
    /// Node identifier.
+
    id: NodeId,
+
    /// Advertized features.
+
    features: NodeFeatures,
+
    /// Monotonic timestamp.
+
    timestamp: Timestamp,
+
    /// Non-unique alias. Must be valid UTF-8.
+
    alias: [u8; 32],
+
    /// Announced addresses.
+
    addresses: Vec<Address>,
+
}
+

+
/// Message payload.
+
/// These are the messages peers send to each other.
+
#[derive(Debug, Serialize, Deserialize, Clone)]
+
pub enum Message {
+
    /// Say hello to a peer. This is the first message sent to a peer after connection.
+
    Hello {
+
        // TODO: This is currently untrusted.
+
        id: NodeId,
+
        timestamp: Timestamp,
+
        version: u32,
+
        addrs: Vec<Address>,
+
        git: Url,
+
    },
+
    Node {
+
        /// Signature over the announcement, by the node being announced.
+
        signature: crypto::Signature,
+
        /// Unsigned node announcement.
+
        announcement: NodeAnnouncement,
+
    },
+
    /// Get a peer's inventory.
+
    GetInventory { ids: Vec<ProjId> },
+
    /// Send our inventory to a peer. Sent in response to [`Message::GetInventory`].
+
    /// Nb. This should be the whole inventory, not a partial update.
+
    Inventory {
+
        inv: Vec<ProjId>,
+
        timestamp: Timestamp,
+
        /// Original peer this inventory came from. We don't set this when we
+
        /// are the originator, only when relaying.
+
        origin: Option<NodeId>,
+
    },
+
}
+

+
impl From<Message> for Envelope {
+
    fn from(msg: Message) -> Self {
+
        Self {
+
            magic: NETWORK_MAGIC,
+
            msg,
+
        }
+
    }
+
}
+

+
impl Message {
+
    pub fn hello(id: NodeId, timestamp: Timestamp, addrs: Vec<Address>, git: Url) -> Self {
+
        Self::Hello {
+
            id,
+
            timestamp,
+
            version: PROTOCOL_VERSION,
+
            addrs,
+
            git,
+
        }
+
    }
+

+
    pub fn inventory<S, T, G>(ctx: &mut Context<S, T, G>) -> Result<Self, storage::Error>
+
    where
+
        T: storage::ReadStorage,
+
    {
+
        let timestamp = ctx.timestamp();
+
        let inv = ctx.storage.inventory()?;
+

+
        Ok(Self::Inventory {
+
            timestamp,
+
            inv,
+
            origin: None,
+
        })
+
    }
+

+
    pub fn get_inventory(ids: impl Into<Vec<ProjId>>) -> Self {
+
        Self::GetInventory { ids: ids.into() }
+
    }
+
}
added node/src/protocol/peer.rs
@@ -0,0 +1,213 @@
+
use crate::decoder::Decoder;
+
use crate::protocol::message::*;
+
use crate::protocol::*;
+

+
#[derive(Debug, Default)]
+
#[allow(clippy::large_enum_variant)]
+
pub enum PeerState {
+
    /// Initial peer state. For outgoing peers this
+
    /// means we've attempted a connection. For incoming
+
    /// peers, this means they've successfully connected
+
    /// to us.
+
    #[default]
+
    Initial,
+
    /// State after successful handshake.
+
    Negotiated {
+
        /// The peer's unique identifier.
+
        id: NodeId,
+
        since: LocalTime,
+
        /// Addresses this peer is reachable on.
+
        addrs: Vec<Address>,
+
        git: Url,
+
    },
+
    /// When a peer is disconnected.
+
    Disconnected { since: LocalTime },
+
}
+

+
#[derive(thiserror::Error, Debug, Clone)]
+
pub enum PeerError {
+
    #[error("wrong network constant in message: {0}")]
+
    WrongMagic(u32),
+
    #[error("wrong protocol version in message: {0}")]
+
    WrongVersion(u32),
+
    #[error("invalid inventory timestamp: {0}")]
+
    InvalidTimestamp(u64),
+
    #[error("peer misbehaved")]
+
    Misbehavior,
+
}
+

+
#[derive(Debug)]
+
pub struct Peer {
+
    /// Peer address.
+
    pub addr: net::SocketAddr,
+
    /// Connection direction.
+
    pub link: Link,
+
    /// Whether we should attempt to re-connect
+
    /// to this peer upon disconnection.
+
    pub persistent: bool,
+
    /// Peer connection state.
+
    pub state: PeerState,
+
    /// Last known peer time.
+
    pub timestamp: Timestamp,
+

+
    /// Inbox for incoming messages from peer.
+
    inbox: Decoder,
+
    /// Connection attempts. For persistent peers, Tracks
+
    /// how many times we've attempted to connect. We reset this to zero
+
    /// upon successful connection.
+
    attempts: usize,
+
}
+

+
impl Peer {
+
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool) -> Self {
+
        Self {
+
            addr,
+
            inbox: Decoder::new(256),
+
            state: PeerState::default(),
+
            link,
+
            timestamp: Timestamp::default(),
+
            persistent,
+
            attempts: 0,
+
        }
+
    }
+

+
    pub fn ip(&self) -> IpAddr {
+
        self.addr.ip()
+
    }
+

+
    pub fn is_negotiated(&self) -> bool {
+
        matches!(self.state, PeerState::Negotiated { .. })
+
    }
+

+
    pub fn inbox(&mut self) -> &mut Decoder {
+
        &mut self.inbox
+
    }
+

+
    pub fn attempts(&self) -> usize {
+
        self.attempts
+
    }
+

+
    pub fn attempted(&mut self) {
+
        self.attempts += 1;
+
    }
+

+
    pub fn connected(&mut self) {
+
        self.attempts = 0;
+
    }
+

+
    pub fn received<S, T, G>(
+
        &mut self,
+
        envelope: Envelope,
+
        ctx: &mut Context<S, T, G>,
+
    ) -> Result<Option<Message>, PeerError>
+
    where
+
        T: storage::ReadStorage + storage::WriteStorage,
+
        G: crypto::Signer,
+
    {
+
        if envelope.magic != NETWORK_MAGIC {
+
            return Err(PeerError::WrongMagic(envelope.magic));
+
        }
+
        debug!("Received {:?} from {}", &envelope.msg, self.ip());
+

+
        match (&self.state, envelope.msg) {
+
            (
+
                PeerState::Initial,
+
                Message::Hello {
+
                    id,
+
                    timestamp,
+
                    version,
+
                    addrs,
+
                    git,
+
                },
+
            ) => {
+
                let now = ctx.timestamp();
+

+
                if timestamp.abs_diff(now) > MAX_TIME_DELTA.as_secs() {
+
                    return Err(PeerError::InvalidTimestamp(timestamp));
+
                }
+
                if version != PROTOCOL_VERSION {
+
                    return Err(PeerError::WrongVersion(version));
+
                }
+
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
+
                // extra "acknowledgment" message sent when the `Hello` is well received.
+
                if self.link.is_inbound() {
+
                    let git = ctx.config.git_url.clone();
+
                    ctx.write_all(
+
                        self.addr,
+
                        [
+
                            Message::hello(ctx.id(), now, ctx.config.listen.clone(), git),
+
                            Message::get_inventory([]),
+
                        ],
+
                    );
+
                }
+
                // Nb. we don't set the peer timestamp here, since it is going to be
+
                // set after the first message is received only. Setting it here would
+
                // mean that messages received right after the handshake could be ignored.
+
                self.state = PeerState::Negotiated {
+
                    id,
+
                    since: ctx.clock.local_time(),
+
                    addrs,
+
                    git,
+
                };
+
            }
+
            (PeerState::Initial, _) => {
+
                debug!(
+
                    "Disconnecting peer {} for sending us a message before handshake",
+
                    self.ip()
+
                );
+
                return Err(PeerError::Misbehavior);
+
            }
+
            (PeerState::Negotiated { .. }, Message::GetInventory { .. }) => {
+
                // TODO: Handle partial inventory requests.
+
                let inventory = Message::inventory(ctx).unwrap();
+
                ctx.write(self.addr, inventory);
+
            }
+
            (
+
                PeerState::Negotiated { id, git, .. },
+
                Message::Inventory {
+
                    timestamp,
+
                    inv,
+
                    origin,
+
                },
+
            ) => {
+
                let now = ctx.clock.local_time();
+
                let last = self.timestamp;
+

+
                // Don't allow messages from too far in the past or future.
+
                if timestamp.abs_diff(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
+
                    return Err(PeerError::InvalidTimestamp(timestamp));
+
                }
+
                // Discard inventory messages we've already seen, otherwise update
+
                // out last seen time.
+
                if timestamp > last {
+
                    self.timestamp = timestamp;
+
                } else {
+
                    return Ok(None);
+
                }
+
                ctx.process_inventory(&inv, origin.unwrap_or(*id), git);
+

+
                if ctx.config.relay {
+
                    return Ok(Some(Message::Inventory {
+
                        timestamp,
+
                        inv,
+
                        origin: origin.or(Some(*id)),
+
                    }));
+
                }
+
            }
+
            (PeerState::Negotiated { .. }, Message::Node { .. }) => {
+
                todo!();
+
            }
+
            (PeerState::Negotiated { .. }, Message::Hello { .. }) => {
+
                debug!(
+
                    "Disconnecting peer {} for sending us a redundant handshake message",
+
                    self.ip()
+
                );
+
                return Err(PeerError::Misbehavior);
+
            }
+
            (PeerState::Disconnected { .. }, msg) => {
+
                debug!("Ignoring {:?} from disconnected peer {}", msg, self.ip());
+
            }
+
        }
+
        Ok(None)
+
    }
+
}
modified node/src/test/peer.rs
@@ -10,6 +10,8 @@ use crate::address_book::{KnownAddress, Source};
use crate::clock::RefClock;
use crate::collections::HashMap;
use crate::decoder::Decoder;
+
use crate::protocol::config::*;
+
use crate::protocol::message::*;
use crate::protocol::*;
use crate::storage::{ReadStorage, WriteStorage};
use crate::test::crypto::MockSigner;
modified node/src/test/tests.rs
@@ -7,6 +7,9 @@ use nakamoto_net::simulator::{Peer as _, Simulation};
use nakamoto_net::Protocol as _;

use crate::collections::{HashMap, HashSet};
+
use crate::protocol::config::*;
+
use crate::protocol::message::*;
+
use crate::protocol::peer::*;
use crate::protocol::*;
use crate::storage::git::Storage;
use crate::storage::ReadStorage;