Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-node src wire.rs
//! Implementation of the transport protocol.
//!
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
use std::collections::VecDeque;
use std::collections::hash_map::Entry;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
use std::{io, net, time};

use crossbeam_channel as chan;
use cyphernet::addr::{HostName, InetHost, NetAddr};
use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
use cyphernet::proxy::socks5;
use cyphernet::{Digest, EcSk, Ecdh, Sha256};
use localtime::LocalTime;
use mio::net::TcpStream;
use radicle::node::device::Device;

use radicle::collections::{RandomMap, RandomSet};
use radicle::crypto;
use radicle::node::Link;
use radicle::node::NodeId;
#[cfg(any(feature = "i2p", feature = "tor"))]
use radicle::node::config::AddressConfig;
use radicle::storage::WriteStorage;
use radicle_protocol::deserializer::Deserializer;
pub use radicle_protocol::wire::frame;
pub use radicle_protocol::wire::frame::{Frame, FrameData, StreamId};
pub use radicle_protocol::wire::*;
use radicle_protocol::worker::{FetchRequest, FetchResult};

use crate::reactor;
use crate::reactor::{Listener, Transport};
use crate::reactor::{NoiseSession, ProtocolArtifact, SessionEvent, Socks5Session};
use crate::reactor::{Token, Tokens};
use crate::service;
use crate::service::FETCH_TIMEOUT;
use crate::service::io::Io;
use crate::service::{DisconnectReason, Metrics, Service, session};
use crate::worker;
use crate::worker::{ChannelEvent, ChannelsConfig};
use crate::worker::{Task, TaskResult};

/// NoiseXK handshake pattern.
pub const NOISE_XK: HandshakePattern = HandshakePattern {
    initiator: cyphernet::encrypt::noise::InitiatorPattern::Xmitted,
    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
};

/// Default time to wait until a network connection is considered inactive.
pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);

/// Maximum size of a peer inbox, in bytes.
pub const MAX_INBOX_SIZE: usize = 1024 * 1024 * 2;

/// Control message used internally between workers, users, and the service.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum Control {
    /// Message from the user to the service.
    User(service::Command),
    /// Message from a worker to the service.
    Worker(TaskResult),
    /// Flush data in the given stream to the remote.
    Flush { remote: NodeId, stream: StreamId },
}

/// Peer session type.
type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<TcpStream>>;

/// Reactor action.
type Action<G> = reactor::Action<Listener, Transport<WireSession<G>>>;

/// A worker stream.
struct Stream {
    /// Channels.
    channels: worker::Channels,
    /// Data sent.
    sent_bytes: usize,
    /// Data received.
    received_bytes: usize,
}

impl Stream {
    fn new(channels: worker::Channels) -> Self {
        Self {
            channels,
            sent_bytes: 0,
            received_bytes: 0,
        }
    }
}

/// Streams associated with a connected peer.
struct Streams {
    /// Active streams and their associated worker channels.
    /// Note that the gossip and control streams are not included here as they are always
    /// implied to exist.
    streams: RandomMap<StreamId, Stream>,
    /// Connection direction.
    link: Link,
    /// Sequence number used to compute the next stream id.
    seq: u64,
}

impl Streams {
    /// Create a new [`Streams`] object, passing the connection link.
    fn new(link: Link) -> Self {
        Self {
            streams: RandomMap::default(),
            link,
            seq: 0,
        }
    }

    /// Get a known stream.
    fn get(&self, stream: &StreamId) -> Option<&Stream> {
        self.streams.get(stream)
    }

    /// Get a known stream, mutably.
    fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
        self.streams.get_mut(stream)
    }

    /// Open a new stream.
    fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
        self.seq += 1;

        let id = StreamId::git(self.link)
            .nth(self.seq)
            .expect("Streams::open: too many streams");
        let channels = self
            .register(id, config)
            .expect("Streams::open: stream was already open");

        (id, channels)
    }

    /// Register an open stream.
    fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
        let (wire, worker) = worker::Channels::pair(config)
            .expect("Streams::register: fatal: unable to create channels");

        match self.streams.entry(stream) {
            Entry::Vacant(e) => {
                e.insert(Stream::new(worker));
                Some(wire)
            }
            Entry::Occupied(_) => None,
        }
    }

    /// Unregister an open stream.
    fn unregister(&mut self, stream: &StreamId) -> Option<Stream> {
        self.streams.remove(stream)
    }

    /// Close all streams.
    fn shutdown(&mut self) {
        for (sid, stream) in self.streams.drain() {
            log::debug!(target: "wire", "Closing worker stream {sid}");
            stream.channels.close().ok();
        }
    }
}

/// The initial state of an outbound peer before handshake is completed.
#[derive(Debug)]
struct Outbound {
    /// Token for I/O event notification.
    token: Token,
    /// Remote address.
    addr: NetAddr<HostName>,
    /// Remote Node ID.
    nid: NodeId,
}

/// Peer connection state machine.
enum Peer {
    /// The state after handshake is completed.
    /// Peers in this state are handled by the underlying service.
    Connected {
        #[allow(dead_code)]
        addr: NetAddr<HostName>,
        link: Link,
        nid: NodeId,
        inbox: Deserializer<MAX_INBOX_SIZE, Frame>,
        streams: Streams,
    },
    /// The peer was scheduled for disconnection. Once the transport is handed over
    /// by the reactor, we can consider it disconnected.
    Disconnecting {
        link: Link,
        nid: Option<NodeId>,
        reason: DisconnectReason,
    },
}

