Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
heartwood crates radicle-protocol src service message.rs
use std::str::FromStr;
use std::{fmt, mem};

use bytes::{Buf, BufMut};
use nonempty::NonEmpty;

use radicle::crypto;
use radicle::git;
use radicle::identity::RepoId;
use radicle::node;
use radicle::node::device::Device;
use radicle::node::{Address, Alias, UserAgent};
use radicle::storage;
use radicle::storage::refs::RefsAt;

use crate::bounded::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{Link, NodeId, Timestamp};
use crate::wire;
use crate::wire::Encode as _;

/// Maximum number of addresses which can be announced to other nodes.
pub const ADDRESS_LIMIT: usize = 16;
/// Maximum number of repository remotes that can be included in a [`RefsAnnouncement`] message.
pub const REF_REMOTE_LIMIT: usize = 1024;
/// Maximum number of inventory which can be announced to other nodes.
pub const INVENTORY_LIMIT: usize = 2973;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Subscribe {
    /// Subscribe to events matching this filter.
    pub filter: Filter,
    /// Request messages since this time.
    pub since: Timestamp,
    /// Request messages until this time.
    pub until: Timestamp,
}

impl Subscribe {
    pub fn all() -> Self {
        Self {
            filter: Filter::default(),
            since: Timestamp::MIN,
            until: Timestamp::MAX,
        }
    }
}

/// Node announcing itself to the network.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeAnnouncement {
    /// Supported protocol version.
    pub version: u8,
    /// Advertised features.
    pub features: node::Features,
    /// Monotonic timestamp.
    pub timestamp: Timestamp,
    /// Non-unique alias.
    pub alias: Alias,
    /// Announced addresses.
    pub addresses: BoundedVec<Address, ADDRESS_LIMIT>,
    /// Nonce used for announcement proof-of-work.
    pub nonce: u64,
    /// User-agent string.
    pub agent: UserAgent,
}

impl NodeAnnouncement {
    /// Calculate the amount of work that went into creating this announcement.
    ///
    /// Proof-of-work uses the [`scrypt`] algorithm with the parameters in
    /// [`Announcement::POW_PARAMS`]. The "work" is calculated by counting the number of leading
    /// zero bits after running `scrypt` on a serialized [`NodeAnnouncement`] using
    /// [`Encode::encode_to_vec`].
    ///
    /// In other words, `work = leading-zeros(scrypt(serialize(announcement)))`.
    ///
    /// Higher numbers mean higher difficulty. For each increase in work, difficulty is doubled.
    /// For instance, an output of `7` is *four* times more work than an output of `5`.
    ///
    /// [`Encode::encode_to_vec`]: crate::wire::Encode::encode_to_vec
    pub fn work(&self) -> u32 {
        let (n, r, p) = Announcement::POW_PARAMS;
        let params = scrypt::Params::new(n, r, p, 32).expect("proof-of-work parameters are valid");
        let mut output = [0u8; 32];

        scrypt::scrypt(
            &self.encode_to_vec(),
            Announcement::POW_SALT,
            &params,
            &mut output,
        )
        .expect("proof-of-work output vector is a valid length");

        // Calculate the number of leading zero bits in the output vector.
        if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|&(_, &x)| x != 0) {
            zero_bytes as u32 * 8 + non_zero.leading_zeros()
        } else {
            output.len() as u32 * 8
        }
    }

    /// Solve the proof-of-work of a node announcement for the given target, by iterating through
    /// different nonces.
    ///
    /// If the given difficulty target is too high, there may not be a result. In that case, `None`
    /// is returned.
    pub fn solve(mut self, target: u32) -> Option<Self> {
        loop {
            if let Some(nonce) = self.nonce.checked_add(1) {
                self.nonce = nonce;

                if self.work() >= target {
                    break;
                }
            } else {
                return None;
            }
        }
        Some(self)
    }
}

impl wire::Encode for NodeAnnouncement {
    fn encode(&self, buf: &mut impl BufMut) {
        self.version.encode(buf);
        self.features.encode(buf);
        self.timestamp.encode(buf);
        self.alias.encode(buf);
        self.addresses.encode(buf);
        self.nonce.encode(buf);
        self.agent.encode(buf);
    }
}

impl wire::Decode for NodeAnnouncement {
    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
        let version = u8::decode(buf)?;
        let features = node::Features::decode(buf)?;
        let timestamp = Timestamp::decode(buf)?;
        let alias = wire::Decode::decode(buf)?;
        let addresses = BoundedVec::<Address, ADDRESS_LIMIT>::decode(buf)?;
        let nonce = u64::decode(buf)?;
        let agent = match UserAgent::decode(buf) {
            Ok(ua) => ua,
            Err(wire::Error::UnexpectedEnd { .. }) => {
                UserAgent::from_str("/radicle/message/truncated/").expect("valid user agent")
            }
            Err(e) => return Err(e),
        };

