Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Don't simulate transport
Alexis Sellier committed 3 years ago
commit 1017a2bc69d6bb1a42a3c35c52686cc9b1a2496f
parent 2b6e20b50c36db5cc15a185ad89a64d697d1e8b8
6 files changed +686 -57
modified node/src/protocol.rs
@@ -14,7 +14,7 @@ use git_url::Url;
use log::*;
use nakamoto::{LocalDuration, LocalTime};
use nakamoto_net as nakamoto;
-
use nakamoto_net::{Io, Link};
+
use nakamoto_net::Link;
use nonempty::NonEmpty;

use crate::address_book;
@@ -25,16 +25,16 @@ use crate::collections::{HashMap, HashSet};
use crate::crypto;
use crate::identity::{Id, Project};
use crate::protocol::config::ProjectTracking;
-
use crate::protocol::message::{Message, NodeAnnouncement, RefsAnnouncement};
+
use crate::protocol::message::{NodeAnnouncement, RefsAnnouncement};
use crate::protocol::peer::{Peer, PeerError, PeerState};
-
use crate::protocol::wire::Encode;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};

pub use crate::protocol::config::{Config, Network};
+
pub use crate::protocol::message::{Envelope, Message};

use self::filter::Filter;
-
use self::message::{Envelope, InventoryAnnouncement, NodeFeatures};
+
use self::message::{InventoryAnnouncement, NodeFeatures};

pub const DEFAULT_PORT: u16 = 8776;
pub const PROTOCOL_VERSION: u32 = 1;
@@ -53,6 +53,21 @@ pub type Routing = HashMap<Id, HashSet<NodeId>>;
/// Seconds since epoch.
pub type Timestamp = u64;

+
/// Output of a state transition.
+
#[derive(Debug)]
+
pub enum Io {
+
    /// There are some messages ready to be sent to a peer.
+
    Write(net::SocketAddr, Vec<Envelope>),
+
    /// Connect to a peer.
+
    Connect(net::SocketAddr),
+
    /// Disconnect from a peer.
+
    Disconnect(net::SocketAddr, DisconnectReason),
+
    /// Ask for a wakeup in a specified amount of time.
+
    Wakeup(LocalDuration),
+
    /// Emit an event.
+
    Event(Event),
+
}
+

/// A protocol event.
#[derive(Debug, Clone)]
pub enum Event {
@@ -265,7 +280,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
    }

    /// Get I/O outbox.
