Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Include identity doc in fetch result
cloudhead committed 2 years ago
commit e90c6a49bcd3a75edafa06502f7d34277ea25f37
parent 991505ec9dadc58f8d8a21f5b6818577c10b8ca5
9 files changed +114 -41
modified radicle-node/src/service.rs
@@ -50,7 +50,7 @@ use crate::service::message::{
};
use crate::service::policy::{store::Write, Policy, Scope};
use crate::storage;
-
use crate::storage::{refs::RefsAt, Namespaces, ReadRepository, ReadStorage};
+
use crate::storage::{refs::RefsAt, Namespaces, ReadStorage};
use crate::worker::fetch;
use crate::worker::FetchError;
use crate::Link;
@@ -780,18 +780,32 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
-
            Command::AnnounceRefs(id, resp) => match self.announce_own_refs(id) {
-
                Ok(refs) => match refs.as_slice() {
-
                    &[refs] => {
-
                        resp.send(refs).ok();
+
            Command::AnnounceRefs(id, resp) => {
+
                let doc = match self.storage.get(id) {
+
                    Ok(Some(doc)) => doc,
+
                    Ok(None) => {
+
                        error!(target: "service", "Error announcing refs: repository {id} not found");
+
                        return;
+
                    }
+
                    Err(e) => {
+
                        error!(target: "service", "Error announcing refs: doc error: {e}");
+
                        return;
+
                    }
+
                };
+

+
                match self.announce_own_refs(id, doc) {
+
                    Ok(refs) => match refs.as_slice() {
+
                        &[refs] => {
+
                            resp.send(refs).ok();
+
                        }
+
                        // SAFETY: Since we passed in one NID, we should get exactly one item back.
+
                        [..] => panic!("Service::command: unexpected refs returned"),
+
                    },
+
                    Err(err) => {
+
                        error!(target: "service", "Error announcing refs: {err}");
                    }
-
                    // SAFETY: Since we passed in one NID, we should get exactly one item back.
-
                    [..] => panic!("Service::command: unexpected refs returned"),
-
                },
-
                Err(err) => {
-
                    error!(target: "service", "Error announcing refs: {err}");
                }
-
            },
+
            }
            Command::AnnounceInventory => {
                if let Err(err) = self
                    .storage
@@ -954,6 +968,7 @@ where
                updated,
                namespaces,
                clone,
+
                doc,
            }) => {
                info!(target: "service", "Fetched {rid} from {remote} successfully");

@@ -974,6 +989,7 @@ where
                    updated,
                    namespaces,
                    clone,
+
                    doc,
                }
            }
            Err(err) => {
@@ -1013,6 +1029,7 @@ where
        if fetching.subscribers.is_empty() {
            trace!(target: "service", "No fetch requests found for {rid}..");

+
            // TODO: Combine this with the below.
            // We only announce refs here when the fetch wasn't user-requested. This is
            // because the user might want to announce his fork, once he has created one,
            // or may choose to not announce anything.
@@ -1020,9 +1037,12 @@ where
                FetchResult::Success {
                    updated,
                    namespaces,
+
                    doc,
                    ..
                } if !updated.is_empty() => {
-
                    if let Err(e) = self.announce_refs(rid, namespaces.iter().cloned()) {
+
                    if let Err(e) =
+
                        self.announce_refs(rid, doc.clone().into(), namespaces.iter().cloned())
+
                    {
                        error!(target: "service", "Failed to announce new refs: {e}");
                    }
                }
@@ -1031,8 +1051,14 @@ where
        }

        // Announce our new inventory if this fetch was a clone.
-
        if let FetchResult::Success { clone: true, .. } = result {
-
            self.storage.insert(rid);
+
        if let FetchResult::Success {
+
            doc, clone: true, ..
+
        } = result
+
        {
+
            // Only add inventory for public repositories.
+
            if doc.visibility.is_public() {
+
                self.storage.insert(rid);
+
            }
            self.sync_and_announce();
        }

@@ -1832,8 +1858,8 @@ where
    }

    /// Announce our own refs for the given repo.
-
    fn announce_own_refs(&mut self, rid: RepoId) -> Result<Vec<RefsAt>, Error> {
-
        let refs = self.announce_refs(rid, [self.node_id()])?;
+
    fn announce_own_refs(&mut self, rid: RepoId, doc: Doc<Verified>) -> Result<Vec<RefsAt>, Error> {
+
        let refs = self.announce_refs(rid, doc, [self.node_id()])?;

        // Update refs database with our signed refs branches.
        // This isn't strictly necessary for now, as we only use the database for fetches, and
@@ -1860,10 +1886,9 @@ where
    fn announce_refs(
        &mut self,
        rid: RepoId,
+
        doc: Doc<Verified>,
        remotes: impl IntoIterator<Item = NodeId>,
    ) -> Result<Vec<RefsAt>, Error> {
-
        let repo = self.storage.repository(rid)?;
-
        let doc = repo.identity_doc()?;
        let peers = self.sessions.connected().map(|(_, p)| p);
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;

modified radicle-node/src/test/arbitrary.rs
@@ -1,7 +1,10 @@
+
use std::collections::HashSet;
+

use bloomy::BloomFilter;
use qcheck::Arbitrary;

use crate::crypto;
+
use crate::identity::DocAt;
use crate::node::Alias;
use crate::prelude::{BoundedVec, NodeId, RepoId, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
@@ -10,6 +13,7 @@ use crate::service::message::{
    Subscribe, ZeroBytes,
};
use crate::wire::MessageType;
+
use crate::worker::fetch::FetchResult;

pub use radicle::test::arbitrary::*;

@@ -27,6 +31,17 @@ impl Arbitrary for Filter {
    }
}

+
impl Arbitrary for FetchResult {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        FetchResult {
+
            updated: vec![],
+
            namespaces: HashSet::arbitrary(g),
+
            clone: bool::arbitrary(g),
+
            doc: DocAt::arbitrary(g),
+
        }
+
    }
+
}
+

impl Arbitrary for Message {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        let type_id = g
modified radicle-node/src/test/handle.rs
@@ -5,8 +5,9 @@ use std::time;

use radicle::git;
use radicle::storage::refs::RefsAt;
+
use radicle::test::arbitrary;

-
use crate::identity::RepoId;
+
use crate::identity::{DocAt, RepoId};
use crate::node::{Alias, Config, ConnectOptions, ConnectResult, Event, FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service::policy;
@@ -62,6 +63,7 @@ impl radicle::node::Handle for Handle {
            updated: vec![],
            namespaces: HashSet::new(),
            clone: false,
+
            doc: arbitrary::gen::<DocAt>(1),
        })
    }

modified radicle-node/src/test/simulator.rs
@@ -19,7 +19,8 @@ use crate::prelude::{Address, RepoId};
use crate::service::io::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::Namespaces;
-
use crate::storage::WriteStorage;
+
use crate::storage::{ReadRepository, WriteStorage};
+
use crate::test::arbitrary;
use crate::test::peer::Service;
use crate::worker::{fetch, FetchError};
use crate::Link;
@@ -409,20 +410,30 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            .extend(msgs.into_iter().map(|m| (from, p.node_id(), m)));
                    }
                    Input::Fetched(rid, nid, result) => {
-
                        let result = Rc::try_unwrap(result).unwrap();
+
                        let mut result = Rc::try_unwrap(result).unwrap();
                        let repo = match p.storage().repository_mut(rid) {
                            Ok(repo) => repo,
                            Err(e) if e.is_not_found() => p.storage().create(rid).unwrap(),
                            Err(e) => panic!("Failed to open repository: {e}"),
                        };
-
                        match &result {
-
                            Ok(fetch::FetchResult { namespaces, .. }) => {
-
                                radicle::test::fetch(
-
                                    &repo,
-
                                    &nid,
-
                                    Namespaces::Followed(namespaces.clone()),
-
                                )
-
                                .unwrap();
+

+
                        match &mut result {
+
                            Ok(fetch::FetchResult {
+
                                namespaces,
+
                                updated,
+
                                doc,
+
                                ..
+
                            }) => {
+
                                *updated =
+
                                    radicle::test::fetch(&repo, &nid, Namespaces::All).unwrap();
+
                                *namespaces = updated
+
                                    .iter()
+
                                    .map(|r| {
+
                                        NodeId::from_namespaced(&r.name().to_namespaced().unwrap())
+
                                            .unwrap()
+
                                    })
+
                                    .collect();
+
                                *doc = repo.identity_doc().unwrap();
                            }
                            Err(err) => panic!("Error fetching: {err}"),
                        }
@@ -657,6 +668,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                                        Namespaces::All => HashSet::new(),
                                    },
                                    clone: true,
+
                                    doc: arbitrary::gen(1),
                                })),
                            ),
                        },