        Ok(Self {
            version,
            features,
            timestamp,
            alias,
            addresses,
            nonce,
            agent,
        })
    }
}

/// Node announcing project refs being created or updated.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RefsAnnouncement {
    /// Repository identifier.
    pub rid: RepoId,
    /// Updated `rad/sigrefs`.
    pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
    /// Time of announcement.
    pub timestamp: Timestamp,
}

/// Track the status of `RefsAt` within a given repository.
#[derive(Default)]
pub struct RefsStatus {
    /// The `rad/sigrefs` was missing or it's ahead of the local
    /// `rad/sigrefs`. We want it.
    pub want: Vec<RefsAt>,
    /// The `rad/sigrefs` has been seen before. We already have it.
    pub have: Vec<RefsAt>,
}

impl RefsStatus {
    /// Get the set of `want` and `have` `RefsAt`'s for the given
    /// announcement.
    ///
    /// Nb. We use the refs database as a cache for quick lookups. This does *not* check
    /// for ancestry matches, since we don't cache the whole history (only the tips).
    /// This, however, is not a problem because the signed refs branch is fast-forward only,
    /// and old refs announcements will be discarded due to their lower timestamps.
    pub fn new<D: node::refs::Store>(
        rid: RepoId,
        refs: NonEmpty<RefsAt>,
        db: &D,
    ) -> Result<RefsStatus, storage::Error> {
        let mut status = RefsStatus::default();
        for theirs in refs.iter() {
            status.insert(&rid, *theirs, db)?;
        }
        Ok(status)
    }

    fn insert<D: node::refs::Store>(
        &mut self,
        repo: &RepoId,
        theirs: RefsAt,
        db: &D,
    ) -> Result<(), storage::Error> {
        match db.get(repo, &theirs.remote, &storage::refs::SIGREFS_BRANCH) {
            Ok(Some((ours, _))) => {
                if theirs.at != ours {
                    self.want.push(theirs);
                } else {
                    self.have.push(theirs);
                }
            }
            Ok(None) => {
                self.want.push(theirs);
            }
            Err(e) => {
                log::debug!(
                    target: "service",
                    "Failed to get cached 'rad/sigrefs' of {} in {repo} for refs status: {e}", theirs.remote,
                );
            }
        }
        Ok(())
    }
}

/// Node announcing its inventory to the network.
/// This should be the whole inventory every time.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InventoryAnnouncement {
    /// Node inventory.
    pub inventory: BoundedVec<RepoId, INVENTORY_LIMIT>,
    /// Time of announcement.
    pub timestamp: Timestamp,
}

/// Node announcing information to a connected peer.
///
/// This should not be relayed and should be used to send an
/// informational message a peer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Info {
    /// Tell a node that sent a refs announcement that it was already synced at the given `Oid`,
    /// for this particular `rid`.
    RefsAlreadySynced { rid: RepoId, at: git::Oid },
}

/// Announcement messages are messages that are relayed between peers.
#[derive(Clone, PartialEq, Eq)]
pub enum AnnouncementMessage {
    /// Inventory announcement.
    Inventory(InventoryAnnouncement),
    /// Node announcement.
    Node(NodeAnnouncement),
    /// Refs announcement.
    Refs(RefsAnnouncement),
}

impl AnnouncementMessage {
    /// Sign this announcement message.
    pub fn signed<G>(self, signer: &Device<G>) -> Announcement
    where
        G: crypto::signature::Signer<crypto::Signature>,
    {
        use crypto::signature::Signer as _;

        let msg = self.encode_to_vec();

        let signature = signer.sign(&msg);

        Announcement {
            node: *signer.public_key(),
            message: self,
            signature,
        }
    }

    pub fn timestamp(&self) -> Timestamp {
        match self {
            Self::Inventory(InventoryAnnouncement { timestamp, .. }) => *timestamp,
            Self::Refs(RefsAnnouncement { timestamp, .. }) => *timestamp,
            Self::Node(NodeAnnouncement { timestamp, .. }) => *timestamp,
        }
    }

    pub fn is_node_announcement(&self) -> bool {
        matches!(self, Self::Node(_))
    }
}

impl From<NodeAnnouncement> for AnnouncementMessage {
    fn from(ann: NodeAnnouncement) -> Self {
        Self::Node(ann)
    }
}

