Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Rebroadcast logic for stored gossip messages
Alexis Sellier committed 3 years ago
commit e35ec2f715c8fa5bacff8249373682fc0e3a47ed
parent f9ad3ac155e4428986b1cd4689807fd8c3288ae3
9 files changed +389 -253
modified radicle-node/src/service.rs
@@ -28,7 +28,7 @@ use crate::git;
use crate::git::Url;
use crate::identity::{Doc, Id};
use crate::service::config::ProjectTracking;
-
use crate::service::message::Address;
+
use crate::service::message::{Address, Announcement, AnnouncementMessage};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::peer::{Session, SessionError, SessionState};
use crate::storage;
@@ -37,6 +37,7 @@ use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, Writ
pub use crate::service::config::{Config, Network};
pub use crate::service::message::{Envelope, Message};

+
use self::gossip::Gossip;
use self::message::{InventoryAnnouncement, NodeFeatures};
use self::reactor::Reactor;

@@ -132,6 +133,8 @@ pub struct Service<A, S, G> {
    storage: S,
    /// Tracks the location of projects.
    routing: Routing,
+
    /// State relating to gossip.
+
    gossip: Gossip,
    /// Peer sessions, currently or recently connected.
    sessions: Sessions,
    /// Keeps track of peer states.
@@ -185,6 +188,7 @@ where
            rng,
            clock,
            routing,
+
            gossip: Gossip::default(),
            peers: BTreeMap::new(),
            reactor: Reactor::new(network),
            sessions,
@@ -449,17 +453,10 @@ where
                let remote = repo.remote(&node).unwrap();
                let peers = self.sessions.negotiated().map(|(_, p)| p);
                let refs = remote.refs.into();
-
                let message = RefsAnnouncement { id, refs };
-
                let signature = message.sign(&self.signer);
-

-
                self.reactor.broadcast(
-
                    Message::RefsAnnouncement {
-
                        node,
-
                        message,
-
                        signature,
-
                    },
-
                    peers,
-
                );
+
                let msg = AnnouncementMessage::from(RefsAnnouncement { id, refs });
+
                let ann = msg.signed(&self.signer);
+

+
                self.reactor.broadcast(ann, peers);
            }
        }
    }
