Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Move `routing` to `radicle` crate
Alexis Sellier committed 3 years ago
commit 40c58b45ed5b41b7776d1ecf920309f3a3aa1c39
parent b558c742e681345f5895e99d35338d90a10723cd
16 files changed +386 -384
modified Cargo.lock
@@ -1795,6 +1795,7 @@ dependencies = [
 "fastrand",
 "git-ref-format",
 "git2",
+
 "localtime",
 "log",
 "multibase",
 "nonempty 0.8.1",
modified radicle-node/src/address/store.rs
@@ -3,12 +3,12 @@ use std::{fmt, io};

use radicle::node;
use radicle::node::Address;
+
use radicle::prelude::Timestamp;
use sqlite as sql;
use thiserror::Error;

use crate::address::types;
use crate::address::{KnownAddress, Source};
-
use crate::clock::Timestamp;
use crate::service::NodeId;
use crate::sql::transaction;
use crate::wire::AddressType;
modified radicle-node/src/address/types.rs
@@ -3,8 +3,8 @@ use std::ops::{Deref, DerefMut};
use nonempty::NonEmpty;
use radicle::node;
use radicle::node::Address;
+
use radicle::prelude::Timestamp;

-
use crate::clock::Timestamp;
use crate::collections::HashMap;
use crate::LocalTime;

deleted radicle-node/src/clock.rs
@@ -1,2 +0,0 @@
-
/// Milliseconds since epoch.
-
pub type Timestamp = u64;
modified radicle-node/src/lib.rs
@@ -1,6 +1,5 @@
pub mod address;
pub mod bounded;
-
pub mod clock;
pub mod control;
pub mod deserializer;
pub mod logger;
@@ -17,12 +16,12 @@ pub mod worker;

pub use localtime::{LocalDuration, LocalTime};
pub use netservices::LinkDirection as Link;
+
pub use radicle::prelude::Timestamp;
pub use radicle::{collections, crypto, git, identity, node, profile, rad, storage};
pub use runtime::Runtime;

pub mod prelude {
    pub use crate::bounded::BoundedVec;
-
    pub use crate::clock::Timestamp;
    pub use crate::crypto::{PublicKey, Signature, Signer};
    pub use crate::deserializer::Deserializer;
    pub use crate::identity::{Did, Id};
@@ -31,5 +30,5 @@ pub mod prelude {
    pub use crate::service::{DisconnectReason, Event, Message, Network, NodeId};
    pub use crate::storage::refs::Refs;
    pub use crate::storage::WriteStorage;
-
    pub use crate::{LocalDuration, LocalTime};
+
    pub use crate::{LocalDuration, LocalTime, Timestamp};
}
modified radicle-node/src/runtime.rs
@@ -21,8 +21,8 @@ use radicle::Storage;
use crate::address;
use crate::control;
use crate::crypto::Signer;
-
use crate::node::NodeId;
-
use crate::service::{routing, tracking};
+
use crate::node::{routing, NodeId};
+
use crate::service::tracking;
use crate::wire;
use crate::wire::Wire;
use crate::worker;
modified radicle-node/src/service.rs
@@ -5,7 +5,6 @@ pub mod config;
pub mod filter;
pub mod message;
pub mod reactor;
-
pub mod routing;
pub mod session;
pub mod tracking;

@@ -22,12 +21,12 @@ use log::*;

use crate::address;
use crate::address::AddressBook;
-
use crate::clock::Timestamp;
use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::identity::IdentityError;
use crate::identity::{Doc, Id};
use crate::node;
+
use crate::node::routing;
use crate::node::{Address, Features, FetchResult, Seed, Seeds};
use crate::prelude::*;
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
deleted radicle-node/src/service/routing.rs
@@ -1,356 +0,0 @@
-
use std::collections::HashSet;
-
use std::fmt;
-
use std::path::Path;
-

-
use sqlite as sql;
-
use thiserror::Error;
-

-
use crate::{
-
    clock::Timestamp,
-
    prelude::{Id, NodeId},
-
};
-

-
/// An error occuring in peer-to-peer networking code.
-
#[derive(Error, Debug)]
-
pub enum Error {
-
    /// An Internal error.
-
    #[error("internal error: {0}")]
-
    Internal(#[from] sql::Error),
-
    /// Internal unit overflow.
-
    #[error("the unit overflowed")]
-
    UnitOverflow,
-
}
-

-
/// Persistent file storage for a routing table.
-
pub struct Table {
-
    db: sql::Connection,
-
}
-

-
impl fmt::Debug for Table {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        write!(f, "Table(..)")
-
    }
-
}
-

-
impl Table {
-
    const SCHEMA: &str = include_str!("routing/schema.sql");
-

-
    /// Open a routing file store at the given path. Creates a new empty store
-
    /// if an existing store isn't found.
-
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
-
        let db = sql::Connection::open(path)?;
-
        db.execute(Self::SCHEMA)?;
-

-
        Ok(Self { db })
-
    }
-

-
    /// Create a new in-memory routing table.
-
    pub fn memory() -> Result<Self, Error> {
-
        let db = sql::Connection::open(":memory:")?;
-
        db.execute(Self::SCHEMA)?;
-

-
        Ok(Self { db })
-
    }
-
}
-

-
/// Backing store for a routing table.
-
pub trait Store {
-
    /// Get the nodes seeding the given id.
-
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error>;
-
    /// Get the resources seeded by the given node.
-
    fn get_resources(&self, node_id: &NodeId) -> Result<HashSet<Id>, Error>;
-
    /// Get a specific entry.
-
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error>;
-
    /// Checks if any entries are available.
-
    fn is_empty(&self) -> Result<bool, Error> {
-
        Ok(self.len()? == 0)
-
    }
-
    /// Add a new node seeding the given id.
-
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error>;
-
    /// Remove a node for the given id.
-
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error>;
-
    /// Iterate over all entries in the routing table.
-
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error>;
-
    /// Get the total number of routing entries.
-
    fn len(&self) -> Result<usize, Error>;
-
    /// Prune entries older than the given timestamp.
-
    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error>;
-
}
-

-
impl Store for Table {
-
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error> {
-
        let mut stmt = self
-
            .db
-
            .prepare("SELECT (node) FROM routing WHERE resource = ?")?;
-
        stmt.bind((1, id))?;
-

-
        let mut nodes = HashSet::new();
-
        for row in stmt.into_iter() {
-
            nodes.insert(row?.read::<NodeId, _>("node"));
-
        }
-
        Ok(nodes)
-
    }
-

-
    fn get_resources(&self, node: &NodeId) -> Result<HashSet<Id>, Error> {
-
        let mut stmt = self
-
            .db
-
            .prepare("SELECT resource FROM routing WHERE node = ?")?;
-
        stmt.bind((1, node))?;
-

-
        let mut resources = HashSet::new();
-
        for row in stmt.into_iter() {
-
            resources.insert(row?.read::<Id, _>("resource"));
-
        }
-
        Ok(resources)
-
    }
-

-
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error> {
-
        let mut stmt = self
-
            .db
-
            .prepare("SELECT (time) FROM routing WHERE resource = ? AND node = ?")?;
-

-
        stmt.bind((1, id))?;
-
        stmt.bind((2, node))?;
-

-
        if let Some(Ok(row)) = stmt.into_iter().next() {
-
            return Ok(Some(row.read::<i64, _>("time") as Timestamp));
-
        }
-
        Ok(None)
-
    }
-

-
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error> {
-
        let time: i64 = time.try_into().map_err(|_| Error::UnitOverflow)?;
-
        let mut stmt = self.db.prepare(
-
            "INSERT INTO routing (resource, node, time)
-
             VALUES (?, ?, ?)
-
             ON CONFLICT DO UPDATE
-
             SET time = ?3
-
             WHERE time < ?3",
-
        )?;
-

-
        stmt.bind((1, &id))?;
-
        stmt.bind((2, &node))?;
-
        stmt.bind((3, time))?;
-
        stmt.next()?;
-

-
        Ok(self.db.change_count() > 0)
-
    }
-

-
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error> {
-
        let mut stmt = self
-
            .db
-
            .prepare("SELECT resource, node FROM routing ORDER BY resource")?
-
            .into_iter();
-
        let mut entries = Vec::new();
-

-
        while let Some(Ok(row)) = stmt.next() {
-
            let id = row.read("resource");
-
            let node = row.read("node");
-

-
            entries.push((id, node));
-
        }
-
        Ok(Box::new(entries.into_iter()))
-
    }
-

-
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error> {
-
        let mut stmt = self
-
            .db
-
            .prepare("DELETE FROM routing WHERE resource = ? AND node = ?")?;
-

-
        stmt.bind((1, id))?;
-
        stmt.bind((2, node))?;
-
        stmt.next()?;
-

-
        Ok(self.db.change_count() > 0)
-
    }
-

-
    fn len(&self) -> Result<usize, Error> {
-
        let stmt = self.db.prepare("SELECT COUNT(1) FROM routing")?;
-
        let count: i64 = stmt
-
            .into_iter()
-
            .next()
-
            .expect("COUNT will always return a single row")?
-
            .read(0);
-
        let count: usize = count.try_into().map_err(|_| Error::UnitOverflow)?;
-
        Ok(count)
-
    }
-

-
    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error> {
-
        let oldest: i64 = oldest.try_into().map_err(|_| Error::UnitOverflow)?;
-
        let limit: i64 = limit
-
            .unwrap_or(i64::MAX as usize)
-
            .try_into()
-
            .map_err(|_| Error::UnitOverflow)?;
-

-
        let mut stmt = self.db.prepare(
-
            "DELETE FROM routing WHERE rowid IN
-
            (SELECT rowid FROM routing WHERE time < ? LIMIT ?)",
-
        )?;
-
        stmt.bind((1, oldest))?;
-
        stmt.bind((2, limit))?;
-
        stmt.next()?;
-

-
        Ok(self.db.change_count())
-
    }
-
}
-

-
#[cfg(test)]
-
mod test {
-
    use localtime::LocalTime;
-

-
    use super::*;
-
    use crate::test::arbitrary;
-

-
    #[test]
-
    fn test_insert_and_get() {
-
        let ids = arbitrary::set::<Id>(5..10);
-
        let nodes = arbitrary::set::<NodeId>(5..10);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        for id in &ids {
-
            for node in &nodes {
-
                assert!(db.insert(*id, *node, 0).unwrap());
-
            }
-
        }
-

-
        for id in &ids {
-
            let seeds = db.get(id).unwrap();
-
            for node in &nodes {
-
                assert!(seeds.contains(node));
-
            }
-
        }
-
    }
-

-
    #[test]
-
    fn test_insert_and_get_resources() {
-
        let ids = arbitrary::set::<Id>(5..10);
-
        let nodes = arbitrary::set::<NodeId>(5..10);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        for id in &ids {
-
            for node in &nodes {
-
                assert!(db.insert(*id, *node, 0).unwrap());
-
            }
-
        }
-

-
        for node in &nodes {
-
            let projects = db.get_resources(node).unwrap();
-
            for id in &ids {
-
                assert!(projects.contains(id));
-
            }
-
        }
-
    }
-

-
    #[test]
-
    fn test_entries() {
-
        let ids = arbitrary::set::<Id>(6..9);
-
        let nodes = arbitrary::set::<NodeId>(6..9);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        for id in &ids {
-
            for node in &nodes {
-
                assert!(db.insert(*id, *node, 0).unwrap());
-
            }
-
        }
-

-
        let results = db.entries().unwrap().collect::<Vec<_>>();
-
        assert_eq!(results.len(), ids.len() * nodes.len());
-

-
        let mut results_ids = results.iter().map(|(id, _)| *id).collect::<Vec<_>>();
-
        results_ids.dedup();
-

-
        assert_eq!(results_ids.len(), ids.len(), "Entries are grouped by id");
-
    }
-

-
    #[test]
-
    fn test_insert_and_remove() {
-
        let ids = arbitrary::set::<Id>(5..10);
-
        let nodes = arbitrary::set::<NodeId>(5..10);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        for id in &ids {
-
            for node in &nodes {
-
                db.insert(*id, *node, 0).unwrap();
-
            }
-
        }
-
        for id in &ids {
-
            for node in &nodes {
-
                assert!(db.remove(id, node).unwrap());
-
            }
-
        }
-
        for id in &ids {
-
            assert!(db.get(id).unwrap().is_empty());
-
        }
-
    }
-

-
    #[test]
-
    fn test_insert_duplicate() {
-
        let id = arbitrary::gen::<Id>(1);
-
        let node = arbitrary::gen::<NodeId>(1);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        assert!(db.insert(id, node, 0).unwrap());
-
        assert!(!db.insert(id, node, 0).unwrap());
-
        assert!(!db.insert(id, node, 0).unwrap());
-
    }
-

-
    #[test]
-
    fn test_remove_redundant() {
-
        let id = arbitrary::gen::<Id>(1);
-
        let node = arbitrary::gen::<NodeId>(1);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        assert!(db.insert(id, node, 0).unwrap());
-
        assert!(db.remove(&id, &node).unwrap());
-
        assert!(!db.remove(&id, &node).unwrap());
-
    }
-

-
    #[test]
-
    fn test_len() {
-
        let mut db = Table::open(":memory:").unwrap();
-
        let ids = arbitrary::vec::<Id>(10);
-
        let node = arbitrary::gen(1);
-

-
        for id in ids {
-
            db.insert(id, node, LocalTime::now().as_millis()).unwrap();
-
        }
-

-
        assert_eq!(10, db.len().unwrap(), "correct number of rows in table");
-
    }
-

-
    #[test]
-
    fn test_prune() {
-
        let rng = fastrand::Rng::new();
-
        let now = LocalTime::now();
-
        let ids = arbitrary::vec::<Id>(10);
-
        let nodes = arbitrary::vec::<NodeId>(10);
-
        let mut db = Table::open(":memory:").unwrap();
-

-
        for id in &ids {
-
            for node in &nodes {
-
                let time = rng.u64(..now.as_millis());
-
                db.insert(*id, *node, time).unwrap();
-
            }
-
        }
-

-
        let ids = arbitrary::vec::<Id>(10);
-
        let nodes = arbitrary::vec::<NodeId>(10);
-

-
        for id in &ids {
-
            for node in &nodes {
-
                let time = rng.u64(now.as_millis()..i64::MAX as u64);
-
                db.insert(*id, *node, time).unwrap();
-
            }
-
        }
-

-
        let pruned = db.prune(now.as_millis(), None).unwrap();
-
        assert_eq!(pruned, ids.len() * nodes.len());
-

-
        for id in &ids {
-
            for node in &nodes {
-
                let t = db.entry(id, node).unwrap().unwrap();
-
                assert!(t >= now.as_millis());
-
            }
-
        }
-
    }
-
}
deleted radicle-node/src/service/routing/schema.sql
@@ -1,13 +0,0 @@
-
--
-
-- Routing table SQL schema.
-
--
-
create table if not exists "routing" (
-
  -- Resource being seeded.
-
  "resource"     text      not null,
-
  -- Node ID.
-
  "node"         text      not null,
-
  -- UNIX time at which this entry was added or refreshed.
-
  "time"         integer   not null,
-

-
  primary key ("resource", "node")
-
);
modified radicle-node/src/test/peer.rs
@@ -7,11 +7,11 @@ use log::*;

use crate::address;
use crate::address::Store;
-
use crate::clock::Timestamp;
use crate::crypto::test::signer::MockSigner;
use crate::crypto::Signer;
use crate::identity::Id;
use crate::node;
+
use crate::node::routing;
use crate::prelude::*;
use crate::service;
use crate::service::message::*;
modified radicle-node/src/wire/protocol.rs
@@ -21,12 +21,12 @@ use netservices::{NetConnection, NetProtocol, NetReader, NetSession, NetWriter};
use reactor::Timestamp;

use radicle::collections::HashMap;
-
use radicle::node::NodeId;
+
use radicle::node::{routing, NodeId};
use radicle::storage::WriteStorage;

use crate::crypto::Signer;
use crate::service::reactor::{Fetch, Io};
-
use crate::service::{routing, session, DisconnectReason, Message, Service};
+
use crate::service::{session, DisconnectReason, Message, Service};
use crate::wire;
use crate::wire::{Decode, Encode};
use crate::worker::{Task, TaskResult};
modified radicle/Cargo.toml
@@ -16,6 +16,7 @@ cyphernet = { version = "0.2.0", features = ["tor", "dns", "ed25519"] }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
multibase = { version = "0.9.1" }
+
localtime = { version = "1.2.0" }
log = { version = "0.4.17", features = ["std"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
once_cell = { version = "1.13" }
modified radicle/src/lib.rs
@@ -31,7 +31,7 @@ pub mod prelude {

    pub use crypto::{PublicKey, Signer, Verified};
    pub use identity::{project::Project, Did, Doc, Id};
-
    pub use node::NodeId;
+
    pub use node::{NodeId, Timestamp};
    pub use profile::Profile;
    pub use storage::{BranchName, ReadRepository, ReadStorage, WriteRepository, WriteStorage};
}
modified radicle/src/node.rs
@@ -1,4 +1,5 @@
mod features;
+
pub mod routing;
pub mod tracking;

use std::collections::BTreeSet;
@@ -32,6 +33,9 @@ pub const ADDRESS_DB_FILE: &str = "addresses.db";
/// Filename of tracking table database under the node directory.
pub const TRACKING_DB_FILE: &str = "tracking.db";

+
/// Milliseconds since epoch.
+
pub type Timestamp = u64;
+

/// Result of a command, on the node control socket.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "status")]
added radicle/src/node/routing.rs
@@ -0,0 +1,356 @@
+
use std::collections::HashSet;
+
use std::fmt;
+
use std::path::Path;
+

+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::{
+
    prelude::Timestamp,
+
    prelude::{Id, NodeId},
+
};
+

+
/// An error occuring in peer-to-peer networking code.
+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
    /// Internal unit overflow.
+
    #[error("the unit overflowed")]
+
    UnitOverflow,
+
}
+

+
/// Persistent file storage for a routing table.
+
pub struct Table {
+
    db: sql::Connection,
+
}
+

+
impl fmt::Debug for Table {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "Table(..)")
+
    }
+
}
+

