Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Implement routing table pruning functions
Alexis Sellier committed 3 years ago
commit 78545c2f0f60c07e8b7315213366468964f3dec4
parent b8c2ab53163e184bd8e730c445b0930cb2a006c6
2 files changed +92 -12
modified radicle-node/src/service.rs
@@ -756,7 +756,11 @@ where
    ) -> Result<(), routing::Error> {
        for proj_id in inventory {
            // TODO: Fire an event on routing update.
-
            if self.routing.insert(*proj_id, from)? && self.config.is_tracking(proj_id) {
+
            if self
+
                .routing
+
                .insert(*proj_id, from, self.clock.timestamp())?
+
                && self.config.is_tracking(proj_id)
+
            {
                self.storage.fetch(*proj_id, remote).unwrap();
            }
        }
modified radicle-node/src/service/routing.rs
@@ -5,7 +5,10 @@ use std::path::Path;
use sqlite as sql;
use thiserror::Error;

-
use crate::prelude::{Id, NodeId};
+
use crate::{
+
    clock::Timestamp,
+
    prelude::{Id, NodeId},
+
};

/// An error occuring in peer-to-peer networking code.
#[derive(Error, Debug)]
@@ -13,6 +16,9 @@ pub enum Error {
    /// An Internal error.
    #[error("internal error: {0}")]
    Internal(#[from] sql::Error),
+
    /// Internal clock time overflow.
+
    #[error("the time overflowed")]
+
    TimeOverflow,
}

/// Persistent file storage for a routing table.
@@ -51,12 +57,16 @@ impl Table {
pub trait Store {
    /// Get the nodes seeding the given id.
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error>;
+
    /// Get a specific entry.
+
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error>;
    /// Add a new node seeding the given id.
-
    fn insert(&mut self, id: Id, node: NodeId) -> Result<bool, Error>;
+
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error>;
    /// Remove a node for the given id.
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error>;
    /// Iterate over all entries in the routing table.
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error>;
+
    /// Prune entries older than the given timestamp.
+
    fn prune(&mut self, oldest: Timestamp) -> Result<usize, Error>;
}

impl Store for Table {
@@ -74,7 +84,24 @@ impl Store for Table {
        Ok(nodes)
    }

-
    fn insert(&mut self, id: Id, node: NodeId) -> Result<bool, Error> {
+
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error> {
+
        let row = self
+
            .db
+
            .prepare("SELECT (time) FROM routing WHERE resource = ? AND node = ?")?
+
            .bind(1, id)?
+
            .bind(2, node)?
+
            .into_cursor()
+
            .next();
+

+
        if let Some(Ok(row)) = row {
+
            return Ok(Some(row.get::<i64, _>(0) as Timestamp));
+
        }
+
        Ok(None)
+
    }
+

+
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error> {
+
        let time: i64 = time.try_into().map_err(|_| Error::TimeOverflow)?;
+

        self.db
            .prepare(
                "INSERT INTO routing (resource, node, time)
@@ -83,7 +110,7 @@ impl Store for Table {
            )?
            .bind(1, &id)?
            .bind(2, &node)?
-
            .bind(3, 0)?
+
            .bind(3, time)?
            .next()?;

        Ok(self.db.change_count() > 0)
@@ -114,10 +141,23 @@ impl Store for Table {

        Ok(self.db.change_count() > 0)
    }
+

+
    fn prune(&mut self, oldest: Timestamp) -> Result<usize, Error> {
+
        let oldest: i64 = oldest.try_into().map_err(|_| Error::TimeOverflow)?;
+

+
        self.db
+
            .prepare("DELETE FROM routing WHERE time < ?")?
+
            .bind(1, oldest)?
+
            .next()?;
+

+
        Ok(self.db.change_count())
+
    }
}

#[cfg(test)]
mod test {
+
    use nakamoto_net::LocalTime;
+

    use super::*;
    use crate::test::arbitrary;

@@ -129,7 +169,7 @@ mod test {

        for id in &ids {
            for node in &nodes {
-
                assert!(db.insert(*id, *node).unwrap());
+
                assert!(db.insert(*id, *node, 0).unwrap());
            }
        }

@@ -149,7 +189,7 @@ mod test {

        for id in &ids {
            for node in &nodes {
-
                assert!(db.insert(*id, *node).unwrap());
+
                assert!(db.insert(*id, *node, 0).unwrap());
            }
        }

@@ -170,7 +210,7 @@ mod test {

        for id in &ids {
            for node in &nodes {
-
                db.insert(*id, *node).unwrap();
+
                db.insert(*id, *node, 0).unwrap();
            }
        }
        for id in &ids {
@@ -189,9 +229,9 @@ mod test {
        let node = arbitrary::gen::<NodeId>(1);
        let mut db = Table::open(":memory:").unwrap();

-
        assert!(db.insert(id, node).unwrap());
-
        assert!(!db.insert(id, node).unwrap());
-
        assert!(!db.insert(id, node).unwrap());
+
        assert!(db.insert(id, node, 0).unwrap());
+
        assert!(!db.insert(id, node, 0).unwrap());
+
        assert!(!db.insert(id, node, 0).unwrap());
    }

    #[test]
@@ -200,8 +240,44 @@ mod test {
        let node = arbitrary::gen::<NodeId>(1);
        let mut db = Table::open(":memory:").unwrap();

-
        assert!(db.insert(id, node).unwrap());
+
        assert!(db.insert(id, node, 0).unwrap());
        assert!(db.remove(&id, &node).unwrap());
        assert!(!db.remove(&id, &node).unwrap());
    }
+

+
    #[test]
+
    fn test_prune() {
+
        let rng = fastrand::Rng::new();
+
        let now = LocalTime::now();
+
        let ids = arbitrary::vec::<Id>(10);
+
        let nodes = arbitrary::vec::<NodeId>(10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                let time = rng.u64(..now.as_secs());
+
                db.insert(*id, *node, time).unwrap();
+
            }
+
        }
+

+
        let ids = arbitrary::vec::<Id>(10);
+
        let nodes = arbitrary::vec::<NodeId>(10);
+

+
        for id in &ids {
+
            for node in &nodes {
+
                let time = rng.u64(now.as_secs()..i64::MAX as u64);
+
                db.insert(*id, *node, time).unwrap();
+
            }
+
        }
+

+
        let pruned = db.prune(now.as_secs()).unwrap();
+
        assert_eq!(pruned, ids.len() * nodes.len());
+

+
        for id in &ids {
+
            for node in &nodes {
+
                let t = db.entry(id, node).unwrap().unwrap();
+
                assert!(t >= now.as_secs());
+
            }
+
        }
+
    }
}