Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: minimise refs announcement to key and Oid
Fintan Halpenny committed 2 years ago
commit d27cd0db0c4d5ae4c2f21ab9c4354e532268ed64
parent 61b06553b461b982fb59808d1dad9d6a1548732a
11 files changed +209 -75
modified radicle-node/src/service.rs
@@ -39,8 +39,9 @@ use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::tracking::{store::Write, Scope};
use crate::storage;
+
use crate::storage::refs::RefsAt;
+
use crate::storage::ReadRepository;
use crate::storage::{Namespaces, ReadStorage};
-
use crate::storage::{ReadRepository, RemoteRepository as _};
use crate::worker::fetch;
use crate::worker::FetchError;
use crate::Link;
@@ -113,6 +114,8 @@ impl SyncedRouting {
#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error(transparent)]
+
    Git(#[from] radicle::git::raw::Error),
+
    #[error(transparent)]
    Storage(#[from] storage::Error),
    #[error(transparent)]
    Refs(#[from] storage::refs::Error),
@@ -609,8 +612,24 @@ where
        }
    }

+
    /// Initiate an outgoing fetch for some repository, based on
+
    /// another node's announcement.
+
    fn fetch_refs_at(
+
        &mut self,
+
        rid: Id,
+
        from: &NodeId,
+
        refs: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    ) {
+
        self._fetch(rid, from, refs, timeout)
+
    }
+

    /// Initiate an outgoing fetch for some repository.
    fn fetch(&mut self, rid: Id, from: &NodeId, timeout: time::Duration) {
+
        self._fetch(rid, from, vec![], timeout)
+
    }
+

+
    fn _fetch(&mut self, rid: Id, from: &NodeId, refs_at: Vec<RefsAt>, timeout: time::Duration) {
        let Some(session) = self.sessions.get_mut(from) else {
            error!(target: "service", "Session {from} does not exist; cannot initiate fetch");
            return;
@@ -629,10 +648,10 @@ where
            }
            session::FetchResult::Ready => {
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");
-

                match self.tracking.namespaces_for(&self.storage, &rid) {
                    Ok(namespaces) => {
-
                        self.outbox.fetch(session, rid, namespaces, timeout);
+
                        self.outbox
+
                            .fetch(session, rid, namespaces, refs_at, timeout);
                    }
                    Err(err) => {
                        error!(target: "service", "Error getting namespaces for {rid}: {err}");
@@ -901,15 +920,7 @@ where
        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
            return Err(session::Error::InvalidTimestamp(timestamp));
        }
-
        // Check ref signatures validity.
-
        if let AnnouncementMessage::Refs(message) = message {
-
            for theirs in message.refs.iter() {
-
                if theirs.verify(&theirs.id).is_err() {
-
                    warn!(target: "service", "Peer {relayer} relayed refs announcement with invalid signature for {}", theirs.id);
-
                    return Err(session::Error::Misbehavior);
-
                }
-
            }
-
        }
+

        // Discard announcement messages we've already seen, otherwise update out last seen time.
        match self.gossip.announced(announcer, announcement) {
            Ok(fresh) => {
@@ -962,7 +973,6 @@ where
                                }
                                Ok(false) => {
                                    debug!(target: "service", "Missing tracked inventory {id}; initiating fetch..");
-

                                    self.fetch(*id, announcer, FETCH_TIMEOUT);
                                }
                                Err(e) => {
@@ -1019,8 +1029,10 @@ where
                    // which is required by the protocol to only announce refs it has.
                    if self.sessions.is_connected(announcer) {
                        match self.should_fetch_refs_announcement(message, &repo_entry.scope) {
-
                            Ok(true) => self.fetch(message.rid, announcer, FETCH_TIMEOUT),
-
                            Ok(false) => {}
+
                            Ok(Some(refs)) => {
+
                                self.fetch_refs_at(message.rid, announcer, refs, FETCH_TIMEOUT)
+
                            }
+
                            Ok(None) => {}
                            Err(e) => {
                                error!(target: "service", "Failed to check refs announcement: {e}");
                                return Err(session::Error::Misbehavior);
@@ -1092,29 +1104,36 @@ where
        &self,
        message: &RefsAnnouncement,
        scope: &tracking::Scope,
-
    ) -> Result<bool, Error> {
+
    ) -> Result<Option<Vec<RefsAt>>, Error> {
        // First, check the freshness.
        if !message.is_fresh(&self.storage)? {
            debug!(target: "service", "All refs of {} are already in local storage", &message.rid);
-
            return Ok(false);
+
            return Ok(None);
        }

        // Second, check the scope.
        match scope {
-
            tracking::Scope::All => Ok(true),
+
            tracking::Scope::All => Ok(Some(message.refs.clone().into())),
            tracking::Scope::Trusted => {
                match self.tracking.namespaces_for(&self.storage, &message.rid) {
-
                    Ok(Namespaces::All) => Ok(true),
+
                    Ok(Namespaces::All) => Ok(Some(message.refs.clone().into())),
                    Ok(Namespaces::Trusted(mut trusted)) => {
                        // Get the set of trusted nodes except self.
                        trusted.remove(&self.node_id());

                        // Check if there is at least one trusted ref.
-
                        Ok(message.refs.iter().any(|refs| trusted.contains(&refs.id)))
+
                        Ok(Some(
+
                            message
+
                                .refs
+
                                .iter()
+
                                .filter(|refs_at| trusted.contains(&refs_at.remote))
+
                                .cloned()
+
                                .collect(),
+
                        ))
                    }
                    Err(NamespacesError::NoTrusted { rid }) => {
                        debug!(target: "service", "No trusted nodes to fetch {}", &rid);
-
                        Ok(false)
+
                        Ok(None)
                    }
                    Err(e) => {
                        error!(target: "service", "Failed to obtain namespaces: {e}");
@@ -1333,10 +1352,7 @@ where
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

        for remote_id in remotes.into_iter() {
-
            if refs
-
                .push(repo.remote(&remote_id)?.refs.unverified())
-
                .is_err()
-
            {
+
            if refs.push(RefsAt::new(&repo, remote_id)?).is_err() {
                warn!(
                    target: "service",
                    "refs announcement limit ({}) exceeded, peers will see only some of your repository references",
modified radicle-node/src/service/io.rs
@@ -1,7 +1,9 @@
-
use std::collections::VecDeque;
+
use std::collections::{HashMap, VecDeque};
use std::time;

use log::*;
+
use radicle::git;
+
use radicle::storage::refs::RefsAt;

use crate::prelude::*;
use crate::service::session::Session;
@@ -27,6 +29,8 @@ pub enum Io {
        remote: NodeId,
        /// Namespaces being fetched.
        namespaces: Namespaces,
+
        /// If a refs announcements was made.
+
        refs_at: Option<HashMap<NodeId, git::Oid>>,
        /// Fetch timeout.
        timeout: time::Duration,
    },
@@ -85,11 +89,20 @@ impl Outbox {
        remote: &mut Session,
        rid: Id,
        namespaces: Namespaces,
+
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
    ) {
+
        let refs_at = {
+
            let refs = refs_at
+
                .into_iter()
+
                .map(|RefsAt { remote, at }| (remote, at))
+
                .collect::<HashMap<_, _>>();
+
            (!refs.is_empty()).then_some(refs)
+
        };
        self.io.push_back(Io::Fetch {
            rid,
            namespaces,
+
            refs_at,
            remote: remote.id,
            timeout,
        });
modified radicle-node/src/service/message.rs
@@ -1,7 +1,8 @@
use std::{fmt, io, mem};

+
use radicle::storage::refs::RefsAt;
+

use crate::crypto;
-
use crate::crypto::Unverified;
use crate::identity::Id;
use crate::node;
use crate::node::{Address, Alias};
@@ -9,14 +10,13 @@ use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
use crate::service::{Link, NodeId, Timestamp};
use crate::storage;
-
use crate::storage::refs::SignedRefs;
-
use crate::storage::{ReadStorage, RemoteRepository as _};
+
use crate::storage::ReadStorage;
use crate::wire;

/// Maximum number of addresses which can be announced to other nodes.
pub const ADDRESS_LIMIT: usize = 16;
/// Maximum number of repository remotes that can be included in a [`RefsAnnouncement`] message.
-
pub const REF_REMOTE_LIMIT: usize = 512;
+
pub const REF_REMOTE_LIMIT: usize = 1024;
/// Maximum number of inventory which can be announced to other nodes.
pub const INVENTORY_LIMIT: usize = 2973;

@@ -157,8 +157,8 @@ impl wire::Decode for NodeAnnouncement {
pub struct RefsAnnouncement {
    /// Repository identifier.
    pub rid: Id,
-
    /// Updated refs.
-
    pub refs: BoundedVec<SignedRefs<Unverified>, REF_REMOTE_LIMIT>,
+
    /// Updated `rad/sigrefs`.
+
    pub refs: BoundedVec<RefsAt, REF_REMOTE_LIMIT>,
    /// Time of announcement.
    pub timestamp: Timestamp,
}
@@ -175,8 +175,8 @@ impl RefsAnnouncement {
        };

        for theirs in self.refs.iter() {
-
            if let Ok(ours) = repo.remote(&theirs.id) {
-
                if *ours.refs != theirs.refs {
+
            if let Ok(ours) = RefsAt::new(&repo, theirs.remote) {
+
                if ours.at != theirs.at {
                    return Ok(true);
                }
            } else {
@@ -199,9 +199,9 @@ impl RefsAnnouncement {
            Ok(r) => r,
        };

-
        if let Some(refs) = self.refs.iter().find(|refs| &refs.id == remote) {
-
            let local_refs = repo.remote(remote)?.refs.unverified();
-
            return Ok(&local_refs == refs);
+
        if let Some(theirs) = self.refs.iter().find(|refs_at| refs_at.remote == *remote) {
+
            let ours = RefsAt::new(&repo, *remote)?;
+
            return Ok(ours.at == theirs.at);
        }
        Ok(false)
    }
@@ -535,18 +535,22 @@ mod tests {
    use crate::test::arbitrary;
    use fastrand;
    use qcheck_macros::quickcheck;
+
    use radicle::git::raw;

    #[test]
    fn test_ref_remote_limit() {
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
-
        let rs = Refs::default();
        let signer = MockSigner::default();
-
        let signed_refs = rs.signed(&signer).unwrap().unverified();
+
        let at = raw::Oid::zero().into();

        assert_eq!(refs.capacity(), REF_REMOTE_LIMIT);

        for _ in 0..refs.capacity() {
-
            refs.push(signed_refs.clone()).unwrap();
+
            refs.push(RefsAt {
+
                remote: *signer.public_key(),
+
                at,
+
            })
+
            .unwrap();
        }

        let msg: Message = AnnouncementMessage::from(RefsAnnouncement {
@@ -595,11 +599,17 @@ mod tests {
    }

    #[quickcheck]
-
    fn prop_refs_announcement_signing(rid: Id, refs: Refs) {
+
    fn prop_refs_announcement_signing(rid: Id) {
        let signer = MockSigner::new(&mut fastrand::Rng::new());
        let timestamp = 0;
-
        let signed_refs = refs.signed(&signer).unwrap();
-
        let refs = BoundedVec::collect_from(&mut [signed_refs.unverified()].into_iter());
+
        let at = raw::Oid::zero().into();
+
        let refs = BoundedVec::collect_from(
+
            &mut [RefsAt {
+
                remote: *signer.public_key(),
+
                at,
+
            }]
+
            .into_iter(),
+
        );
        let message = AnnouncementMessage::Refs(RefsAnnouncement {
            rid,
            refs,
modified radicle-node/src/test/peer.rs
@@ -10,6 +10,7 @@ use radicle::identity::Visibility;
use radicle::node::address::Store;
use radicle::node::{address, Alias, ConnectOptions};
use radicle::rad;
+
use radicle::storage::refs::RefsAt;
use radicle::storage::{ReadRepository, RemoteRepository};
use radicle::Storage;

@@ -290,10 +291,17 @@ where
        if let Ok(repo) = self.storage().repository(rid) {
            if let Ok(false) = repo.is_empty() {
                if let Ok(remotes) = repo.remotes() {
-
                    for (remote_id, remote) in remotes.into_iter() {
-
                        if let Err(e) = refs.push(remote.refs.unverified()) {
-
                            debug!(target: "test", "Failed to push {remote_id} to refs: {e}");
-
                            break;
+
                    for (remote_id, _) in remotes.into_iter() {
+
                        match RefsAt::new(&repo, remote_id) {
+
                            Ok(refs_at) => {
+
                                if let Err(e) = refs.push(refs_at) {
+
                                    debug!(target: "test", "Failed to push {remote_id} to refs: {e}");
+
                                    break;
+
                                }
+
                            }
+
                            Err(e) => {
+
                                debug!(target: "test", "Failed to get `rad/sigrefs` for {remote_id}: {e}")
+
                            }
                        }
                    }
                }
modified radicle-node/src/tests.rs
@@ -11,7 +11,7 @@ use netservices::Direction as Link;
use radicle::identity::Visibility;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
-
use radicle::storage::RemoteRepository as _;
+
use radicle::storage::refs::RefsAt;

use crate::collections::{RandomMap, RandomSet};
use crate::crypto::test::signer::MockSigner;
@@ -37,6 +37,7 @@ use crate::test::peer;
use crate::test::peer::Peer;
use crate::test::simulator;
use crate::test::simulator::{Peer as _, Simulation};
+
use crate::test::storage as mock_storage;
use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
@@ -752,11 +753,32 @@ fn test_refs_announcement_trusted() {
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);

+
    let refs = arbitrary::gen::<Refs>(8);
+
    let sigref_at = arbitrary::oid();
+
    let signed_refs = refs.signed(bob.signer()).unwrap();
+
    let node_id = alice.id;
+
    alice.storage_mut().insert_remote(
+
        rid,
+
        node_id,
+
        mock_storage::refs::SignedRefsAt {
+
            at: sigref_at,
+
            sigrefs: signed_refs,
+
        },
+
    );
+

    // Generate some refs for Bob under their own node_id.
    let refs = arbitrary::gen::<Refs>(8);
+
    let sigref_at = arbitrary::oid();
    let signed_refs = refs.signed(bob.signer()).unwrap();
    let node_id = bob.id;
-
    bob.storage_mut().insert_remote(rid, node_id, signed_refs);
+
    bob.storage_mut().insert_remote(
+
        rid,
+
        node_id,
+
        mock_storage::refs::SignedRefsAt {
+
            at: sigref_at,
+
            sigrefs: signed_refs,
+
        },
+
    );

    // Alice uses Scope::Trusted, and did not track Bob yet.
    alice.connect_to(&bob);
@@ -1207,17 +1229,11 @@ fn test_refs_synced_event() {
    let bob = Peer::new("eve", [9, 9, 9, 9]);
    let acme = alice.project("acme", "");
    let events = alice.events();
-
    let refs = alice
-
        .storage()
-
        .repository(acme)
-
        .unwrap()
-
        .remote(&alice.id)
-
        .unwrap()
-
        .refs
-
        .unverified();
    let ann = AnnouncementMessage::from(RefsAnnouncement {
        rid: acme,
-
        refs: vec![refs].try_into().unwrap(),
+
        refs: vec![RefsAt::new(&alice.storage().repository(acme).unwrap(), alice.id).unwrap()]
+
            .try_into()
+
            .unwrap(),
        timestamp: bob.timestamp(),
    });
    let msg = ann.signed(bob.signer());
modified radicle-node/src/wire.rs
@@ -25,6 +25,7 @@ use crate::node::Alias;
use crate::prelude::*;
use crate::service::filter;
use crate::storage::refs::Refs;
+
use crate::storage::refs::RefsAt;
use crate::storage::refs::SignedRefs;

/// The default type we use to represent sizes on the wire.
@@ -469,6 +470,25 @@ impl Decode for SignedRefs<Unverified> {
    }
}

+
impl Encode for RefsAt {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.remote.encode(writer)?;
+
        n += self.at.encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl Decode for RefsAt {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
+
        let remote = NodeId::decode(reader)?;
+
        let at = git::Oid::decode(reader)?;
+
        Ok(Self { remote, at })
+
    }
+
}
+

impl Encode for node::Features {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        self.deref().encode(writer)
modified radicle-node/src/wire/message.rs
@@ -146,7 +146,7 @@ impl wire::Encode for RefsAnnouncement {
impl wire::Decode for RefsAnnouncement {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
        let rid = Id::decode(reader)?;
-
        let refs = BoundedVec::decode(reader)?;
+
        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(reader)?;
        let timestamp = Timestamp::decode(reader)?;

        Ok(Self {
modified radicle-node/src/worker.rs
@@ -220,7 +220,6 @@ impl Worker {
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
                let result = self.fetch(rid, remote, channels);
-

                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
modified radicle/src/storage/refs.rs
@@ -367,6 +367,32 @@ impl<V> Deref for SignedRefs<V> {
    }
}

+
/// The content-addressable information required to load a remote's
+
/// `rad/sigrefs`.
+
///
+
/// This can be used to [`RefsAt::load`] a [`SignedRefsAt`].
+
///
+
/// It can also be used for communicating announcements of updates
+
/// references to other nodes.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct RefsAt {
+
    /// The remote namespace of the `rad/sigrefs`.
+
    pub remote: RemoteId,
+
    /// The commit SHA that `rad/sigrefs` points to.
+
    pub at: Oid,
+
}
+

+
impl RefsAt {
+
    pub fn new<S: ReadRepository>(repo: &S, remote: RemoteId) -> Result<Self, git::raw::Error> {
+
        let at = repo.reference_oid(&remote, &storage::refs::SIGREFS_BRANCH)?;
+
        Ok(RefsAt { remote, at })
+
    }
+

+
    pub fn load<S: ReadRepository>(&self, repo: &S) -> Result<SignedRefsAt, Error> {
+
        SignedRefsAt::load_at(self.at, self.remote, repo)
+
    }
+
}
+

/// Verified [`SignedRefs`] that keeps track of their content address
/// [`Oid`].
#[derive(Debug, Clone, PartialEq, Eq)]
modified radicle/src/test/arbitrary.rs
@@ -19,7 +19,7 @@ use crate::identity::{
use crate::node::address::AddressType;
use crate::node::{Address, Alias};
use crate::storage;
-
use crate::storage::refs::{Refs, SignedRefs};
+
use crate::storage::refs::{Refs, RefsAt, SignedRefs};
use crate::test::storage::{MockRepository, MockStorage};
use crate::{cob, git};

@@ -206,6 +206,15 @@ impl Arbitrary for Refs {
    }
}

+
impl Arbitrary for RefsAt {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        Self {
+
            remote: PublicKey::arbitrary(g),
+
            at: oid(),
+
        }
+
    }
+
}
+

impl Arbitrary for MockStorage {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        let inventory = Arbitrary::arbitrary(g);
modified radicle/src/test/storage.rs
@@ -21,7 +21,7 @@ pub struct MockStorage {

    /// All refs keyed by RID.
    /// Each value is a map of refs keyed by node Id (public key).
-
    pub remotes: HashMap<Id, HashMap<NodeId, refs::SignedRefs<Verified>>>,
+
    pub remotes: HashMap<Id, HashMap<NodeId, refs::SignedRefsAt>>,
}

impl MockStorage {
@@ -39,16 +39,11 @@ impl MockStorage {
    }

    /// Add a remote `node` with `signed_refs` for the repo `rid`.
-
    pub fn insert_remote(
-
        &mut self,
-
        rid: Id,
-
        node: NodeId,
-
        signed_refs: refs::SignedRefs<Verified>,
-
    ) {
+
    pub fn insert_remote(&mut self, rid: Id, node: NodeId, refs: refs::SignedRefsAt) {
        self.remotes
            .entry(rid)
            .or_insert(HashMap::new())
-
            .insert(node, signed_refs);
+
            .insert(node, refs);
    }
}

@@ -113,7 +108,7 @@ impl WriteStorage for MockStorage {
pub struct MockRepository {
    id: Id,
    doc: DocAt,
-
    remotes: HashMap<NodeId, refs::SignedRefs<Verified>>,
+
    remotes: HashMap<NodeId, refs::SignedRefsAt>,
}

impl MockRepository {
@@ -136,7 +131,9 @@ impl RemoteRepository for MockRepository {
    fn remote(&self, id: &RemoteId) -> Result<Remote<Verified>, refs::Error> {
        self.remotes
            .get(id)
-
            .map(|refs| Remote { refs: refs.clone() })
+
            .map(|refs| Remote {
+
                refs: refs.sigrefs.clone(),
+
            })
            .ok_or(refs::Error::InvalidRef)
    }

@@ -144,7 +141,14 @@ impl RemoteRepository for MockRepository {
        Ok(self
            .remotes
            .iter()
-
            .map(|(id, refs)| (*id, Remote { refs: refs.clone() }))
+
            .map(|(id, refs)| {
+
                (
+
                    *id,
+
                    Remote {
+
                        refs: refs.sigrefs.clone(),
+
                    },
+
                )
+
            })
            .collect())
    }
}
@@ -210,10 +214,23 @@ impl ReadRepository for MockRepository {

    fn reference_oid(
        &self,
-
        _remote: &RemoteId,
-
        _reference: &git::Qualified,
+
        remote: &RemoteId,
+
        reference: &git::Qualified,
    ) -> Result<git_ext::Oid, git::raw::Error> {
-
        Ok(Oid::from_str("ffffffffffffffffffffffffffffffffffffffff").unwrap())
+
        let not_found = || {
+
            git::raw::Error::new(
+
                git::raw::ErrorCode::NotFound,
+
                git::raw::ErrorClass::Reference,
+
                format!("could not find {reference} for {remote}"),
+
            )
+
        };
+

+
        let refs = self.remotes.get(remote).ok_or_else(not_found)?;
+
        if reference == &*refs::SIGREFS_BRANCH {
+
            Ok(refs.at)
+
        } else {
+
            refs.sigrefs.get(reference).ok_or_else(not_found)
+
        }
    }

    fn references_of(&self, _remote: &RemoteId) -> Result<crate::storage::refs::Refs, Error> {