+
impl Table {
+
    const SCHEMA: &str = include_str!("routing/schema.sql");
+

+
    /// Open a routing file store at the given path. Creates a new empty store
+
    /// if an existing store isn't found.
+
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+
        let db = sql::Connection::open(path)?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self { db })
+
    }
+

+
    /// Create a new in-memory routing table.
+
    pub fn memory() -> Result<Self, Error> {
+
        let db = sql::Connection::open(":memory:")?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self { db })
+
    }
+
}
+

+
/// Backing store for a routing table.
+
pub trait Store {
+
    /// Get the nodes seeding the given id.
+
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error>;
+
    /// Get the resources seeded by the given node.
+
    fn get_resources(&self, node_id: &NodeId) -> Result<HashSet<Id>, Error>;
+
    /// Get a specific entry.
+
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error>;
+
    /// Checks if any entries are available.
+
    fn is_empty(&self) -> Result<bool, Error> {
+
        Ok(self.len()? == 0)
+
    }
+
    /// Add a new node seeding the given id.
+
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error>;
+
    /// Remove a node for the given id.
+
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error>;
+
    /// Iterate over all entries in the routing table.
+
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error>;
+
    /// Get the total number of routing entries.
+
    fn len(&self) -> Result<usize, Error>;
+
    /// Prune entries older than the given timestamp.
+
    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error>;
+
}
+