impl From<InventoryAnnouncement> for AnnouncementMessage {
    fn from(ann: InventoryAnnouncement) -> Self {
        Self::Inventory(ann)
    }
}

impl From<RefsAnnouncement> for AnnouncementMessage {
    fn from(ann: RefsAnnouncement) -> Self {
        Self::Refs(ann)
    }
}

impl fmt::Debug for AnnouncementMessage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Node(message) => write!(f, "Node({})", message.timestamp),
            Self::Inventory(message) => {
                write!(
                    f,
                    "Inventory([{}], {})",
                    message
                        .inventory
                        .iter()
                        .map(|i| i.to_string())
                        .collect::<Vec<String>>()
                        .join(", "),
                    message.timestamp
                )
            }
            Self::Refs(message) => {
                write!(
                    f,
                    "Refs({}, {}, {:?})",
                    message.rid, message.timestamp, message.refs
                )
            }
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Announcement {
    /// Node identifier.
    pub node: NodeId,
    /// Signature over the announcement.
    pub signature: crypto::Signature,
    /// Unsigned node announcement.
    pub message: AnnouncementMessage,
}

impl Announcement {
    /// Proof-of-work parameters for announcements.
    ///
    /// These parameters are fed into `scrypt`.
    /// They represent the `log2(N)`, `r`, `p` parameters, respectively.
    ///
    /// * log2(N) – iterations count (affects memory and CPU usage), e.g. 15
    /// * r – block size (affects memory and CPU usage), e.g. 8
    /// * p – parallelism factor (threads to run in parallel - affects the memory, CPU usage), usually 1
    ///
    /// `15, 8, 1` are usually the recommended parameters.
    ///
    #[cfg(debug_assertions)]
    pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
    #[cfg(not(debug_assertions))]
    pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
    /// Salt used for generating PoW.
    pub const POW_SALT: &'static [u8] = b"rad";

    /// Verify this announcement's signature.
    pub fn verify(&self) -> bool {
        let msg = self.message.encode_to_vec();
        self.node.verify(msg, &self.signature).is_ok()
    }

    pub fn matches(&self, filter: &Filter) -> bool {
        match &self.message {
            AnnouncementMessage::Inventory(_) => true,
            AnnouncementMessage::Node(_) => true,
            AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
        }
    }

    /// Check whether this announcement is of the same variant as another.
    pub fn variant_eq(&self, other: &Self) -> bool {
        std::mem::discriminant(&self.message) == std::mem::discriminant(&other.message)
    }

    /// Get the announcement timestamp.
    pub fn timestamp(&self) -> Timestamp {
        self.message.timestamp()
    }
}

/// Message payload.
/// These are the messages peers send to each other.
#[derive(Clone, PartialEq, Eq)]
pub enum Message {
    /// Subscribe to gossip messages matching the filter and time range.
    Subscribe(Subscribe),

    /// Gossip announcement. These messages are relayed to peers, and filtered
    /// using [`Message::Subscribe`].
    Announcement(Announcement),

    /// Informational message. These messages are sent between peers for information
    /// and do not need to be acted upon. They can be safely ignored, though handling
    /// them can be useful for the user.
    Info(Info),

    /// Ask a connected peer for a Pong.
    ///
    /// Used to check if the remote peer is responsive, or a side-effect free way to keep a
    /// connection alive.
    Ping(Ping),

    /// Response to `Ping` message.
    Pong {
        /// The pong payload.
        zeroes: ZeroBytes,
    },
}

impl PartialOrd for Message {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for Message {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        let this = self.encode_to_vec();
        let other = other.encode_to_vec();

        this.cmp(&other)
    }
}

impl Message {
    pub fn announcement(
        node: NodeId,
        message: impl Into<AnnouncementMessage>,
        signature: crypto::Signature,
    ) -> Self {
        Announcement {
            node,
            signature,
            message: message.into(),
        }
        .into()
    }

    pub fn node<G: crypto::signature::Signer<crypto::Signature>>(
        message: NodeAnnouncement,
        signer: &Device<G>,
    ) -> Self {
        AnnouncementMessage::from(message).signed(signer).into()
    }

    pub fn inventory<G: crypto::signature::Signer<crypto::Signature>>(
        message: InventoryAnnouncement,
        signer: &Device<G>,
    ) -> Self {
        AnnouncementMessage::from(message).signed(signer).into()
    }

    pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
        Self::Subscribe(Subscribe {
            filter,
            since,
            until,
        })
    }

    pub fn log(&self, level: log::Level, remote: &NodeId, link: Link) {
        if !log::log_enabled!(level) {
            return;
        }
        let (verb, prep) = if link.is_inbound() {
            ("Received", "from")
        } else {
            ("Sending", "to")
        };
        let msg = match self {
            Self::Announcement(Announcement { node, message, .. }) => match message {
                AnnouncementMessage::Node(NodeAnnouncement {
                    addresses,
                    timestamp,
                    ..
                }) => format!(
                    "{verb} node announcement of {node} with {} address(es) {prep} {remote} (t={timestamp})",
                    addresses.len()
                ),
                AnnouncementMessage::Refs(RefsAnnouncement {
                    rid,
                    refs,
                    timestamp,
                }) => format!(
                    "{verb} refs announcement of {node} for {rid} with {} remote(s) {prep} {remote} (t={timestamp})",
                    refs.len()
                ),
                AnnouncementMessage::Inventory(InventoryAnnouncement {
                    inventory,
                    timestamp,
                }) => {
                    format!(
                        "{verb} inventory announcement of {node} with {} item(s) {prep} {remote} (t={timestamp})",
                        inventory.len()
                    )
                }
            },
            Self::Info(Info::RefsAlreadySynced { rid, .. }) => {
                format!("{verb} `refs-already-synced` info {prep} {remote} for {rid}")
            }
            Self::Ping { .. } => format!("{verb} ping {prep} {remote}"),
            Self::Pong { .. } => format!("{verb} pong {prep} {remote}"),
            Self::Subscribe(Subscribe { .. }) => {
                format!("{verb} subscription filter {prep} {remote}")
            }
        };
        log::log!(target: "service", level, "{msg}");
    }
}

/// A ping message.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Ping {
    /// The requested length of the pong message.
    pub ponglen: wire::Size,
    /// Zero bytes (ignored).
    pub zeroes: ZeroBytes,
}

impl Ping {
    /// Maximum number of zero bytes in a ping message.
    pub const MAX_PING_ZEROES: wire::Size = Message::MAX_SIZE // Message size without the type.
        - mem::size_of::<wire::Size>() as wire::Size // Account for pong length.
        - mem::size_of::<wire::Size>() as wire::Size; // Account for zeroes length prefix.

    /// Maximum number of zero bytes in a pong message.
    pub const MAX_PONG_ZEROES: wire::Size =
        Message::MAX_SIZE - mem::size_of::<wire::Size>() as wire::Size; // Account for zeroes length
    // prefix.

    pub fn new(rng: &mut fastrand::Rng) -> Self {
        let ponglen = rng.u16(0..Self::MAX_PONG_ZEROES);

        Ping {
            ponglen,
            zeroes: ZeroBytes::new(rng.u16(0..Self::MAX_PING_ZEROES)),
        }
    }
}

impl From<Announcement> for Message {
    fn from(ann: Announcement) -> Self {
        Self::Announcement(ann)
    }
}

impl From<Info> for Message {
    fn from(info: Info) -> Self {
        Self::Info(info)
    }
}

impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Subscribe(Subscribe { since, until, .. }) => {
                write!(f, "Subscribe({since}..{until})")
            }
            Self::Announcement(Announcement { node, message, .. }) => {
                write!(f, "Announcement({node}, {message:?})")
            }
            Self::Info(info) => {
                write!(f, "Info({info:?})")
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {zeroes:?})"),
            Self::Pong { zeroes } => write!(f, "Pong({zeroes:?})"),
        }
    }
}

/// Represents a vector of zeroes of a certain length.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ZeroBytes(wire::Size);

impl ZeroBytes {
    pub fn new(size: wire::Size) -> Self {
        ZeroBytes(size)
    }

    pub fn is_empty(&self) -> bool {
        self.0 == 0
    }

    pub fn len(&self) -> usize {
        self.0.into()
    }
}

#[cfg(any(test, feature = "test"))]
#[allow(clippy::unwrap_used)]
impl qcheck::Arbitrary for Message {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        use qcheck::Arbitrary;

