Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src test simulator.rs
//! A simple P2P network simulator. Acts as the _reactor_, but without doing any I/O.

use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
use std::rc::Rc;
use std::sync::Arc;
use std::{fmt, io, net};

use localtime::{LocalDuration, LocalTime};
use log::*;
use radicle::node::NodeId;
use radicle::node::events::Event;
use radicle_protocol::worker::FetchError;

use crate::Link;
use crate::crypto;
use crate::prelude::{Address, RepoId};
use crate::service::io::Io;
use crate::service::{DisconnectReason, Message, Metrics};
use crate::storage::Namespaces;
use crate::storage::{ReadRepository, WriteStorage};
use crate::test::arbitrary;
use crate::test::peer::Service;
use crate::worker::fetch;

/// Minimum latency between peers.
pub const MIN_LATENCY: LocalDuration = LocalDuration::from_millis(1);
/// Maximum number of events buffered per peer.
pub const MAX_EVENTS: usize = 2048;

/// A simulated peer. Service instances have to be wrapped in this type to be simulated.
pub trait Peer<S, G>:
    Deref<Target = Service<S, G>> + DerefMut<Target = Service<S, G>> + 'static
{
    /// Initialize the peer. This should at minimum initialize the service with the
    /// current time.
    fn init(&mut self);
    /// Get the peer address.
    fn addr(&self) -> Address;
    /// Get the peer id.
    fn id(&self) -> NodeId;
}

/// Simulated service input.
#[derive(Debug, Clone)]
pub enum Input {
    /// Connection attempt underway.
    Connecting {
        /// Remote peer id.
        id: NodeId,
        /// Address used to connect.
        addr: Address,
    },
    /// New connection with a peer.
    Connected {
        /// Remote peer id.
        id: NodeId,
        /// Remote peer address.
        addr: Address,
        /// Link direction.
        link: Link,
    },
    /// Disconnected from peer.
    Disconnected(NodeId, Rc<DisconnectReason>),
    /// Received messages from a remote peer.
    Received(NodeId, Vec<Message>),
    /// Fetch completed for a node.
    Fetched(RepoId, NodeId, Rc<Result<fetch::FetchResult, FetchError>>),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
}

/// A scheduled service input.
#[derive(Debug, Clone)]
pub struct Scheduled {
    /// The node for which this input is scheduled.
    pub node: NodeId,
    /// The remote peer from which this input originates.
    /// If the input originates from the local node, this should be set to the zero address.
    pub remote: NodeId,
    /// The input being scheduled.
    pub input: Input,
}

impl fmt::Display for Scheduled {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match &self.input {
            Input::Received(from, msgs) => {
                write!(f, "{} <- {} ({:?})", self.node, from, msgs)
            }
            Input::Connected {
                id: addr,
                link: Link::Inbound,
                ..
            } => write!(f, "{} <== {}: Connected", self.node, addr),
            Input::Connected {
                id: addr,
                link: Link::Outbound,
                ..
            } => write!(f, "{} ==> {}: Connected", self.node, addr),
            Input::Connecting { id, .. } => {
                write!(f, "{} => {}: Connecting", self.node, id)
            }
            Input::Disconnected(addr, reason) => {
                write!(f, "{} =/= {}: Disconnected: {}", self.node, addr, reason)
            }
            Input::Wake => {
                write!(f, "{}: Tock", self.node)
            }
            Input::Fetched(rid, nid, _) => {
                write!(f, "{} <<~ {} ({}): Fetched", self.node, nid, rid)
            }
        }
    }
}

/// Inbox of scheduled state machine inputs to be delivered to the simulated nodes.
#[derive(Debug)]
pub struct Inbox {
    /// The set of scheduled inputs. We use a `BTreeMap` to ensure inputs are always
    /// ordered by scheduled delivery time.
    messages: BTreeMap<LocalTime, Scheduled>,
}

impl Inbox {
    /// Add a scheduled input to the inbox.
    fn insert(&mut self, mut time: LocalTime, msg: Scheduled) {
        // Make sure we don't overwrite an existing message by using the same time slot.
        while self.messages.contains_key(&time) {
            time = time + MIN_LATENCY;
        }
        self.messages.insert(time, msg);
    }

