Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Complete the "relay" seed test
Alexis Sellier committed 3 years ago
commit ee513ccf7ec9ac813af332d7bc5057fc3c5c1db0
parent 7919c7ea61baa42f8b353ac577fb9749975ce2ac
8 files changed +150 -90
modified radicle-cli/tests/commands.rs
@@ -368,7 +368,11 @@ fn test_replication_via_seed() {
        .remote(&bob.id)
        .unwrap();

-
    // TODO: Seed should send Bob's ref announcement to Alice, after the fetch.
-
    // Currently, it is relayed when received from Bob, before the fetch completes,
-
    // which means that Alice fetches too early from Seed, and nothing is fetched.
+
    // Seed should send Bob's ref announcement to Alice, after the fetch.
+
    alice
+
        .storage
+
        .repository(rid)
+
        .unwrap()
+
        .remote(&bob.id)
+
        .unwrap();
}
modified radicle-node/src/bounded.rs
@@ -15,7 +15,9 @@ pub struct BoundedVec<T, const N: usize> {
impl<T, const N: usize> BoundedVec<T, N> {
    /// Create a new empty `BoundedVec<T,N>`.
    pub fn new() -> Self {
-
        BoundedVec { v: Vec::new() }
+
        BoundedVec {
+
            v: Vec::with_capacity(N),
+
        }
    }

    /// Build a `BoundedVec` by consuming from the given iterator up to its limit.
modified radicle-node/src/service.rs
@@ -74,7 +74,7 @@ pub use message::ADDRESS_LIMIT;
/// Maximum inventory limit imposed by message size limits.
pub use message::INVENTORY_LIMIT;
/// Maximum number of project git references imposed by message size limits.
-
pub use message::REF_LIMIT;
+
pub use message::REF_REMOTE_LIMIT;

/// A service event.
#[derive(Debug, Clone)]
@@ -484,7 +484,7 @@ where
                resp.send(untracked).ok();
            }
            Command::AnnounceRefs(id) => {
-
                if let Err(err) = self.announce_refs(id) {
+
                if let Err(err) = self.announce_refs(id, Namespaces::One(self.node_id())) {
                    error!("Error announcing refs: {}", err);
                }
            }
@@ -492,7 +492,7 @@ where
                let updated = self
                    .sync_and_announce_inventory()
                    .expect("Service::command: error syncing and announcing inventory");
-
                resp.send(updated).ok();
+
                resp.send(!updated.is_empty()).ok();
            }
            Command::QueryState(query, sender) => {
                sender.send(query(self)).ok();
@@ -578,15 +578,21 @@ where
                }
            } else {
                log::debug!(target: "service", "No fetch requests found for {rid}..");
+

+
                // We only announce refs here when the fetch wasn't user-requested. This is
+
                // because the user might want to announce his fork, once he has created one,
+
                // or may choose to not announce anything.
+
                if let Err(e) = self.announce_refs(rid, fetch.namespaces) {
+
                    error!(target: "service", "Failed to announce new refs: {e}");
+
                }
            }
+
            // TODO: Since this fetch could be either a full clone or simply a ref update, we need
+
            // to either announce new inventory, or new refs. Right now, we announce both in some
+
            // cases.

-
            // Announce the newly fetched project to the network, if necessary.
-
            // Since this fetch could be either a full clone or simply a ref update, we need to
-
            // either announce new inventory, or new refs.
-
            //
-
            // TODO: Announce new refs? Would require the ability to announce other peer refs.
+
            // Announce the newly fetched repository to the network, if necessary.
            if let Err(e) = self.sync_and_announce_inventory() {
-
                error!(target: "service", "Failed to announce new inventory: {e}");
+
                error!(target: "service", "Failed to sync announce new inventory: {e}");
            }
        }