impl std::fmt::Debug for Peer {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
        }
    }
}

impl Peer {
    /// Return the peer's id, if any.
    fn id(&self) -> Option<&NodeId> {
        match self {
            Peer::Connected { nid, .. } | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
            Peer::Disconnecting { nid: None, .. } => None,
        }
    }

    fn link(&self) -> Link {
        match self {
            Peer::Connected { link, .. } => *link,
            Peer::Disconnecting { link, .. } => *link,
        }
    }

    /// Connected peer.
    fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
        Self::Connected {
            link,
            addr,
            nid,
            inbox: Deserializer::default(),
            streams: Streams::new(link),
        }
    }
}

/// Holds connected peers.
struct Peers(RandomMap<Token, Peer>);

impl Peers {
    fn get_mut(&mut self, token: &Token) -> Option<&mut Peer> {
        self.0.get_mut(token)
    }

    fn entry(&mut self, token: Token) -> Entry<'_, Token, Peer> {
        self.0.entry(token)
    }

    fn insert(&mut self, token: Token, peer: Peer) {
        if self.0.insert(token, peer).is_some() {
            log::debug!(target: "wire", token=token.0; "Replacing existing peer");
        }
    }

    fn remove(&mut self, id: &Token) -> Option<Peer> {
        self.0.remove(id)
    }

    fn lookup(&self, id: &NodeId) -> Option<(Token, &Peer)> {
        self.0
            .iter()
            .find(|(_, peer)| peer.id() == Some(id))
            .map(|(token, peer)| (*token, peer))
    }

    fn lookup_mut(&mut self, id: &NodeId) -> Option<(Token, &mut Peer)> {
        self.0
            .iter_mut()
            .find(|(_, peer)| peer.id() == Some(id))
            .map(|(fd, peer)| (*fd, peer))
    }

    fn active(&self) -> impl Iterator<Item = (Token, &NodeId, Link)> {
        self.0.iter().filter_map(|(id, peer)| match peer {
            Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
            Peer::Disconnecting { .. } => None,
        })
    }

    fn connected(&self) -> impl Iterator<Item = (Token, &NodeId)> {
        self.0.iter().filter_map(|(id, peer)| {
            if let Peer::Connected { nid, .. } = peer {
                Some((*id, nid))
            } else {
                None
            }
        })
    }

    fn iter(&self) -> impl Iterator<Item = &Peer> {
        self.0.values()
    }
}

/// Wire protocol implementation for a set of peers.
pub(crate) struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
    /// Backing service instance.
    service: Service<D, S, G>,
    /// Worker pool interface.
    worker: chan::Sender<Task>,
    /// Used for authentication.
    signer: Device<G>,
    /// Node metrics.
    metrics: service::Metrics,
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Outbound attempted peers without a session.
    outbound: RandomMap<Token, Outbound>,
    /// Inbound peers without a session.
    inbound: RandomSet<Token>,
    /// Listening addresses that are not yet registered.
    listening: RandomMap<Token, net::SocketAddr>,
    /// Peer (established) sessions.
    peers: Peers,
    /// A (practically) infinite source of tokens to identify transports and listeners.
    tokens: Tokens,
}