modified radicle-node/src/tests.rs
@@ -1344,6 +1344,7 @@ fn test_queued_fetch_max_capacity() {
    let rid1 = *repo_keys.next().unwrap();
    let rid2 = *repo_keys.next().unwrap();
    let rid3 = *repo_keys.next().unwrap();
+
    let doc = storage.repos.get(&rid1).unwrap().doc.clone();
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage);
    let bob = Peer::new("bob", [8, 8, 8, 8]);

@@ -1372,14 +1373,14 @@ fn test_queued_fetch_max_capacity() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::new(doc.clone())));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid2);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::new(doc)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid3);
}
@@ -1453,6 +1454,7 @@ fn test_queued_fetch_from_ann_same_rid() {
            }],
            namespaces: [carol.id()].into_iter().collect(),
            clone: false,
+
            doc: arbitrary::gen(1),
        }),
    );
    // Now the 1st fetch is done, but the 2nd and 3rd fetches are redundant.
@@ -1496,14 +1498,14 @@ fn test_queued_fetch_from_command_same_rid() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid1, bob.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == eve.id);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid1, eve.id, Ok(fetch::FetchResult::default()));
+
    alice.fetched(rid1, eve.id, Ok(arbitrary::gen::<fetch::FetchResult>(1)));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, nid, _)) if rid == rid1 && nid == carol.id);
}
modified radicle-node/src/worker/fetch.rs
@@ -6,6 +6,7 @@ use std::str::FromStr;
use localtime::LocalTime;