        match g.choose(&[1, 2, 3, 4, 5, 6, 7]).unwrap() {
            1 => Announcement {
                node: NodeId::arbitrary(g),
                message: InventoryAnnouncement {
                    inventory: BoundedVec::arbitrary(g),
                    timestamp: Timestamp::arbitrary(g),
                }
                .into(),
                signature: crypto::Signature::from(<[u8; 64]>::arbitrary(g)),
            }
            .into(),
            2 => Announcement {
                node: NodeId::arbitrary(g),
                message: RefsAnnouncement {
                    rid: RepoId::arbitrary(g),
                    refs: BoundedVec::arbitrary(g),
                    timestamp: Timestamp::arbitrary(g),
                }
                .into(),
                signature: crypto::Signature::from(<[u8; 64]>::arbitrary(g)),
            }
            .into(),
            3 => {
                let message = NodeAnnouncement {
                    version: u8::arbitrary(g),
                    features: u64::arbitrary(g).into(),
                    timestamp: Timestamp::arbitrary(g),
                    alias: Alias::arbitrary(g),
                    addresses: Arbitrary::arbitrary(g),
                    nonce: u64::arbitrary(g),
                    agent: UserAgent::arbitrary(g),
                }
                .into();
                let bytes: [u8; 64] = Arbitrary::arbitrary(g);
                let signature = crypto::Signature::from(bytes);

                Announcement {
                    node: NodeId::arbitrary(g),
                    signature,
                    message,
                }
                .into()
            }
            4 => {
                let message = Info::RefsAlreadySynced {
                    rid: RepoId::arbitrary(g),
                    at: radicle::test::arbitrary::oid(),
                };
                Self::Info(message)
            }
            5 => Self::Subscribe(Subscribe {
                filter: Filter::arbitrary(g),
                since: Timestamp::arbitrary(g),
                until: Timestamp::arbitrary(g),
            }),
            6 => {
                let mut rng = fastrand::Rng::with_seed(u64::arbitrary(g));

                Self::Ping(Ping::new(&mut rng))
            }
            7 => Self::Pong {
                zeroes: ZeroBytes::new(u16::arbitrary(g).min(Ping::MAX_PONG_ZEROES)),
            },
            _ => panic!("Invalid choice for Message::arbitrary"),
        }
    }
}

#[cfg(any(test, feature = "test"))]
impl qcheck::Arbitrary for ZeroBytes {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        ZeroBytes::new(u16::arbitrary(g))
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {

    use fastrand;
    use localtime::LocalTime;
    use qcheck_macros::quickcheck;
    use radicle::test::arbitrary;

    use crate::wire::Decode as _;

    use super::*;

    #[test]
    fn test_ref_remote_limit() {
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
        let signer = Device::mock();
        let at = git::Oid::ZERO_SHA1;

        assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);

        for _ in 0..refs.capacity() {
            refs.push(RefsAt {
                remote: *signer.public_key(),
                at,
            })
            .unwrap();
        }

        let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
            rid: arbitrary::r#gen(1),
            refs,
            timestamp: LocalTime::now().into(),
        })
        .signed(&Device::mock())
        .into();

        let mut buf = Vec::new();
        msg.encode(&mut buf);

        let decoded = Message::decode_exact(buf.as_slice());
        assert!(decoded.is_ok());
        assert_eq!(msg, decoded.unwrap());
    }

    #[test]
    fn test_inventory_limit() {
        let msg = Message::inventory(
            InventoryAnnouncement {
                inventory: arbitrary::vec(INVENTORY_LIMIT)
                    .try_into()
                    .expect("size within bounds limit"),
                timestamp: LocalTime::now().into(),
            },
            &Device::mock(),
        );
        let mut buf: Vec<u8> = Vec::new();
        msg.encode(&mut buf);

        let decoded = Message::decode_exact(buf.as_slice());
        assert!(
            decoded.is_ok(),
            "INVENTORY_LIMIT is a valid limit for decoding"
        );
        assert_eq!(
            msg,
            decoded.unwrap(),
            "encoding and decoding should be safe for message at INVENTORY_LIMIT",
        );
    }

    #[quickcheck]
    fn prop_refs_announcement_signing(rid: RepoId) {
        let signer = Device::mock_rng(&mut fastrand::Rng::new());
        let timestamp = Timestamp::EPOCH;
        let at = git::Oid::ZERO_SHA1;
        let refs = BoundedVec::collect_from(
            &mut [RefsAt {
                remote: *signer.public_key(),
                at,
            }]
            .into_iter(),
        );
        let message = AnnouncementMessage::Refs(RefsAnnouncement {
            rid,
            refs,
            timestamp,
        });
        let ann = message.signed(&signer);

        assert!(ann.verify());
    }

    #[test]
    fn test_node_announcement_validate() {
        let ann = NodeAnnouncement {
            version: 1,
            features: node::Features::SEED,
            timestamp: Timestamp::try_from(42491841u64).unwrap(),
            alias: Alias::new("alice"),
            addresses: BoundedVec::new(),
            nonce: 0,
            agent: UserAgent::test(),
        };

        assert_eq!(ann.work(), 2);
        assert_eq!(ann.clone().solve(1).unwrap().work(), 1);
        assert_eq!(ann.clone().solve(8).unwrap().work(), 8);
        assert_eq!(ann.solve(14).unwrap().work(), 14);
    }
}