Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Implement routing table persistence
Alexis Sellier committed 3 years ago
commit 85ac8a1d4db5e0b4337e863485de94124858fa83
parent 15f525299e05d508380f0b12ae639612f79cd241
16 files changed +436 -76
modified Cargo.lock
@@ -3,6 +3,17 @@
version = 3

[[package]]
+
name = "ahash"
+
version = "0.7.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
+
dependencies = [
+
 "getrandom",
+
 "once_cell",
+
 "version_check",
+
]
+

+
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -296,6 +307,18 @@ dependencies = [
]

[[package]]
+
name = "fallible-iterator"
+
version = "0.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
+

+
[[package]]
+
name = "fallible-streaming-iterator"
+
version = "0.1.9"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
+

+
[[package]]
name = "fastrand"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -396,6 +419,18 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
+
dependencies = [
+
 "ahash",
+
]
+

+
[[package]]
+
name = "hashlink"
+
version = "0.8.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa"
+
dependencies = [
+
 "hashbrown",
+
]

[[package]]
name = "hermit-abi"
@@ -521,6 +556,17 @@ dependencies = [
]

[[package]]
+
name = "libsqlite3-sys"
+
version = "0.25.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9f0455f2c1bc9a7caa792907026e469c1d91761fb0ea37cbb16427c77280cf35"
+
dependencies = [
+
 "cc",
+
 "pkg-config",
+
 "vcpkg",
+
]
+