-
    pub fn outbox(&mut self) -> &mut VecDeque<Io<Event, DisconnectReason>> {
+
    pub fn outbox(&mut self) -> &mut VecDeque<Io> {
        &mut self.context.io
    }

@@ -619,7 +634,7 @@ impl fmt::Display for DisconnectReason {
}

impl<S, T, G> Iterator for Protocol<S, T, G> {
-
    type Item = Io<Event, DisconnectReason>;
+
    type Item = Io;

    fn next(&mut self) -> Option<Self::Item> {
        self.context.io.pop_front()
@@ -645,7 +660,7 @@ pub struct Context<S, T, G> {
    /// Tracks the location of projects.
    routing: Routing,
    /// Outgoing I/O queue.
-
    io: VecDeque<Io<Event, DisconnectReason>>,
+
    io: VecDeque<Io>,
    /// Clock. Tells the time.
    clock: RefClock,
    /// Project storage.
@@ -792,26 +807,18 @@ impl<S, T, G> Context<S, T, G> {
    }

    fn write_all(&mut self, remote: net::SocketAddr, msgs: impl IntoIterator<Item = Message>) {
-
        let mut buf = Vec::new();
-

-
        for msg in msgs {
-
            debug!("Write {:?} to {}", &msg, remote.ip());
-

-
            let envelope = self.config.network.envelope(msg);
-
            envelope
-
                .encode(&mut buf)
-
                .expect("writing to an in-memory buffer doesn't fail");
-
        }
-
        self.io.push_back(Io::Write(remote, buf));
+
        let envelopes = msgs
+
            .into_iter()
+
            .map(|msg| self.config.network.envelope(msg))
+
            .collect();
+
        self.io.push_back(Io::Write(remote, envelopes));
    }

    fn write(&mut self, remote: net::SocketAddr, msg: Message) {
        debug!("Write {:?} to {}", &msg, remote.ip());

        let envelope = self.config.network.envelope(msg);
-
        let bytes = wire::serialize(&envelope);
-

-
        self.io.push_back(Io::Write(remote, bytes));
+
        self.io.push_back(Io::Write(remote, vec![envelope]));
    }

    /// Broadcast a message to a list of peers.
modified node/src/protocol/wire.rs
@@ -475,7 +475,24 @@ impl<S, T, G> Iterator for Wire<S, T, G> {
    type Item = nakamoto::Io<protocol::Event, protocol::DisconnectReason>;

    fn next(&mut self) -> Option<Self::Item> {
-
        self.inner.next()
+
        match self.inner.next() {
+
            Some(protocol::Io::Write(addr, msgs)) => {
+
                let mut buf = Vec::new();
+
                for msg in msgs {
+
                    log::debug!("Write {:?} to {}", &msg, addr.ip());
+

+
                    msg.encode(&mut buf)
+
                        .expect("writing to an in-memory buffer doesn't fail");
+
                }
+
                Some(nakamoto::Io::Write(addr, buf))
+
            }
+
            Some(protocol::Io::Event(e)) => Some(nakamoto::Io::Event(e)),
+
            Some(protocol::Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
+
            Some(protocol::Io::Disconnect(a, r)) => Some(nakamoto::Io::Disconnect(a, r)),
+
            Some(protocol::Io::Wakeup(d)) => Some(nakamoto::Io::Wakeup(d)),
+

+
            None => None,
+
        }
    }
}

modified node/src/test.rs
@@ -5,5 +5,6 @@ pub(crate) mod fixtures;
pub(crate) mod handle;
pub(crate) mod logger;
pub(crate) mod peer;
+
pub(crate) mod simulator;
pub(crate) mod storage;
pub(crate) mod tests;
modified node/src/test/peer.rs
@@ -3,29 +3,26 @@ use std::ops::{Deref, DerefMut};

use git_url::Url;
use log::*;
-
use nakamoto_net::simulator;
-
use nakamoto_net::Protocol as _;

use crate::address_book::{KnownAddress, Source};
use crate::clock::RefClock;
use crate::collections::HashMap;
-
use crate::decoder::Decoder;
+
use crate::protocol;
use crate::protocol::config::*;
use crate::protocol::message::*;
-
use crate::protocol::wire::Wire;
use crate::protocol::*;
use crate::storage::WriteStorage;
use crate::test::crypto::MockSigner;
-
use crate::transport;
-
use crate::*;
+
use crate::test::simulator;
+
use crate::{Link, LocalTime};

-
/// Transport instantiation used for testing.
-
pub type Transport<S> = transport::Transport<HashMap<net::IpAddr, KnownAddress>, S, MockSigner>;
+
/// Protocol instantiation used for testing.
+
pub type Protocol<S> = protocol::Protocol<HashMap<net::IpAddr, KnownAddress>, S, MockSigner>;

#[derive(Debug)]
pub struct Peer<S> {
    pub name: &'static str,
-
    pub protocol: Transport<S>,
+
    pub protocol: Protocol<S>,
    pub ip: net::IpAddr,
    pub rng: fastrand::Rng,
    pub local_time: LocalTime,
@@ -34,7 +31,7 @@ pub struct Peer<S> {
    initialized: bool,
}

-
impl<'r, S> simulator::Peer<Transport<S>> for Peer<S>
+
impl<'r, S> simulator::Peer<S> for Peer<S>
where
    S: WriteStorage<'r> + 'static,
{
@@ -48,7 +45,7 @@ where
}

impl<S> Deref for Peer<S> {
-
    type Target = Transport<S>;
+
    type Target = Protocol<S>;

    fn deref(&self) -> &Self::Target {
        &self.protocol
@@ -94,14 +91,7 @@ where
        let local_time = LocalTime::now();
        let clock = RefClock::from(local_time);
        let signer = MockSigner::new(&mut rng);
-
        let protocol = Transport::new(Wire::new(Protocol::new(
-
            config,
-
            clock,
-
            storage,
-
            addrs,
-
            signer,
-
            rng.clone(),
-
        )));
+
        let protocol = Protocol::new(config, clock, storage, addrs, signer, rng.clone());
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, rng.u16(..));

@@ -138,13 +128,12 @@ where
    }

    pub fn receive(&mut self, peer: &net::SocketAddr, msg: Message) {
-
        let bytes = wire::serialize(&self.config().network.envelope(msg));
-

-
        self.protocol.received_bytes(peer, &bytes);
+
        self.protocol
+
            .received_message(peer, self.config().network.envelope(msg));
    }

    pub fn connect_from(&mut self, peer: &Self) {
-
        let remote = simulator::Peer::<Transport<S>>::addr(peer);
+
        let remote = simulator::Peer::<S>::addr(peer);
        let local = net::SocketAddr::new(self.ip, self.rng.u16(..));
        let git = format!("file:///{}.git", remote.ip());
        let git = Url::from_bytes(git.as_bytes()).unwrap();
@@ -169,7 +158,7 @@ where
    }

    pub fn connect_to(&mut self, peer: &Self) {
-
        let remote = simulator::Peer::<Transport<S>>::addr(peer);
+
        let remote = simulator::Peer::<S>::addr(peer);

        self.initialize();
        self.protocol.attempted(&remote);
@@ -196,20 +185,16 @@ where

    /// Drain outgoing messages sent from this peer to the remote address.
    pub fn messages(&mut self, remote: &net::SocketAddr) -> impl Iterator<Item = Message> {
-
        let mut stream = Decoder::<Envelope>::new(2048);
        let mut msgs = Vec::new();

        self.protocol.outbox().retain(|o| match o {
-
            Io::Write(a, bytes) if a == remote => {
-
                stream.input(bytes);
+
            protocol::Io::Write(a, envelopes) if a == remote => {
+
                msgs.extend(envelopes.iter().map(|e| e.msg.clone()));
                false
            }
            _ => true,
        });

-
        while let Some(envelope) = stream.decode_next().unwrap() {
-
            msgs.push(envelope.msg);
-
        }
        msgs.into_iter()
    }

@@ -220,7 +205,7 @@ where
    }

    /// Get a draining iterator over the peer's I/O outbox.
-
    pub fn outbox(&mut self) -> impl Iterator<Item = Io<Event, DisconnectReason>> + '_ {
+
    pub fn outbox(&mut self) -> impl Iterator<Item = Io> + '_ {
        self.protocol.outbox().drain(..)
    }
}
added node/src/test/simulator.rs
@@ -0,0 +1,619 @@
+
//! A simple P2P network simulator. Acts as the _reactor_, but without doing any I/O.
+
#![allow(clippy::collapsible_if)]
+

+
#[cfg(feature = "quickcheck")]
+
pub mod arbitrary;
+

+
use std::collections::{BTreeMap, BTreeSet, VecDeque};
+
use std::marker::PhantomData;
+
use std::ops::{Deref, DerefMut, Range};
+
use std::{fmt, io, net};
+

+
use log::*;
+
use nakamoto_net as nakamoto;
+
use nakamoto_net::{Link, LocalDuration, LocalTime};
+

+
use crate::protocol::{DisconnectReason, Envelope, Event, Io};
+
use crate::storage::WriteStorage;
+
use crate::test::peer::Protocol;
+

+
/// Minimum latency between peers.
+
pub const MIN_LATENCY: LocalDuration = LocalDuration::from_millis(1);
+
/// Maximum number of events buffered per peer.
+
pub const MAX_EVENTS: usize = 2048;
+

+
/// Identifier for a simulated node/peer.
+
/// The simulator requires each peer to have a distinct IP address.
+
type NodeId = std::net::IpAddr;
+

+
/// A simulated peer. Protocol instances have to be wrapped in this type to be simulated.
+
pub trait Peer<S>: Deref<Target = Protocol<S>> + DerefMut<Target = Protocol<S>> + 'static {
+
    /// Initialize the peer. This should at minimum initialize the protocol with the
+
    /// current time.
+
    fn init(&mut self);
+
    /// Get the peer address.
+
    fn addr(&self) -> net::SocketAddr;
+
}
+

+
/// Simulated protocol input.
+
#[derive(Debug, Clone)]
+
pub enum Input {
+
    /// Connection attempt underway.
+
    Connecting {
+
        /// Remote peer address.
+
        addr: net::SocketAddr,
+
    },
+
    /// New connection with a peer.
+
    Connected {
+
        /// Remote peer id.
+
        addr: net::SocketAddr,
+
        /// Local peer id.
+
        local_addr: net::SocketAddr,
+
        /// Link direction.
+
        link: Link,
+
    },
+
    /// Disconnected from peer.
+
    Disconnected(
+
        net::SocketAddr,
+
        nakamoto::DisconnectReason<DisconnectReason>,
+
    ),
+
    /// Received a message from a remote peer.
+
    Received(net::SocketAddr, Vec<Envelope>),
+
    /// Used to advance the state machine after some wall time has passed.
+
    Wake,
+
}
+

+
/// A scheduled protocol input.
+
#[derive(Debug, Clone)]
+
pub struct Scheduled {
+
    /// The node for which this input is scheduled.
+
    pub node: NodeId,
+
    /// The remote peer from which this input originates.
+
    /// If the input originates from the local node, this should be set to the zero address.
+
    pub remote: net::SocketAddr,
+
    /// The input being scheduled.
+
    pub input: Input,
+
}
+

+
impl fmt::Display for Scheduled {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match &self.input {
+
            Input::Received(from, msgs) => {
+
                for msg in msgs {
+
                    write!(f, "{} <- {} ({:?})", self.node, from, msg)?;
+
                }
+
                Ok(())
+
            }
+
            Input::Connected {
+
                addr,
+
                local_addr,
+
                link: Link::Inbound,
+
                ..
+
            } => write!(f, "{} <== {}: Connected", local_addr, addr),
+
            Input::Connected {
+
                local_addr,
+
                addr,
+
                link: Link::Outbound,
+
                ..
+
            } => write!(f, "{} ==> {}: Connected", local_addr, addr),
+
            Input::Connecting { addr } => {
+
                write!(f, "{} => {}: Connecting", self.node, addr)
+
            }
+
            Input::Disconnected(addr, reason) => {
+
                write!(f, "{} =/= {}: Disconnected: {}", self.node, addr, reason)
+
            }
+
            Input::Wake => {
+
                write!(f, "{}: Tock", self.node)
+
            }
+
        }
+
    }
+
}
+

+
/// Inbox of scheduled state machine inputs to be delivered to the simulated nodes.
+
#[derive(Debug)]
+
pub struct Inbox {
+
    /// The set of scheduled inputs. We use a `BTreeMap` to ensure inputs are always
+
    /// ordered by scheduled delivery time.
+
    messages: BTreeMap<LocalTime, Scheduled>,
+
}
+

+
impl Inbox {
+
    /// Add a scheduled input to the inbox.
+
    fn insert(&mut self, mut time: LocalTime, msg: Scheduled) {
+
        // Make sure we don't overwrite an existing message by using the same time slot.
+
        while self.messages.contains_key(&time) {
+
            time = time + MIN_LATENCY;
+
        }
+
        self.messages.insert(time, msg);
+
    }
+

+
    /// Get the next scheduled input to be delivered.
+
    fn next(&mut self) -> Option<(LocalTime, Scheduled)> {
+
        self.messages
+
            .iter()
+
            .next()
+
            .map(|(time, scheduled)| (*time, scheduled.clone()))
+
    }
+

+
    /// Get the last message sent between two peers. Only checks one direction.
+
    fn last(&self, node: &NodeId, remote: &net::SocketAddr) -> Option<(&LocalTime, &Scheduled)> {
+
        self.messages
+
            .iter()
+
            .rev()
+
            .find(|(_, v)| &v.node == node && &v.remote == remote)
+
    }
+
}
+

+
/// Simulation options.
+
#[derive(Debug, Clone)]
+
pub struct Options {
+
    /// Minimum and maximum latency between nodes, in seconds.
+
    pub latency: Range<u64>,
+
    /// Probability that network I/O fails.
+
    /// A rate of `1.0` means 100% of I/O fails.
+
    pub failure_rate: f64,
+
}
+

+
impl Default for Options {
+
    fn default() -> Self {
+
        Self {
+
            latency: Range::default(),
+
            failure_rate: 0.,
+
        }
+
    }
+
}
+

+
/// A peer-to-peer node simulation.
+
pub struct Simulation<S> {
+
    /// Inbox of inputs to be delivered by the simulation.
+
    inbox: Inbox,
+
    /// Events emitted during simulation.
+
    events: BTreeMap<NodeId, VecDeque<Event>>,
+
    /// Priority events that should happen immediately.
+
    priority: VecDeque<Scheduled>,
+
    /// Simulated latencies between nodes.
+
    latencies: BTreeMap<(NodeId, NodeId), LocalDuration>,
+
    /// Network partitions between two nodes.
+
    partitions: BTreeSet<(NodeId, NodeId)>,
+
    /// Set of existing connections between nodes.
+
    connections: BTreeMap<(NodeId, NodeId), u16>,
+
    /// Set of connection attempts.
+
    attempts: BTreeSet<(NodeId, NodeId)>,
+
    /// Simulation options.
+
    opts: Options,
+
    /// Start time of simulation.
+
    start_time: LocalTime,
+
    /// Current simulation time. Updated when a scheduled message is processed.
+
    time: LocalTime,
+
    /// RNG.
+
    rng: fastrand::Rng,
+
    /// Storage type.
+
    storage: PhantomData<S>,
+
}
+

+
impl<'r, S: WriteStorage<'r>> Simulation<S> {
+
    /// Create a new simulation.
+
    pub fn new(time: LocalTime, rng: fastrand::Rng, opts: Options) -> Self {
+
        Self {
+
            inbox: Inbox {
+
                messages: BTreeMap::new(),
+
            },
+
            events: BTreeMap::new(),
+
            priority: VecDeque::new(),
+
            partitions: BTreeSet::new(),
+
            latencies: BTreeMap::new(),
+
            connections: BTreeMap::new(),
+
            attempts: BTreeSet::new(),
+
            opts,
+
            start_time: time,
+
            time,
+
            rng,
+
            storage: PhantomData,
+
        }
+
    }
+

+
    /// Check whether the simulation is done, ie. there are no more messages to process.
+
    pub fn is_done(&self) -> bool {
+
        self.inbox.messages.is_empty()
+
    }
+

+
    /// Total amount of simulated time elapsed.
+
    #[allow(dead_code)]
+
    pub fn elapsed(&self) -> LocalDuration {
+
        self.time - self.start_time
+
    }
+

+
    /// Check whether the simulation has settled, ie. the only messages left to process
+
    /// are (periodic) timeouts.
+
    pub fn is_settled(&self) -> bool {
+
        self.inbox
+
            .messages
+
            .iter()
+
            .all(|(_, s)| matches!(s.input, Input::Wake))
+
    }
+

+
    /// Get a node's emitted events.
+
    pub fn events(&mut self, node: &NodeId) -> impl Iterator<Item = Event> + '_ {
+
        self.events.entry(*node).or_default().drain(..)
+
    }
+

+
    /// Get the latency between two nodes. The minimum latency between nodes is 1 millisecond.
+
    pub fn latency(&self, from: NodeId, to: NodeId) -> LocalDuration {
+
        self.latencies
+
            .get(&(from, to))
+
            .cloned()
+
            .map(|l| {
+
                if l <= MIN_LATENCY {
+
                    l
+
                } else {
+
                    // Create variance in the latency. The resulting latency
+
                    // will be between half, and two times the base latency.
+
                    let millis = l.as_millis();
+

+
                    if self.rng.bool() {
+
                        // More latency.
+
                        LocalDuration::from_millis(millis + self.rng.u128(0..millis))
+
                    } else {
+
                        // Less latency.
+
                        LocalDuration::from_millis(millis - self.rng.u128(0..millis / 2))
+
                    }
+
                }
+
            })
+
            .unwrap_or_else(|| MIN_LATENCY)
+
    }
+

+
    /// Initialize peers.
+
    pub fn initialize<'a, P>(self, peers: impl IntoIterator<Item = &'a mut P>) -> Self
+
    where
+
        P: Peer<S>,
+
    {
+
        for peer in peers.into_iter() {
+
            peer.init();
+
        }
+
        self
+
    }
+

+
    /// Run the simulation while the given predicate holds.
+
    pub fn run_while<'a, P>(
+
        &mut self,
+
        peers: impl IntoIterator<Item = &'a mut P>,
+
        pred: impl Fn(&Self) -> bool,
+
    ) where
+
        P: Peer<S>,
+
    {
+
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.addr().ip(), p)).collect();
+

