Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: prune project routing table
Slack Coder committed 3 years ago
commit 5472931ff0ba947f62b4dd85bab7a32b8cc9c158
parent a74cb416c1661f5f9899fe34a02f26b26c85000a
5 files changed +198 -19
modified radicle-node/src/main.rs
@@ -2,6 +2,8 @@ use std::{env, net, process, thread};

use anyhow::Context as _;

+
use nakamoto_net::LocalDuration;
+

use radicle::profile;
use radicle_node::crypto::ssh::keystore::MemorySigner;
use radicle_node::logger;
@@ -13,8 +15,9 @@ type Reactor = nakamoto_net_poll::Reactor<net::TcpStream>;
#[derive(Debug)]
struct Options {
    connect: Vec<Address>,
-
    listen: Vec<net::SocketAddr>,
    external_addresses: Vec<Address>,
+
    limits: service::config::Limits,
+
    listen: Vec<net::SocketAddr>,
}

impl Options {
@@ -24,6 +27,7 @@ impl Options {
        let mut parser = lexopt::Parser::from_env();
        let mut connect = Vec::new();
        let mut external_addresses = Vec::new();
+
        let mut limits = service::config::Limits::default();
        let mut listen = Vec::new();

        while let Some(arg) = parser.next()? {
@@ -32,14 +36,21 @@ impl Options {
                    let addr = parser.value()?.parse()?;
                    connect.push(addr);
                }
-
                Long("listen") => {
-
                    let addr = parser.value()?.parse()?;
-
                    listen.push(addr);
-
                }
                Long("external-address") => {
                    let addr = parser.value()?.parse()?;
                    external_addresses.push(addr);
                }
+
                Long("limit-routing-max-age") => {
+
                    let secs: u64 = parser.value()?.parse()?;
+
                    limits.routing_max_age = LocalDuration::from_secs(secs);
+
                }
+
                Long("limit-routing-max-size") => {
+
                    limits.routing_max_size = parser.value()?.parse()?;
+
                }
+
                Long("listen") => {
+
                    let addr = parser.value()?.parse()?;
+
                    listen.push(addr);
+
                }
                Long("help") => {
                    println!("usage: radicle-node [--connect <addr>]..");
                    process::exit(0);
@@ -49,8 +60,9 @@ impl Options {
        }
        Ok(Self {
            connect,
-
            listen,
            external_addresses,
+
            limits,
+
            listen,
        })
    }
}
@@ -77,6 +89,7 @@ fn main() -> anyhow::Result<()> {
        service: service::Config {
            connect: options.connect,
            external_addresses: options.external_addresses,
+
            limits: options.limits,
            ..service::Config::default()
        },
        listen: options.listen,
modified radicle-node/src/service.rs
@@ -405,7 +405,7 @@ where
        if now - self.last_prune >= PRUNE_INTERVAL {
            debug!("Running 'prune' task...");

-
            if let Err(err) = self.prune_routing_entries() {
+
            if let Err(err) = self.prune_routing_entries(&now) {
                error!("Error pruning routing entries: {}", err);
            }
            self.reactor.wakeup(PRUNE_INTERVAL);
@@ -934,8 +934,17 @@ where
        Ok(())
    }

-
    fn prune_routing_entries(&mut self) -> Result<(), storage::Error> {
-
        // TODO
+
    fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
+
        let count = self.routing.len()?;
+
        if count <= self.config.limits.routing_max_size {
+
            return Ok(());
+
        }
+

+
        let delta = count - self.config.limits.routing_max_size;
+
        self.routing.prune(
+
            (*now - self.config.limits.routing_max_age).as_secs(),
+
            Some(delta),
+
        )?;
        Ok(())
    }

modified radicle-node/src/service/config.rs
@@ -1,3 +1,5 @@
+
use super::nakamoto::LocalDuration;
+

use crate::collections::HashSet;
use crate::identity::{Id, PublicKey};
use crate::service::filter::Filter;
@@ -40,6 +42,24 @@ pub enum RemoteTracking {
    Allowed(HashSet<PublicKey>),
}

+
/// Configuration parameters defining attributes of minima and maxima.
+
#[derive(Debug, Clone)]
+
pub struct Limits {
+
    /// Number of routing table entries before we start pruning.
+
    pub routing_max_size: usize,
+
    /// How long to keep a routing table entry before being pruned.
+
    pub routing_max_age: LocalDuration,
+
}
+

+
impl Default for Limits {
+
    fn default() -> Self {
+
        Self {
+
            routing_max_size: 1000,
+
            routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
        }
+
    }
+
}
+

/// Service configuration.
#[derive(Debug, Clone)]
pub struct Config {
@@ -58,6 +78,7 @@ pub struct Config {
    pub relay: bool,
    /// List of addresses to listen on for protocol connections.
    pub listen: Vec<Address>,
+
    pub limits: Limits,
}

impl Default for Config {
@@ -70,6 +91,7 @@ impl Default for Config {
            remote_tracking: RemoteTracking::default(),
            relay: true,
            listen: vec![],
+
            limits: Limits::default(),
        }
    }
}
modified radicle-node/src/service/routing.rs
@@ -16,9 +16,9 @@ pub enum Error {
    /// An Internal error.
    #[error("internal error: {0}")]
    Internal(#[from] sql::Error),
-
    /// Internal clock time overflow.
-
    #[error("the time overflowed")]
-
    TimeOverflow,
+
    /// Internal unit overflow.
+
    #[error("the unit overflowed")]
+
    UnitOverflow,
}

/// Persistent file storage for a routing table.
@@ -61,14 +61,20 @@ pub trait Store {
    fn get_resources(&self, node_id: &NodeId) -> Result<HashSet<Id>, Error>;
    /// Get a specific entry.
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error>;
+
    /// Checks if any entries are available.
+
    fn is_empty(&self) -> Result<bool, Error> {
+
        Ok(self.len()? == 0)
+
    }
    /// Add a new node seeding the given id.
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error>;
    /// Remove a node for the given id.
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error>;
    /// Iterate over all entries in the routing table.
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error>;
+
    /// Get the total number of routing entries.
+
    fn len(&self) -> Result<usize, Error>;
    /// Prune entries older than the given timestamp.
-
    fn prune(&mut self, oldest: Timestamp) -> Result<usize, Error>;
+
    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error>;
}

impl Store for Table {
@@ -113,11 +119,13 @@ impl Store for Table {
    }

    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error> {
-
        let time: i64 = time.try_into().map_err(|_| Error::TimeOverflow)?;
+
        let time: i64 = time.try_into().map_err(|_| Error::UnitOverflow)?;
        let mut stmt = self.db.prepare(
            "INSERT INTO routing (resource, node, time)
             VALUES (?, ?, ?)
-
             ON CONFLICT DO NOTHING",
+
             ON CONFLICT DO UPDATE
+
             SET time = ?3
+
             WHERE time < ?3",
        )?;

        stmt.bind(1, &id)?;
@@ -156,11 +164,30 @@ impl Store for Table {
        Ok(self.db.change_count() > 0)
    }

-
    fn prune(&mut self, oldest: Timestamp) -> Result<usize, Error> {
-
        let oldest: i64 = oldest.try_into().map_err(|_| Error::TimeOverflow)?;
-
        let mut stmt = self.db.prepare("DELETE FROM routing WHERE time < ?")?;
+
    fn len(&self) -> Result<usize, Error> {
+
        let stmt = self.db.prepare("SELECT COUNT(1) FROM routing")?;
+
        let count: i64 = stmt
+
            .into_cursor()
+
            .next()
+
            .expect("COUNT will always return a single row")?
+
            .get(0);
+
        let count: usize = count.try_into().map_err(|_| Error::UnitOverflow)?;
+
        Ok(count)
+
    }
+

+
    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error> {
+
        let oldest: i64 = oldest.try_into().map_err(|_| Error::UnitOverflow)?;
+
        let limit: i64 = limit
+
            .unwrap_or(i64::MAX as usize)
+
            .try_into()
+
            .map_err(|_| Error::UnitOverflow)?;

+
        let mut stmt = self.db.prepare(
+
            "DELETE FROM routing WHERE rowid IN
+
            (SELECT rowid FROM routing WHERE time < ? LIMIT ?)",
+
        )?;
        stmt.bind(1, oldest)?;
+
        stmt.bind(2, limit)?;
        stmt.next()?;

        Ok(self.db.change_count())
@@ -279,6 +306,19 @@ mod test {
    }

    #[test]
+
    fn test_len() {
+
        let mut db = Table::open(":memory:").unwrap();
+
        let ids = arbitrary::vec::<Id>(10);
+
        let node = arbitrary::gen(1);
+

+
        for id in ids {
+
            db.insert(id, node, LocalTime::now().as_secs()).unwrap();
+
        }
+

+
        assert_eq!(10, db.len().unwrap(), "correct number of rows in table");
+
    }
+

+
    #[test]
    fn test_prune() {
        let rng = fastrand::Rng::new();
        let now = LocalTime::now();
@@ -303,7 +343,7 @@ mod test {
            }
        }

-
        let pruned = db.prune(now.as_secs()).unwrap();
+
        let pruned = db.prune(now.as_secs(), None).unwrap();
        assert_eq!(pruned, ids.len() * nodes.len());

        for id in &ids {
modified radicle-node/src/tests.rs
@@ -7,6 +7,7 @@ use nakamoto_net as nakamoto;
use crate::address;
use crate::collections::{HashMap, HashSet};
use crate::crypto::test::signer::MockSigner;
+
use crate::identity::Id;
use crate::prelude::{LocalDuration, Timestamp};
use crate::service::config::*;
use crate::service::filter::Filter;
@@ -228,6 +229,100 @@ fn test_inventory_sync() {
}

#[test]
+
fn test_inventory_pruning() {
+
    struct Test {
+
        limits: Limits,
+
        /// Number of projects by peer
+
        peer_projects: Vec<usize>,
+
        wait_time: LocalDuration,
+
        expected_routing_table_size: usize,
+
    }
+
    let tests = [
+
        // All zero
+
        Test {
+
            limits: Limits {
+
                routing_max_size: 0,
+
                routing_max_age: LocalDuration::from_secs(0),
+
            },
+
            peer_projects: vec![10; 5],
+
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
+
            expected_routing_table_size: 0,
+
        },
+
        // All entries are too young to expire.
+
        Test {
+
            limits: Limits {
+
                routing_max_size: 0,
+
                routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
            },
+
            peer_projects: vec![10; 5],
+
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
+
            expected_routing_table_size: 0,
+
        },
+
        // All entries remain because the table is unconstrained.
+
        Test {
+
            limits: Limits {
+
                routing_max_size: 50,
+
                routing_max_age: LocalDuration::from_mins(0),
+
            },
+
            peer_projects: vec![10; 5],
+
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
+
            expected_routing_table_size: 50,
+
        },
+
        // Some entries are pruned because the table is constrained.
+
        Test {
+
            limits: Limits {
+
                routing_max_size: 25,
+
                routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
            },
+
            peer_projects: vec![10; 5],
+
            wait_time: LocalDuration::from_mins(7 * 24 * 60) + LocalDuration::from_secs(1),
+
            expected_routing_table_size: 25,
+
        },
+
    ];
+

+
    for test in tests {
+
        let mut alice = Peer::config(
+
            "alice",
+
            Config {
+
                limits: test.limits,
+
                ..Config::default()
+
            },
+
            [7, 7, 7, 7],
+
            MockStorage::empty(),
+
            address::Book::memory().unwrap(),
+
            MockSigner::default(),
+
            fastrand::Rng::new(),
+
        );
+

+
        let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+

+
        // Tell Alice about the amazing projects available
+
        alice.connect_to(&bob);
+
        for num_projs in test.peer_projects {
+
            alice.receive(
+
                &bob.addr(),
+
                Message::inventory(
+
                    InventoryAnnouncement {
+
                        inventory: test::arbitrary::vec::<Id>(num_projs),
+
                        timestamp: bob.clock().timestamp(),
+
                    },
+
                    &MockSigner::default(),
+
                ),
+
            );
+
        }
+

+
        // Wait for things to happen
+
        assert!(test.wait_time > PRUNE_INTERVAL, "pruning must be triggered");
+
        alice.elapse(test.wait_time);
+

+
        assert_eq!(
+
            test.expected_routing_table_size,
+
            alice.routing().len().unwrap()
+
        );
+
    }
+
}
+

+
#[test]
fn test_tracking() {
    let mut alice = Peer::config(
        "alice",