    /// Get the next scheduled input to be delivered.
    fn next(&mut self) -> Option<(LocalTime, Scheduled)> {
        self.messages
            .iter()
            .next()
            .map(|(time, scheduled)| (*time, scheduled.clone()))
    }

    /// Get the last message sent between two peers. Only checks one direction.
    fn last(&self, node: &NodeId, remote: &NodeId) -> Option<(&LocalTime, &Scheduled)> {
        self.messages
            .iter()
            .rev()
            .find(|(_, v)| &v.node == node && &v.remote == remote)
    }
}

/// Simulation options.
#[derive(Debug, Clone)]
pub struct Options {
    /// Minimum and maximum latency between nodes, in seconds.
    pub latency: Range<u64>,
    /// Probability that network I/O fails.
    /// A rate of `1.0` means 100% of I/O fails.
    pub failure_rate: f64,
}

impl Default for Options {
    fn default() -> Self {
        Self {
            latency: Range::default(),
            failure_rate: 0.,
        }
    }
}

/// A peer-to-peer node simulation.
pub struct Simulation<S, G> {
    /// Inbox of inputs to be delivered by the simulation.
    inbox: Inbox,
    /// Events emitted during simulation.
    events: BTreeMap<NodeId, VecDeque<Event>>,
    /// Messages received during simulation.
    messages: Vec<(NodeId, NodeId, Message)>,
    /// Priority events that should happen immediately.
    priority: VecDeque<Scheduled>,
    /// Simulated latencies between nodes.
    latencies: BTreeMap<(NodeId, NodeId), LocalDuration>,
    /// Network partitions between two nodes.
    partitions: BTreeSet<(NodeId, NodeId)>,
    /// Set of existing connections between nodes.
    connections: BTreeSet<(NodeId, NodeId)>,
    /// Set of connection attempts.
    attempts: BTreeSet<(NodeId, NodeId)>,
    /// Simulation options.
    opts: Options,
    /// Start time of simulation.
    start_time: LocalTime,
    /// Current simulation time. Updated when a scheduled message is processed.
    time: LocalTime,
    /// RNG.
    rng: RefCell<fastrand::Rng>,
    /// Storage type.
    storage: PhantomData<S>,
    /// Signer type.
    signer: PhantomData<G>,
}

impl<S, G> Simulation<S, G>
where
    S: WriteStorage + 'static,
    G: crypto::signature::Signer<crypto::Signature>,
{
    /// Create a new simulation.
    pub fn new(time: LocalTime, rng: fastrand::Rng, opts: Options) -> Self {
        Self {
            inbox: Inbox {
                messages: BTreeMap::new(),
            },
            events: BTreeMap::new(),
            messages: Vec::new(),
            priority: VecDeque::new(),
            partitions: BTreeSet::new(),
            latencies: BTreeMap::new(),
            connections: BTreeSet::new(),
            attempts: BTreeSet::new(),
            opts,
            start_time: time,
            time,
            rng: RefCell::new(rng),
            storage: PhantomData,
            signer: PhantomData,
        }
    }

    /// Check whether the simulation is done, ie. there are no more messages to process.
    pub fn is_done(&self) -> bool {
        self.inbox.messages.is_empty()
    }

    /// Total amount of simulated time elapsed.
    #[allow(dead_code)]
    pub fn elapsed(&self) -> LocalDuration {
        self.time - self.start_time
    }

    /// Check whether the simulation has settled, ie. the only messages left to process
    /// are (periodic) timeouts.
    pub fn is_settled(&self) -> bool {
        self.inbox
            .messages
            .iter()
            .all(|(_, s)| matches!(s.input, Input::Wake))
    }

    /// Get a node's emitted events.
    pub fn events(&mut self, node: &NodeId) -> impl Iterator<Item = Event> + '_ {
        self.events.entry(*node).or_default().drain(..)
    }

    /// Get all messages received by nodes during the simulation.
    pub fn messages(&mut self) -> &[(NodeId, NodeId, Message)] {
        &self.messages
    }

    /// Get the latency between two nodes. The minimum latency between nodes is 1 millisecond.
    pub fn latency(&self, from: NodeId, to: NodeId) -> LocalDuration {
        self.latencies
            .get(&(from, to))
            .cloned()
            .map(|l| if l < MIN_LATENCY { MIN_LATENCY } else { l })
            .unwrap_or_else(|| MIN_LATENCY)
    }

    /// Initialize peers.
    pub fn initialize<'a, P>(self, peers: impl IntoIterator<Item = &'a mut P>) -> Self
    where
        P: Peer<S, G>,
    {
        for peer in peers.into_iter() {
            peer.init();
        }
        self
    }

    /// Run the simulation while the given predicate holds.
    pub fn run_while<'a, P>(
        &mut self,
        peers: impl IntoIterator<Item = &'a mut P>,
        pred: impl Fn(&Self) -> bool,
    ) where
        P: Peer<S, G>,
    {
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.id(), p)).collect();

        self.messages.clear();
        self.events.clear();
        self.start_time = self.time;

        while self.step_(&mut nodes) {
            if !pred(self) {
                break;
            }
        }
    }

    /// Process one scheduled input from the inbox, using the provided peers.
    /// This function should be called until it returns `false`, or some desired state is reached.
    /// Returns `true` if there are more messages to process.
    pub fn step<'a, P: Peer<S, G>>(&mut self, peers: impl IntoIterator<Item = &'a mut P>) -> bool {
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.id(), p)).collect();
        self.step_(&mut nodes)
    }

    fn step_<P: Peer<S, G>>(&mut self, nodes: &mut BTreeMap<NodeId, &mut P>) -> bool {
        if !self.opts.latency.is_empty() {
            // Configure latencies.
            for (i, from) in nodes.keys().enumerate() {
                for to in nodes.keys().skip(i + 1) {
                    let range = self.opts.latency.clone();
                    let latency = LocalDuration::from_millis(
                        self.rng
                            .borrow_mut()
                            .u128(range.start as u128 * 1_000..range.end as u128 * 1_000),
                    );

                    self.latencies.entry((*from, *to)).or_insert(latency);
                    self.latencies.entry((*to, *from)).or_insert(latency);
                }
            }
        }

        // Create and heal partitions.
        // TODO: These aren't really "network" partitions, as they are only
        // between individual nodes. We need to think about more realistic
        // scenarios. We should also think about creating various network
        // topologies.
        if self.time.as_secs() % 10 == 0 {
            for (i, x) in nodes.keys().enumerate() {
                for y in nodes.keys().skip(i + 1) {
                    if self.is_fallible() {
                        self.partitions.insert((*x, *y));
                    } else {
                        self.partitions.remove(&(*x, *y));
                    }
                }
            }
        }

        // Schedule any messages in the pipes.
        for peer in nodes.values_mut() {
            let id = peer.id();

            while let Some(o) = peer.next() {
                self.schedule(&id, o);
            }
        }
        // Next high-priority message.
        let priority = self.priority.pop_front().map(|s| (self.time, s));

        if let Some((time, next)) = priority.or_else(|| self.inbox.next()) {
            let elapsed = (time - self.start_time).as_millis();
            if matches!(next.input, Input::Wake) {
                trace!(target: "sim", "{elapsed:05} {next}");
            } else {
                // TODO: This can be confusing, since this event may not actually be passed to
                // the service. It would be best to only log the events that are being sent
                // to the service, or to log when an input is being dropped.
                info!(target: "sim", "{:05} {} ({})", elapsed, next, self.inbox.messages.len());
            }
            assert!(time >= self.time, "Time only moves forwards!");

            self.time = time;
            self.inbox.messages.remove(&time);

            let Scheduled { input, node, .. } = next;

            if let Some(ref mut p) = nodes.get_mut(&node) {
                p.tick(time, &Metrics::default());

                match input {
                    Input::Connecting { id, addr } => {
                        if self.attempts.insert((node, id)) {
                            // TODO: Also call `inbound` for inbound attempts.
                            p.attempted(id, addr);
                        }
                    }
                    Input::Connected { id, addr, link } => {
                        let conn = (node, id);

                        let attempted = link.is_outbound() && self.attempts.remove(&conn);
                        if (attempted || link.is_inbound()) && self.connections.insert(conn) {
                            p.connected(id, addr, link);
                        }
                    }
                    Input::Disconnected(id, reason) => {
                        let conn = (node, id);
                        let attempt = self.attempts.remove(&conn);
                        let connection = self.connections.remove(&conn);

                        // FIXME: This shouldn't happen, but it does when latency is introduced.
                        if attempt && connection {
                            log::error!(target: "sim", "Connection is attempted and connected at the same time");
                        }
                        if attempt || connection {
                            p.disconnected(id, Link::Inbound, &reason);
                            p.disconnected(id, Link::Outbound, &reason);
                        }
                    }
                    Input::Wake => p.wake(),
                    Input::Received(from, msgs) => {
                        for msg in msgs.clone() {
                            p.received_message(from, msg);
                        }
                        self.messages
                            .extend(msgs.into_iter().map(|m| (from, p.node_id(), m)));
                    }
                    Input::Fetched(rid, nid, result) => {
                        let mut result = Rc::try_unwrap(result).unwrap();
                        let repo = match p.storage().repository_mut(rid) {
                            Ok(repo) => repo,
                            Err(e) if e.is_not_found() => p.storage().create(rid).unwrap(),
                            Err(e) => panic!("Failed to open repository: {e}"),
                        };

                        match &mut result {
                            Ok(fetch::FetchResult {
                                namespaces,
                                updated,
                                doc,
                                ..
                            }) => {
                                *updated =
                                    radicle::test::fetch(&repo, &nid, Namespaces::All).unwrap();
                                *namespaces = updated
                                    .iter()
                                    .map(|r| {
                                        NodeId::from_namespaced(&r.name().to_namespaced().unwrap())
                                            .unwrap()
                                    })
                                    .collect();
                                *doc = repo.identity_doc().unwrap();
                            }
                            Err(err) => panic!("Error fetching: {err}"),
                        }
                        p.fetched(rid, nid, result);
                    }
                }
                while let Some(o) = p.next() {
                    self.schedule(&node, o);
                }
            } else {
                panic!("Node {node} not found when attempting to schedule {input:?}",);
            }
        }
        !self.is_done()
    }

    /// Process a service output event from a node.
    pub fn schedule(&mut self, node: &NodeId, out: Io) {
        let node = *node;

        match out {
            Io::Write(receiver, msgs) => {
                if msgs.is_empty() {
                    return;
                }
                // If the other end has disconnected the sender with some latency, there may not be
                // a connection remaining to use.
                if !self.connections.contains(&(node, receiver)) {
                    return;
                }
                let sender = node;

                if self.is_partitioned(sender, receiver) {
                    // Drop message if nodes are partitioned.
                    info!(
                        target: "sim",
                        "{sender} -> {receiver} (DROPPED)",
                    );
                    return;
                }

                // Schedule message in the future, ensuring messages don't arrive out-of-order
                // between two peers.
                let latency = self.latency(node, receiver);
                let time = self
                    .inbox
                    .last(&receiver, &sender)
                    .map(|(k, _)| *k)
                    .unwrap_or_else(|| self.time);
                let time = time + latency;
                let elapsed = (time - self.start_time).as_millis();

                for msg in &msgs {
                    info!(
                        target: "sim",
                        "{elapsed:05} {sender} -> {receiver} ({msg:?}) (+{latency})"
                    );
                }

                self.inbox.insert(
                    time,
                    Scheduled {
                        remote: sender,
                        node: receiver,
                        input: Input::Received(sender, msgs),
                    },
                );
            }
            Io::Connect(remote, addr) => {
                assert!(remote != node, "self-connections are not allowed");

                self.inbox.insert(
                    self.time + MIN_LATENCY,
                    Scheduled {
                        node,
                        remote,
                        input: Input::Connecting {
                            id: remote,
                            addr: addr.clone(),
                        },
                    },
                );

                // Fail to connect if the nodes are partitioned.
                if self.is_partitioned(node, remote) {
                    log::info!(target: "sim", "{node} -/-> {remote} (partitioned)");

                    // Sometimes, the service gets a failure input, other times it just hangs.
                    if self.rng.borrow_mut().bool() {
                        self.inbox.insert(
                            self.time + MIN_LATENCY,
                            Scheduled {
                                node,
                                remote,
                                input: Input::Disconnected(
                                    remote,
                                    Rc::new(DisconnectReason::Connection(Arc::new(
                                        io::Error::from(io::ErrorKind::UnexpectedEof),
                                    ))),
                                ),
                            },
                        );
                    }
                    return;
                }

                let latency = MIN_LATENCY + self.latency(node, remote);

                self.inbox.insert(
                    // The remote will get the connection attempt with some latency.
                    self.time + latency,
                    Scheduled {
                        node: remote,
                        remote: node,
                        input: Input::Connected {
                            id: node,
                            addr: Address::from(net::SocketAddr::from(([0, 0, 0, 0], 0))),
                            link: Link::Inbound,
                        },
                    },
                );
                self.inbox.insert(
                    // The local node will have established the connection after some latency.
                    self.time + latency,
                    Scheduled {
                        remote,
                        node,
                        input: Input::Connected {
                            id: remote,
                            addr,
                            link: Link::Outbound,
                        },
                    },
                );
            }
            Io::Disconnect(remote, reason) => {
                // The local node is immediately disconnected.
                self.priority.push_back(Scheduled {
                    remote,
                    node,
                    input: Input::Disconnected(remote, Rc::new(reason)),
                });

                // Nb. It's possible for disconnects to happen simultaneously from both ends, hence
                // it can be that a node will try to disconnect a remote that is already
                // disconnected from the other side.
                //
                // It's also possible that the connection was only attempted and never succeeded,
                // in which case we would return here.
                if !self.connections.contains(&(node, remote)) {
                    debug!(target: "sim", "Ignoring disconnect of {remote} from {node}");
                    return;
                };
                let latency = self.latency(node, remote);

                // The remote node receives the disconnection with some delay.
                self.inbox.insert(
                    self.time + latency,
                    Scheduled {
                        node: remote,
                        remote: node,
                        input: Input::Disconnected(
                            node,
                            Rc::new(DisconnectReason::Connection(Arc::new(io::Error::from(
                                io::ErrorKind::ConnectionReset,
                            )))),
                        ),
                    },
                );
            }
            Io::Wakeup(duration) => {
                let time = self.time + duration;

                if !matches!(
                    self.inbox.messages.get(&time),
                    Some(Scheduled {
                        input: Input::Wake,
                        ..
                    })
                ) {
                    self.inbox.insert(
                        time,
                        Scheduled {
                            node,
                            // The remote is not applicable for this type of output.
                            remote: [0; 32].into(),
                            input: Input::Wake,
                        },
                    );
                }
            }
            Io::Fetch { rid, remote, .. } => {
                log::info!(
                    target: "sim",
                    "{:05} {} ~> {} ({}): Fetch outgoing",
                    self.elapsed().as_millis(), node, remote, rid
                );

                if self.is_fallible() {
                    self.inbox.insert(
                        self.time + LocalDuration::from_secs(3),
                        Scheduled {
                            node,
                            remote,
                            input: Input::Fetched(
                                rid,
                                remote,
                                Rc::new(Err(FetchError::Io(io::ErrorKind::Other.into()))),
                            ),
                        },
                    );
                } else {
                    self.inbox.insert(
                        self.time + LocalDuration::from_secs(3),
                        Scheduled {
                            node,
                            remote,
                            input: Input::Fetched(
                                rid,
                                remote,
                                Rc::new(Ok(fetch::FetchResult {
                                    updated: vec![],
                                    canonical: fetch::UpdatedCanonicalRefs::default(),
                                    namespaces: HashSet::new(),
                                    clone: true,
                                    doc: arbitrary::r#gen(1),
                                })),
                            ),
                        },
                    );
                }
            }
        }
    }

    /// Check whether we should fail the next operation.
    fn is_fallible(&self) -> bool {
        self.rng.borrow_mut().f64() % 1.0 < self.opts.failure_rate
    }

    /// Check whether two nodes are partitioned.
    fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
        self.partitions.contains(&(a, b)) || self.partitions.contains(&(b, a))
    }
}