Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Announce inventory only if it changed
Alexis Sellier committed 3 years ago
commit 267aba59016e312039a0724a1e98f89712b2adfc
parent 7cbfde1e9b3166900fd5c33a9d2dc208182a27a7
6 files changed +135 -76
modified radicle-node/src/address/store.rs
@@ -4,13 +4,13 @@ use std::{fmt, io};
use radicle::node;
use radicle::node::Address;
use radicle::prelude::Timestamp;
+
use radicle::sql::transaction;
use sqlite as sql;
use thiserror::Error;

use crate::address::types;
use crate::address::{KnownAddress, Source};
use crate::service::NodeId;
-
use crate::sql::transaction;
use crate::wire::AddressType;

#[derive(Error, Debug)]
modified radicle-node/src/lib.rs
@@ -6,7 +6,6 @@ pub mod logger;
pub mod runtime;
pub mod service;
pub mod signals;
-
pub mod sql;
#[cfg(any(test, feature = "test"))]
pub mod test;
#[cfg(test)]
modified radicle-node/src/service.rs
@@ -27,6 +27,7 @@ use crate::identity::IdentityError;
use crate::identity::{Doc, Id};
use crate::node;
use crate::node::routing;
+
use crate::node::routing::InsertResult;
use crate::node::{Address, Features, FetchResult, Seed, Seeds};
use crate::prelude::*;
use crate::runtime::Emitter;
@@ -102,6 +103,23 @@ pub enum Event {
    },
}

+
/// Result of syncing our routing table with a node's inventory.
+
#[derive(Default)]
+
struct SyncedRouting {
+
    /// Repo entries added.
+
    added: Vec<Id>,
+
    /// Repo entries removed.
+
    removed: Vec<Id>,
+
    /// Repo entries updated (time).
+
    updated: Vec<Id>,
+
}
+

+
impl SyncedRouting {
+
    fn is_empty(&self) -> bool {
+
        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
+
    }
+
}
+

/// General service error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -527,10 +545,11 @@ where
                }
            }
            Command::SyncInventory(resp) => {
-
                let updated = self
+
                let synced = self
                    .sync_inventory()
                    .expect("Service::command: error syncing inventory");
-
                resp.send(!updated.is_empty()).ok();
+
                resp.send(synced.added.len() + synced.removed.len() > 0)
+
                    .ok();
            }
            Command::QueryState(query, sender) => {
                sender.send(query(self)).ok();
@@ -806,8 +825,8 @@ where
                }

                match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