+
impl Store for Table {
+
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT (node) FROM routing WHERE resource = ?")?;
+
        stmt.bind((1, id))?;
+

+
        let mut nodes = HashSet::new();
+
        for row in stmt.into_iter() {
+
            nodes.insert(row?.read::<NodeId, _>("node"));
+
        }
+
        Ok(nodes)
+
    }
+

+
    fn get_resources(&self, node: &NodeId) -> Result<HashSet<Id>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT resource FROM routing WHERE node = ?")?;
+
        stmt.bind((1, node))?;
+

+
        let mut resources = HashSet::new();
+
        for row in stmt.into_iter() {
+
            resources.insert(row?.read::<Id, _>("resource"));
+
        }
+
        Ok(resources)
+
    }
+

+
    fn entry(&self, id: &Id, node: &NodeId) -> Result<Option<Timestamp>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT (time) FROM routing WHERE resource = ? AND node = ?")?;
+

+
        stmt.bind((1, id))?;
+
        stmt.bind((2, node))?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            return Ok(Some(row.read::<i64, _>("time") as Timestamp));
+
        }
+
        Ok(None)
+
    }
+

+
    fn insert(&mut self, id: Id, node: NodeId, time: Timestamp) -> Result<bool, Error> {
+
        let time: i64 = time.try_into().map_err(|_| Error::UnitOverflow)?;
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO routing (resource, node, time)
+
             VALUES (?, ?, ?)
+
             ON CONFLICT DO UPDATE
+
             SET time = ?3
+
             WHERE time < ?3",
+
        )?;