impl<D, S, G> Wire<D, S, G>
where
    D: service::Store,
    S: WriteStorage + 'static,
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId>,
{
    pub fn new(service: Service<D, S, G>, worker: chan::Sender<Task>, signer: Device<G>) -> Self {
        assert!(service.started().is_some(), "Service must be initialized");

        Self {
            service,
            worker,
            signer,
            metrics: Metrics::default(),
            actions: VecDeque::new(),
            inbound: RandomSet::default(),
            outbound: RandomMap::default(),
            listening: RandomMap::default(),
            peers: Peers(RandomMap::default()),
            tokens: Tokens::default(),
        }
    }

    pub fn listen(&mut self, socket: Listener) {
        let token = self.tokens.advance();
        self.listening.insert(token, socket.local_addr());
        self.actions
            .push_back(Action::RegisterListener(token, socket));
    }

    fn disconnect(&mut self, token: Token, reason: DisconnectReason) -> Option<(NodeId, Link)> {
        match self.peers.entry(token) {
            Entry::Vacant(_) => {
                // Connecting peer with no session.
                log::debug!(target: "wire", token=token.0; "Disconnecting pending peer: {reason}");
                self.actions.push_back(Action::UnregisterTransport(token));

                // Check for attempted outbound connections. Unestablished inbound connections don't
                // have an NID yet.
                self.outbound
                    .values()
                    .find(|o| o.token == token)
                    .map(|o| (o.nid, Link::Outbound))
            }
            Entry::Occupied(mut e) => match e.get_mut() {
                Peer::Disconnecting { nid, link, .. } => {
                    log::debug!(target: "wire", token=token.0; "Peer is already disconnecting");

                    nid.map(|n| (n, *link))
                }
                Peer::Connected {
                    nid, streams, link, ..
                } => {
                    log::debug!(target: "wire", token=token.0; "Disconnecting peer: {reason}");
                    let nid = *nid;
                    let link = *link;

                    streams.shutdown();
                    e.insert(Peer::Disconnecting {
                        nid: Some(nid),
                        link,
                        reason,
                    });
                    self.actions.push_back(Action::UnregisterTransport(token));

                    Some((nid, link))
                }
            },
        }
    }

    fn worker_result(&mut self, task: TaskResult) {
        let nid = task.remote;
        let Some((fd, peer)) = self.peers.lookup_mut(&nid) else {
            log::debug!(target: "wire", "Peer {nid} not found; ignoring fetch result");
            return;
        };

        if let Peer::Connected { link, streams, .. } = peer {
            // Nb. It's possible that the stream would already be unregistered if we received an
            // early "close" from the remote. Otherwise, we unregister it here and send the "close"
            // ourselves.
            if let Some(s) = streams.unregister(&task.stream) {
                log::debug!(
                    target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
                    task.stream, task.remote, s.sent_bytes, s.received_bytes
                );
                let frame = Frame::<service::Message>::control(
                    *link,
                    frame::Control::Close {
                        stream: task.stream,
                    },
                );
                self.actions
                    .push_back(Action::Send(fd, frame.encode_to_vec()));
            }
        } else {
            // If the peer disconnected, we'll get here, but we still want to let the service know
            // about the fetch result, so we don't return here.
            log::debug!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
            return;
        };

        // Only call into the service if we initiated this fetch.
        match task.result {
            FetchResult::Initiator { rid, result } => {
                self.service.fetched(rid, nid, result);
            }
            FetchResult::Responder { rid, result } => {
                if let Some(rid) = rid {
                    if let Some(err) = result.err() {
                        log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
                    } else {
                        log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
                    }
                }
            }
        }
    }

    fn flush(&mut self, remote: NodeId, stream: StreamId) {
        let Some((fd, peer)) = self.peers.lookup_mut(&remote) else {
            log::debug!(target: "wire", "Peer {remote} is not known; ignoring flush");
            return;
        };
        let Peer::Connected { streams, link, .. } = peer else {
            log::debug!(target: "wire", "Peer {remote} is not connected; ignoring flush");
            return;
        };
        let Some(s) = streams.get_mut(&stream) else {
            log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
            return;
        };
        let metrics = self.metrics.peer(remote);

        for data in s.channels.try_iter() {
            let frame = match data {
                ChannelEvent::Data(data) => {
                    metrics.sent_git_bytes += data.len();
                    metrics.sent_bytes += data.len();
                    Frame::<service::Message>::git(stream, data)
                }
                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
            };
            self.actions
                .push_back(reactor::Action::Send(fd, frame.encode_to_vec()));
        }
    }

    fn cleanup(&mut self, token: Token) {
        if self.inbound.remove(&token) {
            log::debug!(target: "wire", token=token.0; "Cleaning up inbound peer state");
        } else if let Some(outbound) = self.outbound.remove(&token) {
            log::debug!(target: "wire", token=token.0; "Cleaning up outbound peer state");
            self.service.disconnected(
                outbound.nid,
                Link::Outbound,
                &DisconnectReason::connection(),
            );
        } else {
            log::debug!(target: "wire", token=token.0; "Tried to cleanup unknown peer");
        }
    }
}