-
                    Ok(updated) => {
-
                        if updated.is_empty() {
+
                    Ok(synced) => {
+
                        if synced.is_empty() {
                            debug!(target: "service", "No routes updated by inventory announcement from {announcer}");
                            return Ok(false);
                        }
@@ -865,11 +884,11 @@ where

                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
-
                if let Ok(updated) = self
+
                if let Ok(result) = self
                    .routing
                    .insert(message.rid, *announcer, message.timestamp)
                {
-
                    if updated {
+
                    if let InsertResult::SeedAdded = result {
                        self.emitter.emit(Event::SeedDiscovered {
                            rid: message.rid,
                            nid: *relayer,
@@ -1113,11 +1132,11 @@ where
    }

    /// Update our routing table with our local node's inventory.
-
    fn sync_inventory(&mut self) -> Result<Vec<Id>, Error> {
+
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
        let inventory = self.storage.inventory()?;
-
        let updated = self.sync_routing(&inventory, self.node_id(), self.time())?;
+
        let result = self.sync_routing(&inventory, self.node_id(), self.time())?;

-
        Ok(updated)
+
        Ok(result)
    }

    /// Process a peer inventory announcement by updating our routing table.
@@ -1128,39 +1147,43 @@ where
        inventory: &[Id],
        from: NodeId,
        timestamp: Timestamp,
-
    ) -> Result<Vec<Id>, Error> {
-
        let mut updated = Vec::new();
+
    ) -> Result<SyncedRouting, Error> {
+
        let mut synced = SyncedRouting::default();
        let mut included = HashSet::new();

        for rid in inventory {
            included.insert(rid);
-
            if self.routing.insert(*rid, from, timestamp)? {
-
                info!(target: "service", "Routing table updated for {rid} with seed {from}");
-
                self.emitter.emit(Event::SeedDiscovered {
-
                    rid: *rid,
-
                    nid: from,
-
                });
-

-
                if self
-
                    .tracking
-
                    .is_repo_tracked(rid)
-
                    .expect("Service::process_inventory: error accessing tracking configuration")
-
                {
-
                    // TODO: We should fetch here if we're already connected, case this seed has
-
                    // refs we don't have.
+
            match self.routing.insert(*rid, from, timestamp)? {
+
                InsertResult::SeedAdded => {
+
                    info!(target: "service", "Routing table updated for {rid} with seed {from}");
+
                    self.emitter.emit(Event::SeedDiscovered {
+
                        rid: *rid,
+
                        nid: from,
+
                    });
+

+
                    if self.tracking.is_repo_tracked(rid).expect(
+
                        "Service::process_inventory: error accessing tracking configuration",
+
                    ) {
+
                        // TODO: We should fetch here if we're already connected, case this seed has
+
                        // refs we don't have.
+
                    }
+
                    synced.added.push(*rid);
                }
-
                updated.push(*rid);
+
                InsertResult::TimeUpdated => {
+
                    synced.updated.push(*rid);
+
                }
+
                InsertResult::NotUpdated => {}
            }
        }
        for rid in self.routing.get_resources(&from)?.into_iter() {
            if !included.contains(&rid) {
                if self.routing.remove(&rid, &from)? {
-
                    updated.push(rid);
+
                    synced.removed.push(rid);
                    self.emitter.emit(Event::SeedDropped { rid, nid: from });
                }
            }
        }
-
        Ok(updated)
+
        Ok(synced)
    }

    /// Announce local refs for given id.
@@ -1214,9 +1237,9 @@ where

    fn sync_and_announce(&mut self) {
        match self.sync_inventory() {
-
            // TODO: This will always be `true` because the local clock is ticking.
-
            Ok(updated) => {
-
                if !updated.is_empty() {
+
            Ok(synced) => {
+
                // Only announce if our inventory changed.
+
                if synced.added.len() + synced.removed.len() > 0 {
                    if let Err(e) = self
                        .storage
                        .inventory()
deleted radicle-node/src/sql.rs
@@ -1,21 +0,0 @@
-
use sqlite as sql;
-

-
/// Run an SQL query inside a transaction.
-
/// Commits the transaction on success, and rolls back on error.
-
pub fn transaction<T>(
-
    db: &sql::Connection,
-
    query: impl FnOnce(&sql::Connection) -> Result<T, sql::Error>,
-
) -> Result<T, sql::Error> {
-
    db.execute("BEGIN")?;
-

-
    match query(db) {
-
        Ok(result) => {
-
            db.execute("COMMIT")?;
-
            Ok(result)
-
        }
-
        Err(err) => {
-
            db.execute("ROLLBACK")?;
-
            Err(err)
-
        }
-
    }
-
}
modified radicle/src/node/routing.rs
@@ -8,6 +8,7 @@ use thiserror::Error;
use crate::{
    prelude::Timestamp,
    prelude::{Id, NodeId},
+
    sql::transaction,
};

/// How long to wait for the database lock to be released before failing a read.
@@ -15,6 +16,17 @@ const DB_READ_TIMEOUT: time::Duration = time::Duration::from_secs(3);
/// How long to wait for the database lock to be released before failing a write.
const DB_WRITE_TIMEOUT: time::Duration = time::Duration::from_secs(6);

+
/// Result of inserting into the routing table.
+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+
pub enum InsertResult {
+
    /// Nothing was updated.
+
    NotUpdated,
+
    /// The entry's timestamp was updated.
+
    TimeUpdated,
+
    /// A new entry was inserted.
+
    SeedAdded,
+
}
+

/// An error occuring in peer-to-peer networking code.
#[derive(Error, Debug)]
pub enum Error {
@@ -83,7 +95,7 @@ pub trait Store {
        Ok(self.len()? == 0)
    }
    /// Add a new node seeding the given id.
-
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error>;
+
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<InsertResult, Error>;
    /// Remove a node for the given id.
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error>;
    /// Iterate over all entries in the routing table.
@@ -135,22 +147,37 @@ impl Store for Table {
        Ok(None)
    }

-
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error> {
+
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<InsertResult, Error> {
        let time: i64 = time.try_into().map_err(|_| Error::UnitOverflow)?;
-
        let mut stmt = self.db.prepare(
-
            "INSERT INTO routing (resource, node, time)
-
             VALUES (?, ?, ?)
-
             ON CONFLICT DO UPDATE
-
             SET time = ?3
-
             WHERE time < ?3",
-
        )?;

-
        stmt.bind((1, &id))?;
-
        stmt.bind((2, &node))?;
-
        stmt.bind((3, time))?;
-
        stmt.next()?;
-

-
        Ok(self.db.change_count() > 0)
+
        transaction(&self.db, |db| {
+
            let mut stmt =
+
                db.prepare("SELECT (time) FROM routing WHERE resource = ? AND node = ?")?;
+

+
            stmt.bind((1, &id))?;
+
            stmt.bind((2, &node))?;
+

+
            let existed = stmt.into_iter().next().is_some();
+
            let mut stmt = db.prepare(
+
                "INSERT INTO routing (resource, node, time)
+
                 VALUES (?, ?, ?)
+
                 ON CONFLICT DO UPDATE
+
                 SET time = ?3
+
                 WHERE time < ?3",
+
            )?;
+

+
            stmt.bind((1, &id))?;
+
            stmt.bind((2, &node))?;
+
            stmt.bind((3, time))?;
+
            stmt.next()?;
+

+
            Ok(match (self.db.change_count() > 0, existed) {
+
                (true, true) => InsertResult::TimeUpdated,
+
                (true, false) => InsertResult::SeedAdded,
+
                (false, _) => InsertResult::NotUpdated,
+
            })
+
        })
+
        .map_err(Error::from)
    }

    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error> {
@@ -226,7 +253,7 @@ mod test {

        for id in &ids {
            for node in &nodes {
-
                assert!(db.insert(*id, *node, 0).unwrap());
+
                assert_eq!(db.insert(*id, *node, 0).unwrap(), InsertResult::SeedAdded);
            }
        }

@@ -246,7 +273,7 @@ mod test {

        for id in &ids {
            for node in &nodes {
-
                assert!(db.insert(*id, *node, 0).unwrap());
+
                assert_eq!(db.insert(*id, *node, 0).unwrap(), InsertResult::SeedAdded);
            }
        }

@@ -266,7 +293,7 @@ mod test {

        for id in &ids {
            for node in &nodes {
-
                assert!(db.insert(*id, *node, 0).unwrap());
+
                assert_eq!(db.insert(*id, *node, 0).unwrap(), InsertResult::SeedAdded);
            }
        }

@@ -306,9 +333,20 @@ mod test {
        let node = arbitrary::gen::<NodeId>(1);
        let mut db = Table::open(":memory:").unwrap();

-
        assert!(db.insert(id, node, 0).unwrap());
-
        assert!(!db.insert(id, node, 0).unwrap());
-
        assert!(!db.insert(id, node, 0).unwrap());
+
        assert_eq!(db.insert(id, node, 0).unwrap(), InsertResult::SeedAdded);
+
        assert_eq!(db.insert(id, node, 0).unwrap(), InsertResult::NotUpdated);
+
        assert_eq!(db.insert(id, node, 0).unwrap(), InsertResult::NotUpdated);
+
    }
+

+
    #[test]
+
    fn test_insert_existing_updated_time() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let node = arbitrary::gen::<NodeId>(1);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        assert_eq!(db.insert(id, node, 0).unwrap(), InsertResult::SeedAdded);
+
        assert_eq!(db.insert(id, node, 1).unwrap(), InsertResult::TimeUpdated);
+
        assert_eq!(db.entry(&id, &node).unwrap(), Some(1));
    }

    #[test]
@@ -317,7 +355,7 @@ mod test {
        let node = arbitrary::gen::<NodeId>(1);
        let mut db = Table::open(":memory:").unwrap();

-
        assert!(db.insert(id, node, 0).unwrap());
+
        assert_eq!(db.insert(id, node, 0).unwrap(), InsertResult::SeedAdded);
        assert!(db.remove(&id, &node).unwrap());
        assert!(!db.remove(&id, &node).unwrap());
    }
modified radicle/src/sql.rs
@@ -8,6 +8,26 @@ use crate::identity::Id;
use crate::node;
use crate::node::Address;

+
/// Run an SQL query inside a transaction.
+
/// Commits the transaction on success, and rolls back on error.
+
pub fn transaction<T>(
+
    db: &sql::Connection,
+
    query: impl FnOnce(&sql::Connection) -> Result<T, sql::Error>,
+
) -> Result<T, sql::Error> {
+
    db.execute("BEGIN")?;
+

+
    match query(db) {
+
        Ok(result) => {
+
            db.execute("COMMIT")?;
+
            Ok(result)
+
        }
+
        Err(err) => {
+
            db.execute("ROLLBACK")?;
+
            Err(err)
+
        }
+
    }
+
}
+

impl TryFrom<&Value> for Id {
    type Error = sql::Error;