@@ -760,7 +766,7 @@ where

                match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
                    Ok(updated) => {
-
                        if !updated {
+
                        if updated.is_empty() {
                            return Ok(false);
                        }
                    }
@@ -771,6 +777,7 @@ where
                }

                for id in message.inventory.as_slice() {
+
                    // TODO: Move this out (good luck with the borrow checker).
                    if let Some(sess) = self.sessions.get_mut(announcer) {
                        // If we are connected to the announcer of this inventory, update the peer's
                        // subscription filter to include all inventory items. This way, we'll
@@ -802,38 +809,58 @@ where
            }
            // Process a peer inventory update announcement by (maybe) fetching.
            AnnouncementMessage::Refs(message) => {
+
                for (remote_id, theirs) in message.refs.iter() {
+
                    if theirs.verify(remote_id).is_err() {
+
                        warn!(target: "service", "Peer {relayer} relayed refs announcement with invalid signature for {remote_id}");
+
                        return Err(session::Error::Misbehavior);
+
                    }
+
                }
+

                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
-
                if let Ok(updated) = self.routing.insert(message.id, *relayer, message.timestamp) {
+
                if let Ok(updated) = self
+
                    .routing
+
                    .insert(message.rid, *relayer, message.timestamp)
+
                {
                    if updated {
-
                        info!(target: "service", "Routing table updated for {} with seed {relayer}", message.id);
+
                        info!(target: "service", "Routing table updated for {} with seed {relayer}", message.rid);
                    }
                }
                // TODO: Buffer/throttle fetches.
-
                // TODO: Check that we're tracking this user as well.
                if self
                    .tracking
-
                    .is_repo_tracked(&message.id)
+
                    .is_repo_tracked(&message.rid)
                    .expect("Service::handle_announcement: error accessing tracking configuration")
                {
-
                    // Discard inventory messages we've already seen, otherwise update
+
                    // Discard announcement messages we've already seen, otherwise update
                    // our last seen time.
-
                    if !peer.refs_announced(message.id, timestamp) {
+
                    if !peer.refs_announced(message.rid, timestamp) {
                        debug!(target: "service", "Ignoring stale refs announcement from {announcer}");
                        return Ok(false);
                    }
-
                    // TODO: Check refs to see if we should try to fetch or not.
-
                    // Refs are only supposed to be relayed by peers who are tracking
-
                    // the resource. Therefore, it's safe to fetch from the remote
-
                    // peer, even though it isn't the announcer.
-
                    self.fetch(message.id, relayer);

-
                    return Ok(true);
+
                    // Refs can be relayed by peers who don't have the data in storage,
+
                    // therefore we only check whether we are connected to the *announcer*,
+
                    // which is required by the protocol to only announce refs it has.
+
                    if self.sessions.is_negotiated(announcer) {
+
                        match message.is_fresh(&self.storage) {
+
                            Ok(is_fresh) => {
+
                                if is_fresh {
+
                                    // TODO: Only fetch if the refs announced are for peers we're tracking.
+
                                    self.fetch(message.rid, announcer);
+
                                }
+
                            }
+
                            Err(e) => {
+
                                error!(target: "service", "Failed to check ref announcement freshness: {e}");
+
                            }
+
                        }
+
                    }
+
                    return Ok(relay);
                } else {
                    debug!(
                        target: "service",
                        "Ignoring refs announcement from {announcer}: repository {} isn't tracked",
-
                        message.id
+
                        message.rid
                    );
                }
            }
@@ -1048,11 +1075,11 @@ where
    }

    /// Sync, and if needed, announce our local inventory.
-
    fn sync_and_announce_inventory(&mut self) -> Result<bool, Error> {
+
    fn sync_and_announce_inventory(&mut self) -> Result<Vec<Id>, Error> {
        let inventory = self.storage.inventory()?;
        let updated = self.sync_routing(&inventory, self.node_id(), self.clock.as_millis())?;

-
        if updated {
+
        if !updated.is_empty() {
            self.announce_inventory(inventory)?;
        }
        Ok(updated)
@@ -1066,8 +1093,8 @@ where
        inventory: &[Id],
        from: NodeId,
        timestamp: Timestamp,
-
    ) -> Result<bool, Error> {
-
        let mut updated = false;
+
    ) -> Result<Vec<Id>, Error> {
+
        let mut updated = Vec::new();
        let mut included = HashSet::new();

        for proj_id in inventory {
@@ -1083,13 +1110,13 @@ where
                    // TODO: We should fetch here if we're already connected, case this seed has
                    // refs we don't have.
                }
-
                updated = true;
+
                updated.push(*proj_id);
            }
        }
        for id in self.routing.get_resources(&from)?.into_iter() {
            if !included.contains(&id) {
                if self.routing.remove(&id, &from)? {
-
                    updated = true;
+
                    updated.push(id);
                }
            }
        }
@@ -1097,25 +1124,34 @@ where
    }

    /// Announce local refs for given id.
-
    fn announce_refs(&mut self, id: Id) -> Result<(), storage::Error> {
-
        type Refs = BoundedVec<Id, REF_LIMIT>;
-

-
        let node = self.node_id();
-
        let repo = self.storage.repository(id)?;
-
        let remote = repo.remote(&node)?;
+
    fn announce_refs(&mut self, rid: Id, namespaces: Namespaces) -> Result<(), storage::Error> {
+
        let repo = self.storage.repository(rid)?;
        let peers = self.sessions.negotiated().map(|(_, p)| p);
        let timestamp = self.clock.as_millis();
-

-
        if remote.refs.len() > Refs::max() {
-
            error!(
-
                target: "service",
-
                "refs announcement limit ({}) exceeded, other nodes will see only some of your project references",
-
                Refs::max(),
-
            );
+
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
+

+
        match namespaces {
+
            Namespaces::All => {
+
                for (remote_id, remote) in repo.remotes()?.into_iter() {
+
                    if refs.push((remote_id, remote.refs.unverified())).is_err() {
+
                        warn!(
+
                            target: "service",
+
                            "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
+
                            REF_REMOTE_LIMIT,
+
                        );
+
                        break;
+
                    }
+
                }
+
            }
+
            Namespaces::One(pk) => refs
+
                .push((pk, repo.remote(&pk)?.refs.unverified()))
+
                // SAFETY: `REF_REMOTE_LIMIT` is greater than 1, thus the bounded vec can hold at least
+
                // one remote.
+
                .unwrap(),
        }
-
        let refs = BoundedVec::collect_from(&mut remote.refs.iter().map(|(a, b)| (a.clone(), *b)));
+

        let msg = AnnouncementMessage::from(RefsAnnouncement {
-
            id,
+
            rid,
            refs,
            timestamp,
        });
modified radicle-node/src/service/message.rs
@@ -1,19 +1,22 @@
use std::{fmt, io, mem};

use crate::crypto;
-
use crate::git;
+
use crate::crypto::Unverified;
use crate::identity::Id;
use crate::node;
use crate::node::Address;
use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{NodeId, Timestamp};
+
use crate::storage;
+
use crate::storage::refs::SignedRefs;
+
use crate::storage::{ReadRepository, WriteStorage};
use crate::wire;

/// Maximum number of addresses which can be announced to other nodes.
pub const ADDRESS_LIMIT: usize = 16;
-
/// Maximum number of project git references.
-
pub const REF_LIMIT: usize = 235;
+
/// Maximum number of repository remotes that can be included in a [`RefsAnnouncement`] message.
+
pub const REF_REMOTE_LIMIT: usize = 512;
/// Maximum number of inventory which can be announced to other nodes.
pub const INVENTORY_LIMIT: usize = 2973;

@@ -142,13 +145,31 @@ impl wire::Decode for NodeAnnouncement {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RefsAnnouncement {
    /// Repository identifier.
-
    pub id: Id,
+
    pub rid: Id,
    /// Updated refs.
-
    pub refs: BoundedVec<(git::RefString, git::Oid), REF_LIMIT>,
+
    pub refs: BoundedVec<(NodeId, SignedRefs<Unverified>), REF_REMOTE_LIMIT>,
    /// Time of announcement.
    pub timestamp: Timestamp,
}

+
impl RefsAnnouncement {
+
    /// Check if this announcement is "fresh", meaning if it contains refs we do not have.
+
    pub fn is_fresh<S: WriteStorage>(&self, storage: S) -> Result<bool, storage::Error> {
+
        let repo = storage.repository(self.rid)?;
+

+
        for (remote_id, theirs) in self.refs.iter() {
+
            if let Ok(ours) = repo.remote(remote_id) {
+
                if *ours.refs != theirs.refs {
+
                    return Ok(true);
+
                }
+
            } else {
+
                return Ok(true);
+
            }
+
        }
+
        Ok(false)
+
    }
+
}
+

/// Node announcing its inventory to the network.
/// This should be the whole inventory every time.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -228,7 +249,7 @@ impl fmt::Debug for AnnouncementMessage {
                )
            }
            Self::Refs(message) => {
-
                write!(f, "Refs({}, {:?})", message.id, message.refs)
+
                write!(f, "Refs({}, {:?})", message.rid, message.refs)
            }
        }
    }
@@ -273,7 +294,7 @@ impl Announcement {
        match &self.message {
            AnnouncementMessage::Inventory(_) => true,
            AnnouncementMessage::Node(_) => true,
-
            AnnouncementMessage::Refs(RefsAnnouncement { id, .. }) => filter.contains(id),
+
            AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) => filter.contains(rid),
        }
    }
}
@@ -430,37 +451,33 @@ mod tests {
    use qcheck_macros::quickcheck;

    #[test]
-
    fn test_ref_limit() {
-
        let mut refs = Refs::default();
-
        while refs.len() < REF_LIMIT {
-
            refs.insert(arbitrary::refstring(u8::MAX as usize), arbitrary::oid());
+
    fn test_ref_remote_limit() {
+
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
+
        let rs = Refs::default();
+
        let signer = MockSigner::default();
+
        let signed_refs = rs.signed(&signer).unwrap().unverified();
+

+
        assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);
+

+
        for _ in 0..refs.capacity() {
+
            refs.push((*signer.public_key(), signed_refs.clone()))
+
                .unwrap();
        }

-
        let bounded_refs = BoundedVec::collect_from(&mut refs.iter().map(|(a, b)| (a.clone(), *b)));
        let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
-
            id: arbitrary::gen(1),
-
            refs: bounded_refs,
+
            rid: arbitrary::gen(1),
+
            refs,
            timestamp: LocalTime::now().as_millis(),
        })
        .signed(&MockSigner::default())
        .into();

        let mut buf: Vec<u8> = Vec::new();
-
        assert!(
-
            msg.encode(&mut buf).is_ok(),
-
            "REF_LIMIT is too big to support message encoding",
-
        );
+
        assert!(msg.encode(&mut buf).is_ok());

        let decoded = wire::deserialize(buf.as_slice());
-
        assert!(
-
            decoded.is_ok(),
-
            "REF_LIMIT is too big to support message decoding"
-
        );
-
        assert_eq!(
-
            msg,
-
            decoded.unwrap(),
-
            "encoding and decoding should be safe for message at REF_LIMIT",
-
        );
+
        assert!(decoded.is_ok());
+
        assert_eq!(msg, decoded.unwrap());
    }

    #[test]