impl<D, S, G> reactor::ReactionHandler for Wire<D, S, G>
where
    D: service::Store + Send,
    S: WriteStorage + Send + 'static,
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send + Debug,
{
    type Listener = Listener;
    type Transport = Transport<WireSession<G>>;

    fn tick(&mut self) {
        self.metrics.open_channels = self
            .peers
            .iter()
            .filter_map(|p| {
                if let Peer::Connected { streams, .. } = p {
                    Some(streams.streams.len())
                } else {
                    None
                }
            })
            .sum();
        self.metrics.worker_queue_size = self.worker.len();

        self.service.tick(LocalTime::now(), &self.metrics);
    }

    fn timer_reacted(&mut self) {
        self.service.wake();
    }

    fn listener_reacted(
        &mut self,
        _: Token, // Note that this is the token of the listener socket.
        event: io::Result<(TcpStream, std::net::SocketAddr)>,
        _: Instant,
    ) {
        match event {
            Ok((connection, peer)) => {
                let remote = NetAddr::from(peer);
                let InetHost::Ip(ip) = remote.host else {
                    log::debug!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
                    drop(connection);

                    return;
                };
                log::debug!(target: "wire", "Inbound connection from {remote}..");

                // If the service doesn't want to accept this connection,
                // we drop the connection here, which disconnects the socket.
                if !self.service.accepted(ip) {
                    log::debug!(target: "wire", "Rejecting inbound connection from {ip}..");
                    drop(connection);

                    return;
                }

                let session = accept::<G>(
                    remote.clone().into(),
                    connection,
                    self.signer.clone().into_inner(),
                );
                let transport = match Transport::with_session(session, Link::Inbound) {
                    Ok(transport) => transport,
                    Err(err) => {
                        log::warn!(target: "wire", "Failed to create transport for accepted connection: {err}");
                        return;
                    }
                };

                let token = self.tokens.advance();
                log::debug!(target: "wire", token=token.0; "Accepted inbound connection from {remote}..");

                self.inbound.insert(token);
                self.actions
                    .push_back(reactor::Action::RegisterTransport(token, transport))
            }
            Err(err) => {
                log::error!(target: "wire", "Error listening for inbound connections: {err}");
            }
        }
    }

    fn listener_registered(&mut self, token: Token, _listener: &Self::Listener) {
        if let Some(local_addr) = self.listening.remove(&token) {
            self.service.listening(local_addr);
        }
    }

    fn transport_registered(&mut self, token: Token, _transport: &Self::Transport) {
        if let Some(outbound) = self.outbound.get(&token) {
            log::debug!(target: "wire", token=token.0; "Outbound peer resource registered for {}", outbound.nid);
        } else if self.inbound.contains(&token) {
            log::debug!(target: "wire", token=token.0; "Inbound peer resource registered");
        } else {
            log::debug!(target: "wire", token=token.0; "Unknown peer registered");
        }
    }

    fn transport_reacted(&mut self, token: Token, event: SessionEvent<WireSession<G>>, _: Instant) {
        match event {
            SessionEvent::Established(ProtocolArtifact { state, session }) => {
                // SAFETY: With the NoiseXK protocol, there is always a remote static key.
                let nid: NodeId = state.remote_static_key.unwrap();
                // Make sure we don't try to connect to ourselves by mistake.
                if &nid == self.signer.public_key() {
                    log::warn!(target: "wire", "Self-connection detected, disconnecting..");
                    self.disconnect(token, DisconnectReason::SelfConnection);

                    return;
                }

                let established_addr: NetAddr<HostName> = session.state;
                let (addr, link) = if self.inbound.remove(&token) {
                    self.metrics.peer(nid).inbound_connection_attempts += 1;
                    (established_addr, Link::Inbound)
                } else if let Some(peer) = self.outbound.remove(&token) {
                    assert_eq!(nid, peer.nid);
                    (peer.addr, Link::Outbound)
                } else {
                    log::debug!(target: "wire", token=token.0; "Session for {nid} not found");
                    return;
                };
                log::debug!(
                    target: "wire", token=token.0, direction:display=link; "Session established with {nid}"
                );

                // Connections to close.
                let mut disconnect = Vec::new();

                // Handle conflicting connections.
                // This is typical when users have mutually configured their nodes to connect to
                // each other on startup. We handle this by deterministically choosing one node
                // whose outbound connection is the one that is kept. The other connections are
                // dropped.
                {
                    // Having precedence means that our outbound connection will win over
                    // the other node's outbound connection.
                    enum Precedence {
                        Ours,
                        Theirs,
                    }

                    use Link::*;
                    use Precedence::*;

                    // Whether we have precedence in case of conflicting connections.
                    let precedence = if *self.signer.public_key() > nid {
                        Ours
                    } else {
                        Theirs
                    };

                    // Active sessions with the same NID but a different token are conflicting.
                    let peers = self.peers.active().filter_map(|(c_id, d, link)| {
                        (*d == nid && c_id != token).then_some((c_id, link))
                    });

                    // Outbound connection attempts with the same remote key but a different file
                    // descriptor are conflicting.
                    let outbound = self.outbound.iter().filter_map(|(c_id, other)| {
                        (other.nid == nid && *c_id != token).then_some((*c_id, Outbound))
                    });

                    for (c_token, c_link) in peers.chain(outbound) {
                        // If we have precedence, the inbound connection is closed.
                        // In the case where both connections are inbound or outbound,
                        // we close the newer connection, ie. the one with the higher
                        // token.
                        let close = match (link, c_link, &precedence) {
                            (Inbound, Outbound, Ours) => token,
                            (Inbound, Outbound, Theirs) => c_token,
                            (Outbound, Inbound, Ours) => c_token,
                            (Outbound, Inbound, Theirs) => token,
                            (Inbound, Inbound, _) => token.max(c_token),
                            (Outbound, Outbound, _) => token.max(c_token),
                        };

                        log::trace!(
                            target: "wire", "Established session with token {} conflicts with existing session with token {} for {nid}. Disconnecting session with token {}.", token.0, c_token.0, close.0
                        );
                        disconnect.push(close);
                    }
                }
                for id in &disconnect {
                    log::info!(
                        target: "wire", token=token.0; "Closing conflicting session with {nid}.."
                    );
                    // Disconnect and return the associated NID of the peer, if available.
                    if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
                        // We disconnect the session eagerly because otherwise we will get the new
                        // `connected` event before the `disconnect`, resulting in a duplicate
                        // connection.
                        self.service
                            .disconnected(nid, link, &DisconnectReason::Conflict);
                    }
                }
                if !disconnect.contains(&token) {
                    self.peers
                        .insert(token, Peer::connected(nid, addr.clone(), link));
                    self.service.connected(nid, addr.into(), link);
                }
            }
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected {
                    nid,
                    inbox,
                    streams,
                    ..
                }) = self.peers.get_mut(&token)
                {
                    let metrics = self.metrics.peer(*nid);
                    metrics.received_bytes += data.len();

                    if inbox.input(&data).is_err() {
                        log::warn!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
                        log::warn!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
                        self.disconnect(
                            token,
                            DisconnectReason::Session(session::Error::Misbehavior),
                        );

                        return;
                    }

                    loop {
                        match inbox.deserialize_next() {
                            Ok(Some(Frame {
                                data: FrameData::Control(frame::Control::Open { stream }),
                                ..
                            })) => {
                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
                                metrics.streams_opened += 1;
                                metrics.received_fetch_requests += 1;
                                let reader_limit = self.service.config().limits.fetch_pack_receive;
                                let Some(channels) = streams.register(
                                    stream,
                                    ChannelsConfig::new(FETCH_TIMEOUT)
                                        .with_reader_limit(reader_limit),
                                ) else {
                                    log::debug!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
                                    continue;
                                };

                                let task = Task {
                                    fetch: FetchRequest::Responder {
                                        remote: *nid,
                                        emitter: self.service.emitter(),
                                    },
                                    stream,
                                    channels,
                                };
                                if let Err(e) = self.worker.try_send(task) {
                                    log::warn!(
                                        target: "wire",
                                        "Worker pool failed to accept incoming fetch request: {e}"
                                    );
                                }
                            }
                            Ok(Some(Frame {
                                data: FrameData::Control(frame::Control::Eof { stream }),
                                ..
                            })) => {
                                if let Some(s) = streams.get(&stream) {
                                    log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");

                                    if s.channels.send(ChannelEvent::Eof).is_err() {
                                        log::debug!(target: "wire", "Worker is disconnected; cannot send `EOF`");
                                    }
                                } else {
                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
                                }
                            }
                            Ok(Some(Frame {
                                data: FrameData::Control(frame::Control::Close { stream }),
                                ..
                            })) => {
                                log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");

                                if let Some(s) = streams.unregister(&stream) {
                                    log::debug!(
                                        target: "wire",
                                        "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
                                        s.sent_bytes, s.received_bytes
                                    );
                                    s.channels.close().ok();
                                }
                            }
                            Ok(Some(Frame {
                                data: FrameData::Gossip(msg),
                                ..
                            })) => {
                                metrics.received_gossip_messages += 1;
                                self.service.received_message(*nid, msg);
                            }
                            Ok(Some(Frame {
                                stream,
                                data: FrameData::Git(data),
                                ..
                            })) => {
                                if let Some(s) = streams.get_mut(&stream) {
                                    metrics.received_git_bytes += data.len();

                                    if s.channels.send(ChannelEvent::Data(data)).is_err() {
                                        log::warn!(target: "wire", "Worker is disconnected; cannot send data");
                                    }
                                } else {
                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
                                }
                            }
                            Ok(None) => {
                                // Buffer is empty, or message isn't complete.
                                break;
                            }
                            Err(e) => {
                                log::warn!(target: "wire", "Invalid gossip message from {nid}: {e}");

                                if !inbox.is_empty() {
                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
                                }
                                self.disconnect(
                                    token,
                                    DisconnectReason::Session(session::Error::Misbehavior),
                                );
                                break;
                            }
                        }
                    }
                } else {
                    log::debug!(target: "wire", token=token.0; "Dropping message from unconnected peer");
                }
            }
            SessionEvent::Terminated(err) => {
                self.disconnect(token, DisconnectReason::Connection(Arc::new(err)));
            }
        }
    }

    fn handle_command(&mut self, cmd: Control) {
        match cmd {
            Control::User(cmd) => self.service.command(cmd),
            Control::Worker(result) => self.worker_result(result),
            Control::Flush { remote, stream } => self.flush(remote, stream),
        }
    }

    fn handle_error(&mut self, err: reactor::Error<Listener, Transport<WireSession<G>>>) {
        match err {
            reactor::Error::Poll(err) | reactor::Error::Registration(err) => {
                // TODO: This should be a fatal error, there's nothing we can do here.
                log::error!(target: "wire", "Can't poll connections: {err}");
            }
            reactor::Error::ListenerDisconnect(token, _) => {
                // TODO: This should be a fatal error, there's nothing we can do here.
                log::error!(target: "wire", token=token.0; "Listener disconnected");
            }
            reactor::Error::TransportDisconnect(token, transport) => {
                log::trace!(target: "wire", token=token.0; "Peer disconnected");

                // We're dropping the TCP connection here.
                drop(transport);

                // The peer transport is already disconnected and removed from the reactor;
                // therefore there is no need to initiate a disconnection. We simply remove
                // the peer from the map.
                match self.peers.remove(&token) {
                    Some(mut peer) => {
                        if let Peer::Connected { streams, .. } = &mut peer {
                            streams.shutdown();
                        }

                        if let Some(id) = peer.id() {
                            self.service.disconnected(
                                *id,
                                peer.link(),
                                &DisconnectReason::connection(),
                            );
                        } else {
                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
                        }
                    }
                    None => self.cleanup(token),
                }
            }
        }
    }

    fn handover_listener(&mut self, token: Token, _listener: Self::Listener) {
        log::warn!(target: "wire", token=token.0; "Listener handover is not supported");
    }

    fn handover_transport(&mut self, token: Token, transport: Self::Transport) {
        match self.peers.entry(token) {
            Entry::Occupied(e) => {
                match e.get() {
                    Peer::Disconnecting {
                        nid, reason, link, ..
                    } => {
                        log::debug!(target: "wire", token=token.0; "Transport handover for disconnecting peer");

                        // Disconnect TCP stream.
                        drop(transport);

                        // If there is no NID, the service is not aware of the peer.
                        if let Some(nid) = nid {
                            // In the case of a conflicting connection, there will be two resources
                            // for the peer. However, at the service level, there is only one, and
                            // it is identified by NID.
                            //
                            // Therefore, we specify which of the connections we're closing by
                            // passing the `link`.
                            self.service.disconnected(*nid, *link, reason);
                        }
                        e.remove();
                    }
                    Peer::Connected { nid, .. } => {
                        panic!(
                            "Wire::handover_transport: Unexpected handover of connected peer {nid} with token {}",
                            token.0
                        );
                    }
                }
            }
            Entry::Vacant(_) => self.cleanup(token),
        }
    }
}

