Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Re-think inventory update code
cloudhead committed 2 years ago
commit 2bcb03b021af544142532e97441d2c9efcf4a688
parent 0e880e12e693e4868ade7279bf71c83e6bc12ed4
15 files changed +116 -56
modified radicle-cli/src/commands/init.rs
@@ -367,7 +367,7 @@ fn sync(
    let events = node.subscribe(time::Duration::from_secs(3))?;
    let sessions = node.sessions()?;

-
    node.sync_inventory()?;
+
    node.update_inventory(rid)?;
    spinner.message("Announcing..");

    if !sessions.iter().any(|s| s.is_connected()) {
modified radicle-cli/src/commands/sync.rs
@@ -531,7 +531,6 @@ pub fn announce_inventory(mut node: Node) -> anyhow::Result<()> {
    let peers = node.sessions()?.iter().filter(|s| s.is_connected()).count();
    let spinner = term::spinner(format!("Announcing inventory to {peers} peers.."));

-
    node.sync_inventory()?;
    node.announce_inventory()?;
    spinner.finish();

modified radicle-node/src/bounded.rs
@@ -1,4 +1,4 @@
-
use std::ops;
+
use std::{collections::BTreeSet, ops};

#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -189,6 +189,22 @@ impl<T, const N: usize> TryFrom<Vec<T>> for BoundedVec<T, N> {
    }
}

+
impl<T, const N: usize> TryFrom<BTreeSet<T>> for BoundedVec<T, N> {
+
    type Error = Error;
+

+
    fn try_from(value: BTreeSet<T>) -> Result<Self, Self::Error> {
+
        if value.len() > N {
+
            return Err(Error::InvalidSize {
+
                expected: N,
+
                actual: value.len(),
+
            });
+
        }
+
        Ok(BoundedVec {
+
            v: value.into_iter().collect(),
+
        })
+
    }
+
}
+

impl<T, const N: usize> From<BoundedVec<T, N>> for Vec<T> {
    fn from(value: BoundedVec<T, N>) -> Self {
        value.v
modified radicle-node/src/control.rs
@@ -168,7 +168,7 @@ where
            }
            CommandResult::ok().to_writer(writer).ok();
        }
-
        Command::SyncInventory => match handle.sync_inventory() {
+
        Command::UpdateInventory { rid } => match handle.update_inventory(rid) {
            Ok(result) => {
                CommandResult::updated(result).to_writer(writer)?;
            }
modified radicle-node/src/runtime/handle.rs
@@ -243,9 +243,9 @@ impl radicle::node::Handle for Handle {
            .map_err(Error::from)
    }

-
    fn sync_inventory(&mut self) -> Result<bool, Error> {
+
    fn update_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::SyncInventory(sender))?;
+
        self.command(service::Command::UpdateInventory(rid, sender))?;
        receiver.recv().map_err(Error::from)
    }

modified radicle-node/src/service.rs
@@ -10,7 +10,7 @@ pub mod message;
pub mod session;

use std::collections::hash_map::Entry;
-
use std::collections::{HashMap, HashSet, VecDeque};
+
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, net, time};
@@ -31,7 +31,7 @@ use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{ConnectOptions, Penalty, Severity};
use radicle::storage::refs::SignedRefsUpdate;
-
use radicle::storage::RepositoryError;
+
use radicle::storage::{Inventory, RepositoryError};

use crate::crypto;
use crate::crypto::{Signer, Verified};
@@ -172,8 +172,8 @@ pub enum Command {
    AnnounceRefs(RepoId, chan::Sender<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
-
    /// Announce local inventory to peers.
-
    SyncInventory(chan::Sender<bool>),
+
    /// Update local inventory.
+
    UpdateInventory(RepoId, chan::Sender<bool>),
    /// Connect to node with the given address.
    Connect(NodeId, Address, ConnectOptions),
    /// Disconnect from node.
@@ -203,7 +203,7 @@ impl fmt::Debug for Command {
        match self {
            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
-
            Self::SyncInventory(_) => write!(f, "SyncInventory(..)"),
+
            Self::UpdateInventory(rid, _) => write!(f, "UpdateInventory({rid})"),
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Config(_) => write!(f, "Config"),
@@ -788,7 +788,8 @@ where
                    error!(target: "service", "Error announcing inventory: {err}");
                }
            }
-
            Command::SyncInventory(resp) => {
+
            Command::UpdateInventory(rid, resp) => {
+
                self.storage.insert(rid);
                let synced = self
                    .sync_inventory()
                    .expect("Service::command: error syncing inventory");
@@ -938,6 +939,7 @@ where
            Ok(fetch::FetchResult {
                updated,
                namespaces,
+
                clone,
            }) => {
                debug!(target: "service", "Fetched {rid} from {remote} successfully");

@@ -953,6 +955,7 @@ where
                FetchResult::Success {
                    updated,
                    namespaces,
+
                    clone,
                }
            }
            Err(err) => {
@@ -999,6 +1002,7 @@ where
                FetchResult::Success {
                    updated,
                    namespaces,
+
                    ..
                } if !updated.is_empty() => {
                    if let Err(e) = self.announce_refs(rid, namespaces.iter().cloned()) {
                        error!(target: "service", "Failed to announce new refs: {e}");
@@ -1007,14 +1011,12 @@ where
                _ => debug!(target: "service", "Nothing to announce, no refs were updated.."),
            }
        }
-
        // TODO: Since this fetch could be either a full clone
-
        // or simply a ref update, we need to either announce
-
        // new inventory, or new refs. Right now, we announce
-
        // both in some cases.
-
        //
-
        // Announce the newly fetched repository to the
-
        // network, if necessary.
-
        self.sync_and_announce();
+

+
        // Announce our new inventory if this fetch was a clone.
+
        if let FetchResult::Success { clone: true, .. } = result {
+
            self.storage.insert(rid);
+
            self.sync_and_announce();
+
        }

        // We can now try to dequeue another fetch.
        self.dequeue_fetch();
@@ -1311,7 +1313,11 @@ where
                    inventory: message.inventory.to_vec(),
                    timestamp: message.timestamp,
                });
-
                match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
+
                match self.sync_routing(
+
                    message.inventory.iter().cloned(),
+
                    *announcer,
+
                    message.timestamp,
+
                ) {
                    Ok(synced) => {
                        if synced.is_empty() {
                            trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
@@ -1723,7 +1729,7 @@ where
                // Other than crashing the node completely, there's nothing we can do
                // here besides returning an empty inventory and logging an error.
                error!(target: "service", "Error getting local inventory for initial messages: {e}");
-
                vec![]
+
                Default::default()
            }
        };

@@ -1765,10 +1771,8 @@ where

    /// Update our routing table with our local node's inventory.
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
-
        self.storage.refresh()?; // Refresh storage inventory cache.
-

        let inventory = self.storage.inventory()?;
-
        let result = self.sync_routing(&inventory, self.node_id(), self.time())?;
+
        let result = self.sync_routing(inventory, self.node_id(), self.time())?;

        Ok(result)
    }
@@ -1778,14 +1782,18 @@ where
    /// given inventory.
    fn sync_routing(
        &mut self,
-
        inventory: &[RepoId],
+
        inventory: impl IntoIterator<Item = RepoId>,
        from: NodeId,
        timestamp: Timestamp,
    ) -> Result<SyncedRouting, Error> {
        let mut synced = SyncedRouting::default();
-
        let included: HashSet<&RepoId> = HashSet::from_iter(inventory);
+
        let included = inventory.into_iter().collect::<BTreeSet<_>>();

-
        for (rid, result) in self.db.routing_mut().insert(inventory, from, timestamp)? {
+
        for (rid, result) in self
+
            .db
+
            .routing_mut()
+
            .insert(included.iter(), from, timestamp)?
+
        {
            match result {
                InsertResult::SeedAdded => {
                    info!(target: "service", "Routing table updated for {rid} with seed {from}");
@@ -2014,7 +2022,7 @@ where
    ////////////////////////////////////////////////////////////////////////////

    /// Announce our inventory to all connected peers.
-
    fn announce_inventory(&mut self, inventory: Vec<RepoId>) -> Result<(), storage::Error> {
+
    fn announce_inventory(&mut self, inventory: Inventory) -> Result<(), storage::Error> {
        let time = if self.clock > self.last_announce {
            self.clock.as_millis()
        } else if self.last_announce - self.clock < LocalDuration::from_secs(1) {
modified radicle-node/src/service/gossip.rs
@@ -23,10 +23,12 @@ pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
    }
}

-
pub fn inventory(timestamp: Timestamp, inventory: Vec<RepoId>) -> InventoryAnnouncement {
-
    type Inventory = BoundedVec<RepoId, INVENTORY_LIMIT>;
-

-
    if inventory.len() > Inventory::max() {
+
pub fn inventory(
+
    timestamp: Timestamp,
+
    inventory: impl IntoIterator<Item = RepoId>,
+
) -> InventoryAnnouncement {
+
    let inventory = inventory.into_iter().collect::<Vec<_>>();
+
    if inventory.len() > INVENTORY_LIMIT {
        error!(
            target: "service",
            "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects",
modified radicle-node/src/test/handle.rs
@@ -61,6 +61,7 @@ impl radicle::node::Handle for Handle {
        Ok(FetchResult::Success {
            updated: vec![],
            namespaces: HashSet::new(),
+
            clone: false,
        })
    }

@@ -100,7 +101,7 @@ impl radicle::node::Handle for Handle {
        Ok(())
    }

-
    fn sync_inventory(&mut self) -> Result<bool, Self::Error> {
+
    fn update_inventory(&mut self, _rid: RepoId) -> Result<bool, Self::Error> {
        unimplemented!()
    }

modified radicle-node/src/test/simulator.rs
@@ -655,6 +655,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                                        Namespaces::Followed(hs) => hs,
                                        Namespaces::All => HashSet::new(),
                                    },
+
                                    clone: true,
                                })),
                            ),
                        },
modified radicle-node/src/tests.rs
@@ -674,7 +674,12 @@ fn test_refs_announcement_relay() {
            },
        )
    };
-
    let bob_inv = bob.storage().inventory().unwrap();
+
    let bob_inv = bob
+
        .storage()
+
        .inventory()
+
        .unwrap()
+
        .into_iter()
+
        .collect::<Vec<_>>();

    alice.seed(&bob_inv[0], policy::Scope::All).unwrap();
    alice.seed(&bob_inv[1], policy::Scope::All).unwrap();
@@ -740,13 +745,13 @@ fn test_refs_announcement_fetch_trusted_no_inventory() {
        )
    };
    let bob_inv = bob.storage().inventory().unwrap();
-
    let rid = bob_inv[0];
+
    let rid = bob_inv.first().unwrap();

    alice.seed(&rid, policy::Scope::Followed).unwrap();
    alice.connect_to(&bob);

    // Alice receives Bob's refs.
-
    alice.receive(bob.id(), bob.refs_announcement(rid));
+
    alice.receive(bob.id(), bob.refs_announcement(*rid));

    // Alice fetches Bob's refs as this is a new repo.
    assert_matches!(alice.outbox().next(), Some(Io::Fetch { .. }));
@@ -1426,6 +1431,7 @@ fn test_queued_fetch_from_ann_same_rid() {
                oid,
            }],
            namespaces: [carol.id()].into_iter().collect(),
+
            clone: false,
        }),
    );
    // Now the 1st fetch is done, but the 2nd and 3rd fetches are redundant.
@@ -1616,7 +1622,7 @@ fn test_push_and_pull() {
    // We now expect Eve to fetch Alice's project from Alice.
    // Then we expect Bob to fetch Alice's project from Eve.
    alice.elapse(LocalDuration::from_secs(1)); // Make sure our announcement is fresh.
-
    alice.command(service::Command::SyncInventory(send));
+
    alice.command(service::Command::UpdateInventory(proj_id, send));

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

modified radicle-node/src/worker/fetch.rs
@@ -22,6 +22,8 @@ pub struct FetchResult {
    pub updated: Vec<RefUpdate>,
    /// The set of remote namespaces that were updated.
    pub namespaces: HashSet<PublicKey>,
+
    /// The fetch was a full clone.
+
    pub clone: bool,
}

pub enum Handle {
@@ -69,12 +71,12 @@ impl Handle {
        remote: PublicKey,
        refs_at: Option<Vec<SignedRefsUpdate>>,
    ) -> Result<FetchResult, error::Fetch> {
-
        let (result, notifs) = match self {
+
        let (result, clone, notifs) = match self {
            Self::Clone { mut handle, tmp } => {
                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
                let result = radicle_fetch::clone(&mut handle, limit, remote)?;
                mv(tmp, storage, &rid)?;
-
                (result, None)
+
                (result, true, None)
            }
            Self::Pull {
                mut handle,
@@ -82,7 +84,7 @@ impl Handle {
            } => {
                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
                let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
-
                (result, Some(notifications))
+
                (result, false, Some(notifications))
            }
        };

@@ -125,6 +127,7 @@ impl Handle {
                Ok(FetchResult {
                    updated: applied.updated,
                    namespaces: remotes.into_iter().collect(),
+
                    clone,
                })
            }
        }
modified radicle/src/node.rs
@@ -429,8 +429,8 @@ pub enum Command {
    #[serde(rename_all = "camelCase")]
    AnnounceInventory,

-
    /// Sync local inventory with node.
-
    SyncInventory,
+
    /// Update node's inventory.
+
    UpdateInventory { rid: RepoId },

    /// Get the current node condiguration.
    Config,
@@ -654,6 +654,7 @@ pub enum FetchResult {
    Success {
        updated: Vec<RefUpdate>,
        namespaces: HashSet<NodeId>,
+
        clone: bool,
    },
    // TODO: Create enum for reason.
    Failed {
@@ -671,6 +672,7 @@ impl FetchResult {
            Self::Success {
                updated,
                namespaces,
+
                ..
            } => Some((updated, namespaces)),
            _ => None,
        }
@@ -685,12 +687,13 @@ impl FetchResult {
    }
}

-
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>), S>> for FetchResult {
-
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>), S>) -> Self {
+
impl<S: ToString> From<Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>> for FetchResult {
+
    fn from(value: Result<(Vec<RefUpdate>, HashSet<NodeId>, bool), S>) -> Self {
        match value {
-
            Ok((updated, namespaces)) => Self::Success {
+
            Ok((updated, namespaces, clone)) => Self::Success {
                updated,
                namespaces,
+
                clone,
            },
            Err(err) => Self::Failed {
                reason: err.to_string(),
@@ -725,6 +728,7 @@ impl FetchResults {
            if let FetchResult::Success {
                updated,
                namespaces,
+
                ..
            } = r
            {
                Some((nid, updated.as_slice(), namespaces.clone()))
@@ -849,8 +853,8 @@ pub trait Handle: Clone + Sync + Send {
    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Self::Error>;
    /// Announce local inventory.
    fn announce_inventory(&mut self) -> Result<(), Self::Error>;
-
    /// Notify the service that our inventory was updated.
-
    fn sync_inventory(&mut self) -> Result<bool, Self::Error>;
+
    /// Notify the service that our inventory was updated with the given repository.
+
    fn update_inventory(&mut self, rid: RepoId) -> Result<bool, Self::Error>;
    /// Ask the service to shutdown.
    fn shutdown(self) -> Result<(), Self::Error>;
    /// Query the peer session state.
@@ -1107,8 +1111,8 @@ impl Handle for Node {
        Ok(())
    }

-
    fn sync_inventory(&mut self) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::SyncInventory, DEFAULT_TIMEOUT)?;
+
    fn update_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
+
        let mut line = self.call::<Success>(Command::UpdateInventory { rid }, DEFAULT_TIMEOUT)?;
        let response = line.next().ok_or(Error::EmptyResponse {})??;

        Ok(response.updated)
modified radicle/src/storage.rs
@@ -1,7 +1,7 @@
pub mod git;
pub mod refs;

-
use std::collections::{hash_map, HashSet};
+
use std::collections::{hash_map, BTreeSet, HashSet};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::{fmt, io};
@@ -28,7 +28,7 @@ use self::git::UserInfo;
use self::refs::SignedRefs;

pub type BranchName = git::RefString;
-
pub type Inventory = Vec<RepoId>;
+
pub type Inventory = BTreeSet<RepoId>;

/// Describes one or more namespaces.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
@@ -374,6 +374,8 @@ pub trait ReadStorage {
    /// Get the inventory of repositories hosted under this storage.
    /// This function should typically only return public repositories.
    fn inventory(&self) -> Result<Inventory, Error>;
+
    /// Insert this repository into the inventory.
+
    fn insert(&self, rid: RepoId);
    /// Refresh storage inventory.
    fn refresh(&self) -> Result<(), Error>;
    /// Open or create a read-only repository.
@@ -626,6 +628,10 @@ where
        self.deref().path_of(rid)
    }

+
    fn insert(&self, rid: RepoId) {
+
        self.deref().insert(rid)
+
    }
+

    fn contains(&self, rid: &RepoId) -> Result<bool, RepositoryError> {
        self.deref().contains(rid)
    }
modified radicle/src/storage/git.rs
@@ -89,7 +89,7 @@ impl<'a> TryFrom<git2::Reference<'a>> for Ref {
pub struct Storage {
    path: PathBuf,
    info: UserInfo,
-
    inventory: Arc<Mutex<Vec<RepoId>>>,
+
    inventory: Arc<Mutex<BTreeSet<RepoId>>>,
}

impl ReadStorage for Storage {
@@ -125,6 +125,18 @@ impl ReadStorage for Storage {
        }
    }

+
    fn insert(&self, rid: RepoId) {
+
        match self.inventory.lock() {
+
            Ok(mut locked) => {
+
                locked.insert(rid);
+
            }
+
            Err(poisoned) => {
+
                let mut inv = poisoned.into_inner();
+
                inv.insert(rid);
+
            }
+
        }
+
    }
+

    fn refresh(&self) -> Result<(), Error> {
        let repos = self.repositories()?;
        let rids = repos
@@ -189,7 +201,7 @@ impl Storage {
        let storage = Self {
            path,
            info,
-
            inventory: Arc::new(Mutex::new(vec![])),
+
            inventory: Arc::new(Mutex::new(BTreeSet::new())),
        };
        storage.refresh()?;

modified radicle/src/test/storage.rs
@@ -1,4 +1,4 @@
-
use std::collections::HashMap;
+
use std::collections::{BTreeSet, HashMap};
use std::convert::Infallible;
use std::io;
use std::path::{Path, PathBuf};
@@ -76,9 +76,11 @@ impl ReadStorage for MockStorage {
    }

    fn inventory(&self) -> Result<Inventory, Error> {
-
        Ok(self.repos.keys().cloned().collect::<Vec<_>>())
+
        Ok(self.repos.keys().cloned().collect::<BTreeSet<_>>())
    }

+
    fn insert(&self, _rid: RepoId) {}
+

    fn refresh(&self) -> Result<(), Error> {
        Ok(())
    }