Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Re-use inventory message timestamps
cloudhead committed 2 years ago
commit 7484d737d5f3aad5f466a302cbb998446970e0a3
parent 88a40297895d3c303a4df9e3e8ba5625ff96e9eb
6 files changed +132 -121
modified radicle-node/src/runtime.rs
@@ -21,9 +21,7 @@ use radicle::node::address::Store as _;
use radicle::node::notifications;
use radicle::node::Handle as _;
use radicle::profile::Home;
-
use radicle::storage;
-
use radicle::Storage;
-
use radicle::{cob, git};
+
use radicle::{cob, git, storage, Storage};

use crate::control;
use crate::crypto::Signer;
@@ -152,6 +150,7 @@ impl Runtime {
        let network = config.network;
        let rng = fastrand::Rng::new();
        let clock = LocalTime::now();
+
        let timestamp = clock.into();
        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
        let scope = config.scope;
        let policy = config.policy;
@@ -200,7 +199,7 @@ impl Runtime {
            );
            ann
        } else {
-
            service::gossip::node(&config, clock.into())
+
            service::gossip::node(&config, timestamp)
                .solve(Default::default())
                .expect("Runtime::init: unable to solve proof-of-work puzzle")
        };
@@ -226,7 +225,6 @@ impl Runtime {
        let emitter: Emitter<Event> = Default::default();
        let mut service = service::Service::new(
            config.clone(),
-
            clock,
            stores,
            storage.clone(),
            policies,
modified radicle-node/src/service.rs
@@ -32,7 +32,7 @@ use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{ConnectOptions, Penalty, Severity};
use radicle::storage::refs::SIGREFS_BRANCH;
-
use radicle::storage::{Inventory, RepositoryError};
+
use radicle::storage::RepositoryError;

use crate::crypto;
use crate::crypto::{Signer, Verified};
@@ -380,6 +380,8 @@ pub struct Service<D, S, G> {
    outbox: Outbox,
    /// Cached local node announcement.
    node: NodeAnnouncement,
+
    /// Cached local inventory announcement.
+
    inventory: InventoryAnnouncement,
    /// Source of entropy.
    rng: Rng,
    /// Ongoing fetches.
@@ -431,7 +433,6 @@ where
{
    pub fn new(
        config: Config,
-
        clock: LocalTime,
        db: Stores<D>,
        storage: S,
        policies: policy::Config<Write>,
@@ -442,6 +443,9 @@ where
    ) -> Self {
        let sessions = Sessions::new(rng.clone());
        let limiter = RateLimiter::new(config.peers());
+
        let last_timestamp = node.timestamp;
+
        let clock = LocalTime::default(); // Updated on initialize.
+
        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.

        Self {
            config,
@@ -449,6 +453,7 @@ where
            policies,
            signer,
            rng,
+
            inventory,
            node,
            clock,
            db,
@@ -461,9 +466,9 @@ where
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
-
            last_timestamp: Timestamp::MIN,
+
            last_timestamp,
            last_announce: LocalTime::default(),
-
            started_at: None,
+
            started_at: None, // Updated on initialize.
            emitter,
            listening: vec![],
        }
@@ -571,11 +576,17 @@ where
        })
    }

+
    /// Initialize service with current time. Call this once.
    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
        debug!(target: "service", "Init @{}", time.as_millis());
+
        assert_ne!(time, LocalTime::default());

        let nid = self.node_id();
+
        let inventory = self.storage.inventory()?;
+

        self.started_at = Some(time);
+
        self.clock = time;
+
        self.inventory = gossip::inventory(self.timestamp(), inventory.clone());

        // Populate refs database. This is only useful as part of the upgrade process for nodes
        // that have been online since before the refs database was created.
@@ -609,15 +620,16 @@ where
        // Ensure that our inventory is recorded in our routing table, and we are seeding
        // all of it. It can happen that inventory is not properly seeded if for eg. the
        // user creates a new repository while the node is stopped.
-
        let rids = self.storage.inventory()?;
-
        self.db.routing_mut().insert(&rids, nid, time.into())?;
+
        self.db
+
            .routing_mut()
+
            .insert(inventory.iter(), nid, time.into())?;

        let announced = self
            .db
            .seeds()
            .seeded_by(&nid)?
            .collect::<Result<HashMap<_, _>, _>>()?;
-
        for rid in rids {
+
        for rid in inventory {
            let repo = self.storage.repository(rid)?;

            // If we're not seeding this repo, just skip it.
@@ -723,11 +735,7 @@ where
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
            trace!(target: "service", "Running 'announce' task...");

-
            if let Err(err) = self
-
                .storage
-
                .inventory()
-
                .and_then(|i| self.announce_inventory(i))
-
            {
+
            if let Err(err) = self.announce_inventory() {
                error!(target: "service", "Error announcing inventory: {err}");
            }
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
@@ -852,11 +860,7 @@ where
                }
            }
            Command::AnnounceInventory => {
-
                if let Err(err) = self
-
                    .storage
-
                    .inventory()
-
                    .and_then(|i| self.announce_inventory(i))
-
                {
+
                if let Err(err) = self.announce_inventory() {
                    error!(target: "service", "Error announcing inventory: {err}");
                }
            }
@@ -1788,18 +1792,8 @@ where

    /// Set of initial messages to send to a peer.
    fn initial(&mut self, _link: Link) -> Vec<Message> {
-
        let timestamp = self.timestamp();
        let now = self.clock();
        let filter = self.filter();
-
        let inventory = match self.storage.inventory() {
-
            Ok(i) => i,
-
            Err(e) => {
-
                // Other than crashing the node completely, there's nothing we can do
-
                // here besides returning an empty inventory and logging an error.
-
                error!(target: "service", "Error getting local inventory for initial messages: {e}");
-
                Default::default()
-
            }
-
        };

        // TODO: Only subscribe to outbound connections, otherwise we will consume too
        // much bandwidth.
@@ -1823,7 +1817,7 @@ where

        vec![
            Message::node(self.node.clone(), &self.signer),
-
            Message::inventory(gossip::inventory(timestamp, inventory), &self.signer),
+
            Message::inventory(self.inventory.clone(), &self.signer),
            Message::subscribe(filter, since, Timestamp::MAX),
        ]
    }
@@ -1840,7 +1834,9 @@ where
    /// Update our routing table with our local node's inventory.
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
        let inventory = self.storage.inventory()?;
-
        let result = self.sync_routing(inventory, self.node_id(), self.clock.into())?;
+
        let result = self.sync_routing(inventory.clone(), self.node_id(), self.clock.into())?;
+
        // Update cached inventory message.
+
        self.inventory = gossip::inventory(self.timestamp(), inventory);

        Ok(result)
    }
@@ -2001,11 +1997,7 @@ where
            Ok(synced) => {
                // Only announce if our inventory changed.
                if synced.added.len() + synced.removed.len() > 0 {
-
                    if let Err(e) = self
-
                        .storage
-
                        .inventory()
-
                        .and_then(|i| self.announce_inventory(i))
-
                    {
+
                    if let Err(e) = self.announce_inventory() {
                        error!(target: "service", "Failed to announce inventory: {e}");
                    }
                }
@@ -2134,16 +2126,15 @@ where
    ////////////////////////////////////////////////////////////////////////////

    /// Announce our inventory to all connected peers.
-
    fn announce_inventory(&mut self, inventory: Inventory) -> Result<(), storage::Error> {
-
        let time = self.timestamp();
-
        let msg = AnnouncementMessage::from(gossip::inventory(time, inventory));
+
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
+
        let msg = AnnouncementMessage::from(self.inventory.clone());

        self.outbox.announce(
            msg.signed(&self.signer),
            self.sessions.connected().map(|(_, p)| p),
            self.db.gossip_mut(),
        );
-
        self.last_announce = time.to_local_time();
+
        self.last_announce = self.clock;

        Ok(())
    }
modified radicle-node/src/service/gossip/store.rs
@@ -79,6 +79,11 @@ impl Store for Database {
    }

    fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error> {
+
        assert_ne!(
+
            ann.timestamp(),
+
            Timestamp::MIN,
+
            "Timestamp of {ann:?} must not be zero"
+
        );
        let mut stmt = self.db.prepare(
            "INSERT INTO `announcements` (node, repo, type, message, signature, timestamp)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
@@ -291,3 +296,40 @@ impl TryFrom<&sql::Value> for GossipType {
        }
    }
}
+

+
#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
+
mod test {
+
    use super::*;
+
    use crate::prelude::{BoundedVec, RepoId};
+
    use crate::test::arbitrary;
+
    use localtime::LocalTime;
+
    use radicle_crypto::test::signer::MockSigner;
+

+
    #[test]
+
    fn test_announced() {
+
        let mut db = Database::memory().unwrap();
+
        let nid = arbitrary::gen::<NodeId>(1);
+
        let rid = arbitrary::gen::<RepoId>(1);
+
        let timestamp = LocalTime::now().into();
+
        let signer = MockSigner::default();
+
        let refs = AnnouncementMessage::Refs(RefsAnnouncement {
+
            rid,
+
            refs: BoundedVec::new(),
+
            timestamp,
+
        })
+
        .signed(&signer);
+
        let inv = AnnouncementMessage::Inventory(InventoryAnnouncement {
+
            inventory: BoundedVec::new(),
+
            timestamp,
+
        })
+
        .signed(&signer);
+

+
        // Only the first announcement of each type is recognized as new.
+
        assert!(db.announced(&nid, &refs).unwrap());
+
        assert!(!db.announced(&nid, &refs).unwrap());
+

+
        assert!(db.announced(&nid, &inv).unwrap());
+
        assert!(!db.announced(&nid, &inv).unwrap());
+
    }
+
}
modified radicle-node/src/test/peer.rs
@@ -44,6 +44,7 @@ pub struct Peer<S, G> {
    pub service: Service<S, G>,
    pub id: NodeId,
    pub ip: net::IpAddr,
+
    pub local_time: LocalTime,
    pub rng: fastrand::Rng,
    pub local_addr: net::SocketAddr,
    pub tempdir: tempfile::TempDir,
@@ -56,9 +57,7 @@ where
    S: WriteStorage + 'static,
    G: Signer + 'static,
{
-
    fn init(&mut self) {
-
        self.initialize();
-
    }
+
    fn init(&mut self) {}

    fn addr(&self) -> Address {
        self.address()
@@ -85,7 +84,7 @@ impl<S, G> DerefMut for Peer<S, G> {

impl Peer<MockStorage, MockSigner> {
    pub fn new(name: &'static str, ip: impl Into<net::IpAddr>) -> Self {
-
        Self::with_storage(name, ip, MockStorage::empty())
+
        Self::with_storage(name, ip, MockStorage::empty()).initialized()
    }
}

@@ -94,7 +93,7 @@ where
    S: WriteStorage + 'static,
{
    pub fn with_storage(name: &'static str, ip: impl Into<net::IpAddr>, storage: S) -> Self {
-
        Self::config(name, ip, storage, Config::default())
+
        Self::config(name, ip, storage, Config::default()).initialized()
    }
}

@@ -167,18 +166,18 @@ where
        let id = *config.signer.public_key();
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));
+
        let inventory = storage.inventory().unwrap();

        // Make sure the peer address is advertized.
        config.config.external_addresses.push(local_addr.into());
-

-
        for rid in storage.inventory().unwrap() {
-
            policies.seed(&rid, Scope::Followed).unwrap();
+
        for rid in &inventory {
+
            policies.seed(rid, Scope::Followed).unwrap();
        }
-
        let announcement = service::gossip::node(&config.config, config.local_time.into());
+
        let announcement =
+
            service::gossip::node(&config.config, Timestamp::from(config.local_time) + 1);
        let emitter: Emitter<Event> = Default::default();
        let service = Service::new(
            config.config,
-
            config.local_time,
            config.db,
            storage,
            policies,
@@ -194,25 +193,29 @@ where
            id,
            ip,
            local_addr,
+
            local_time: config.local_time,
            rng: config.rng,
            initialized: false,
            tempdir: config.tmp,
        }
    }

-
    pub fn initialize(&mut self) -> bool {
-
        if !self.initialized {
-
            info!(
-
                target: "test",
-
                "{}: Initializing: id = {}, address = {}",
-
                self.name, self.id, self.ip
-
            );
+
    pub fn initialize(&mut self) -> &mut Self {
+
        info!(
+
            target: "test",
+
            "{}: Initializing: id = {}, address = {}",
+
            self.name, self.id, self.ip
+
        );
+
        assert_ne!(self.local_time, LocalTime::default());
+

+
        self.initialized = true;
+
        self.service.initialize(self.local_time).unwrap();
+
        self
+
    }

-
            self.initialized = true;
-
            self.service.initialize(LocalTime::now()).unwrap();
-
            return true;
-
        }
-
        false
+
    pub fn initialized(mut self) -> Self {
+
        self.initialize();
+
        self
    }

    pub fn restart(&mut self) {
@@ -222,7 +225,7 @@ where
            "{}: Restarting: id = {}, address = {}",
            self.name, self.id, self.ip
        );
-
        self.service.initialize(LocalTime::now()).unwrap();
+
        self.service.initialize(*self.service.clock()).unwrap();
    }

    pub fn address(&self) -> Address {
@@ -340,7 +343,6 @@ where
    pub fn connect_from(&mut self, peer: &Self) {
        let remote_id = simulator::Peer::<S, G>::id(peer);

-
        self.initialize();
        self.service
            .connected(remote_id, peer.address(), Link::Inbound);
        self.service
@@ -366,7 +368,6 @@ where
        let remote_id = simulator::Peer::<T, H>::id(peer);
        let remote_addr = simulator::Peer::<T, H>::addr(peer);

-
        self.initialize();
        self.service.command(Command::Connect(
            remote_id,
            remote_addr.clone(),
modified radicle-node/src/tests.rs
@@ -132,7 +132,7 @@ fn test_redundant_connect() {

    // Only one connection attempt is made.
    assert_matches!(
-
        alice.outbox().collect::<Vec<_>>().as_slice(),
+
        alice.outbox().filter(|o| matches!(o, Io::Connect { .. })).collect::<Vec<_>>().as_slice(),
        [Io::Connect(id, addr)]
        if *id == bob.id() && *addr == bob.addr()
    );
@@ -233,9 +233,8 @@ fn test_persistent_peer_connect() {
            },
            ..peer::Config::default()
        },
-
    );
-

-
    alice.initialize();
+
    )
+
    .initialized();

    let outbox = alice.outbox().collect::<Vec<_>>();
    outbox
@@ -251,15 +250,14 @@ fn test_persistent_peer_connect() {
#[test]
fn test_inventory_sync() {
    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Peer::config(
+
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );
    let bob_signer = MockSigner::default();
    let bob_storage = fixtures::storage(tmp.path().join("bob"), &bob_signer).unwrap();
-
    let bob = Peer::config("bob", [8, 8, 8, 8], bob_storage, peer::Config::default());
+
    let bob = Peer::with_storage("bob", [8, 8, 8, 8], bob_storage);
    let now = LocalTime::now().into();
    let projs = bob.storage().inventory().unwrap();

@@ -349,7 +347,8 @@ fn test_inventory_pruning() {
                },
                ..peer::Config::default()
            },
-
        );
+
        )
+
        .initialized();

        let bob = Peer::config(
            "bob",
@@ -359,7 +358,8 @@ fn test_inventory_pruning() {
                local_time: alice.local_time(),
                ..peer::Config::default()
            },
-
        );
+
        )
+
        .initialized();

        // Tell Alice about the amazing projects available
        alice.connect_to(&bob);
@@ -597,14 +597,14 @@ fn test_announcement_relay() {
        Some(Message::Announcement(_))
    );

-
    alice.receive(bob.id(), bob.inventory_announcement());
+
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
    assert!(
        alice.messages(eve.id()).next().is_none(),
        "Another inventory with the same timestamp is ignored"
    );

    bob.elapse(LocalDuration::from_mins(1));
-
    alice.receive(bob.id(), bob.inventory_announcement());
+
    alice.receive(bob.id(), dbg!(bob.inventory_announcement()));
    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
@@ -647,17 +647,15 @@ fn test_announcement_relay() {
#[test]
fn test_refs_announcement_relay() {
    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Peer::config(
+
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );
-
    let eve = Peer::config(
+
    let eve = Peer::with_storage(
        "eve",
        [8, 8, 8, 8],
        Storage::open(tmp.path().join("eve"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );

    let bob = {
@@ -675,6 +673,7 @@ fn test_refs_announcement_relay() {
                ..peer::Config::default()
            },
        )
+
        .initialized()
    };
    let bob_inv = bob
        .storage()
@@ -724,11 +723,10 @@ fn test_refs_announcement_fetch_trusted_no_inventory() {
    logger::init(log::Level::Debug);

    let tmp = tempfile::tempdir().unwrap();
-
    let mut alice = Peer::config(
+
    let mut alice = Peer::with_storage(
        "alice",
        [7, 7, 7, 7],
        Storage::open(tmp.path().join("alice"), fixtures::user()).unwrap(),
-
        peer::Config::default(),
    );
    let bob = {
        let mut rng = fastrand::Rng::new();
@@ -745,6 +743,7 @@ fn test_refs_announcement_fetch_trusted_no_inventory() {
                ..peer::Config::default()
            },
        )
+
        .initialized()
    };
    let bob_inv = bob.storage().inventory().unwrap();
    let rid = bob_inv.first().unwrap();
@@ -861,8 +860,9 @@ fn test_refs_announcement_offline() {
    bob.seed(rid, policy::Scope::All).unwrap();

    // Make sure alice's service wasn't initialized before.
-
    assert!(alice.initialize());
+
    assert_eq!(*alice.clock(), LocalTime::default());

+
    alice.initialize();
    alice.connect_to(&bob);
    alice.receive(bob.id, Message::Subscribe(Subscribe::all()));

@@ -1057,7 +1057,8 @@ fn test_persistent_peer_reconnect_attempt() {
            },
            ..peer::Config::default()
        },
-
    );
+
    )
+
    .initialized();

    let mut sim = Simulation::new(
        LocalTime::now(),
@@ -1100,12 +1101,7 @@ fn test_persistent_peer_reconnect_attempt() {
fn test_persistent_peer_reconnect_success() {
    use std::collections::HashSet;

-
    let bob = Peer::config(
-
        "bob",
-
        [9, 9, 9, 9],
-
        MockStorage::empty(),
-
        peer::Config::default(),
-
    );
+
    let bob = Peer::with_storage("bob", [9, 9, 9, 9], MockStorage::empty());
    let mut alice = Peer::config(
        "alice",
        [7, 7, 7, 7],
@@ -1117,7 +1113,8 @@ fn test_persistent_peer_reconnect_success() {
            },
            ..peer::Config::default()
        },
-
    );
+
    )
+
    .initialized();
    alice.connect_to(&bob);

    // A transient error such as this will cause Alice to attempt a reconnection.
@@ -1568,20 +1565,15 @@ fn test_init_and_seed() {
    )
    .unwrap();
    let (repo, _) = fixtures::repository(tempdir.path().join("working"));
-
    let mut alice = Peer::config(
-
        "alice",
-
        [7, 7, 7, 7],
-
        storage_alice,
-
        peer::Config::default(),
-
    );
+
    let mut alice = Peer::with_storage("alice", [7, 7, 7, 7], storage_alice);

    let storage_bob =
        Storage::open(tempdir.path().join("bob").join("storage"), fixtures::user()).unwrap();
-
    let mut bob = Peer::config("bob", [8, 8, 8, 8], storage_bob, peer::Config::default());
+
    let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], storage_bob);

    let storage_eve =
        Storage::open(tempdir.path().join("eve").join("storage"), fixtures::user()).unwrap();
-
    let mut eve = Peer::config("eve", [9, 9, 9, 9], storage_eve, peer::Config::default());
+
    let mut eve = Peer::with_storage("eve", [9, 9, 9, 9], storage_eve);

    remote::mock::register(&alice.node_id(), alice.storage().path());
    remote::mock::register(&eve.node_id(), eve.storage().path());
@@ -1676,24 +1668,9 @@ fn test_init_and_seed() {
fn prop_inventory_exchange_dense() {
    fn property(alice_inv: MockStorage, bob_inv: MockStorage, eve_inv: MockStorage) {
        let rng = fastrand::Rng::new();
-
        let alice = Peer::config(
-
            "alice",
-
            [7, 7, 7, 7],
-
            alice_inv.clone(),
-
            peer::Config::default(),
-
        );
-
        let mut bob = Peer::config(
-
            "bob",
-
            [8, 8, 8, 8],
-
            bob_inv.clone(),
-
            peer::Config::default(),
-
        );
-
        let mut eve = Peer::config(
-
            "eve",
-
            [9, 9, 9, 9],
-
            eve_inv.clone(),
-
            peer::Config::default(),
-
        );
+
        let alice = Peer::with_storage("alice", [7, 7, 7, 7], alice_inv.clone());
+
        let mut bob = Peer::with_storage("bob", [8, 8, 8, 8], bob_inv.clone());
+
        let mut eve = Peer::with_storage("eve", [9, 9, 9, 9], eve_inv.clone());
        let mut routing = RandomMap::with_hasher(rng.clone().into());

        for (inv, peer) in &[
modified radicle/src/node/timestamp.rs
@@ -8,7 +8,9 @@ use localtime::LocalTime;
use sqlite as sql;

/// Milliseconds since epoch.
-
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+
#[derive(
+
    Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
+
)]
#[serde(transparent)]
pub struct Timestamp(u64);