+

+
        stmt.bind((1, &id))?;
+
        stmt.bind((2, &node))?;
+
        stmt.bind((3, time))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT resource, node FROM routing ORDER BY resource")?
+
            .into_iter();
+
        let mut entries = Vec::new();
+

+
        while let Some(Ok(row)) = stmt.next() {
+
            let id = row.read("resource");
+
            let node = row.read("node");
+

+
            entries.push((id, node));
+
        }
+
        Ok(Box::new(entries.into_iter()))
+
    }
+

+
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM routing WHERE resource = ? AND node = ?")?;
+

+
        stmt.bind((1, id))?;
+
        stmt.bind((2, node))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    fn len(&self) -> Result<usize, Error> {
+
        let stmt = self.db.prepare("SELECT COUNT(1) FROM routing")?;
+
        let count: i64 = stmt
+
            .into_iter()
+
            .next()
+
            .expect("COUNT will always return a single row")?
+
            .read(0);
+
        let count: usize = count.try_into().map_err(|_| Error::UnitOverflow)?;
+
        Ok(count)
+
    }
+

+
    fn prune(&mut self, oldest: Timestamp, limit: Option<usize>) -> Result<usize, Error> {
+
        let oldest: i64 = oldest.try_into().map_err(|_| Error::UnitOverflow)?;
+
        let limit: i64 = limit
+
            .unwrap_or(i64::MAX as usize)
+
            .try_into()
+
            .map_err(|_| Error::UnitOverflow)?;
+

+
        let mut stmt = self.db.prepare(
+
            "DELETE FROM routing WHERE rowid IN
+
            (SELECT rowid FROM routing WHERE time < ? LIMIT ?)",
+
        )?;
+
        stmt.bind((1, oldest))?;
+
        stmt.bind((2, limit))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count())
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use localtime::LocalTime;
+