impl<D, S, G> Iterator for Wire<D, S, G>
where
    D: service::Store,
    S: WriteStorage + 'static,
    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone,
{
    type Item = Action<G>;

    fn next(&mut self) -> Option<Self::Item> {
        while let Some(ev) = self.service.next() {
            match ev {
                Io::Write(node_id, msgs) => {
                    let (fd, link) = match self.peers.lookup(&node_id) {
                        Some((fd, Peer::Connected { link, .. })) => (fd, *link),
                        Some((_, peer)) => {
                            // If the peer is disconnected by the wire protocol, the service may
                            // not be aware of this yet, and may continue to write messages to it.
                            log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
                            continue;
                        }
                        None => {
                            log::debug!(target: "wire", "Dropping {} message(s) to {node_id}: unknown peer", msgs.len());
                            continue;
                        }
                    };
                    log::trace!(
                        target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
                    );
                    let mut data = Vec::new();
                    let metrics = self.metrics.peer(node_id);
                    metrics.sent_gossip_messages += msgs.len();

                    for msg in msgs {
                        Frame::gossip(link, msg).encode(&mut data);
                    }
                    metrics.sent_bytes += data.len();

                    self.actions.push_back(reactor::Action::Send(fd, data));
                }
                Io::Connect(node_id, addr) => {
                    if self.peers.connected().any(|(_, id)| id == &node_id) {
                        log::debug!(
                            target: "wire",
                            "Attempt to connect to already connected peer {node_id}"
                        );
                        // FIXME: The problem here is the session will stay in "initial" state,
                        // because it can't transition to attempted.
                        continue;
                    }
                    self.service.attempted(node_id, addr.clone());
                    self.metrics.peer(node_id).outbound_connection_attempts += 1;

                    match dial::<G>(
                        (*addr).clone(),
                        node_id,
                        self.signer.clone().into_inner(),
                        self.service.config(),
                    )
                    .and_then(|session| {
                        Transport::<WireSession<G>>::with_session(session, Link::Outbound)
                    }) {
                        Ok(transport) => {
                            let token = self.tokens.advance();
                            self.outbound.insert(
                                token,
                                Outbound {
                                    token,
                                    nid: node_id,
                                    addr: (*addr).clone(),
                                },
                            );
                            log::debug!(
                                target: "wire",
                                "Registering outbound transport for {node_id}.."
                            );
                            self.actions
                                .push_back(reactor::Action::RegisterTransport(token, transport));
                        }
                        Err(err) => {
                            logger::establish_connection(&addr, &err);

                            self.service.disconnected(
                                node_id,
                                Link::Outbound,
                                &DisconnectReason::Dial(Arc::new(err)),
                            );
                        }
                    }
                }
                Io::Disconnect(nid, reason) => {
                    if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
                        if let Some((nid, _)) = self.disconnect(id, reason) {
                            self.metrics.peer(nid).disconnects += 1;
                        }
                    } else {
                        log::debug!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
                    }
                }
                Io::Wakeup(d) => {
                    self.actions.push_back(reactor::Action::SetTimer(d.into()));
                }
                Io::Fetch {
                    rid,
                    remote,
                    reader_limit,
                    refs_at,
                    config,
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");

                    let Some((fd, Peer::Connected { link, streams, .. })) =
                        self.peers.lookup_mut(&remote)
                    else {
                        // Nb. It's possible that a peer is disconnected while an `Io::Fetch`
                        // is in the service's i/o buffer. Since the service may not purge the
                        // buffer on disconnect, we should just ignore i/o actions that don't
                        // have a connected peer.
                        log::debug!(target: "wire", "Peer {remote} is not connected: dropping fetch");
                        continue;
                    };
                    let (stream, channels) = streams.open(
                        ChannelsConfig::new(config.timeout()).with_reader_limit(reader_limit),
                    );

                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");

                    let link = *link;
                    let task = Task {
                        fetch: FetchRequest::Initiator {
                            rid,
                            remote,
                            refs_at,
                            config: config.fetch_config(),
                        },
                        stream,
                        channels,
                    };

                    if !self.worker.is_empty() {
                        log::warn!(
                            target: "wire",
                            "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
                        );
                    }
                    if let Err(e) = self.worker.try_send(task) {
                        log::warn!(
                            target: "wire",
                            "Worker pool failed to accept outgoing fetch request: {e}"
                        );
                    }
                    let metrics = self.metrics.peer(remote);
                    metrics.streams_opened += 1;
                    metrics.sent_fetch_requests += 1;

                    self.actions.push_back(Action::Send(
                        fd,
                        Frame::<service::Message>::control(link, frame::Control::Open { stream })
                            .encode_to_vec(),
                    ));
                }
            }
        }
        self.actions.pop_front()
    }
}