@@ -555,14 +552,14 @@ where
    pub fn received_message(&mut self, addr: &net::SocketAddr, envelope: Envelope) {
        match self.handle_message(addr, envelope) {
            Ok(relay) => {
-
                if let Some(msg) = relay {
+
                if let Some(ann) = relay {
                    let negotiated = self
                        .sessions
                        .negotiated()
                        .filter(|(ip, _)| **ip != addr.ip())
                        .map(|(_, p)| p);

-
                    self.reactor.relay(msg, negotiated.clone());
+
                    self.reactor.relay(ann, negotiated.clone());
                }
            }
            Err(SessionError::NotFound(ip)) => {
@@ -579,11 +576,77 @@ where
        }
    }

+
    /// Handle an announcement message.
+
    ///
+
    /// Returns `true` if this announcement should be stored and relayed to connected peers,
+
    /// and `false` if it should not.
+
    pub fn handle_announcement(
+
        &mut self,
+
        git: &git::Url,
+
        announcement: &Announcement,
+
    ) -> Result<bool, peer::SessionError> {
+
        if !announcement.verify() {
+
            return Err(SessionError::Misbehavior);
+
        }
+
        let Announcement { node, message, .. } = announcement;
+

+
        match message {
+
            AnnouncementMessage::Inventory(message) => {
+
                let now = self.clock.local_time();
+
                let peer = self.peers.entry(*node).or_insert_with(Peer::default);
+
                let relay = self.config.relay;
+

+
                // Don't allow messages from too far in the future.
+
                if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
+
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
+
                }
+
                // Discard inventory messages we've already seen, otherwise update
+
                // out last seen time.
+
                if message.timestamp > peer.last_message {
+
                    peer.last_message = message.timestamp;
+
                } else {
+
                    return Ok(false);
+
                }
+
                self.process_inventory(&message.inventory, *node, git);
+

+
                if relay {
+
                    return Ok(true);
+
                }
+
            }
+
            // Process a peer inventory update announcement by (maybe) fetching.
+
            AnnouncementMessage::Refs(message) => {
+
                // FIXME: Check message timestamp.
+
                // TODO: Buffer/throttle fetches.
+
                // TODO: Check that we're tracking this user as well.
+
                if self.config.is_tracking(&message.id) {
+
                    // TODO: Check refs to see if we should try to fetch or not.
+
                    let updated = self.storage.fetch(message.id, git).unwrap();
+
                    let is_updated = !updated.is_empty();
+

+
                    self.reactor.event(Event::RefsFetched {
+
                        from: git.clone(),
+
                        project: message.id,
+
                        updated,
+
                    });
+

+
                    if is_updated {
+
                        return Ok(true);
+
                    }
+
                }
+
            }
+
            AnnouncementMessage::Node(_message) => {
+
                // FIXME: Check message timestamp.
+
                log::warn!("Node announcement handling is not implemented");
+
            }
+
        }
+
        Ok(false)
+
    }
+

    pub fn handle_message(
        &mut self,
        remote: &net::SocketAddr,
        envelope: Envelope,
-
    ) -> Result<Option<Message>, peer::SessionError> {
+
    ) -> Result<Option<Announcement>, peer::SessionError> {
        let peer_ip = remote.ip();
        let peer = if let Some(peer) = self.sessions.get_mut(&peer_ip) {
            peer
@@ -639,93 +702,23 @@ where
                );
                return Err(SessionError::Misbehavior);
            }
-
            (
-
                SessionState::Negotiated { git, .. },
-
                Message::InventoryAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                },
-
            ) => {
-
                let now = self.clock.local_time();
-
                let peer = self.peers.entry(node).or_insert_with(Peer::default);
-
                let relay = self.config.relay;
+
            // Process a peer announcement.
+
            (SessionState::Negotiated { git, .. }, Message::Announcement(ann)) => {
                let git = git.clone();

-
                // Don't allow messages from too far in the future.
-
                if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
-
                }
-
                // Discard inventory messages we've already seen, otherwise update
-
                // out last seen time.
-
                if message.timestamp > peer.last_message {
-
                    peer.last_message = message.timestamp;
-
                } else {
-
                    return Ok(None);
-
                }
-
                self.process_inventory(&message.inventory, node, &git);
-

-
                if relay {
-
                    return Ok(Some(Message::InventoryAnnouncement {
-
                        node,
-
                        message,
-
                        signature,
-
                    }));
-
                }
-
            }
-
            // Process a peer inventory update announcement by (maybe) fetching.
-
            (
-
                SessionState::Negotiated { git, .. },
-
                Message::RefsAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                },
