Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement E2E node test
Alexis Sellier committed 3 years ago
commit 438ae8430433153bcdb98e84837fe90ac636b67d
parent 7962ac0a8e9304f39ae20c8095a2511a601f2ff4
33 files changed +710 -339
modified Cargo.lock
@@ -264,7 +264,6 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
-
 "block-padding",
 "generic-array",
]

@@ -278,12 +277,6 @@ dependencies = [
]

[[package]]
-
name = "block-padding"
-
version = "0.2.1"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
-

-
[[package]]
name = "bloomy"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -428,12 +421,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cec318a675afcb6a1ea1d4340e2d377e56e47c266f28043ceccbf4412ddfdd3b"

[[package]]
-
name = "convert_case"
-
version = "0.4.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
-

-
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -505,16 +492,6 @@ dependencies = [
]

[[package]]
-
name = "crypto-mac"
-
version = "0.11.1"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714"
-
dependencies = [
-
 "generic-array",
-
 "subtle",
-
]
-

-
[[package]]
name = "ct-codecs"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -599,14 +576,15 @@ dependencies = [
[[package]]
name = "cyphernet"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-cyphernet#5568667b76089052a4dfa3dec00d5032b52f505f"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet#2284beac543326b79705909b96efdcd2ecb008d3"
dependencies = [
 "amplify",
+
 "base32",
 "ed25519-compact",
 "multibase",
 "serde",
+
 "sha3",
 "socks",
-
 "torut",
]

[[package]]
@@ -652,10 +630,8 @@ version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
-
 "convert_case",
 "proc-macro2",
 "quote",
-
 "rustc_version",
 "syn",
]

@@ -801,7 +777,7 @@ dependencies = [
 "regex",
 "serde",
 "serde_json",
-
 "sha3 0.10.6",
+
 "sha3",
 "thiserror",
 "uint",
]
@@ -1164,16 +1140,6 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"

[[package]]
name = "hmac"
-
version = "0.11.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
-
dependencies = [
-
 "crypto-mac",
-
 "digest 0.9.0",
-
]
-

-
[[package]]
-
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
@@ -1370,11 +1336,12 @@ dependencies = [
[[package]]
name = "io-reactor"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#32b3d308566228ebeb04b6aea4d6f17a9c4dfe6c"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#d8660d8d0697bfd5b3983a5c1ea202ea26294639"
dependencies = [
 "amplify",
 "crossbeam-channel",
 "libc",
+
 "log",
 "popol",
 "socket2",
]
@@ -1446,7 +1413,7 @@ dependencies = [
 "ecdsa",
 "elliptic-curve",
 "sha2 0.10.6",
-
 "sha3 0.10.6",
+
 "sha3",
]

[[package]]
@@ -1537,6 +1504,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
 "cfg-if",
+
 "value-bag",
]