+
    use super::*;
+
    use crate::test::arbitrary;
+

+
    #[test]
+
    fn test_insert_and_get() {
+
        let ids = arbitrary::set::<Id>(5..10);
+
        let nodes = arbitrary::set::<NodeId>(5..10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.insert(*id, *node, 0).unwrap());
+
            }
+
        }
+

+
        for id in &ids {
+
            let seeds = db.get(id).unwrap();
+
            for node in &nodes {
+
                assert!(seeds.contains(node));
+
            }
+
        }
+
    }
+

+
    #[test]
+
    fn test_insert_and_get_resources() {
+
        let ids = arbitrary::set::<Id>(5..10);
+
        let nodes = arbitrary::set::<NodeId>(5..10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.insert(*id, *node, 0).unwrap());
+
            }
+
        }
+

+
        for node in &nodes {
+
            let projects = db.get_resources(node).unwrap();
+
            for id in &ids {
+
                assert!(projects.contains(id));
+
            }
+
        }
+
    }
+

+
    #[test]
+
    fn test_entries() {
+
        let ids = arbitrary::set::<Id>(6..9);
+
        let nodes = arbitrary::set::<NodeId>(6..9);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.insert(*id, *node, 0).unwrap());
+
            }
+
        }
+

+
        let results = db.entries().unwrap().collect::<Vec<_>>();