-
            ) => {
-
                // FIXME: Check message timestamp.
-

-
                if message.verify(&node, &signature) {
-
                    // TODO: Buffer/throttle fetches.
-
                    // TODO: Check that we're tracking this user as well.
-
                    if self.config.is_tracking(&message.id) {
-
                        // TODO: Check refs to see if we should try to fetch or not.
-
                        let updated = self.storage.fetch(message.id, git).unwrap();
-
                        let is_updated = !updated.is_empty();
-

-
                        self.reactor.event(Event::RefsFetched {
-
                            from: git.clone(),
-
                            project: message.id,
-
                            updated,
-
                        });
-

-
                        if is_updated {
-
                            return Ok(Some(Message::RefsAnnouncement {
-
                                node,
-
                                message,
-
                                signature,
-
                            }));
-
                        }
-
                    }
-
                } else {
-
                    return Err(SessionError::Misbehavior);
+
                // Returning true here means that the message should be relayed.
+
                if self.handle_announcement(&git, &ann)? {
+
                    self.gossip.received(ann.clone(), self.clock.timestamp());
+
                    return Ok(Some(ann));
                }
            }
-
            (
-
                SessionState::Negotiated { .. },
-
                Message::NodeAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                },
-
            ) => {
-
                // FIXME: Check message timestamp.
-

-
                if !message.verify(&node, &signature) {
-
                    return Err(SessionError::Misbehavior);
-
                }
-
                log::warn!("Node announcement handling is not implemented");
-
            }
            (SessionState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
+
                for msg in self
+
                    .gossip
+
                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
+
                {
+
                    self.reactor.write(peer.addr, msg);
+
                }
                peer.subscribe = Some(subscribe);
            }
            (SessionState::Negotiated { .. }, Message::Initialize { .. }) => {
@@ -886,6 +879,34 @@ impl DerefMut for Sessions {

mod gossip {
    use super::*;
+
    use crate::service::filter::Filter;
+

+
    #[derive(Default, Debug)]
+
    pub struct Gossip {
+
        received: Vec<(Timestamp, Announcement)>,
+
    }
+

+
    impl Gossip {
+
        // TODO: Overwrite old messages from the same node or project.
+
        // TODO: Should "time" be this node's time, or the time inside the message?
+
        pub fn received(&mut self, ann: Announcement, time: Timestamp) {
+
            self.received.push((time, ann));
+
        }
+

+
        pub fn filtered<'a>(
+
            &'a self,
+
            filter: &'a Filter,
+
            start: Timestamp,
+
            end: Timestamp,
+
        ) -> impl Iterator<Item = Message> + '_ {
+
            self.received
+
                .iter()
+
                .filter(move |(t, _)| *t >= start && *t < end)
+
                .filter(move |(_, a)| a.matches(filter))
+
                .cloned()
+
                .map(|(_, a)| a.into())
+
        }
+
    }

    pub fn handshake<G: Signer, S: ReadStorage>(
        timestamp: Timestamp,
modified radicle-node/src/service/message.rs
@@ -124,6 +124,7 @@ pub struct Subscribe {
    pub until: Timestamp,
}

+
/// Node announcing itself to the network.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeAnnouncement {
    /// Advertized features.
@@ -136,14 +137,6 @@ pub struct NodeAnnouncement {
    pub addresses: Vec<Address>,
}

-
impl NodeAnnouncement {
-
    /// Verify a signature on this message.
-
    pub fn verify(&self, signer: &NodeId, signature: &crypto::Signature) -> bool {
-
        let msg = wire::serialize(self);
-
        signer.verify(&msg, signature).is_ok()
-
    }
-
}
-

impl wire::Encode for NodeAnnouncement {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        let mut n = 0;
@@ -173,46 +166,119 @@ impl wire::Decode for NodeAnnouncement {
    }
}

+
/// Node announcing project refs being created or updated.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RefsAnnouncement {
    /// Repository identifier.
    pub id: Id,
    /// Updated refs.
    pub refs: Refs,
+
    // TODO: Add timestamp
+
}
+

+
/// Node announcing its inventory to the network.
+
/// This should be the whole inventory every time.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct InventoryAnnouncement {
+
    pub inventory: Vec<Id>,
+
    pub timestamp: Timestamp,
+
}
+

+
/// Announcement messages are messages that are relayed between peers.
+
#[derive(Clone, PartialEq, Eq)]
+
pub enum AnnouncementMessage {
+
    /// Inventory announcement.
+
    Inventory(InventoryAnnouncement),
+
    /// Node announcement.
+
    Node(NodeAnnouncement),
+
    /// Refs announcement.
+
    Refs(RefsAnnouncement),
}