/// Establish a new outgoing connection.
pub fn dial<G: Ecdh<Pk = NodeId>>(
    remote_addr: NetAddr<HostName>,
    remote_id: <G as EcSk>::Pk,
    signer: G,
    config: &radicle::node::Config,
) -> io::Result<WireSession<G>> {
    #[cfg(any(feature = "i2p", feature = "tor"))]
    fn proxy_or_forward<H: std::fmt::Display>(
        config: &AddressConfig,
        global_proxy: Option<net::SocketAddr>,
        host: H,
        port: u16,
    ) -> io::Result<NetAddr<InetHost>> {
        match config {
            // In proxy mode, simply use the configured proxy address.
            // This takes precedence over any global proxy.
            AddressConfig::Proxy { address } => Ok((*address).into()),
            // In "forward" mode, if a global proxy is set, we use that, otherwise
            // we treat the address as a regular DNS name.
            AddressConfig::Forward => Ok(global_proxy
                .map(Into::into)
                .unwrap_or_else(|| NetAddr::new(InetHost::Dns(host.to_string()), port))),
            // If address type support isn't configured, refuse to connect.
            AddressConfig::Drop => Err(io::Error::new(
                io::ErrorKind::Unsupported,
                "no configuration found for address type",
            )),
        }
    }

    // Determine what address to establish a TCP connection with, given the remote peer
    // address and our node configuration.
    let inet_addr: NetAddr<InetHost> = match (&remote_addr.host, config.proxy) {
        // For IP and DNS addresses, use the global proxy if set, otherwise use the address as-is.
        (HostName::Ip(_), Some(proxy)) => proxy.into(),
        (HostName::Ip(ip), None) => NetAddr::new(InetHost::Ip(*ip), remote_addr.port),
        (HostName::Dns(_), Some(proxy)) => proxy.into(),
        (HostName::Dns(dns), None) => NetAddr::new(InetHost::Dns(dns.clone()), remote_addr.port),
        // For onion addresses, handle with care.
        #[cfg(feature = "tor")]
        (HostName::Tor(onion), proxy) => {
            proxy_or_forward(&config.onion, proxy, onion, remote_addr.port)?
        }
        #[cfg(feature = "i2p")]
        (HostName::I2p(i2p), proxy) => proxy_or_forward(&config.i2p, proxy, i2p, remote_addr.port)?,
        _ => {
            return Err(io::Error::new(
                io::ErrorKind::Unsupported,
                "unsupported remote address type",
            ));
        }
    };

    let addr = {
        use std::net::ToSocketAddrs as _;

        inet_addr
            .to_socket_addrs()?
            .next()
            .ok_or(io::ErrorKind::AddrNotAvailable)?
    };

    // NOTE: Previously, here was a note about setting the timeout for connecting
    // to DEFAULT_DIAL_TIMEOUT, for which we have not figured out a way yet.
    // Generally, we should understand what happens if the following call to
    // `connect` fails. How do we learn about it? Where's the leak?

    let connection = TcpStream::connect(addr)?;

    // Whether to tunnel regular connections through the proxy.
    let force_proxy = config.proxy.is_some();

    Ok(session::<G>(
        remote_addr,
        Some(remote_id),
        connection,
        force_proxy,
        signer,
    ))
}