[[package]]
@@ -1618,12 +1586,13 @@ dependencies = [
[[package]]
name = "netservices"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#32b3d308566228ebeb04b6aea4d6f17a9c4dfe6c"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#d8660d8d0697bfd5b3983a5c1ea202ea26294639"
dependencies = [
 "amplify",
 "cyphernet",
 "io-reactor",
 "libc",
+
 "log",
 "socket2",
]

@@ -1930,7 +1899,7 @@ checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
[[package]]
name = "popol"
version = "1.0.0"
-
source = "git+https://github.com/Internet2-WG/popol?branch=api#02b8d4089bd234f3d9d46ef27f6e64cfbc45118a"
+
source = "git+https://github.com/Cyphernet-WG/popol?branch=api#0b78f5ef1c39741cfc67157b7d2c7a27064150b1"
dependencies = [
 "libc",
]
@@ -2044,6 +2013,7 @@ dependencies = [
name = "radicle"
version = "0.2.0"
dependencies = [
+
 "amplify",
 "base64",
 "byteorder",
 "crossbeam-channel",
@@ -2216,7 +2186,6 @@ dependencies = [
name = "radicle-node"
version = "0.2.0"
dependencies = [
-
 "amplify",
 "anyhow",
 "bloomy",
 "byteorder",
@@ -2425,7 +2394,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb"
dependencies = [
 "crypto-bigint",
-
 "hmac 0.12.1",
+
 "hmac",
 "zeroize",
]

@@ -2478,15 +2447,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6"

[[package]]
-
name = "rustc_version"
-
version = "0.4.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
-
dependencies = [
-
 "semver",
-
]
-

-
[[package]]
name = "rustix"
version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2557,7 +2517,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f9e24d2b632954ded8ab2ef9fea0a0c769ea56ea98bddbafbad22caeeadf45d"
dependencies = [
-
 "hmac 0.12.1",
+
 "hmac",
 "pbkdf2",
 "salsa20",
 "sha2 0.10.6",
@@ -2578,12 +2538,6 @@ dependencies = [
]

[[package]]
-
name = "semver"
-
version = "1.0.14"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
-

-
[[package]]
name = "serde"
version = "1.0.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2676,18 +2630,6 @@ dependencies = [

[[package]]
name = "sha3"
-
version = "0.9.1"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f81199417d4e5de3f04b1e871023acea7389672c4135918f05aa9cbf2f2fa809"
-
dependencies = [
-
 "block-buffer 0.9.0",
-
 "digest 0.9.0",
-
 "keccak",
-
 "opaque-debug",
-
]
-

-
[[package]]
-
name = "sha3"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdf0c33fae925bdc080598b84bc15c55e7b9a4a43b3c704da051f977469691c9"
@@ -2744,7 +2686,7 @@ dependencies = [
 "iri-string",
 "k256",
 "rand 0.8.5",
-
 "sha3 0.10.6",
+
 "sha3",
 "thiserror",
 "time 0.3.17",
]
@@ -3145,26 +3087,6 @@ dependencies = [
]

[[package]]
-
name = "torut"
-
version = "0.2.1"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "99febc413f26cf855b3a309c5872edff5c31e0ffe9c2fce5681868761df36f69"
-
dependencies = [
-
 "base32",
-
 "base64",
-
 "derive_more",
-
 "ed25519-dalek",
-
 "hex",
-
 "hmac 0.11.0",
-
 "rand 0.7.3",
-
 "serde",
-
 "serde_derive",
-
 "sha2 0.9.9",
-
 "sha3 0.9.1",
-
 "tokio",
-
]
-

-
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3362,6 +3284,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"

[[package]]
+
name = "value-bag"
+
version = "1.0.0-alpha.9"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55"
+
dependencies = [
+
 "ctor",
+
 "version_check",
+
]
+

+
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified radicle-crypto/src/lib.rs
@@ -171,7 +171,21 @@ impl PublicKey {
}

#[cfg(feature = "cyphernet")]
-
impl cyphernet::crypto::EcPk for PublicKey {}
+
impl cyphernet::crypto::EcPk for PublicKey {
+
    // TODO: Change this once NoiseXK is working.
+
    fn generator() -> Self {
+
        use amplify::hex::FromHex;
+

+
        ed25519::PublicKey::from_slice(
+
            &Vec::<u8>::from_hex(
+
                "5866666666666666666666666666666666666666666666666666666666666666",
+
            )
+
            .unwrap(),
+
        )
+
        .unwrap()
+
        .into()
+
    }
+
}

/// The private/signing key.
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
modified radicle-crypto/src/ssh/keystore.rs
@@ -154,7 +154,6 @@ impl Signer for MemorySigner {

#[cfg(feature = "cyphernet")]
impl cyphernet::crypto::Ecdh for MemorySigner {
-
    type Pk = PublicKey;
    type Secret = crate::SharedSecret;
    type Err = ed25519_compact::Error;

@@ -167,6 +166,15 @@ impl cyphernet::crypto::Ecdh for MemorySigner {
    }
}

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::crypto::EcSk for MemorySigner {
+
    type Pk = PublicKey;
+

+
    fn to_pk(&self) -> Self::Pk {
+
        self.public
+
    }
+
}
+

impl MemorySigner {
    /// Load this signer from a keystore, given a secret key passphrase.
    pub fn load(keystore: &Keystore, passphrase: Passphrase) -> Result<Self, MemorySignerError> {
@@ -184,6 +192,18 @@ impl MemorySigner {
    pub fn boxed(self) -> Box<dyn Signer> {
        Box::new(self)
    }
+

+
    /// Generate a new memory signer.
+
    pub fn gen() -> Self {
+
        let seed = crate::Seed::generate();
+
        let keypair = KeyPair::from_seed(seed);
+
        let sk = keypair.sk;
+

+
        Self {
+
            public: sk.public_key().into(),
+
            secret: Zeroizing::new(sk.into()),
+
        }
+
    }
}

impl TryFrom<ssh_key::PublicKey> for PublicKey {
modified radicle-crypto/src/test/signer.rs
@@ -74,7 +74,6 @@ impl Signer for MockSigner {

#[cfg(feature = "cyphernet")]
impl cyphernet::crypto::Ecdh for MockSigner {
-
    type Pk = PublicKey;
    type Secret = crate::SharedSecret;
    type Err = crate::Error;

@@ -82,3 +81,12 @@ impl cyphernet::crypto::Ecdh for MockSigner {
        Ok([0; 32])
    }
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::crypto::EcSk for MockSigner {
+
    type Pk = PublicKey;
+

+
    fn to_pk(&self) -> Self::Pk {
+
        self.pk
+
    }
+
}
modified radicle-node/Cargo.toml
@@ -9,7 +9,6 @@ edition = "2021"
test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "qcheck"]

[dependencies]
-
amplify = { version = "4.0.0-beta.1", default-features = false, features = ["std"] }
anyhow = { version = "1" }
bloomy = { version = "1.2" }
byteorder = { version = "1" }
@@ -22,9 +21,9 @@ git-ref-format = { version = "0", features = ["serde", "macro"] }
lexopt = { version = "0.2.1" }
log = { version = "0.4.17", features = ["std"] }
nakamoto-net = { version = "0.3.0" }
-
netservices = { version = "0", features = ["io-reactor", "socket2"] }
+
netservices = { version = "0", features = ["io-reactor", "socket2", "log"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
-
io-reactor = { version = "0", features = ["popol", "socket2"] }
+
io-reactor = { version = "0", features = ["popol", "socket2", "log"] }
qcheck = { version = "1", default-features = false, optional = true }
sqlite = { version = "0.30.3" }
sqlite3-src = { version = "0.4.0", features = ["bundled"] } # Ensures static linking
modified radicle-node/src/address/store.rs
@@ -1,15 +1,14 @@
use std::path::Path;
-
use std::str::FromStr;
use std::{fmt, io};

use radicle::node;
+
use radicle::node::Address;
use sqlite as sql;
use thiserror::Error;

use crate::address::types;
use crate::address::{KnownAddress, Source};
use crate::clock::Timestamp;
-
use crate::prelude::Address;
use crate::service::NodeId;
use crate::sql::transaction;
use crate::wire::AddressType;
@@ -229,29 +228,6 @@ pub trait Store {
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error>;
}

-
impl TryFrom<&sql::Value> for Address {
-
    type Error = sql::Error;
-

-
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
-
        match value {
-
            sql::Value::String(s) => Address::from_str(s.as_str()).map_err(|e| sql::Error {
-
                code: None,
-
                message: Some(e.to_string()),
-
            }),
-
            _ => Err(sql::Error {
-
                code: None,
-
                message: Some("sql: invalid type for address".to_owned()),
-
            }),
-
        }
-
    }
-
}
-

-
impl sql::BindableWithIndex for Address {
-
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
-
        self.to_string().bind(stmt, i)
-
    }
-
}
-

impl TryFrom<&sql::Value> for Source {
    type Error = sql::Error;

modified radicle-node/src/address/types.rs
@@ -2,10 +2,10 @@ use std::ops::{Deref, DerefMut};

use nonempty::NonEmpty;
use radicle::node;
+
use radicle::node::Address;

use crate::clock::Timestamp;
use crate::collections::HashMap;
-
use crate::service::message::Address;
use crate::LocalTime;

/// A map with the ability to randomly select values.
modified radicle-node/src/client.rs
@@ -1,11 +1,21 @@
use std::io;
+
use std::{net, thread, time};

+
use netservices::resources::NetAccept;
+
use reactor::poller::popol;
+
use reactor::Reactor;
use thiserror::Error;

use crate::address;
+
use crate::control;
+
use crate::node::NodeId;
use crate::service::{routing, tracking};
+
use crate::wire::Transport;
+
use crate::worker::{WorkerPool, WorkerReq};
+
use crate::{crypto, profile, service, LocalTime};

pub mod handle;
+
use handle::Handle;

/// Directory in `$RAD_HOME` under which node-specific files are stored.
pub const NODE_DIR: &str = "node";
@@ -34,4 +44,104 @@ pub enum Error {
    /// A networking error.
    #[error("network error: {0}")]
    Net(#[from] nakamoto_net::error::Error),
+
    /// A control socket error.
+
    #[error("control socket error: {0}")]
+
    Control(#[from] control::Error),
+
}
+

+
/// Holds join handles to the client threads, as well as a client handle.
+
pub struct Runtime<G: crypto::Signer + crypto::Negotiator> {
+
    pub id: NodeId,
+
    pub handle: Handle<Transport<routing::Table, address::Book, radicle::Storage, G>>,
+
    pub control: thread::JoinHandle<Result<(), control::Error>>,
+
    pub reactor: Reactor<Transport<service::routing::Table, address::Book, radicle::Storage, G>>,
+
    pub pool: WorkerPool,
+
    pub local_addrs: Vec<net::SocketAddr>,
+
}
+

+
impl<G: crypto::Signer + crypto::Negotiator + 'static> Runtime<G> {
+
    /// Run the client.
+
    ///
+
    /// This function spawns threads.
+
    pub fn with(
+
        profile: profile::Profile,
+
        config: service::Config,
+
        listen: Vec<net::SocketAddr>,
+
        proxy: net::SocketAddr,
+
        signer: G,
+
    ) -> Result<Runtime<G>, Error> {
+
        let id = *profile.id();
+
        let node = profile.node();
+
        let negotiator = signer.clone();
+
        let network = config.network;
+
        let rng = fastrand::Rng::new();
+
        let clock = LocalTime::now();
+
        let storage = profile.storage;
+
        let node_dir = profile.home.join(NODE_DIR);
+
        let address_db = node_dir.join(ADDRESS_DB_FILE);
+
        let routing_db = node_dir.join(ROUTING_DB_FILE);
+
        let tracking_db = node_dir.join(TRACKING_DB_FILE);
+

+
        log::info!("Opening address book {}..", address_db.display());
+
        let addresses = address::Book::open(address_db)?;
+

+
        log::info!("Opening routing table {}..", routing_db.display());
+
        let routing = routing::Table::open(routing_db)?;
+

+
        log::info!("Opening tracking policy table {}..", tracking_db.display());
+
        let tracking = tracking::Config::open(tracking_db)?;
+

+
        log::info!("Initializing service ({:?})..", network);
+
        let service = service::Service::new(
+
            config,
+
            clock,
+
            routing,
+
            storage.clone(),
+
            addresses,
+
            tracking,
+
            signer,
+
            rng,
+
        );
+

+
        let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<G>>();
+
        let pool = WorkerPool::with(10, time::Duration::from_secs(9), storage, worker_recv);
+
        let wire = Transport::new(service, worker_send, negotiator.clone(), proxy, clock);
+
        let reactor = Reactor::new(wire, popol::Poller::new())?;
+
        let handle = Handle::from(reactor.controller());
+
        let control = thread::spawn({
+
            let handle = handle.clone();
+
            move || control::listen(node, handle)
+
        });
+
        let controller = reactor.controller();
+
        let mut local_addrs = Vec::new();
+

+
        for addr in listen {
+
            let listener = NetAccept::bind(addr, negotiator.clone())?;
+
            let local_addr = listener.local_addr();
+

+
            local_addrs.push(local_addr);
+
            controller.register_listener(listener)?;
+

+
            log::info!("Listening on {local_addr}..");
+
        }
+

+
        Ok(Runtime {
+
            id,
+
            control,
+
            reactor,
+
            handle,
+
            pool,
+
            local_addrs,
+
        })
+
    }
+

+
    pub fn run(self) -> Result<(), Error> {
+
        log::info!("Running node {}..", self.id);
+

+
        self.pool.run().unwrap();
+
        self.control.join().unwrap()?;
+
        self.reactor.join().unwrap();
+

+
        Ok(())
+
    }
}
modified radicle-node/src/client/handle.rs
@@ -50,6 +50,14 @@ pub struct Handle<T: reactor::Handler> {
    pub(crate) controller: reactor::Controller<T>,
}

+
impl<T: reactor::Handler> Clone for Handle<T> {
+
    fn clone(&self) -> Self {
+
        Self {
+
            controller: self.controller.clone(),
+
        }
+
    }
+
}
+

impl<T: reactor::Handler> From<reactor::Controller<T>> for Handle<T> {
    fn from(controller: reactor::Controller<T>) -> Handle<T> {
        Handle { controller }
@@ -68,6 +76,12 @@ impl<T: reactor::Handler<Command = service::Command>> radicle::node::Handle for
    type FetchLookup = FetchLookup;
    type Error = Error;

+
    fn connect(&mut self, node: NodeId, addr: radicle::node::Address) -> Result<(), Error> {
+
        self.command(service::Command::Connect(node, addr))?;
+

+
        Ok(())
+
    }
+

    fn fetch(&mut self, id: Id) -> Result<Self::FetchLookup, Error> {
        let (sender, receiver) = chan::bounded(1);
        self.command(service::Command::Fetch(id, sender))?;
modified radicle-node/src/lib.rs
@@ -1,6 +1,3 @@
-
#[macro_use]
-
extern crate amplify;
-

pub mod address;
pub mod bounded;
pub mod client;
@@ -27,8 +24,8 @@ pub mod prelude {
    pub use crate::crypto::{PublicKey, Signature, Signer};
    pub use crate::deserializer::Deserializer;
    pub use crate::identity::{Did, Id};
+
    pub use crate::node::Address;
    pub use crate::service::filter::Filter;
-
    pub use crate::service::message::Address;
    pub use crate::service::{DisconnectReason, Event, Message, Network, NodeId};
    pub use crate::storage::refs::Refs;
    pub use crate::storage::WriteStorage;
modified radicle-node/src/main.rs
@@ -1,29 +1,20 @@
-
use std::{env, net, process, thread};
+
use std::{env, net, process};

use anyhow::Context as _;
use cyphernet::addr::PeerAddr;
-
use nakamoto_net::{LocalDuration, LocalTime};
-
use netservices::wire::NetAccept;
-
use reactor::poller::popol;
-
use reactor::Reactor;
+
use nakamoto_net::LocalDuration;

use radicle::profile;
-
use radicle_node::client::handle::Handle;
-
use radicle_node::client::{ADDRESS_DB_FILE, NODE_DIR, ROUTING_DB_FILE, TRACKING_DB_FILE};
+
use radicle_node::client::Runtime;
use radicle_node::crypto::ssh::keystore::MemorySigner;
use radicle_node::prelude::{Address, NodeId};
-
use radicle_node::service::{routing, tracking};
-
use radicle_node::wire::Transport;
-
use radicle_node::worker::{WorkerPool, WorkerReq};
-
use radicle_node::{address, control, logger, service};
+
use radicle_node::{logger, service};

#[derive(Debug)]
struct Options {
    connect: Vec<(NodeId, Address)>,
    external_addresses: Vec<Address>,
    limits: service::config::Limits,
-
    // FIXME(cloudhead): Listen on incoming connections.
-
    #[allow(dead_code)]
    listen: Vec<net::SocketAddr>,
}

@@ -87,63 +78,20 @@ fn main() -> anyhow::Result<()> {

    let options = Options::from_env()?;
    let profile = radicle::Profile::load().context("Failed to load node profile")?;
-
    let node = profile.node();
    let passphrase = env::var(profile::env::RAD_PASSPHRASE)
        .context("`RAD_PASSPHRASE` is required to be set for the node to establish connections")?
        .into();
    let signer = MemorySigner::load(&profile.keystore, passphrase)?;
-
    let negotiator = signer.clone();
    let config = service::Config {
        connect: options.connect.into_iter().collect(),
        external_addresses: options.external_addresses,
        limits: options.limits,
        ..service::Config::default()
    };
-
    let proxy_addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
-
    let network = config.network;
-
    let rng = fastrand::Rng::new();
-
    let clock = LocalTime::now();
-
    let storage = profile.storage;
-
    let node_dir = profile.home.join(NODE_DIR);
-
    let address_db = node_dir.join(ADDRESS_DB_FILE);
-
    let routing_db = node_dir.join(ROUTING_DB_FILE);
-
    let tracking_db = node_dir.join(TRACKING_DB_FILE);
+
    let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
+
    let runtime = Runtime::with(profile, config, options.listen, proxy, signer)?;

-
    log::info!("Opening address book {}..", address_db.display());
-
    let addresses = address::Book::open(address_db)?;
-

-
    log::info!("Opening routing table {}..", routing_db.display());
-
    let routing = routing::Table::open(routing_db)?;
-

-
    log::info!("Opening tracking policy table {}..", tracking_db.display());
-
    let tracking = tracking::Config::open(tracking_db)?;
-

-
    log::info!("Initializing service ({:?})..", network);
-
    let worker_storage = storage.clone();
-
    let service = service::Service::new(
-
        config, clock, routing, storage, addresses, tracking, signer, rng,
-
    );
-

-
    let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<MemorySigner>>();
-
    let pool = WorkerPool::with(10, worker_storage, worker_recv);
-
    let wire = Transport::new(service, worker_send, negotiator.clone(), proxy_addr, clock);
-
    let reactor =
-
        Reactor::new(wire, popol::Poller::new()).expect("unable to instantiate P2P reactor");
-
    let controller = reactor.controller();
-

-
    for socket in options.listen {
-
        let listener = NetAccept::bind(socket, negotiator.clone())?;
-
        controller.register_listener(listener)?;
-

-
        log::info!("Listening on {socket}..");
-
    }
-

-
    let handle = Handle::from(controller);
-
    let control = thread::spawn(move || control::listen(node, handle));
-

-
    pool.join().unwrap();
-
    control.join().unwrap()?;
-
    reactor.join().unwrap();
+
    runtime.run()?;

    Ok(())
}
modified radicle-node/src/service.rs
@@ -11,7 +11,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
-
use std::{fmt, net, str};
+
use std::{fmt, io, net, str};

use crossbeam_channel as chan;
use fastrand::Rng;
@@ -20,7 +20,7 @@ use nakamoto::{LocalDuration, LocalTime};
use nakamoto_net as nakamoto;
use nakamoto_net::Link;
use nonempty::NonEmpty;
-
use radicle::node::Features;
+
use radicle::node::{Address, Features};
use radicle::storage::{Namespaces, ReadStorage};

use crate::address;
@@ -32,8 +32,9 @@ use crate::git;
use crate::identity::{Doc, Id};
use crate::node;
use crate::prelude::*;
-
use crate::service::message::{Address, Announcement, AnnouncementMessage, Ping};
+
use crate::service::message::{Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
+
use crate::service::session::Protocol;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};

@@ -46,8 +47,6 @@ use self::gossip::Gossip;
use self::message::InventoryAnnouncement;
use self::reactor::Reactor;

-
/// Default radicle protocol port.
-
pub const DEFAULT_PORT: u16 = 8776;
/// Target number of peers to maintain connections to.
pub const TARGET_OUTBOUND_PEERS: usize = 8;
/// How often to run the "idle" task.
@@ -104,6 +103,8 @@ pub enum FetchError {
    Storage(#[from] storage::Error),
    #[error(transparent)]
    Fetch(#[from] storage::FetchError),
+
    #[error(transparent)]
+
    Io(#[from] io::Error),
}

/// Result of looking up seeds in our routing table.
@@ -422,7 +423,7 @@ where
    }

    pub fn command(&mut self, cmd: Command) {
-
        debug!("Command {:?}", cmd);
+
        debug!("Received command {:?}", cmd);

        match cmd {
            Command::Connect(id, addr) => self.reactor.connect(id, addr),
@@ -458,12 +459,12 @@ where
                // TODO: Limit the number of seeds we fetch from? Randomize?
                for seed in seeds {
                    let session = self.sessions.get_mut(&seed).unwrap();
-
                    if let Some(upgrade) = session.upgrade(id) {
-
                        self.reactor.write(session.id, upgrade);
+
                    if let Some(fetch) = session.fetch(id) {
+
                        self.reactor.write(session.id, fetch);
                        self.reactor
                            .fetch(session.id, id, Namespaces::default(), true);
                    } else {
-
                        // TODO: If we can't upgrade, it's because we're already fetching from
+
                        // TODO: If we can't fetch, it's because we're already fetching from
                        // this peer. So we need to queue the request, or find another peer.
                        todo!();
                    }
@@ -508,13 +509,16 @@ where

    pub fn fetch_complete(&mut self, _result: FetchResult) {
        // TODO(cloudhead): handle completed job with service business logic
+
        // TODO: Downgrade session to gossip protocol.
    }

    pub fn accepted(&mut self, _addr: net::SocketAddr) {
        // Inbound connection attempt.
    }

-
    pub fn attempted(&mut self, id: NodeId, _addr: &Address) {
+
    pub fn attempted(&mut self, id: NodeId, addr: &Address) {
+
        debug!("Attempted connection to {id} ({addr})");
+

        let persistent = self.config.is_persistent(&id);
        let peer = self.sessions.entry(id).or_insert_with(|| {
            Session::new(id, Link::Outbound, persistent, self.rng.clone(), self.clock)
@@ -792,14 +796,21 @@ where
        };
        peer.last_active = self.clock;

-
        debug!("Received {:?} from {}", &message, peer.id);
+
        debug!("Received message {:?} from {}", &message, peer.id);

        match (&mut peer.state, message) {
+
            (session::State::Connected { protocol, .. }, _) if *protocol == Protocol::Fetch => {
+
                // This should never happen if the service is properly configured, since all
+
                // incoming data is sent directly to the Git worker.
+
                log::error!("Received gossip message from {remote} during Git fetch");
+

+
                return Err(session::Error::Misbehavior);
+
            }
            (session::State::Connected { initialized, .. }, Message::Initialize {}) => {
                // Already initialized!
                if *initialized {
                    debug!(
-
                        "Disconnecting peer {} for sending us a message before initializing",
+
                        "Disconnecting peer {} for initializing already initialized session",
                        peer.id
                    );
                    return Err(session::Error::Misbehavior);
@@ -870,7 +881,8 @@ where
                    }
                }
            }
-
            (session::State::Connected { .. }, Message::Upgrade { repo }) => {
+
            (session::State::Connected { protocol, .. }, Message::Fetch { repo }) => {
+
                *protocol = Protocol::Fetch;
                // All we need is to instruct the transport to handover to the worker
                self.reactor
                    .fetch(*remote, repo, Namespaces::default(), false);
modified radicle-node/src/service/config.rs
@@ -1,6 +1,7 @@
use super::nakamoto::LocalDuration;

-
use crate::service::message::Address;
+
use radicle::node::Address;
+

use crate::service::NodeId;

/// Peer-to-peer network.
@@ -41,8 +42,6 @@ pub struct Config {
    pub network: Network,
    /// Whether or not our node should relay inventories.
    pub relay: bool,
-
    /// List of addresses to listen on for protocol connections.
-
    pub listen: Vec<Address>,
    /// Configured service limits.
    pub limits: Limits,
}
@@ -54,7 +53,6 @@ impl Default for Config {
            external_addresses: vec![],
            network: Network::default(),
            relay: true,
-
            listen: vec![],
            limits: Limits::default(),
        }
    }
modified radicle-node/src/service/message.rs
@@ -1,13 +1,13 @@
-
use cyphernet::addr::{HostAddr, NetAddr};
-
use std::{fmt, io, mem, net, ops};
+
use std::{fmt, io, mem};

use crate::crypto;
use crate::git;
use crate::identity::Id;
use crate::node;
+
use crate::node::Address;
use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
-
use crate::service::{NodeId, Timestamp, DEFAULT_PORT};
+
use crate::service::{NodeId, Timestamp};
use crate::wire;

/// Maximum number of addresses which can be announced to other nodes.
@@ -27,34 +27,6 @@ impl fmt::Display for Hostname {
    }
}

-
/// Peer public protocol address.
-
#[derive(Wrapper, Clone, Eq, PartialEq, Debug, From)]
-
#[wrapper(Display, FromStr)]
-
pub struct Address(NetAddr<DEFAULT_PORT>);
-

-
impl cyphernet::addr::Addr for Address {
-
    fn port(&self) -> u16 {
-
        self.0.port()
-
    }
-
}
-

-
impl ops::Deref for Address {
-
    type Target = NetAddr<DEFAULT_PORT>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl From<net::SocketAddr> for Address {
-
    fn from(addr: net::SocketAddr) -> Self {
-
        Address(NetAddr {
-
            host: HostAddr::Ip(addr.ip()),
-
            port: Some(addr.port()),
-
        })
-
    }
-
}
-

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Subscribe {
    /// Subscribe to events matching this filter.
@@ -332,8 +304,8 @@ pub enum Message {
        zeroes: ZeroBytes,
    },

-
    /// Upgrade session to Git protocol for the given repository.
-
    Upgrade { repo: Id },
+
    /// Upgrade session to Git protocol and fetch the given repository.
+
    Fetch { repo: Id },
}

impl Message {
@@ -419,7 +391,7 @@ impl fmt::Debug for Message {
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {:?})", zeroes),
            Self::Pong { zeroes } => write!(f, "Pong({:?})", zeroes),
-
            Self::Upgrade { repo } => write!(f, "Upgrade({repo})"),
+
            Self::Fetch { repo } => write!(f, "Upgrade({repo})"),
        }
    }
}
modified radicle-node/src/service/reactor.rs
@@ -32,10 +32,10 @@ pub struct Fetch {
    pub repo: Id,
    /// Namespaces to fetch.
    pub namespaces: Namespaces,
-
    /// Connection we are fetching from
+
    /// Remote peer we are interacting with.
    pub remote: NodeId,
-
    /// Indicates whether the fetch request is initiated by a local party
-
    pub initiate: bool,
+
    /// Indicates whether the fetch request was initiated by us.
+
    pub initiated: bool,
}

/// Result of a fetch request from a specific seed.
@@ -87,14 +87,17 @@ impl Reactor {
        self.io.push_back(Io::Wakeup(after));
    }

-
    pub fn fetch(&mut self, remote: NodeId, repo: Id, namespaces: Namespaces, initiate: bool) {
-
        debug!("Fetch {} from {}..", repo, remote);
-

+
    pub fn fetch(&mut self, remote: NodeId, repo: Id, namespaces: Namespaces, initiated: bool) {
+
        if initiated {
+
            debug!("Fetch initiated for {} from {}..", repo, remote);
+
        } else {
+
            debug!("Fetch requested for {} from {}..", repo, remote);
+
        }
        self.io.push_back(Io::Fetch(Fetch {
            repo,
            namespaces,
            remote,
-
            initiate,
+
            initiated,
        }));
    }

modified radicle-node/src/service/session.rs
@@ -22,8 +22,8 @@ pub enum Protocol {
    Gossip,
    /// Git smart protocol. Used for fetching repository data.
    /// This protocol is used after a connection upgrade via the
-
    /// [`Message::Upgrade`] message.
-
    Git,
+
    /// [`Message::Fetch`] message.
+
    Fetch,
}

#[derive(Debug, Clone)]
@@ -119,11 +119,11 @@ impl Session {
        self.attempts += 1;
    }

-
    pub fn upgrade(&mut self, repo: Id) -> Option<Message> {
+
    pub fn fetch(&mut self, repo: Id) -> Option<Message> {
        if let State::Connected { protocol, .. } = &mut self.state {
            if *protocol == Protocol::Gossip {
-
                *protocol = Protocol::Git;
-
                return Some(Message::Upgrade { repo });
+
                *protocol = Protocol::Fetch;
+
                return Some(Message::Fetch { repo });
            } else {
                log::error!(
                    "Attempted to upgrade protocol for {} which was already upgraded",
modified radicle-node/src/test/arbitrary.rs
@@ -1,15 +1,12 @@
-
use std::net;
-

use bloomy::BloomFilter;
-
use cyphernet::addr::{HostAddr, NetAddr};
use qcheck::Arbitrary;

use crate::crypto;
use crate::prelude::{BoundedVec, Id, NodeId, Refs, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE_L, FILTER_SIZE_M, FILTER_SIZE_S};
use crate::service::message::{
-
    Address, Announcement, InventoryAnnouncement, Message, NodeAnnouncement, Ping,
-
    RefsAnnouncement, Subscribe, ZeroBytes,
+
    Announcement, InventoryAnnouncement, Message, NodeAnnouncement, Ping, RefsAnnouncement,
+
    Subscribe, ZeroBytes,
};
use crate::wire::MessageType;

@@ -103,21 +100,6 @@ impl Arbitrary for Message {
    }
}

-
impl Arbitrary for Address {
-
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        let ip = if bool::arbitrary(g) {
-
            net::IpAddr::V4(net::Ipv4Addr::from(u32::arbitrary(g)))
-
        } else {
-
            let octets: [u8; 16] = Arbitrary::arbitrary(g);
-
            net::IpAddr::V6(net::Ipv6Addr::from(octets))
-
        };
-
        Address::from(NetAddr {
-
            host: HostAddr::Ip(ip),
-
            port: Some(u16::arbitrary(g)),
-
        })
-
    }
-
}
-

impl Arbitrary for ZeroBytes {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
        ZeroBytes::new(u16::arbitrary(g))
modified radicle-node/src/test/handle.rs
@@ -21,6 +21,10 @@ impl radicle::node::Handle for Handle {
    type Session = service::Session;
    type FetchLookup = FetchLookup;

+
    fn connect(&mut self, _node: NodeId, _addr: radicle::node::Address) -> Result<(), Error> {
+
        unimplemented!();
+
    }
+

    fn fetch(&mut self, _id: Id) -> Result<FetchLookup, Error> {
        Ok(FetchLookup::NotFound)
    }
modified radicle-node/src/test/peer.rs
@@ -210,8 +210,7 @@ where
                features: node::Features::SEED,
                timestamp: self.timestamp(),
                alias,
-
                addresses: Some(net::SocketAddr::from((self.ip, service::DEFAULT_PORT)).into())
-
                    .into(),
+
                addresses: Some(net::SocketAddr::from((self.ip, node::DEFAULT_PORT)).into()).into(),
                nonce: 0,
            }
            .solve(),
modified radicle-node/src/tests.rs
@@ -1,3 +1,5 @@
+
mod e2e;
+

use std::default::*;
use std::io;
use std::sync::Arc;
added radicle-node/src/tests/e2e.rs
@@ -0,0 +1,131 @@
+
use std::path::{Path, PathBuf};
+
use std::{
+
    collections::{BTreeMap, BTreeSet},
+
    net, thread,
+
};
+

+
use radicle::crypto::ssh::keystore::MemorySigner;
+
use radicle::git::refname;
+
use radicle::identity::Id;
+
use radicle::node::Handle;
+
use radicle::rad;
+
use radicle::test::fixtures;
+
use radicle::Profile;
+
use radicle::Storage;
+

+
use crate::address;
+
use crate::node::NodeId;
+
use crate::service::routing;
+
use crate::storage::git::transport;
+
use crate::wire::Transport;
+
use crate::{client, client::Runtime, service};
+

+
type TestHandle = (
+
    client::handle::Handle<Transport<routing::Table, address::Book, Storage, MemorySigner>>,
+
    thread::JoinHandle<Result<(), client::Error>>,
+
);
+

+
/// Populate a storage instance with a project.
+
fn populate(storage: &Storage, signer: &MemorySigner) {
+
    transport::local::register(storage.clone());
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let (repo, _) = fixtures::repository(tmp.path().join("acme"));
+

+
    rad::init(
+
        &repo,
+
        "acme",
+
        "Acme's Repo",
+
        refname!("master"),
+
        signer,
+
        storage,
+
    )
+
    .unwrap();
+
}
+

+
/// Create a node runtime.
+
fn runtime(home: &Path, config: service::Config) -> Runtime<MemorySigner> {
+
    let profile = Profile::init(home, "pasphrase".to_owned()).unwrap();
+
    let signer = MemorySigner::gen();
+
    let listen = vec![([0, 0, 0, 0], 0).into()];
+
    let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
+

+
    populate(&profile.storage, &signer);
+

+
    Runtime::with(profile, config, listen, proxy, signer).unwrap()
+
}
+

+
/// Create a network of nodes connected to each other.
+
fn network(
+
    configs: impl IntoIterator<Item = (service::Config, PathBuf)>,
+
) -> BTreeMap<(NodeId, net::SocketAddr), TestHandle> {
+
    let mut runtimes = BTreeMap::new();
+
    for (config, home) in configs.into_iter() {
+
        let rt = runtime(home.as_ref(), config);
+
        let id = rt.id;
+
        let addr = *rt.local_addrs.first().unwrap();
+
        let handle = rt.handle.clone();
+
        let join = thread::spawn(|| rt.run());
+

+
        runtimes.insert((id, addr), (handle, join));
+
    }
+

+
    let mut connect = Vec::new();
+
    for (i, (from, _)) in runtimes.iter().enumerate() {
+
        let peers = runtimes
+
            .iter()
+
            .skip(i + 1)
+
            .map(|(p, _)| *p)
+
            .collect::<Vec<(NodeId, net::SocketAddr)>>();
+
        for to in peers {
+
            connect.push((*from, to));
+
        }
+
    }
+

+
    for (from, (to_id, to_addr)) in connect {
+
        let (handle, _) = runtimes.get_mut(&from).unwrap();
+
        handle.connect(to_id, to_addr.into()).unwrap();
+
    }
+
    runtimes
+
}
+

+
/// Checks whether the nodes have converged in their routing tables.
+
#[track_caller]
+
fn check(
+
    nodes: impl IntoIterator<Item = ((NodeId, net::SocketAddr), TestHandle)>,
+
) -> BTreeSet<(Id, NodeId)> {
+
    let mut by_node = BTreeMap::<NodeId, BTreeSet<(Id, NodeId)>>::new();
+
    let mut all = BTreeSet::<(Id, NodeId)>::new();
+

+
    for ((id, _), (handle, _)) in nodes {
+
        let routing = handle.routing().unwrap();
+

+
        for (rid, node) in routing.try_iter() {
+
            all.insert((rid, node));
+
            by_node
+
                .entry(id)
+
                .or_insert_with(BTreeSet::new)
+
                .insert((rid, node));
+
        }
+
    }
+

+
    for (node, routes) in by_node {
+
        assert_eq!(routes, all, "{node} failed to converge");
+
    }
+
    all
+
}
+

+
#[test]
+
fn test_e2e() {
+
    let tmp = tempfile::tempdir().unwrap();
+
    let base = tmp.path();
+
    let nodes = network(vec![
+
        (service::Config::default(), base.join("alice")),
+
        (service::Config::default(), base.join("bob")),
+
    ]);
+
    // TODO: Find a better way to wait for synchronization, eg. using events, or using a loop.
+
    thread::sleep(std::time::Duration::from_secs(3));
+

+
    let routes = check(nodes);
+
    assert_eq!(routes.len(), 2);
+
}
modified radicle-node/src/wire/message.rs
@@ -2,6 +2,7 @@ use std::{io, mem, net};

use byteorder::{NetworkEndian, ReadBytesExt};
use cyphernet::addr::{Addr, HostAddr, NetAddr};
+
use radicle::node::Address;

use crate::prelude::*;
use crate::service::message::*;
@@ -19,7 +20,7 @@ pub enum MessageType {
    Subscribe = 8,
    Ping = 10,
    Pong = 12,
-
    Upgrade = 14,
+
    Fetch = 14,
}

impl From<MessageType> for u16 {
@@ -40,7 +41,7 @@ impl TryFrom<u16> for MessageType {
            8 => Ok(MessageType::Subscribe),
            10 => Ok(MessageType::Ping),
            12 => Ok(MessageType::Pong),
-
            14 => Ok(MessageType::Upgrade),
+
            14 => Ok(MessageType::Fetch),
            _ => Err(other),
        }
    }
@@ -62,7 +63,7 @@ impl Message {
            },
            Self::Ping { .. } => MessageType::Ping,
            Self::Pong { .. } => MessageType::Pong,
-
            Self::Upgrade { .. } => MessageType::Upgrade,
+
            Self::Fetch { .. } => MessageType::Fetch,
        }
        .into()
    }
@@ -216,7 +217,7 @@ impl wire::Encode for Message {
            Self::Pong { zeroes } => {
                n += zeroes.encode(writer)?;
            }
-
            Self::Upgrade { repo } => {
+
            Self::Fetch { repo } => {
                n += repo.encode(writer)?;
            }
        }
@@ -293,9 +294,9 @@ impl wire::Decode for Message {
                let zeroes = ZeroBytes::decode(reader)?;
                Ok(Self::Pong { zeroes })
            }
-
            Ok(MessageType::Upgrade) => {
+
            Ok(MessageType::Fetch) => {
                let repo = Id::decode(reader)?;
-
                Ok(Self::Upgrade { repo })
+
                Ok(Self::Fetch { repo })
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }
modified radicle-node/src/wire/transport.rs
@@ -6,14 +6,14 @@ use std::collections::VecDeque;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::sync::Arc;
-
use std::time::Instant;
+
use std::time::{Instant, SystemTime};
use std::{io, net};

use crossbeam_channel as chan;
use cyphernet::addr::{Addr as _, HostAddr, PeerAddr};
use nakamoto_net::{Link, LocalTime};
use netservices::noise::NoiseXk;
-
use netservices::wire::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
+
use netservices::resources::{ListenerEvent, NetAccept, NetResource, SessionEvent};
use netservices::NetSession;

use radicle::collections::HashMap;
@@ -29,7 +29,7 @@ use crate::worker::{WorkerReq, WorkerResp};
use crate::{address, service};

/// Reactor action.
-
type Action<G> = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>>>;
+
type Action<G> = reactor::Action<NetAccept<NoiseXk<G>>, NetResource<NoiseXk<G>>>;

/// Peer connection state machine.
#[derive(Debug)]
@@ -218,7 +218,7 @@ where
        self.actions.push_back(Action::UnregisterTransport(fd));
    }

-
    fn upgraded(&mut self, session: NetTransport<NoiseXk<G>>) {
+
    fn upgraded(&mut self, session: NetResource<NoiseXk<G>>) {
        let fd = session.as_raw_fd();
        let Some(peer) = self.peers.get_mut(&fd) else {
            log::error!(target: "transport", "Peer with fd {fd} was not found");
@@ -267,13 +267,12 @@ where
    G: Signer + Negotiator + Send,
{
    type Listener = NetAccept<NoiseXk<G>>;
-
    type Transport = NetTransport<NoiseXk<G>>;
+
    type Transport = NetResource<NoiseXk<G>>;
    type Command = service::Command;

-
    fn tick(&mut self, time: Instant) {
-
        // FIXME: Ensure that the time correctly converted.
-
        self.service
-
            .tick(LocalTime::from_secs(time.elapsed().as_secs()));
+
    fn tick(&mut self, _time: Instant) {
+
        // FIXME: Change this once a proper timestamp is passed into the function.
+
        self.service.tick(LocalTime::from(SystemTime::now()));

        let mut completed = Vec::new();
        for peer in self.peers.values() {
@@ -303,12 +302,12 @@ where
                log::debug!(
                    target: "transport",
                    "Accepted inbound peer connection from {}..",
-
                    session.transition_addr()
+
                    session.transient_addr()
                );
                self.peers
                    .insert(session.as_raw_fd(), Peer::connecting(Link::Inbound));

-
                let transport = match NetTransport::<NoiseXk<G>>::accept(session) {
+
                let transport = match NetResource::<NoiseXk<G>>::new(session) {
                    Ok(transport) => transport,
                    Err(err) => {
                        log::error!(target: "transport", "Failed to upgrade accepted peer socket: {err}");
@@ -365,17 +364,12 @@ where
                self.service.connected(node_id, link);
            }
            SessionEvent::Data(data) => {
-
                if let Some(Peer::Connected { link, id }) = self.peers.get(&fd) {
+
                if let Some(Peer::Connected { id, .. }) = self.peers.get(&fd) {
                    self.read_queue.extend(data);

                    loop {
                        match Message::decode(&mut self.read_queue) {
-
                            Ok(msg) => {
-
                                log::debug!(
-
                                    target: "transport", "Received message {:?} from {} ({:?})", msg, id, link
-
                                );
-
                                self.service.received_message(*id, msg)
-
                            }
+
                            Ok(msg) => self.service.received_message(*id, msg),
                            Err(err) if err.is_eof() => {
                                // Buffer is empty, or message isn't complete.
                                break;
@@ -405,22 +399,47 @@ where
        self.service.command(cmd);
    }

-
    fn handle_error(&mut self, err: reactor::Error<net::SocketAddr, RawFd>) {
-
        match err {
+
    fn handle_error(
+
        &mut self,
+
        err: reactor::Error<NetAccept<NoiseXk<G>>, NetResource<NoiseXk<G>>>,
+
    ) {
+
        match &err {
            reactor::Error::ListenerUnknown(id) => {
+
                // TODO: What are we supposed to do here? Remove this error.
                log::error!(target: "transport", "Received error: unknown listener {}", id);
            }
-
            reactor::Error::PeerUnknown(id) => {
+
            reactor::Error::TransportUnknown(id) => {
+
                // TODO: What are we supposed to do here? Remove this error.
                log::error!(target: "transport", "Received error: unknown peer {}", id);
            }
-
            reactor::Error::PeerDisconnected(fd, err) => {
-
                log::error!(target: "transport", "Received error: peer {} disconnected: {}", fd, err);
-

-
                self.actions.push_back(Action::UnregisterTransport(fd));
-
            }
            reactor::Error::Poll(err) => {
+
                // TODO: This should be a fatal error, there's nothing we can do here.
                log::error!(target: "transport", "Can't poll connections: {}", err);
            }
+
            reactor::Error::ListenerPollError(id, err) => {
+
                // TODO: This should be a fatal error, there's nothing we can do here.
+
                log::error!(target: "transport", "Received error: listener {} disconnected: {}", id, err);
+
                self.actions.push_back(Action::UnregisterListener(*id));
+
            }
+
            reactor::Error::ListenerDisconnect(id, _, err) => {
+
                // TODO: This should be a fatal error, there's nothing we can do here.
+
                log::error!(target: "transport", "Received error: listener {} disconnected: {}", id, err);
+
            }
+
            reactor::Error::TransportPollError(id, err) => {
+
                log::error!(target: "transport", "Received error: peer {} disconnected: {}", id, err);
+
                self.actions.push_back(Action::UnregisterTransport(*id));
+
            }
+
            reactor::Error::TransportDisconnect(id, _, err) => {
+
                log::error!(target: "transport", "Received error: peer {} disconnected: {}", id, err);
+
            }
+
            reactor::Error::WriteFailure(id, err) => {
+
                // TODO: Disconnect peer?
+
                log::error!(target: "transport", "Error during writing to peer {id}: {err}")
+
            }
+
            reactor::Error::WriteLogicError(id, _) => {
+
                // TODO: We shouldn't be receiving this error. There's nothing we can do.
+
                log::error!(target: "transport", "Write logic error for peer {id}: {err}")
+
            }
        }
    }

@@ -458,7 +477,7 @@ where
    W: WriteStorage + 'static,
    G: Signer + Negotiator,
{
-
    type Item = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>>>;
+
    type Item = reactor::Action<NetAccept<NoiseXk<G>>, NetResource<NoiseXk<G>>>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(event) = self.actions.pop_front() {
@@ -498,9 +517,11 @@ where
                        break;
                    }

-
                    match NetTransport::<NoiseXk<G>>::connect(
+
                    match NetResource::<NoiseXk<G>>::connect(
                        PeerAddr::new(node_id, socket_addr),
                        &self.keypair,
+
                        // TODO: Improve API to not require a boolean.
+
                        true,
                    ) {
                        Ok(transport) => {
                            self.service.attempted(node_id, &socket_addr.into());
modified radicle-node/src/worker.rs
@@ -1,20 +1,29 @@
-
use crossbeam_channel as chan;
-
use netservices::noise::NoiseXk;
-
use netservices::wire::NetTransport;
+
use core::time;
+
use std::io::prelude::*;
+
use std::process;
use std::thread;
use std::thread::JoinHandle;
+
use std::{io, net};
+

+
use crossbeam_channel as chan;
+
use netservices::noise::NoiseXk;
+
use netservices::resources::{NetReader, NetResource, NetWriter, SplitIo};
+
use netservices::tunnel::Tunnel;

use radicle::crypto::Negotiator;
-
use radicle::storage::WriteStorage;
+
use radicle::storage::{ReadRepository, RefUpdate, WriteStorage};
use radicle::Storage;
+
use reactor::poller::popol;

use crate::service::reactor::Fetch;
-
use crate::service::FetchResult;
+
use crate::service::{FetchError, FetchResult};
+

+
type Session<G> = NetResource<NoiseXk<G>>;

/// Worker request.
pub struct WorkerReq<G: Negotiator> {
    pub fetch: Fetch,
-
    pub session: NetTransport<NoiseXk<G>>,
+
    pub session: NetResource<NoiseXk<G>>,
    pub drain: Vec<u8>,
    pub channel: chan::Sender<WorkerResp<G>>,
}
@@ -22,69 +31,192 @@ pub struct WorkerReq<G: Negotiator> {
/// Worker response.
pub struct WorkerResp<G: Negotiator> {
    pub result: FetchResult,
-
    pub session: NetTransport<NoiseXk<G>>,
+
    pub session: Session<G>,
}

-
pub struct Worker<G: Negotiator> {
+
/// A worker that replicates git objects.
+
struct Worker<G: Negotiator> {
    storage: Storage,
    tasks: chan::Receiver<WorkerReq<G>>,
+
    timeout: time::Duration,
}

-
impl<G: Negotiator> Worker<G> {
-
    pub fn run(self) -> Result<(), chan::RecvError> {
+
impl<G: Negotiator + 'static> Worker<G> {
+
    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
+
    /// the next task.
+
    fn run(self) -> Result<(), chan::RecvError> {
        loop {
            let task = self.tasks.recv()?;
            self.process(task);
        }
    }

-
    pub fn process(&self, task: WorkerReq<G>) {
+
    fn process(&self, task: WorkerReq<G>) {
        let WorkerReq {
            fetch,
            session,
-
            // TODO: Implement logic.
-
            drain: _drain,
+
            drain,
            channel,
        } = task;
-
        let result = match self.storage.repository(fetch.repo) {
-
            Ok(_) => todo!(),
-
            Err(err) => FetchResult::Error {
+

+
        let (session, result) = self._process(&fetch, drain, session);
+
        let result = match result {
+
            Ok(updated) => FetchResult::Fetched {
                from: fetch.remote,
-
                error: err.into(),
+
                updated,
+
            },
+
            Err(error) => FetchResult::Error {
+
                from: fetch.remote,
+
                error,
            },
        };
        if channel.send(WorkerResp { result, session }).is_err() {
            log::error!("Unable to report fetch result: worker channel disconnected");
        }
    }
+

+
    fn _process(
+
        &self,
+
        fetch: &Fetch,
+
        drain: Vec<u8>,
+
        session: Session<G>,
+
    ) -> (Session<G>, Result<Vec<RefUpdate>, FetchError>) {
+
        if fetch.initiated {
+
            let mut tunnel = match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
+
                Ok(tunnel) => tunnel,
+
                Err((session, err)) => return (session, Err(err.into())),
+
            };
+
            let result = self.fetch(fetch, &mut tunnel);
+
            let session = tunnel.into_session();
+

+
            (session, result)
+
        } else {
+
            let (mut stream_r, mut stream_w) = match session.split_io() {
+
                Ok((r, w)) => (r, w),
+
                Err(err) => {
+
                    return (err.original, Err(err.error.into()));
+
                }
+
            };
+
            let result = self.upload_pack(fetch, drain, &mut stream_r, &mut stream_w);
+
            let session = NetResource::from_split_io(stream_r, stream_w);
+

+
            (session, result)
+
        }
+
    }
+

+
    fn fetch(
+
        &self,
+
        fetch: &Fetch,
+
        tunnel: &mut Tunnel<Session<G>>,
+
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
        let tunnel_addr = tunnel.local_addr()?;
+
        let repo = self.storage.repository(fetch.repo)?;
+
        let child = process::Command::new("git")
+
            .current_dir(repo.path())
+
            .arg("fetch")
+
            .arg("--atomic") // The path to the git repo must be exact.
+
            .arg(format!("git://{tunnel_addr}"))
+
            .arg(fetch.namespaces.as_fetchspec())
+
            .arg(".")
+
            .stdout(process::Stdio::piped())
+
            .stdin(process::Stdio::piped())
+
            .spawn()?;
+

+
        let _ = tunnel.tunnel_once(popol::Poller::new(), self.timeout)?;
+
        let output = child.wait_with_output()?;
+

+
        // TODO: Parse fetch output to return updates.
+
        log::debug!(target: "worker", "Fetch output for {}: {:?}", fetch.repo, output);
+

+
        Ok(vec![])
+
    }
+

+
    fn upload_pack(
+
        &self,
+
        fetch: &Fetch,
+
        drain: Vec<u8>,
+
        stream_r: &mut NetReader<NoiseXk<G>>,
+
        stream_w: &mut NetWriter<NoiseXk<G>>,
+
    ) -> Result<Vec<RefUpdate>, FetchError> {
+
        let repo = self.storage.repository(fetch.repo)?;
+
        let mut child = process::Command::new("git")
+
            .current_dir(repo.path())
+
            .arg("upload-pack")
+
            .arg("--strict") // The path to the git repo must be exact.
+
            .arg(".")
+
            .stdout(process::Stdio::piped())
+
            .stdin(process::Stdio::piped())
+
            .spawn()?;
+

+
        let mut stdin = child.stdin.take().unwrap();
+
        let mut stdout = child.stdout.take().unwrap();
+

+
        thread::scope(|scope| {
+
            let t = scope.spawn(move || {
+
                let mut buf = [0u8; 65535];
+

+
                // First drain the buffer of incoming data that was waiting.
+
                if stdin.write_all(&drain[..]).is_err() {
+
                    return;
+
                }
+
                // Then process any new data coming into the socket, and write it
+
                // to the standard input of the `upload-pack` process.
+
                while let Ok(n) = stream_r.read(&mut buf) {
+
                    if n == 0 {
+
                        break;
+
                    }
+
                    if stdin.write_all(&buf[..n]).is_err() {
+
                        break;
+
                    }
+
                }
+
            });
+
            // Output of `upload-pack` is sent back to the remote peer.
+
            io::copy(&mut stdout, stream_w)?;
+
            // SAFETY: The thread does not panic, unless the implementations of read/write
+
            // internally panic.
+
            t.join().unwrap();
+

+
            Ok::<_, FetchError>(())
+
        })?;
+
        child.wait()?;
+

+
        Ok(vec![])
+
    }
}

+
/// A pool of workers. One thread is allocated for each worker.
pub struct WorkerPool {
    pool: Vec<JoinHandle<Result<(), chan::RecvError>>>,
}

impl WorkerPool {
+
    /// Create a new worker pool with the given parameters.
    pub fn with<G: Negotiator + 'static>(
        capacity: usize,
+
        timeout: time::Duration,
        storage: Storage,
        tasks: chan::Receiver<WorkerReq<G>>,
    ) -> Self {
        let mut pool = Vec::with_capacity(capacity);
        for _ in 0..capacity {
-
            let runtime = Worker {
+
            let worker = Worker {
                tasks: tasks.clone(),
                storage: storage.clone(),
+
                timeout,
            };
-
            let thread = thread::spawn(|| runtime.run());
+
            let thread = thread::spawn(|| worker.run());
+

            pool.push(thread);
        }
        Self { pool }
    }

-
    pub fn join(self) -> thread::Result<()> {
+
    /// Run the worker pool.
+
    ///
+
    /// Blocks until all worker threads have exited.
+
    pub fn run(self) -> thread::Result<()> {
        for worker in self.pool {
-
            let result = worker.join()?;
-
            if let Err(err) = result {
+
            if let Err(err) = worker.join()? {
                log::error!(target: "pool", "Worker failed: {err}");
            }
        }
modified radicle/Cargo.toml
@@ -11,6 +11,7 @@ test = ["qcheck", "radicle-crypto/test"]
sql = ["sqlite"]

[dependencies]
+
amplify = { version = "4.0.0-beta.1", default-features = false, features = ["std"] }
base64 = { version= "0.13" }
byteorder = { version = "1.4" }
crossbeam-channel = { version = "0.5.6" }
modified radicle/src/lib.rs
@@ -5,6 +5,9 @@

pub extern crate radicle_crypto as crypto;

+
#[macro_use]
+
extern crate amplify;
+

pub mod cob;
pub mod collections;
pub mod git;
modified radicle/src/node.rs
@@ -1,9 +1,11 @@
mod features;

-
use std::io;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
+
use std::{io, net, ops};
+

+
use cyphernet::addr::{HostAddr, NetAddr};

use crate::crypto::PublicKey;
use crate::identity::Id;
@@ -13,11 +15,40 @@ pub use features::Features;

/// Default name for control socket file.
pub const DEFAULT_SOCKET_NAME: &str = "radicle.sock";
+
/// Default radicle protocol port.
+
pub const DEFAULT_PORT: u16 = 8776;
/// Response on node socket indicating that a command was carried out successfully.
pub const RESPONSE_OK: &str = "ok";
/// Response on node socket indicating that a command had no effect.
pub const RESPONSE_NOOP: &str = "noop";

+
/// Peer public protocol address.
+
#[derive(Wrapper, Clone, Eq, PartialEq, Debug, From)]
+
#[wrapper(Display, FromStr)]
+
pub struct Address(NetAddr<DEFAULT_PORT>);
+

+
impl cyphernet::addr::Addr for Address {
+
    fn port(&self) -> u16 {
+
        self.0.port()
+
    }
+
}
+

+
impl ops::Deref for Address {
+
    type Target = NetAddr<DEFAULT_PORT>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.0
+
    }
+
}
+

+
impl From<net::SocketAddr> for Address {
+
    fn from(addr: net::SocketAddr) -> Self {
+
        Address(NetAddr {
+
            host: HostAddr::Ip(addr.ip()),
+
            port: Some(addr.port()),
+
        })
+
    }
+
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("failed to connect to node: {0}")]
@@ -37,6 +68,8 @@ pub trait Handle {
    /// The error returned by all methods.
    type Error: std::error::Error;

+
    /// Connect to a peer.
+
    fn connect(&mut self, node: NodeId, addr: Address) -> Result<(), Self::Error>;
    /// Retrieve or update the project from network.
    fn fetch(&mut self, id: Id) -> Result<Self::FetchLookup, Self::Error>;
    /// Start tracking the given project. Doesn't do anything if the project is already
@@ -99,6 +132,10 @@ impl Handle for Node {
    type FetchLookup = ();
    type Error = Error;

+
    fn connect(&mut self, _node: NodeId, _addr: Address) -> Result<(), Error> {
+
        todo!()
+
    }
+

    fn fetch(&mut self, id: Id) -> Result<(), Error> {
        for line in self.call("fetch", &[id])? {
            let line = line?;
modified radicle/src/profile.rs
@@ -10,8 +10,8 @@
//!     node/
//!       radicle.sock                           # Node control socket
//!
-
use std::io;
use std::path::{Path, PathBuf};
+
use std::{fs, io};

use thiserror::Error;

@@ -68,10 +68,15 @@ pub struct Profile {
impl Profile {
    pub fn init(home: impl AsRef<Path>, passphrase: impl Into<Passphrase>) -> Result<Self, Error> {
        let home = home.as_ref().to_path_buf();
-
        let storage = Storage::open(home.join("storage"))?;
-
        let keystore = Keystore::new(&home.join("keys"));
+
        let paths = Paths {
+
            home: home.as_path(),
+
        };
+
        let storage = Storage::open(paths.storage())?;
+
        let keystore = Keystore::new(&paths.keys());
        let public_key = keystore.init("radicle", passphrase)?;

+
        fs::create_dir_all(paths.node()).ok();
+

        transport::local::register(storage.clone());

        Ok(Profile {
modified radicle/src/sql.rs
@@ -6,6 +6,7 @@ use sqlite::Value;

use crate::identity::Id;
use crate::node;
+
use crate::node::Address;

impl TryFrom<&Value> for Id {
    type Error = sql::Error;
@@ -49,3 +50,26 @@ impl TryFrom<&Value> for node::Features {
        }
    }
}
+

+
impl TryFrom<&sql::Value> for Address {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::String(s) => Address::from_str(s.as_str()).map_err(|e| sql::Error {
+
                code: None,
+
                message: Some(e.to_string()),
+
            }),
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for address".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for Address {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        self.to_string().bind(stmt, i)
+
    }
+
}
modified radicle/src/storage.rs
@@ -36,6 +36,15 @@ pub enum Namespaces {
    One(PublicKey),
}

+
impl Namespaces {
+
    pub fn as_fetchspec(&self) -> String {
+
        match self {
+
            Self::All => String::from("refs/namespaces/*:refs/namespaces/*"),
+
            Self::One(pk) => format!("refs/namespaces/{pk}/refs/*:refs/namespaces/{pk}/refs/*"),
+
        }
+
    }
+
}
+

impl From<PublicKey> for Namespaces {
    fn from(pk: PublicKey) -> Self {
        Self::One(pk)
modified radicle/src/storage/git.rs
@@ -680,6 +680,8 @@ impl WriteRepository for Repository {
pub mod trailers {
    use std::str::FromStr;

+
    use thiserror::Error;
+

    use super::*;
    use crypto::{PublicKey, PublicKeyError};
    use crypto::{Signature, SignatureError};
modified radicle/src/storage/git/transport/local.rs
@@ -7,8 +7,6 @@ use std::process;
use std::str::FromStr;
use std::sync::Once;

-
use once_cell::sync::OnceCell;
-

use crate::storage;
use crate::storage::git::Storage;

@@ -17,7 +15,7 @@ use super::ChildStream;
thread_local! {
    /// Stores a storage instance per thread.
    /// This avoids race conditions when used in a multi-threaded context.
-
    static THREAD_STORAGE: OnceCell<Storage> = OnceCell::default();
+
    static THREAD_STORAGE: RefCell<Option<Storage>> = RefCell::default();
}

/// Local git transport over the filesystem.
@@ -44,7 +42,8 @@ impl git2::transport::SmartSubtransport for Local {
        };
        let git_dir = THREAD_STORAGE
            .with(|t| {
-
                t.get()
+
                t.borrow()
+
                    .as_ref()
                    .map(|s| storage::git::paths::repository(&s, &url.repo))
            })
            .ok_or_else(|| git2::Error::from_str("local transport storage was not registered"))?;
@@ -100,7 +99,7 @@ pub fn register(storage: Storage) {
    static REGISTER: Once = Once::new();

    THREAD_STORAGE.with(|s| {
-
        s.set(storage).ok();
+
        *s.borrow_mut() = Some(storage);
    });

    REGISTER.call_once(|| unsafe {
modified radicle/src/test/arbitrary.rs
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, HashSet};
use std::hash::Hash;
-
use std::iter;
use std::ops::RangeBounds;
+
use std::{iter, net};

use crypto::test::signer::MockSigner;
use crypto::{PublicKey, Signer, Unverified, Verified};
@@ -15,6 +15,7 @@ use crate::identity::{
    project::Project,
    Did,
};
+
use crate::node::Address;
use crate::storage;
use crate::storage::refs::{Refs, SignedRefs};
use crate::test::storage::MockStorage;
@@ -187,3 +188,18 @@ impl Arbitrary for Id {
        Id::from(oid)
    }
}
+

+
impl Arbitrary for Address {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        let ip = if bool::arbitrary(g) {
+
            net::IpAddr::V4(net::Ipv4Addr::from(u32::arbitrary(g)))
+
        } else {
+
            let octets: [u8; 16] = Arbitrary::arbitrary(g);
+
            net::IpAddr::V6(net::Ipv6Addr::from(octets))
+
        };
+
        Address::from(cyphernet::addr::NetAddr {
+
            host: cyphernet::addr::HostAddr::Ip(ip),
+
            port: Some(u16::arbitrary(g)),
+
        })
+
    }
+
}