@@ -493,13 +510,16 @@ mod tests {
    }

    #[quickcheck]
-
    fn prop_refs_announcement_signing(id: Id, refs: Refs) {
+
    fn prop_refs_announcement_signing(rid: Id, refs: Refs) {
        let signer = MockSigner::new(&mut fastrand::Rng::new());
        let timestamp = 0;
-

+
        let signed_refs = refs.signed(&signer).unwrap();
+
        let refs = BoundedVec::collect_from(
+
            &mut [(*signer.public_key(), signed_refs.unverified())].into_iter(),
+
        );
        let message = AnnouncementMessage::Refs(RefsAnnouncement {
-
            id,
-
            refs: BoundedVec::collect_from(&mut refs.iter().map(|(k, v)| (k.clone(), *v))),
+
            rid,
+
            refs,
            timestamp,
        });
        let ann = message.signed(&signer);
modified radicle-node/src/service/reactor.rs
@@ -109,7 +109,7 @@ impl Reactor {
    /// Relay a message to interested peers.
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
        if let AnnouncementMessage::Refs(msg) = &ann.message {
-
            let id = msg.id;
+
            let id = msg.rid;
            let peers = peers.into_iter().filter(|p| {
                if let Some(subscribe) = &p.subscribe {
                    subscribe.filter.contains(&id)
modified radicle-node/src/test/arbitrary.rs
@@ -2,7 +2,7 @@ use bloomy::BloomFilter;
use qcheck::Arbitrary;

use crate::crypto;
-
use crate::prelude::{BoundedVec, Id, NodeId, Refs, Timestamp};
+
use crate::prelude::{BoundedVec, Id, NodeId, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
use crate::service::message::{
    Announcement, InventoryAnnouncement, Message, NodeAnnouncement, Ping, RefsAnnouncement,
@@ -53,10 +53,8 @@ impl Arbitrary for Message {
            MessageType::RefsAnnouncement => Announcement {
                node: NodeId::arbitrary(g),
                message: RefsAnnouncement {
-
                    id: Id::arbitrary(g),
-
                    refs: BoundedVec::collect_from(
-
                        &mut Refs::arbitrary(g).iter().map(|(k, v)| (k.clone(), *v)),
-
                    ),
+
                    rid: Id::arbitrary(g),
+
                    refs: BoundedVec::arbitrary(g),
                    timestamp: Timestamp::arbitrary(g),
                }
                .into(),
modified radicle-node/src/test/peer.rs
@@ -226,10 +226,10 @@ where
        )
    }

-
    pub fn refs_announcement(&self, id: Id) -> Message {
+
    pub fn refs_announcement(&self, rid: Id) -> Message {
        let refs = BoundedVec::new();
        let ann = AnnouncementMessage::from(RefsAnnouncement {
-
            id,
+
            rid,
            refs,
            timestamp: self.timestamp(),
        });
modified radicle-node/src/wire/message.rs
@@ -144,7 +144,7 @@ impl wire::Encode for RefsAnnouncement {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        let mut n = 0;

-
        n += self.id.encode(writer)?;
+
        n += self.rid.encode(writer)?;
        n += self.refs.encode(writer)?;
        n += self.timestamp.encode(writer)?;

@@ -154,12 +154,12 @@ impl wire::Encode for RefsAnnouncement {

impl wire::Decode for RefsAnnouncement {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let id = Id::decode(reader)?;
+
        let rid = Id::decode(reader)?;
        let refs = BoundedVec::decode(reader)?;
        let timestamp = Timestamp::decode(reader)?;

        Ok(Self {
-
            id,
+
            rid,
            refs,
            timestamp,
        })