/// Accept a new connection.
pub fn accept<G: Ecdh<Pk = NodeId>>(
    remote_addr: NetAddr<HostName>,
    connection: TcpStream,
    signer: G,
) -> WireSession<G> {
    session::<G>(remote_addr, None, connection, false, signer)
}

/// Create a new [`WireSession`].
fn session<G: Ecdh<Pk = NodeId>>(
    remote_addr: NetAddr<HostName>,
    remote_id: Option<NodeId>,
    connection: TcpStream,
    force_proxy: bool,
    signer: G,
) -> WireSession<G> {
    if let Err(e) = connection.set_nodelay(true) {
        log::warn!(target: "wire", "Unable to set TCP_NODELAY on socket {connection:?}: {e}");
    }

    let connection = std::net::TcpStream::from(connection);

    if let Err(e) = connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT)) {
        log::warn!(target: "wire", "Unable to set TCP read timeout on socket {connection:?}: {e}");
    }

    if let Err(e) = connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT)) {
        log::warn!(target: "wire", "Unable to set TCP write timeout on socket {connection:?}: {e}");
    }

    #[cfg(feature = "socket2")]
    {
        let connection = socket2::SockRef::from(&connection);

        let ka = socket2::TcpKeepalive::new().with_time(time::Duration::from_secs(30));

        #[cfg(any(
            target_os = "android",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "fuchsia",
            target_os = "illumos",
            target_os = "ios",
            target_os = "visionos",
            target_os = "linux",
            target_os = "macos",
            target_os = "netbsd",
            target_os = "tvos",
            target_os = "watchos",
            target_os = "windows",
            target_os = "cygwin",
        ))]
        let ka = ka.with_interval(time::Duration::from_secs(10));

        #[cfg(any(
            target_os = "android",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "fuchsia",
            target_os = "illumos",
            target_os = "ios",
            target_os = "visionos",
            target_os = "linux",
            target_os = "macos",
            target_os = "netbsd",
            target_os = "tvos",
            target_os = "watchos",
            target_os = "cygwin",
        ))]
        let ka = ka.with_retries(3);

        if let Err(e) = connection.set_tcp_keepalive(&ka) {
            log::warn!(target: "wire", "Failed to set TCP_KEEPALIVE on socket {connection:?}: {e}");
        }
    }

    #[cfg(not(feature = "socket2"))]
    log::debug!(target: "wire", "Not attempting to set TCP_KEEPALIVE on socket {connection:?}");

    let connection = TcpStream::from_std(connection);

    let proxy = {
        let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
        Socks5Session::new(connection, socks5)
    };

    let noise = {
        let pair = G::generate_keypair();

        let keyset = Keyset {
            e: pair.0,
            s: Some(signer),
            re: None,
            rs: remote_id,
        };

        NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(NOISE_XK, remote_id.is_some(), &[], keyset)
    };

    WireSession::new(proxy, noise)
}

