Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-protocol src service io.rs
use std::collections::VecDeque;

use localtime::LocalDuration;
use log::*;
use radicle::identity::RepoId;
use radicle::node::Address;
use radicle::node::NodeId;
use radicle::node::config::FetchPackSizeLimit;
use radicle::storage::refs::RefsAt;

use crate::fetcher;
use crate::service::DisconnectReason;
use crate::service::Link;
use crate::service::message::Message;
use crate::service::session::Session;

use super::gossip;
use super::message::{Announcement, AnnouncementMessage};

/// I/O operation to execute at the network/wire level.
#[derive(Debug)]
pub enum Io {
    /// There are some messages ready to be sent to a peer.
    Write(NodeId, Vec<Message>),
    /// Connect to a peer.
    Connect(NodeId, Address),
    /// Disconnect from a peer.
    Disconnect(NodeId, DisconnectReason),
    /// Fetch repository data from a peer.
    Fetch {
        /// Repo being fetched.
        rid: RepoId,
        /// Remote node being fetched from.
        remote: NodeId,
        /// If the node is fetching specific `rad/sigrefs`.
        refs_at: Option<Vec<RefsAt>>,
        /// Limit the number of bytes fetched.
        reader_limit: FetchPackSizeLimit,
        /// Options for configuring the fetch worker, such as timeout, and
        /// internal fetch protocol options.
        config: fetcher::FetchConfig,
    },
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
}

/// Interface to the network.
#[derive(Debug, Default)]
pub struct Outbox {
    /// Outgoing I/O queue.
    io: VecDeque<Io>,
}

impl Outbox {
    /// Connect to a peer.
    pub fn connect(&mut self, id: NodeId, addr: Address) {
        self.io.push_back(Io::Connect(id, addr));
    }

    /// Disconnect a peer.
    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
        self.io.push_back(Io::Disconnect(id, reason));
    }

    pub fn write(&mut self, remote: &Session, msg: Message) {
        let level = match &msg {
            Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
            _ => log::Level::Debug,
        };
        msg.log(level, &remote.id, Link::Outbound);
        trace!(target: "service", "Write {:?} to {}", &msg, remote);

        self.io.push_back(Io::Write(remote.id, vec![msg]));
    }

    /// Announce something to a peer. This is meant for our own announcement messages.
    pub fn announce<'a>(
        &mut self,
        ann: Announcement,
        peers: impl Iterator<Item = &'a Session>,
        gossip: &mut impl gossip::Store,
    ) {
        // Store our announcement so that it can be retrieved from us later, just like
        // announcements we receive from peers.
        if let Err(e) = gossip.announced(&ann.node, &ann) {
            warn!(target: "service", "Failed to update gossip store with announced message: {e}");
        }

        for peer in peers {
            if let AnnouncementMessage::Refs(refs) = &ann.message {
                if let Some(subscribe) = &peer.subscribe {
                    if subscribe.filter.contains(&refs.rid) {
                        self.write(peer, ann.clone().into());
                    } else {
                        debug!(
                            target: "service",
                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
                            refs.rid
                        );
                    }
                } else {
                    debug!(
                        target: "service",
                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
                    );
                }
            } else {
                self.write(peer, ann.clone().into());
            }
        }
    }

    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();

        for (ix, msg) in msgs.iter().enumerate() {
            trace!(
                target: "service",
                "Write {:?} to {} ({}/{})",
                msg,
                remote,
                ix + 1,
                msgs.len()
            );
            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
        }
        self.io.push_back(Io::Write(remote.id, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
        self.io.push_back(Io::Wakeup(after));
    }

    pub fn fetch(
        &mut self,
        peer: &mut Session,
        rid: RepoId,
        refs_at: Vec<RefsAt>,
        reader_limit: FetchPackSizeLimit,
        config: fetcher::FetchConfig,
    ) {
        let refs_at = (!refs_at.is_empty()).then_some(refs_at);

        if let Some(refs_at) = &refs_at {
            debug!(
                target: "service",
                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
            );
        } else {
            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
        }

        self.io.push_back(Io::Fetch {
            rid,
            refs_at,
            remote: peer.id,
            reader_limit,
            config,
        });
    }

    /// Broadcast a message to a list of peers.
    pub fn broadcast<'a>(
        &mut self,
        msg: impl Into<Message>,
        peers: impl IntoIterator<Item = &'a Session>,
    ) {
        let msg = msg.into();
        for peer in peers {
            self.write(peer, msg.clone());
        }
    }

    /// Relay a message to interested peers.
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
        if let AnnouncementMessage::Refs(msg) = &ann.message {
            let id = msg.rid;
            let peers = peers.into_iter().filter(|p| {
                if let Some(subscribe) = &p.subscribe {
                    subscribe.filter.contains(&id)
                } else {
                    // If the peer did not send us a `subscribe` message, we don't
                    // relay any messages to them.
                    false
                }
            });
            self.broadcast(ann, peers);
        } else {
            self.broadcast(ann, peers);
        }
    }

    /// Number of items in outbox.
    #[allow(clippy::len_without_is_empty)]
    pub fn len(&self) -> usize {
        self.io.len()
    }

    #[cfg(any(test, feature = "test"))]
    pub fn queue(&mut self) -> &mut VecDeque<Io> {
        &mut self.io
    }
}

impl Iterator for Outbox {
    type Item = Io;

    fn next(&mut self) -> Option<Self::Item> {
        self.io.pop_front()
    }
}