+
        while self.step_(&mut nodes) {
+
            if !pred(self) {
+
                break;
+
            }
+
        }
+
    }
+

+
    /// Process one scheduled input from the inbox, using the provided peers.
+
    /// This function should be called until it returns `false`, or some desired state is reached.
+
    /// Returns `true` if there are more messages to process.
+
    pub fn step<'a, P: Peer<S>>(&mut self, peers: impl IntoIterator<Item = &'a mut P>) -> bool {
+
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.addr().ip(), p)).collect();
+
        self.step_(&mut nodes)
+
    }
+

+
    fn step_<P: Peer<S>>(&mut self, nodes: &mut BTreeMap<NodeId, &mut P>) -> bool {
+
        if !self.opts.latency.is_empty() {
+
            // Configure latencies.
+
            for (i, from) in nodes.keys().enumerate() {
+
                for to in nodes.keys().skip(i + 1) {
+
                    let range = self.opts.latency.clone();
+
                    let latency = LocalDuration::from_millis(
+
                        self.rng
+
                            .u128(range.start as u128 * 1_000..range.end as u128 * 1_000),
+
                    );
+

+
                    self.latencies.entry((*from, *to)).or_insert(latency);
+
                    self.latencies.entry((*to, *from)).or_insert(latency);
+
                }
+
            }
+
        }
+

+
        // Create and heal partitions.
+
        // TODO: These aren't really "network" partitions, as they are only
+
        // between individual nodes. We need to think about more realistic
+
        // scenarios. We should also think about creating various network
+
        // topologies.
+
        if self.time.as_secs() % 10 == 0 {
+
            for (i, x) in nodes.keys().enumerate() {
+
                for y in nodes.keys().skip(i + 1) {
+
                    if self.is_fallible() {
+
                        self.partitions.insert((*x, *y));
+
                    } else {
+
                        self.partitions.remove(&(*x, *y));
+
                    }
+
                }
+
            }
+
        }
+

+
        // Schedule any messages in the pipes.
+
        for peer in nodes.values_mut() {
+
            let ip = peer.addr().ip();
+

+
            for o in peer.by_ref() {
+
                self.schedule(&ip, o);
+
            }
+
        }
+
        // Next high-priority message.
+
        let priority = self.priority.pop_front().map(|s| (self.time, s));
+

+
        if let Some((time, next)) = priority.or_else(|| self.inbox.next()) {
+
            let elapsed = (time - self.start_time).as_millis();
+
            if matches!(next.input, Input::Wake) {
+
                trace!(target: "sim", "{:05} {}", elapsed, next);
+
            } else {
+
                // TODO: This can be confusing, since this event may not actually be passed to
+
                // the protocol. It would be best to only log the events that are being sent
+
                // to the protocol, or to log when an input is being dropped.
+
                info!(target: "sim", "{:05} {} ({})", elapsed, next, self.inbox.messages.len());
+
            }
+
            assert!(time >= self.time, "Time only moves forwards!");
+

+
            self.time = time;
+
            self.inbox.messages.remove(&time);
+

+
            let Scheduled { input, node, .. } = next;
+

+
            if let Some(ref mut p) = nodes.get_mut(&node) {
+
                p.tick(time);
+

+
                match input {
+
                    Input::Connecting { addr } => {
+
                        if self.attempts.insert((node, addr.ip())) {
+
                            p.attempted(&addr);
+
                        }
+
                    }
+
                    Input::Connected {
+
                        addr,
+
                        local_addr,
+
                        link,
+
                    } => {
+
                        let conn = (node, addr.ip());
+

+
                        let attempted = link.is_outbound() && self.attempts.remove(&conn);
+
                        if attempted || link.is_inbound() {
+
                            if self.connections.insert(conn, local_addr.port()).is_none() {
+
                                p.connected(addr, &local_addr, link);
+
                            }
+
                        }
+
                    }
+
                    Input::Disconnected(addr, reason) => {
+
                        let conn = (node, addr.ip());
+
                        let attempt = self.attempts.remove(&conn);
+
                        let connection = self.connections.remove(&conn).is_some();
+

+
                        // Can't be both attempting and connected.
+
                        assert!(!(attempt && connection));
+

+
                        if attempt || connection {
+
                            p.disconnected(&addr, reason);
+
                        }
+
                    }
+
                    Input::Wake => p.wake(),
+
                    Input::Received(addr, msgs) => {
+
                        for msg in msgs {
+
                            p.received_message(&addr, msg);
+
                        }
+
                    }
+
                }
+
                for o in p.by_ref() {
+
                    self.schedule(&node, o);
+
                }
+
            } else {
+
                panic!(
+
                    "Node {} not found when attempting to schedule {:?}",
+
                    node, input
+
                );
+
            }
+
        }
+
        !self.is_done()
+
    }