+
[[package]]
name = "libssh2-sys"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -791,6 +837,7 @@ dependencies = [
 "quickcheck_macros",
 "radicle-git-ext",
 "radicle-ssh",
+
 "rusqlite",
 "serde",
 "serde_json",
 "sha2 0.10.6",
@@ -834,6 +881,7 @@ dependencies = [
 "quickcheck",
 "quickcheck_macros",
 "radicle",
+
 "rusqlite",
 "serde",
 "serde_json",
 "tempfile",
@@ -901,6 +949,20 @@ dependencies = [
]

[[package]]
+
name = "rusqlite"
+
version = "0.28.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a"
+
dependencies = [
+
 "bitflags",
+
 "fallible-iterator",
+
 "fallible-streaming-iterator",
+
 "hashlink",
+
 "libsqlite3-sys",
+
 "smallvec",
+
]
+

+
[[package]]
name = "ryu"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -994,6 +1056,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"

[[package]]
+
name = "smallvec"
+
version = "1.10.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
+

+
[[package]]
name = "socket2"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified radicle-node/Cargo.toml
@@ -19,6 +19,7 @@ log = { version = "0.4.17", features = ["std"] }
nakamoto-net = { version = "0.3.0" }
nakamoto-net-poll = { version = "0.3.0" }
nonempty = { version = "0.8.0", features = ["serialize"] }
+
rusqlite = { version = "0.28.0", features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
tempfile = { version = "3.3.0" }
@@ -27,6 +28,7 @@ thiserror = { version = "1" }
[dependencies.radicle]
path = "../radicle"
version = "0.2.0"
+
features = ["sql"]

[dev-dependencies]
radicle = { path = "../radicle", version = "*", features = ["test"] }
modified radicle-node/src/client.rs
@@ -1,17 +1,38 @@
-
use std::net;
+
use std::{io, net};

use crossbeam_channel as chan;
use nakamoto_net::{LocalTime, Reactor};
+
use thiserror::Error;

use crate::clock::RefClock;
use crate::collections::HashMap;
use crate::profile::Profile;
use crate::service;
+
use crate::service::routing;
use crate::transport::Transport;
use crate::wire::Wire;

pub mod handle;

+
/// Directory in `$RAD_HOME` under which node-specific files are stored.
+
pub const NODE_DIR: &str = "node";
+
/// Filename of routing table database under [`NODE_DIR`].
+
pub const ROUTING_DB_FILE: &str = "routing.db";
+

+
/// A client error.
+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// A routing database error.
+
    #[error("routing database error: {0}")]
+
    Routing(#[from] routing::Error),
+
    /// An I/O error.
+
    #[error("i/o error: {0}")]
+
    Io(#[from] io::Error),
+
    /// A networking error.
+
    #[error("network error: {0}")]
+
    Net(#[from] nakamoto_net::error::Error),
+
}
+

/// Client configuration.
#[derive(Debug, Clone)]
pub struct Config {
@@ -55,7 +76,7 @@ pub struct Client<R: Reactor> {
}

impl<R: Reactor> Client<R> {
-
    pub fn new(profile: Profile) -> Result<Self, nakamoto_net::error::Error> {
+
    pub fn new(profile: Profile) -> Result<Self, Error> {
        let (handle, commands) = chan::unbounded::<service::Command>();
        let (shutdown, shutdown_recv) = chan::bounded(1);
        let (listening_send, listening) = chan::bounded(1);
@@ -73,24 +94,27 @@ impl<R: Reactor> Client<R> {
        })
    }

-
    pub fn run(mut self, config: Config) -> Result<(), nakamoto_net::error::Error> {
+
    pub fn run(mut self, config: Config) -> Result<(), Error> {
        let network = config.service.network;
        let rng = fastrand::Rng::new();
        let time = LocalTime::now();
        let storage = self.profile.storage;
        let signer = self.profile.signer;
        let addresses = HashMap::with_hasher(rng.clone().into());
+
        let routing = routing::Table::open(self.profile.home.join(NODE_DIR).join(ROUTING_DB_FILE))?;

        log::info!("Initializing client ({:?})..", network);

        let service = service::Service::new(
            config.service,
            RefClock::from(time),
+
            routing,
            storage,
            addresses,
            signer,
            rng,
        );
+

        self.reactor.run(
            &config.listen,
            Transport::new(Wire::new(service)),
modified radicle-node/src/client/handle.rs
@@ -90,11 +90,11 @@ impl<W: Waker> traits::Handle for Handle<W> {
        Ok(())
    }

-
    fn routing(&self) -> Result<chan::Receiver<(Id, Vec<NodeId>)>, Error> {
+
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
        let (sender, receiver) = chan::unbounded();
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            for (id, nodes) in state.routing().iter() {
-
                if sender.send((*id, nodes.iter().cloned().collect())).is_err() {
+
            for (id, node) in state.routing().entries()? {
+
                if sender.send((id, node)).is_err() {
                    break;
                }
            }
@@ -155,8 +155,8 @@ pub mod traits {
        fn command(&self, cmd: service::Command) -> Result<(), Error>;
        /// Ask the client to shutdown.
        fn shutdown(self) -> Result<(), Error>;
-
        /// Query the routing table.
-
        fn routing(&self) -> Result<chan::Receiver<(Id, Vec<NodeId>)>, Error>;
+
        /// Query the routing table entries.
+
        fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error>;
        /// Query the peer session state.
        fn sessions(&self) -> Result<chan::Receiver<(NodeId, Session)>, Error>;
        /// Query the inventory.
modified radicle-node/src/control.rs
@@ -106,13 +106,8 @@ fn drain<H: Handle>(stream: &UnixStream, handle: &H) -> Result<(), DrainError> {
                    Ok(c) => {
                        let mut writer = LineWriter::new(stream);

-
                        for (id, seeds) in c.iter() {
-
                            let seeds = seeds
-
                                .into_iter()
-
                                .map(String::from)
-
                                .collect::<Vec<_>>()
-
                                .join(" ");
-
                            writeln!(writer, "{id} {seeds}",)?;
+
                        for (id, seed) in c.iter() {
+
                            writeln!(writer, "{id} {seed}",)?;
                        }
                    }
                    Err(e) => return Err(DrainError::Client(e)),
modified radicle-node/src/service.rs
@@ -3,6 +3,7 @@ pub mod filter;
pub mod message;
pub mod peer;
pub mod reactor;
+
pub mod routing;

use std::collections::BTreeMap;
use std::ops::{Deref, DerefMut};
@@ -22,7 +23,6 @@ use crate::address_book;
use crate::address_book::AddressBook;
use crate::address_manager::AddressManager;
use crate::clock::{RefClock, Timestamp};
-
use crate::collections::{HashMap, HashSet};
use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::git;
@@ -55,8 +55,6 @@ pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);

/// Network node identifier.
pub type NodeId = crypto::PublicKey;
-
/// Network routing table. Keeps track of where projects are hosted.
-
pub type Routing = HashMap<Id, HashSet<NodeId>>;

/// A service event.
#[derive(Debug, Clone)]
@@ -148,18 +146,20 @@ impl fmt::Debug for Command {
pub enum CommandError {
    #[error(transparent)]
    Storage(#[from] storage::Error),
+
    #[error(transparent)]
+
    Routing(#[from] routing::Error),
}

#[derive(Debug)]
-
pub struct Service<A, S, G> {
+
pub struct Service<R, A, S, G> {
    /// Service configuration.
    config: Config,
    /// Our cryptographic signer and key.
    signer: G,
    /// Project storage.
    storage: S,
-
    /// Tracks the location of projects.
-
    routing: Routing,
+
    /// Network routing table. Keeps track of where projects are located.
+
    routing: R,
    /// State relating to gossip.
    gossip: Gossip,
    /// Peer sessions, currently or recently connected.
@@ -188,7 +188,7 @@ pub struct Service<A, S, G> {
    start_time: LocalTime,
}

-
impl<A, S, G> Service<A, S, G>
+
impl<R, A, S, G> Service<R, A, S, G>
where
    G: crypto::Signer,
{
@@ -203,8 +203,9 @@ where
    }
}

-
impl<A, S, G> Service<A, S, G>
+
impl<R, A, S, G> Service<R, A, S, G>
where
+
    R: routing::Store,
    A: address_book::Store,
    S: WriteStorage + 'static,
    G: crypto::Signer,
@@ -212,13 +213,13 @@ where
    pub fn new(
        config: Config,
        clock: RefClock,
+
        routing: R,
        storage: S,
        addresses: A,
        signer: G,
        rng: Rng,
    ) -> Self {
        let addrmgr = AddressManager::new(addresses);
-
        let routing = HashMap::with_hasher(rng.clone().into());
        let sessions = Sessions::new(rng.clone());
        let network = config.network;

@@ -243,15 +244,14 @@ where
        }
    }

-
    pub fn seeds(&self, id: &Id) -> Box<dyn Iterator<Item = (&NodeId, &Session)> + '_> {
-
        if let Some(peers) = self.routing.get(id) {
-
            Box::new(
-
                peers
-
                    .iter()
-
                    .filter_map(|id| self.sessions.by_id(id).map(|p| (id, p))),
-
            )
+
    pub fn seeds(&self, id: &Id) -> Vec<(NodeId, &Session)> {
+
        if let Ok(seeds) = self.routing.get(id) {
+
            seeds
+
                .into_iter()
+
                .filter_map(|id| self.sessions.by_id(&id).map(|p| (id, p)))
+
                .collect()
        } else {
-
            Box::new(std::iter::empty())
+
            vec![]
        }
    }

@@ -313,14 +313,13 @@ where
        &mut self.reactor
    }

-
    pub fn lookup(&self, id: Id) -> Lookup {
-
        Lookup {
+
    pub fn lookup(&self, id: Id) -> Result<Lookup, routing::Error> {
+
        let remote = self.routing.get(&id)?.iter().cloned().collect();
+

+
        Ok(Lookup {
            local: self.storage.get(&self.node_id(), id).unwrap(),
-
            remote: self
-
                .routing
-
                .get(&id)
-
                .map_or(vec![], |r| r.iter().cloned().collect()),
-
        }
+
            remote,
+
        })
    }

    pub fn initialize(&mut self, time: LocalTime) {
@@ -387,7 +386,7 @@ where
                    return;
                }

-
                let seeds = self.seeds(&id).collect::<Vec<_>>();
+
                let seeds = self.seeds(&id);
                let seeds = if let Some(seeds) = NonEmpty::from_vec(seeds) {
                    seeds
                } else {
@@ -618,7 +617,8 @@ where
                } else {
                    return Ok(false);
                }
-
                self.process_inventory(&message.inventory, *node, git);
+
                self.process_inventory(&message.inventory, *node, git)
+
                    .unwrap();

                if relay {
                    return Ok(true);
@@ -747,18 +747,19 @@ where
    }

    /// Process a peer inventory announcement by updating our routing table.
-
    fn process_inventory(&mut self, inventory: &Inventory, from: NodeId, remote: &Url) {
+
    fn process_inventory(
+
        &mut self,
+
        inventory: &Inventory,
+
        from: NodeId,
+
        remote: &Url,
+
    ) -> Result<(), routing::Error> {
        for proj_id in inventory {
-
            let inventory = self
-
                .routing
-
                .entry(*proj_id)
-
                .or_insert_with(|| HashSet::with_hasher(self.rng.clone().into()));
-

            // TODO: Fire an event on routing update.
-
            if inventory.insert(from) && self.config.is_tracking(proj_id) {
+
            if self.routing.insert(*proj_id, from)? && self.config.is_tracking(proj_id) {
                self.storage.fetch(*proj_id, remote).unwrap();
            }
        }
+
        Ok(())
    }

    ////////////////////////////////////////////////////////////////////////////
@@ -808,11 +809,12 @@ pub trait ServiceState {
    /// Get service configuration.
    fn config(&self) -> &Config;
    /// Get reference to routing table.
-
    fn routing(&self) -> &Routing;
+
    fn routing(&self) -> &dyn routing::Store;
}

-
impl<A, S, G> ServiceState for Service<A, S, G>
+
impl<R, A, S, G> ServiceState for Service<R, A, S, G>
where
+
    R: routing::Store,
    G: Signer,
    S: ReadStorage,
{
@@ -836,7 +838,7 @@ where
        &self.config
    }

-
    fn routing(&self) -> &Routing {
+
    fn routing(&self) -> &dyn routing::Store {
        &self.routing
    }
}
@@ -871,7 +873,7 @@ impl fmt::Display for DisconnectReason {
    }
}

-
impl<A, S, G> Iterator for Service<A, S, G> {
+
impl<R, A, S, G> Iterator for Service<R, A, S, G> {
    type Item = reactor::Io;

    fn next(&mut self) -> Option<Self::Item> {
added radicle-node/src/service/routing.rs
@@ -0,0 +1,196 @@
+
use std::collections::HashSet;
+
use std::path::Path;
+

+
use rusqlite as sql;
+
use thiserror::Error;
+

+
use crate::prelude::{Id, NodeId};
+

+
/// An error occuring in peer-to-peer networking code.
+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
}
+

+
/// Persistent file storage for a routing table.
+
#[derive(Debug)]
+
pub struct Table {
+
    db: sql::Connection,
+
}
+

+
impl Table {
+
    const SCHEMA: &str = include_str!("routing/schema.sql");
+

+
    /// Open a routing file store at the given path. Creates a new empty store
+
    /// if an existing store isn't found.
+
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+
        let db = sql::Connection::open(path)?;
+
        db.execute(Self::SCHEMA, [])?;
+

+
        Ok(Self { db })
+
    }
+

+
    /// Create a new in-memory routing table.
+
    pub fn memory() -> Result<Self, Error> {
+
        let db = sql::Connection::open_in_memory()?;
+
        db.execute(Self::SCHEMA, [])?;
+

+
        Ok(Self { db })
+
    }
+
}
+

+
/// Backing store for a routing table.
+
pub trait Store {
+
    /// Get the nodes seeding the given id.
+
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error>;
+
    /// Add a new node seeding the given id.
+
    fn insert(&mut self, id: Id, node: NodeId) -> Result<bool, Error>;
+
    /// Remove a node for the given id.
+
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error>;
+
    /// Iterate over all entries in the routing table.
+
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error>;
+
}
+

+
impl Store for Table {
+
    fn get(&self, id: &Id) -> Result<HashSet<NodeId>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT (node) FROM routing WHERE resource = ?")?;
+
        let mut rows = stmt.query([id])?;
+
        let mut nodes = HashSet::new();
+

+
        while let Ok(Some(row)) = rows.next() {
+
            let field = row.get(0)?;
+
            nodes.insert(field);
+
        }
+
        Ok(nodes)
+
    }
+

+
    fn insert(&mut self, id: Id, node: NodeId) -> Result<bool, Error> {
+
        let updated = self.db.execute(
+
            "INSERT INTO routing (resource, node, time) VALUES (?, ?, ?) ON CONFLICT DO NOTHING",
+
            (id, node, 0),
+
        )?;
+

+
        Ok(updated > 0)
+
    }
+

+
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (Id, NodeId)>>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT resource, node FROM routing ORDER BY resource")?;
+
        let mut rows = stmt.query([])?;
+
        let mut entries = Vec::new();
+

+
        while let Ok(Some(row)) = rows.next() {
+
            let id = row.get(0)?;
+
            let node = row.get(1)?;
+

+
            entries.push((id, node));
+
        }
+
        Ok(Box::new(entries.into_iter()))
+
    }
+

+
    fn remove(&mut self, id: &Id, node: &NodeId) -> Result<bool, Error> {
+
        let deleted = self.db.execute(
+
            "DELETE FROM routing WHERE resource = ? AND node = ?",
+
            (id, node),
+
        )?;
+

+
        Ok(deleted > 0)
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use crate::test::arbitrary;
+

+
    #[test]
+
    fn test_insert_and_get() {
+
        let ids = arbitrary::set::<Id>(5..10);
+
        let nodes = arbitrary::set::<NodeId>(5..10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.insert(*id, *node).unwrap());
+
            }
+
        }
+

+
        for id in &ids {
+
            let seeds = db.get(id).unwrap();
+
            for node in &nodes {
+
                assert!(seeds.contains(node));
+
            }
+
        }
+
    }
+

+
    #[test]
+
    fn test_iter() {
+
        let ids = arbitrary::set::<Id>(6..9);
+
        let nodes = arbitrary::set::<NodeId>(6..9);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.insert(*id, *node).unwrap());
+
            }
+
        }
+

+
        let results = db.entries().unwrap().collect::<Vec<_>>();
+
        assert_eq!(results.len(), ids.len() * nodes.len());
+

+
        let mut results_ids = results.iter().map(|(id, _)| *id).collect::<Vec<_>>();
+
        results_ids.dedup();
+

+
        assert_eq!(results_ids.len(), ids.len(), "Entries are grouped by id");
+
    }
+

+
    #[test]
+
    fn test_insert_and_remove() {
+
        let ids = arbitrary::set::<Id>(5..10);
+
        let nodes = arbitrary::set::<NodeId>(5..10);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            for node in &nodes {
+
                db.insert(*id, *node).unwrap();
+
            }
+
        }
+

+
        for id in &ids {
+
            for node in &nodes {
+
                assert!(db.remove(id, node).unwrap());
+
            }
+
        }
+

+
        for id in &ids {
+
            assert!(db.get(id).unwrap().is_empty());
+
        }
+
    }
+

+
    #[test]
+
    fn test_insert_duplicate() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let node = arbitrary::gen::<NodeId>(1);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        assert!(db.insert(id, node).unwrap());
+
        assert!(!db.insert(id, node).unwrap());
+
        assert!(!db.insert(id, node).unwrap());
+
    }
+

+
    #[test]
+
    fn test_remove_redundant() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let node = arbitrary::gen::<NodeId>(1);
+
        let mut db = Table::open(":memory:").unwrap();
+

+
        assert!(db.insert(id, node).unwrap());
+
        assert!(db.remove(&id, &node).unwrap());
+
        assert!(!db.remove(&id, &node).unwrap());
+
    }
+
}
added radicle-node/src/service/routing/schema.sql
@@ -0,0 +1,15 @@
+
--
+
-- Routing table SQL schema.
+
--
+
create table if not exists "routing" (
+
  "id"           integer   primary key,
+
  -- Resource being seeded.
+
  "resource"     text      not null,
+
  -- Node ID.
+
  "node"         text      not null,
+
  -- UNIX time at which this entry was added or refreshed.
+
  "time"         integer   not null,
+

+
  unique("resource", "node")
+
);
+
create index "routing_index" on routing ("resource", "node");
modified radicle-node/src/test/handle.rs
@@ -36,7 +36,7 @@ impl traits::Handle for Handle {
        Ok(())
    }

-
    fn routing(&self) -> Result<chan::Receiver<(Id, Vec<service::NodeId>)>, Error> {
+
    fn routing(&self) -> Result<chan::Receiver<(Id, service::NodeId)>, Error> {
        unimplemented!();
    }

modified radicle-node/src/test/peer.rs
@@ -9,6 +9,7 @@ use crate::clock::{RefClock, Timestamp};
use crate::collections::HashMap;
use crate::git;
use crate::git::Url;
+
use crate::prelude::NodeId;
use crate::service;
use crate::service::config::*;
use crate::service::message::*;
@@ -20,7 +21,8 @@ use crate::test::simulator;
use crate::{Link, LocalTime};

/// Service instantiation used for testing.
-
pub type Service<S> = service::Service<HashMap<net::IpAddr, KnownAddress>, S, MockSigner>;
+
pub type Service<S> =
+
    service::Service<routing::Table, HashMap<net::IpAddr, KnownAddress>, S, MockSigner>;

#[derive(Debug)]
pub struct Peer<S> {
@@ -101,7 +103,8 @@ where
        let local_time = LocalTime::now();
        let clock = RefClock::from(local_time);
        let signer = MockSigner::new(&mut rng);
-
        let service = Service::new(config, clock, storage, addrs, signer, rng.clone());
+
        let routing = routing::Table::memory().unwrap();
+
        let service = Service::new(config, clock, routing, storage, addrs, signer, rng.clone());
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, rng.u16(..));

modified radicle-node/src/test/tests.rs
@@ -527,7 +527,7 @@ fn prop_inventory_exchange_dense() {
        let alice = Peer::new("alice", [7, 7, 7, 7], alice_inv.clone());
        let mut bob = Peer::new("bob", [8, 8, 8, 8], bob_inv.clone());
        let mut eve = Peer::new("eve", [9, 9, 9, 9], eve_inv.clone());
-
        let mut routing = Routing::with_hasher(rng.clone().into());
+
        let mut routing = HashMap::with_hasher(rng.clone().into());

        for (inv, peer) in &[
            (alice_inv.inventory, alice.node_id()),
@@ -562,7 +562,7 @@ fn prop_inventory_exchange_dense() {

        for (proj_id, remotes) in &routing {
            for peer in peers.values() {
-
                let lookup = peer.lookup(*proj_id);
+
                let lookup = peer.lookup(*proj_id).unwrap();

                if lookup.local.is_some() {
                    peer.get(*proj_id)
modified radicle-node/src/transport.rs
@@ -8,6 +8,7 @@ use nakamoto_net::{Io, Link};
use crate::address_book;
use crate::collections::HashMap;
use crate::crypto;
+
use crate::service::routing;
use crate::service::{Command, DisconnectReason, Event, Service};
use crate::storage::WriteStorage;
use crate::wire::Wire;
@@ -18,13 +19,13 @@ struct Peer {
}

#[derive(Debug)]
-
pub struct Transport<S, T, G> {
+
pub struct Transport<R, S, T, G> {
    peers: HashMap<net::IpAddr, Peer>,
-
    inner: Wire<S, T, G>,
+
    inner: Wire<R, S, T, G>,
}

-
impl<S, T, G> Transport<S, T, G> {
-
    pub fn new(inner: Wire<S, T, G>) -> Self {
+
impl<R, S, T, G> Transport<R, S, T, G> {
+
    pub fn new(inner: Wire<R, S, T, G>) -> Self {
        Self {
            peers: HashMap::default(),
            inner,
@@ -32,8 +33,9 @@ impl<S, T, G> Transport<S, T, G> {
    }
}

-
impl<S, T, G> nakamoto::Protocol for Transport<S, T, G>
+
impl<R, S, T, G> nakamoto::Protocol for Transport<R, S, T, G>
where
+
    R: routing::Store,
    T: WriteStorage + 'static,
    S: address_book::Store,
    G: crypto::Signer,
@@ -84,7 +86,7 @@ where
    }
}

-
impl<S, T, G> Iterator for Transport<S, T, G> {
+
impl<R, S, T, G> Iterator for Transport<R, S, T, G> {
    type Item = Io<Event, DisconnectReason>;

    fn next(&mut self) -> Option<Self::Item> {
@@ -92,15 +94,15 @@ impl<S, T, G> Iterator for Transport<S, T, G> {
    }
}

-
impl<S, T, G> Deref for Transport<S, T, G> {
-
    type Target = Service<S, T, G>;
+
impl<R, S, T, G> Deref for Transport<R, S, T, G> {
+
    type Target = Service<R, S, T, G>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

-
impl<S, T, G> DerefMut for Transport<S, T, G> {
+
impl<R, S, T, G> DerefMut for Transport<R, S, T, G> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
modified radicle-node/src/wire.rs
@@ -19,8 +19,8 @@ use crate::git::fmt;
use crate::hash::Digest;
use crate::identity::Id;
use crate::service;
-
use crate::service::filter;
use crate::service::reactor::Io;
+
use crate::service::{filter, routing};
use crate::storage::refs::Refs;
use crate::storage::refs::SignedRefs;
use crate::storage::WriteStorage;
@@ -437,13 +437,13 @@ impl Decode for SignedRefs<Unverified> {
}

#[derive(Debug)]
-
pub struct Wire<S, T, G> {
+
pub struct Wire<R, S, T, G> {
    inboxes: HashMap<IpAddr, Decoder>,
-
    inner: service::Service<S, T, G>,
+
    inner: service::Service<R, S, T, G>,
}

-
impl<S, T, G> Wire<S, T, G> {
-
    pub fn new(inner: service::Service<S, T, G>) -> Self {
+
impl<R, S, T, G> Wire<R, S, T, G> {
+
    pub fn new(inner: service::Service<R, S, T, G>) -> Self {
        Self {
            inboxes: HashMap::new(),
            inner,
@@ -451,8 +451,9 @@ impl<S, T, G> Wire<S, T, G> {
    }
}

-
impl<S, T, G> Wire<S, T, G>
+
impl<R, S, T, G> Wire<R, S, T, G>
where
+
    R: routing::Store,
    S: address_book::Store,
    T: WriteStorage + 'static,
    G: Signer,
@@ -501,7 +502,7 @@ where
    }
}

-
impl<S, T, G> Iterator for Wire<S, T, G> {
+
impl<R, S, T, G> Iterator for Wire<R, S, T, G> {
    type Item = nakamoto::Io<service::Event, service::DisconnectReason>;

    fn next(&mut self) -> Option<Self::Item> {
@@ -526,15 +527,15 @@ impl<S, T, G> Iterator for Wire<S, T, G> {
    }
}

-
impl<S, T, G> Deref for Wire<S, T, G> {
-
    type Target = service::Service<S, T, G>;
+
impl<R, S, T, G> Deref for Wire<R, S, T, G> {
+
    type Target = service::Service<R, S, T, G>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

-
impl<S, T, G> DerefMut for Wire<S, T, G> {
+
impl<R, S, T, G> DerefMut for Wire<R, S, T, G> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
modified radicle/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2021"
[features]
default = []
test = ["quickcheck"]
+
sql = ["rusqlite"]

[dependencies]
base64 = { version= "0.13" }
@@ -27,6 +28,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
siphasher = { version = "0.3.10" }
radicle-git-ext = { version = "0", features = ["serde"] }
+
rusqlite = { version = "0.28.0", features = ["bundled"], optional = true }
nonempty = { version = "0.8.0", features = ["serialize"] }
tempfile = { version = "3.3.0" }
thiserror = { version = "1" }
modified radicle/src/lib.rs
@@ -8,6 +8,8 @@ pub mod node;
pub mod profile;
pub mod rad;
pub mod serde_ext;
+
#[cfg(feature = "sql")]
+
pub mod sql;
pub mod ssh;
pub mod storage;
#[cfg(feature = "test")]
added radicle/src/sql.rs
@@ -0,0 +1,48 @@
+
use std::str;
+
use std::str::FromStr;
+

+
use rusqlite as sql;
+
use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
+

+
use crate::crypto::PublicKey;
+
use crate::identity::Id;
+

+
impl FromSql for Id {
+
    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
+
        match value {
+
            ValueRef::Text(id) => {
+
                let id = str::from_utf8(id).map_err(|e| FromSqlError::Other(Box::new(e)))?;
+
                let id = Id::from_str(id).map_err(|e| FromSqlError::Other(Box::new(e)))?;
+

+
                Ok(id)
+
            }
+
            _ => Err(FromSqlError::InvalidType),
+
        }
+
    }
+
}
+

+
impl ToSql for Id {
+
    fn to_sql(&self) -> sql::Result<ToSqlOutput<'_>> {
+
        Ok(ToSqlOutput::from(self.to_string()))
+
    }
+
}
+

+
impl FromSql for PublicKey {
+
    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
+
        match value {
+
            ValueRef::Text(pk) => {
+
                let pk = str::from_utf8(pk).map_err(|e| FromSqlError::Other(Box::new(e)))?;
+
                let pk = PublicKey::from_str(pk).map_err(|e| FromSqlError::Other(Box::new(e)))?;
+

+
                Ok(pk)
+
            }
+
            _ => Err(FromSqlError::InvalidType),
+
        }
+
    }
+
}
+

+
impl ToSql for PublicKey {
+
    fn to_sql(&self) -> sql::Result<ToSqlOutput<'_>> {
+
        Ok(ToSqlOutput::from(self.to_string()))
+
    }
+
}