use radicle::crypto::PublicKey;
+
use radicle::identity::DocAt;
use radicle::prelude::RepoId;
use radicle::storage::refs::RefsAt;
use radicle::storage::{
@@ -16,14 +17,27 @@ use radicle_fetch::{Allowed, BlockList, FetchLimit};

use super::channels::ChannelsFlush;

-
#[derive(Debug, Default)]
+
#[derive(Debug, Clone)]
pub struct FetchResult {
-
    /// The set of updates references.
+
    /// The set of updated references.
    pub updated: Vec<RefUpdate>,
    /// The set of remote namespaces that were updated.
    pub namespaces: HashSet<PublicKey>,
    /// The fetch was a full clone.
    pub clone: bool,
+
    /// Identity doc of fetched repo.
+
    pub doc: DocAt,
+
}
+

+
impl FetchResult {
+
    pub fn new(doc: DocAt) -> Self {
+
        Self {
+
            updated: vec![],
+
            namespaces: HashSet::new(),
+
            clone: false,
+
            doc,
+
        }
+
    }
}

pub enum Handle {
@@ -129,6 +143,7 @@ impl Handle {
                Ok(FetchResult {
                    updated: applied.updated,
                    namespaces: remotes.into_iter().collect(),
+
                    doc: repo.identity_doc()?,
                    clone,
                })
            }
modified radicle/src/identity/doc.rs
@@ -130,7 +130,7 @@ impl Deref for Payload {
}

/// A verified identity document at a specific commit.
-
#[derive(Debug, Clone, PartialEq, Eq)]
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DocAt {
    /// The commit at which this document exists.
    pub commit: Oid,
modified radicle/src/node.rs
@@ -29,7 +29,7 @@ use serde_json as json;

use crate::crypto::PublicKey;
use crate::git;
-
use crate::identity::RepoId;
+
use crate::identity::{DocAt, RepoId};
use crate::profile;
use crate::storage::refs::RefsAt;
use crate::storage::RefUpdate;
@@ -686,6 +686,7 @@ pub enum FetchResult {
        updated: Vec<RefUpdate>,
        namespaces: HashSet<NodeId>,
        clone: bool,
+
        doc: DocAt,
    },
    // TODO: Create enum for reason.
    Failed {
@@ -718,13 +719,14 @@ impl FetchResult {
    }
}

-
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
-
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
+
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool, DocAt), S>> for FetchResult {
+
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool, DocAt), S>) -> Self {
        match value {
-
            Ok((updated, namespaces, clone)) => Self::Success {
+
            Ok((updated, namespaces, clone, doc)) => Self::Success {
                updated,
                namespaces,
                clone,
+
                doc,
            },
            Err(err) => Self::Failed {
                reason: err.to_string(),
modified radicle/src/storage.rs
@@ -394,7 +394,7 @@ pub trait ReadStorage {
        match self.repository(rid) {
            Ok(repo) => Ok(Some(repo.identity_doc()?.into())),
            Err(e) if e.is_not_found() => Ok(None),
-
            Err(e) => Err(e.into()),
+
            Err(e) => Err(e),
        }
    }
}