+
        assert_eq!(results.len(), ids.len() * nodes.len());
+

+
        let mut results_ids = results.iter().map(|(id, _)| *id).collect::<Vec<_>>();
+
        results_ids.dedup();
+

+
        assert_eq!(results_ids.len(), ids.len(), "Entries are grouped by id");
+
    }
+

+
    #[test]
+
    fn test_insert_and_remove() {
+
        let ids = arbitrary::set::<Id>(5..10);
+
        let nodes = arbitrary::set::<NodeId>(5..10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                db.insert(*id, *node, 0).unwrap();
+
            }
+
        }
+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.remove(id, node).unwrap());
+
            }
+
        }
+
        for id in &ids {
+
            assert!(db.get(id).unwrap().is_empty());
+
        }
+
    }
+

+
    #[test]
+
    fn test_insert_duplicate() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let node = arbitrary::gen::<NodeId>(1);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        assert!(db.insert(id, node, 0).unwrap());
+
        assert!(!db.insert(id, node, 0).unwrap());
+
        assert!(!db.insert(id, node, 0).unwrap());
+
    }
+

+
    #[test]
+
    fn test_remove_redundant() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let node = arbitrary::gen::<NodeId>(1);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        assert!(db.insert(id, node, 0).unwrap());
+
        assert!(db.remove(&id, &node).unwrap());