mod logger {
    use radicle::node::Address;

    pub fn establish_connection(addr: &Address, err: &std::io::Error) {
        use std::io::ErrorKind::*;
        match err.kind() {
            ConnectionRefused | ConnectionReset | HostUnreachable | ConnectionAborted
            | NotConnected => {
                log::info!(target: "wire", "Could not establish connection to {addr}: {err}")
            }
            _ => log::warn!(target: "wire", "Failed to establish connection to {addr}: {err}"),
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::service::{Message, ZeroBytes};
    use crate::wire;
    use crate::wire::varint;

    #[test]
    fn test_pong_message_with_extension() {
        use radicle_protocol::deserializer;

        let mut stream = Vec::new();
        let pong = Message::Pong {
            zeroes: ZeroBytes::new(42),
        };
        frame::PROTOCOL_VERSION_STRING.encode(&mut stream);
        frame::StreamId::gossip(Link::Outbound).encode(&mut stream);

        // Serialize gossip message with some extension fields.
        let mut gossip = pong.encode_to_vec();
        String::from("extra").encode(&mut gossip);
        48u8.encode(&mut gossip);

        // Encode gossip message using the varint-prefix format into the stream.
        varint::payload::encode(&gossip, &mut stream);

        let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
        de.input(&stream).unwrap();

        // The "pong" message decodes successfully, even though there is trailing data.
        assert_eq!(
            de.deserialize_next().unwrap().unwrap(),
            Frame::gossip(Link::Outbound, pong)
        );
        assert!(de.deserialize_next().unwrap().is_none());
        assert!(de.is_empty());
    }

    #[test]
    fn test_inventory_ann_with_extension() {
        use radicle_protocol::deserializer;

        #[derive(Debug)]
        struct MessageWithExt {
            msg: Message,
            ext: String,
        }

        impl wire::Encode for MessageWithExt {
            fn encode(&self, writer: &mut impl bytes::BufMut) {
                self.msg.encode(writer);
                self.ext.encode(writer);
            }
        }

        impl wire::Decode for MessageWithExt {
            fn decode(reader: &mut impl bytes::Buf) -> Result<Self, wire::Error> {
                let msg = Message::decode(reader)?;
                let ext = String::decode(reader).unwrap_or_default();

                Ok(MessageWithExt { msg, ext })
            }
        }

        let rid = radicle::test::arbitrary::r#gen(1);
        let pk = radicle::test::arbitrary::r#gen(1);
        let sig: [u8; 64] = radicle::test::arbitrary::r#gen(1);

        // Message with extension.
        let mut stream = Vec::new();
        let ann = Message::announcement(
            pk,
            service::gossip::inventory(radicle::node::Timestamp::MAX, [rid]),
            radicle::crypto::Signature::from(sig),
        );
        let pong = Message::Pong {
            zeroes: ZeroBytes::new(42),
        };
        // Framed message with extension.
        frame::Frame::gossip(
            Link::Outbound,
            MessageWithExt {
                msg: ann.clone(),
                ext: String::from("extra"),
            },
        )
        .encode(&mut stream);
        // Pong message that comes after, without extension.
        frame::Frame::gossip(Link::Outbound, pong.clone()).encode(&mut stream);

        // First test deserializing using the message with extension type.
        {
            let mut de = deserializer::Deserializer::<1024, Frame<MessageWithExt>>::new(1024);
            de.input(&stream).unwrap();

            radicle::assert_matches!(
                de.deserialize_next().unwrap().unwrap().data,
                FrameData::Gossip(MessageWithExt {
                    msg,
                    ext,
                }) if msg == ann && ext == *"extra"
            );
            radicle::assert_matches!(
                de.deserialize_next().unwrap().unwrap().data,
                FrameData::Gossip(MessageWithExt {
                    msg,
                    ext,
                }) if msg == pong && ext.is_empty()
            );
            assert!(de.deserialize_next().unwrap().is_none());
            assert!(de.is_empty());
        }

        // Then test deserializing using the current message type without the extension.
        {
            let mut de = deserializer::Deserializer::<1024, Frame<Message>>::new(1024);
            de.input(&stream).unwrap();

            radicle::assert_matches!(
                de.deserialize_next().unwrap().unwrap().data,
                FrameData::Gossip(msg)
                if msg == ann
            );
            radicle::assert_matches!(
                de.deserialize_next().unwrap().unwrap().data,
                FrameData::Gossip(msg)
                if msg == pong
            );
            assert!(de.deserialize_next().unwrap().is_none());
            assert!(de.is_empty());
        }
    }
}