+

+
    /// Process a protocol output event from a node.
+
    pub fn schedule(&mut self, node: &NodeId, out: Io) {
+
        let node = *node;
+

+
        match out {
+
            Io::Write(receiver, msgs) => {
+
                if msgs.is_empty() {
+
                    return;
+
                }
+
                // If the other end has disconnected the sender with some latency, there may not be
+
                // a connection remaining to use.
+
                let port = if let Some(port) = self.connections.get(&(node, receiver.ip())) {
+
                    *port
+
                } else {
+
                    return;
+
                };
+

+
                let sender: net::SocketAddr = (node, port).into();
+
                if self.is_partitioned(sender.ip(), receiver.ip()) {
+
                    // Drop message if nodes are partitioned.
+
                    info!(
+
                        target: "sim",
+
                        "{} -> {} (DROPPED)",
+
                         sender, receiver,
+
                    );
+
                    return;
+
                }
+

+
                // Schedule message in the future, ensuring messages don't arrive out-of-order
+
                // between two peers.
+
                let latency = self.latency(node, receiver.ip());
+
                let time = self
+
                    .inbox
+
                    .last(&receiver.ip(), &sender)
+
                    .map(|(k, _)| *k)
+
                    .unwrap_or_else(|| self.time);
+
                let time = time + latency;
+
                let elapsed = (time - self.start_time).as_millis();
+

+
                for msg in &msgs {
+
                    info!(
+
                        target: "sim",
+
                        "{:05} {} -> {} ({:?}) (+{})",
+
                        elapsed, sender, receiver, msg, latency
+
                    );
+
                }
+

+
                self.inbox.insert(
+
                    time,
+
                    Scheduled {
+
                        remote: sender,
+
                        node: receiver.ip(),
+
                        input: Input::Received(sender, msgs),
+
                    },
+
                );
+
            }
+
            Io::Connect(remote) => {
+
                assert!(remote.ip() != node, "self-connections are not allowed");
+

+
                // Create an ephemeral sockaddr for the connecting (local) node.
+
                let local_addr: net::SocketAddr = net::SocketAddr::new(node, self.rng.u16(8192..));
+
                let latency = self.latency(node, remote.ip());
+

+
                self.inbox.insert(
+
                    self.time + MIN_LATENCY,
+
                    Scheduled {
+
                        node,
+
                        remote,
+
                        input: Input::Connecting { addr: remote },
+
                    },
+
                );
+

+
                // Fail to connect if the nodes are partitioned.
+
                if self.is_partitioned(node, remote.ip()) {
+
                    log::info!(target: "sim", "{} -/-> {} (partitioned)", node, remote.ip());
+

+
                    // Sometimes, the protocol gets a failure input, other times it just hangs.
+
                    if self.rng.bool() {
+
                        self.inbox.insert(
+
                            self.time + MIN_LATENCY,
+
                            Scheduled {
+
                                node,
+
                                remote,
+
                                input: Input::Disconnected(
+
                                    remote,
+
                                    nakamoto::DisconnectReason::ConnectionError(
+
                                        io::Error::from(io::ErrorKind::UnexpectedEof).into(),
+
                                    ),
+
                                ),
+
                            },
+
                        );
+
                    }
+
                    return;
+
                }
+

+
                self.inbox.insert(
+
                    // The remote will get the connection attempt with some latency.
+
                    self.time + latency,
+
                    Scheduled {
+
                        node: remote.ip(),
+
                        remote: local_addr,
+
                        input: Input::Connected {
+
                            addr: local_addr,
+
                            local_addr: remote,
+
                            link: Link::Inbound,
+
                        },
+
                    },
+
                );
+
                self.inbox.insert(
+
                    // The local node will have established the connection after some latency.
+
                    self.time + latency,
+
                    Scheduled {
+
                        remote,
+
                        node,
+
                        input: Input::Connected {
+
                            addr: remote,
+
                            local_addr,
+
                            link: Link::Outbound,
+
                        },
+
                    },
+
                );
+
            }
+
            Io::Disconnect(remote, reason) => {
+
                // The local node is immediately disconnected.
+
                self.priority.push_back(Scheduled {
+
                    remote,
+
                    node,
+
                    input: Input::Disconnected(remote, reason.into()),
+
                });
+

+
                // Nb. It's possible for disconnects to happen simultaneously from both ends, hence
+
                // it can be that a node will try to disconnect a remote that is already
+
                // disconnected from the other side.
+
                //
+
                // It's also possible that the connection was only attempted and never succeeded,
+
                // in which case we would return here.
+
                let port = if let Some(port) = self.connections.get(&(node, remote.ip())) {
+
                    *port
+
                } else {
+
                    debug!(target: "sim", "Ignoring disconnect of {remote} from {node}");
+
                    return;
+
                };
+
                let local_addr: net::SocketAddr = (node, port).into();
+
                let latency = self.latency(node, remote.ip());
+

+
                // The remote node receives the disconnection with some delay.
+
                self.inbox.insert(
+
                    self.time + latency,
+
                    Scheduled {
+
                        node: remote.ip(),
+
                        remote: local_addr,
+
                        input: Input::Disconnected(
+
                            local_addr,
+
                            nakamoto::DisconnectReason::ConnectionError(
+
                                io::Error::from(io::ErrorKind::ConnectionReset).into(),
+
                            ),
+
                        ),
+
                    },
+
                );
+
            }
+
            Io::Wakeup(duration) => {
+
                let time = self.time + duration;
+

+
                if !matches!(
+
                    self.inbox.messages.get(&time),
+
                    Some(Scheduled {
+
                        input: Input::Wake,
+
                        ..
+
                    })
+
                ) {
+
                    self.inbox.insert(
+
                        time,
+
                        Scheduled {
+
                            node,
+
                            // The remote is not applicable for this type of output.
+
                            remote: ([0, 0, 0, 0], 0).into(),
+
                            input: Input::Wake,
+
                        },
+
                    );
+
                }
+
            }
+
            Io::Event(event) => {
+
                let events = self.events.entry(node).or_insert_with(VecDeque::new);
+
                if events.len() >= MAX_EVENTS {
+
                    warn!(target: "sim", "Dropping event: buffer is full");
+
                } else {
+
                    events.push_back(event);
+
                }
+
            }
+
        }
+
    }
+

+
    /// Check whether we should fail the next operation.
+
    fn is_fallible(&self) -> bool {
+
        self.rng.f64() % 1.0 < self.opts.failure_rate
+
    }
+

+
    /// Check whether two nodes are partitioned.
+
    fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
+
        self.partitions.contains(&(a, b)) || self.partitions.contains(&(b, a))
+
    }
+
}
modified node/src/test/tests.rs
@@ -3,9 +3,6 @@ use std::sync::Arc;

use crossbeam_channel as chan;
use nakamoto_net as nakamoto;
-
use nakamoto_net::simulator;
-
use nakamoto_net::simulator::{Peer as _, Simulation};
-
use nakamoto_net::Protocol as _;

use crate::collections::{HashMap, HashSet};
use crate::protocol::config::*;
@@ -18,8 +15,11 @@ use crate::test::fixtures;
#[allow(unused)]
use crate::test::logger;
use crate::test::peer::Peer;
+
use crate::test::simulator;
+
use crate::test::simulator::{Peer as _, Simulation};
use crate::test::storage::MockStorage;
-
use crate::*;
+
use crate::{assert_matches, Link, LocalTime};
+
use crate::{client, identity, protocol, rad, storage, test};

// NOTE
//