+
        assert!(!db.remove(&id, &node).unwrap());
+
    }
+

+
    #[test]
+
    fn test_len() {
+
        let mut db = Table::open(":memory:").unwrap();
+
        let ids = arbitrary::vec::<Id>(10);
+
        let node = arbitrary::gen(1);
+

+
        for id in ids {
+
            db.insert(id, node, LocalTime::now().as_millis()).unwrap();
+
        }
+

+
        assert_eq!(10, db.len().unwrap(), "correct number of rows in table");
+
    }
+

+
    #[test]
+
    fn test_prune() {
+
        let rng = fastrand::Rng::new();
+
        let now = LocalTime::now();
+
        let ids = arbitrary::vec::<Id>(10);
+
        let nodes = arbitrary::vec::<NodeId>(10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                let time = rng.u64(..now.as_millis());
+
                db.insert(*id, *node, time).unwrap();
+
            }
+
        }
+

+
        let ids = arbitrary::vec::<Id>(10);
+
        let nodes = arbitrary::vec::<NodeId>(10);
+

+
        for id in &ids {
+
            for node in &nodes {
+
                let time = rng.u64(now.as_millis()..i64::MAX as u64);
+
                db.insert(*id, *node, time).unwrap();
+
            }
+
        }
+

+
        let pruned = db.prune(now.as_millis(), None).unwrap();
+
        assert_eq!(pruned, ids.len() * nodes.len());
+

+
        for id in &ids {
+
            for node in &nodes {
+
                let t = db.entry(id, node).unwrap().unwrap();
+
                assert!(t >= now.as_millis());
+
            }
+
        }
+
    }
+
}
added radicle/src/node/routing/schema.sql
@@ -0,0 +1,13 @@
+
--
+
-- Routing table SQL schema.
+
--
+
create table if not exists "routing" (
+
  -- Resource being seeded.
+
  "resource"     text      not null,
+
  -- Node ID.
+
  "node"         text      not null,
+
  -- UNIX time at which this entry was added or refreshed.
+
  "time"         integer   not null,
+

+
  primary key ("resource", "node")
+
);