-
impl RefsAnnouncement {
-
    /// Verify a signature on this message.
-
    pub fn verify(&self, signer: &NodeId, signature: &crypto::Signature) -> bool {
-
        let msg = wire::serialize(self);
-
        signer.verify(&msg, signature).is_ok()
+
impl AnnouncementMessage {
+
    /// Sign this announcement message.
+
    pub fn signed<S: crypto::Signer>(self, signer: S) -> Announcement {
+
        let msg = wire::serialize(&self);
+
        let signature = signer.sign(&msg);
+

+
        Announcement {
+
            node: *signer.public_key(),
+
            message: self,
+
            signature,
+
        }
    }
+
}
+

+
impl From<NodeAnnouncement> for AnnouncementMessage {
+
    fn from(ann: NodeAnnouncement) -> Self {
+
        Self::Node(ann)
+
    }
+
}
+

+
impl From<InventoryAnnouncement> for AnnouncementMessage {
+
    fn from(ann: InventoryAnnouncement) -> Self {
+
        Self::Inventory(ann)
+
    }
+
}

-
    /// Sign this announcement.
-
    pub fn sign<S: crypto::Signer>(&self, signer: S) -> crypto::Signature {
-
        let msg = wire::serialize(self);
-
        signer.sign(&msg)
+
impl From<RefsAnnouncement> for AnnouncementMessage {
+
    fn from(ann: RefsAnnouncement) -> Self {
+
        Self::Refs(ann)
+
    }
+
}
+

+
impl fmt::Debug for AnnouncementMessage {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            Self::Node { .. } => write!(f, "Node(..)"),
+
            Self::Inventory(message) => {
+
                write!(
+
                    f,
+
                    "Inventory([{}], {})",
+
                    message
+
                        .inventory
+
                        .iter()
+
                        .map(|i| i.to_string())
+
                        .collect::<Vec<String>>()
+
                        .join(", "),
+
                    message.timestamp
+
                )
+
            }
+
            Self::Refs(message) => {
+
                write!(f, "Refs({}, {:?})", message.id, message.refs)
+
            }
+
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
-
pub struct InventoryAnnouncement {
-
    pub inventory: Vec<Id>,
-
    pub timestamp: Timestamp,
+
pub struct Announcement {
+
    /// Node identifier.
+
    pub node: NodeId,
+
    /// Unsigned node announcement.
+
    pub message: AnnouncementMessage,
+
    /// Signature over the announcement.
+
    pub signature: crypto::Signature,
}

-
impl InventoryAnnouncement {
-
    /// Verify a signature on this message.
-
    pub fn verify(&self, signer: NodeId, signature: &crypto::Signature) -> bool {
-
        let msg = wire::serialize(self);
-
        signer.verify(&msg, signature).is_ok()
+
impl Announcement {
+
    /// Verify this announcement's signature.
+
    pub fn verify(&self) -> bool {
+
        let msg = wire::serialize(&self.message);
+
        self.node.verify(&msg, &self.signature).is_ok()
+
    }
+

+
    pub fn matches(&self, filter: &Filter) -> bool {
+
        match &self.message {
+
            AnnouncementMessage::Inventory(_) => true,
+
            AnnouncementMessage::Node(_) => true,
+
            AnnouncementMessage::Refs(RefsAnnouncement { id, .. }) => filter.contains(id),
+
        }
    }
}

/// Message payload.
/// These are the messages peers send to each other.
-
///
-
/// "Announcement" messages are messages that are relayed between peers.
#[derive(Clone, PartialEq, Eq)]
pub enum Message {
    /// The first message sent to a peer after connection.
@@ -225,39 +291,11 @@ pub enum Message {
    },

    /// Subscribe to gossip messages matching the filter and time range.
-
    /// timestamp.
    Subscribe(Subscribe),

-
    /// Node announcing its inventory to the network.
-
    /// This should be the whole inventory every time.
-
    InventoryAnnouncement {
-
        /// Node identifier.
-
        node: NodeId,
-
        /// Unsigned node inventory.
-
        message: InventoryAnnouncement,
-
        /// Signature over the announcement.
-
        signature: crypto::Signature,
-
    },
-

-
    /// Node announcing itself to the network.
-
    NodeAnnouncement {
-
        /// Node identifier.
-
        node: NodeId,
-
        /// Unsigned node announcement.
-
        message: NodeAnnouncement,
-
        /// Signature over the announcement, by the node being announced.
-
        signature: crypto::Signature,
-
    },
-

-
    /// Node announcing project refs being created or updated.
-
    RefsAnnouncement {
-
        /// Node identifier.
-
        node: NodeId,
-
        /// Unsigned refs announcement.
-
        message: RefsAnnouncement,
-
        /// Signature over the announcement, by the node that updated the refs.
-
        signature: crypto::Signature,
-
    },
+
    /// Gossip announcement. These messages are relayed to peers, and filtered
+
    /// using [`Message::Subscribe`].
+
    Announcement(Announcement),
}

impl Message {
@@ -270,28 +308,25 @@ impl Message {
        }
    }

-
    pub fn node<S: crypto::Signer>(message: NodeAnnouncement, signer: S) -> Self {
-
        let msg = wire::serialize(&message);
-
        let signature = signer.sign(&msg);
-
        let node = *signer.public_key();
-

-
        Self::NodeAnnouncement {
+
    pub fn announcement(
+
        node: NodeId,
+
        message: impl Into<AnnouncementMessage>,
+
        signature: crypto::Signature,
+
    ) -> Self {
+
        Announcement {
            node,
            signature,
-
            message,
+
            message: message.into(),
        }
+
        .into()
    }

-
    pub fn inventory<S: crypto::Signer>(message: InventoryAnnouncement, signer: S) -> Self {
-
        let msg = wire::serialize(&message);
-
        let signature = signer.sign(&msg);
-
        let node = *signer.public_key();
+
    pub fn node<S: crypto::Signer>(message: NodeAnnouncement, signer: S) -> Self {
+
        AnnouncementMessage::from(message).signed(signer).into()
+
    }

-
        Self::InventoryAnnouncement {
-
            node,
-
            signature,
-
            message,
-
        }
+
    pub fn inventory<S: crypto::Signer>(message: InventoryAnnouncement, signer: S) -> Self {
+
        AnnouncementMessage::from(message).signed(signer).into()
    }

    pub fn subscribe(filter: Filter, since: Timestamp, until: Timestamp) -> Self {
@@ -303,6 +338,12 @@ impl Message {
    }
}

+
impl From<Announcement> for Message {
+
    fn from(ann: Announcement) -> Self {
+
        Self::Announcement(ann)
+
    }
+
}
+

impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
@@ -310,28 +351,8 @@ impl fmt::Debug for Message {
            Self::Subscribe(Subscribe { since, until, .. }) => {
                write!(f, "Subscribe({}..{})", since, until)
            }
-

-
            Self::NodeAnnouncement { node, .. } => write!(f, "NodeAnnouncement({})", node),
-
            Self::InventoryAnnouncement { node, message, .. } => {
-
                write!(
-
                    f,
-
                    "InventoryAnnouncement({}, [{}], {})",
-
                    node,
-
                    message
-
                        .inventory
-
                        .iter()
-
                        .map(|i| i.to_string())
-
                        .collect::<Vec<String>>()
-
                        .join(", "),
-
                    message.timestamp
-
                )
-
            }
-
            Self::RefsAnnouncement { node, message, .. } => {
-
                write!(
-
                    f,
-
                    "RefsAnnouncement({}, {}, {:?})",
-
                    node, message.id, message.refs
-
                )
+
            Self::Announcement(Announcement { node, message, .. }) => {
+
                write!(f, "Announcement({}, {:?})", node, message)
            }
        }
    }
@@ -342,15 +363,14 @@ mod tests {
    use super::*;
    use quickcheck_macros::quickcheck;

-
    use crate::crypto::Signer;
    use crate::test::signer::MockSigner;

    #[quickcheck]
    fn prop_refs_announcement_signing(id: Id, refs: Refs) {
        let signer = MockSigner::new(&mut fastrand::Rng::new());
-
        let message = RefsAnnouncement { id, refs };
-
        let signature = message.sign(&signer);
+
        let message = AnnouncementMessage::Refs(RefsAnnouncement { id, refs });
+
        let ann = message.signed(&signer);

-
        assert!(message.verify(signer.public_key(), &signature));
+
        assert!(ann.verify());
    }
}
modified radicle-node/src/service/reactor.rs
@@ -6,6 +6,8 @@ use log::*;
use crate::prelude::*;
use crate::service::peer::Session;

+
use super::message::{Announcement, AnnouncementMessage};
+

/// Output of a state transition.
#[derive(Debug)]
pub enum Io {
@@ -86,16 +88,20 @@ impl Reactor {
    }

    /// Broadcast a message to a list of peers.
-
    pub fn broadcast<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
+
    pub fn broadcast<'a>(
+
        &mut self,
+
        msg: Announcement,
+
        peers: impl IntoIterator<Item = &'a Session>,
+
    ) {
        for peer in peers {
-
            self.write(peer.addr, msg.clone());
+
            self.write(peer.addr, msg.clone().into());
        }
    }

    /// Relay a message to interested peers.
-
    pub fn relay<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
-
        if let Message::RefsAnnouncement { message, .. } = &msg {
-
            let id = message.id;
+
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
+
        if let AnnouncementMessage::Refs(msg) = &ann.message {
+
            let id = msg.id;
            let peers = peers.into_iter().filter(|p| {
                if let Some(subscribe) = &p.subscribe {
                    subscribe.filter.contains(&id)
@@ -105,9 +111,9 @@ impl Reactor {
                    false
                }
            });
-
            self.broadcast(msg, peers);
+
            self.broadcast(ann, peers);
        } else {
-
            self.broadcast(msg, peers);
+
            self.broadcast(ann, peers);
        }
    }

modified radicle-node/src/test.rs
@@ -1,4 +1,5 @@
pub(crate) mod arbitrary;
+
pub(crate) mod gossip;
pub(crate) mod handle;
pub(crate) mod logger;
pub(crate) mod peer;
modified radicle-node/src/test/arbitrary.rs
@@ -7,8 +7,8 @@ use crate::crypto;
use crate::prelude::{Id, NodeId, Refs, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE};
use crate::service::message::{
-
    Address, Envelope, InventoryAnnouncement, Message, NodeAnnouncement, RefsAnnouncement,
-
    Subscribe,
+
    Address, Announcement, Envelope, InventoryAnnouncement, Message, NodeAnnouncement,
+
    RefsAnnouncement, Subscribe,
};
use crate::wire::message::MessageType;

@@ -46,37 +46,43 @@ impl Arbitrary for Message {
            .unwrap();

        match type_id {
-
            MessageType::InventoryAnnouncement => Self::InventoryAnnouncement {
+
            MessageType::InventoryAnnouncement => Announcement {
                node: NodeId::arbitrary(g),
                message: InventoryAnnouncement {
                    inventory: Vec::<Id>::arbitrary(g),
                    timestamp: Timestamp::arbitrary(g),
-
                },
+
                }
+
                .into(),
                signature: crypto::Signature::from(ByteArray::<64>::arbitrary(g).into_inner()),
-
            },
-
            MessageType::RefsAnnouncement => Self::RefsAnnouncement {
+
            }
+
            .into(),
+
            MessageType::RefsAnnouncement => Announcement {
                node: NodeId::arbitrary(g),
                message: RefsAnnouncement {
                    id: Id::arbitrary(g),
                    refs: Refs::arbitrary(g),
-
                },
+
                }
+
                .into(),
                signature: crypto::Signature::from(ByteArray::<64>::arbitrary(g).into_inner()),
-
            },
+
            }
+
            .into(),
            MessageType::NodeAnnouncement => {
                let message = NodeAnnouncement {
                    features: ByteArray::<32>::arbitrary(g).into_inner(),
                    timestamp: Timestamp::arbitrary(g),
                    alias: ByteArray::<32>::arbitrary(g).into_inner(),
                    addresses: Arbitrary::arbitrary(g),
-
                };
+
                }
+
                .into();
                let bytes: ByteArray<64> = Arbitrary::arbitrary(g);
                let signature = crypto::Signature::from(bytes.into_inner());

-
                Self::NodeAnnouncement {
+
                Announcement {
                    node: NodeId::arbitrary(g),
                    signature,
                    message,
                }
+
                .into()
            }
            MessageType::Subscribe => Self::Subscribe(Subscribe {
                filter: Filter::arbitrary(g),
added radicle-node/src/test/gossip.rs
@@ -0,0 +1,27 @@
+
use radicle::test::signer::MockSigner;
+

+
use crate::test::arbitrary;
+
use crate::{
+
    prelude::{LocalDuration, LocalTime, Message},
+
    service::message::InventoryAnnouncement,
+
};
+

+
pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Message> {
+
    let mut rng = fastrand::Rng::new();
+
    let mut msgs = Vec::new();
+

+
    for _ in 0..count {
+
        let signer = MockSigner::new(&mut rng);
+
        let delta = LocalDuration::from_secs(rng.u64(0..delta.as_secs()));
+
        let time = if rng.bool() { now + delta } else { now - delta };
+

+
        msgs.push(Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: arbitrary::gen(3),
+
                timestamp: time.as_secs(),
+
            },
+
            signer,
+
        ))
+
    }
+
    msgs
+
}
modified radicle-node/src/test/peer.rs
@@ -162,8 +162,16 @@ where
        let mut msgs = self.messages(&remote);
        msgs.find(|m| matches!(m, Message::Initialize { .. }))
            .expect("`initialize` is sent");
-
        msgs.find(|m| matches!(m, Message::InventoryAnnouncement { .. }))
-
            .expect("`inventory-announcement` is sent");
+
        msgs.find(|m| {
+
            matches!(
+
                m,
+
                Message::Announcement(Announcement {
+
                    message: AnnouncementMessage::Inventory(_),
+
                    ..
+
                })
+
            )
+
        })
+
        .expect("`inventory-announcement` is sent");
    }

    pub fn connect_to(&mut self, peer: &Self) {
@@ -177,8 +185,16 @@ where
        let mut msgs = self.messages(&remote);
        msgs.find(|m| matches!(m, Message::Initialize { .. }))
            .expect("`initialize` is sent");
-
        msgs.find(|m| matches!(m, Message::InventoryAnnouncement { .. }))
-
            .expect("`inventory-announcement` is sent");
+
        msgs.find(|m| {
+
            matches!(
+
                m,
+
                Message::Announcement(Announcement {
+
                    message: AnnouncementMessage::Inventory(_),
+
                    ..
+
                })
+
            )
+
        })
+
        .expect("`inventory-announcement` is sent");

        let git = peer.config().git_url.clone();
        self.receive(
modified radicle-node/src/test/tests.rs
@@ -5,7 +5,9 @@ use crossbeam_channel as chan;
use nakamoto_net as nakamoto;

use crate::collections::{HashMap, HashSet};
+
use crate::prelude::Timestamp;
use crate::service::config::*;
+
use crate::service::filter::Filter;
use crate::service::message::*;
use crate::service::peer::*;
use crate::service::reactor::Io;
@@ -212,6 +214,34 @@ fn test_inventory_relay_bad_timestamp() {
}

#[test]
+
fn test_gossip_rebroadcast() {
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
+
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
+

+
    alice.connect_to(&bob);
+

+
    let received = test::gossip::messages(6, alice.local_time(), MAX_TIME_DELTA);
+
    for msg in received.iter().cloned() {
+
        // TODO: Test with elapsed time
+
        alice.receive(&bob.addr(), msg);
+
    }
+

+
    alice.connect_from(&eve);
+
    alice.receive(
+
        &eve.addr(),
+
        Message::Subscribe(Subscribe {
+
            filter: Filter::default(),
+
            since: Timestamp::MIN,
+
            until: Timestamp::MAX,
+
        }),
+
    );
+

+
    let relayed = alice.messages(&eve.addr()).collect::<Vec<_>>();
+
    assert_eq!(relayed, received);
+
}
+

+
#[test]
fn test_inventory_relay() {
    // Topology is eve <-> alice <-> bob
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
@@ -235,7 +265,11 @@ fn test_inventory_relay() {
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
-
        Some(Message::InventoryAnnouncement { node, message: InventoryAnnouncement { timestamp, .. }, .. })
+
        Some(Message::Announcement(Announcement {
+
            node,
+
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
+
            ..
+
        }))
        if node == bob.node_id() && timestamp == now
    );
    assert_matches!(
@@ -272,7 +306,11 @@ fn test_inventory_relay() {
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
-
        Some(Message::InventoryAnnouncement { node, message: InventoryAnnouncement{ timestamp, .. }, .. })
+
        Some(Message::Announcement(Announcement {
+
            node,
+
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
+
            ..
+
        }))
        if node == bob.node_id() && timestamp == now + 1,
        "Sending a new inventory does trigger the relay"
    );
@@ -290,7 +328,11 @@ fn test_inventory_relay() {
    );
    assert_matches!(
        alice.messages(&bob.addr()).next(),
-
        Some(Message::InventoryAnnouncement { node, message: InventoryAnnouncement { timestamp, .. }, .. })
+
        Some(Message::Announcement(Announcement {
+
            node,
+
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
+
            ..
+
        }))
        if node == eve.node_id() && timestamp == now
    );
}
modified radicle-node/src/wire/message.rs
@@ -44,9 +44,11 @@ impl Message {
        match self {
            Self::Initialize { .. } => MessageType::Initialize,
            Self::Subscribe { .. } => MessageType::Subscribe,
-
            Self::NodeAnnouncement { .. } => MessageType::NodeAnnouncement,
-
            Self::InventoryAnnouncement { .. } => MessageType::InventoryAnnouncement,
-
            Self::RefsAnnouncement { .. } => MessageType::RefsAnnouncement,
+
            Self::Announcement(Announcement { message, .. }) => match message {
+
                AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
+
                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
+
                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
+
            },
        }
        .into()
    }
@@ -82,6 +84,16 @@ impl TryFrom<u8> for AddressType {
    }
}

+
impl wire::Encode for AnnouncementMessage {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        match self {
+
            Self::Node(ann) => ann.encode(writer),
+
            Self::Inventory(ann) => ann.encode(writer),
+
            Self::Refs(ann) => ann.encode(writer),
+
        }
+
    }
+
}
+

impl wire::Encode for RefsAnnouncement {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        let mut n = 0;
@@ -150,29 +162,11 @@ impl wire::Encode for Message {
                n += since.encode(writer)?;
                n += until.encode(writer)?;
            }
-
            Self::RefsAnnouncement {
-
                node,
-
                message,
-
                signature,
-
            } => {
-
                n += node.encode(writer)?;
-
                n += message.encode(writer)?;
-
                n += signature.encode(writer)?;
-
            }
-
            Self::InventoryAnnouncement {
+
            Self::Announcement(Announcement {
                node,
                message,
                signature,
-
            } => {
-
                n += node.encode(writer)?;
-
                n += message.encode(writer)?;
-
                n += signature.encode(writer)?;
-
            }
-
            Self::NodeAnnouncement {
-
                node,
-
                message,
-
                signature,
-
            } => {
+
            }) => {
                n += node.encode(writer)?;
                n += message.encode(writer)?;
                n += signature.encode(writer)?;
@@ -213,36 +207,39 @@ impl wire::Decode for Message {
            }
            Ok(MessageType::NodeAnnouncement) => {
                let node = NodeId::decode(reader)?;
-
                let message = NodeAnnouncement::decode(reader)?;
+
                let message = NodeAnnouncement::decode(reader)?.into();
                let signature = Signature::decode(reader)?;

-
                Ok(Self::NodeAnnouncement {
+
                Ok(Announcement {
                    node,
                    message,
                    signature,
-
                })
+
                }
+
                .into())
            }
            Ok(MessageType::InventoryAnnouncement) => {
                let node = NodeId::decode(reader)?;
-
                let message = InventoryAnnouncement::decode(reader)?;
+
                let message = InventoryAnnouncement::decode(reader)?.into();
                let signature = Signature::decode(reader)?;

-
                Ok(Self::InventoryAnnouncement {
+
                Ok(Announcement {
                    node,
                    message,
                    signature,
-
                })
+
                }
+
                .into())
            }
            Ok(MessageType::RefsAnnouncement) => {
                let node = NodeId::decode(reader)?;
-
                let message = RefsAnnouncement::decode(reader)?;
+
                let message = RefsAnnouncement::decode(reader)?.into();
                let signature = Signature::decode(reader)?;

-
                Ok(Self::RefsAnnouncement {
+
                Ok(Announcement {
                    node,
                    message,
                    signature,
-
                })
+
                }
+
                .into())
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }