Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Integrate new Noise_XK transport
Dr Maxim Orlovsky committed 3 years ago
commit aa02c52a2f8df67e8f069319bc56d9080eff0711
parent 909b99bb7cab6bc56ba3e120cc1630e1b40295aa
27 files changed +1009 -1224
modified Cargo.lock
@@ -218,6 +218,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"

[[package]]
+
name = "base32"
+
version = "0.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "23ce669cd6c8588f79e15cf450314f9638f967fc5770ff1c7c1deb0925ea7cfa"
+

+
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -276,6 +282,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
+
 "block-padding",
 "generic-array",
]

@@ -289,6 +296,12 @@ dependencies = [
]

[[package]]
+
name = "block-padding"
+
version = "0.2.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
+

+
[[package]]
name = "bloomy"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -433,6 +446,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cec318a675afcb6a1ea1d4340e2d377e56e47c266f28043ceccbf4412ddfdd3b"

[[package]]
+
name = "convert_case"
+
version = "0.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
+

+
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -504,6 +523,16 @@ dependencies = [
]

[[package]]
+
name = "crypto-mac"
+
version = "0.11.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714"
+
dependencies = [
+
 "generic-array",
+
 "subtle",
+
]
+

+
[[package]]
name = "ct-codecs"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -588,11 +617,14 @@ dependencies = [
[[package]]
name = "cyphernet"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-cyphernet#762f48e1175f9490c635996c0c1bd07ff1287cfc"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet#5568667b76089052a4dfa3dec00d5032b52f505f"
dependencies = [
 "amplify",
+
 "ed25519-compact",
 "multibase",
+
 "serde",
 "socks",
+
 "torut",
]

[[package]]
@@ -638,8 +670,10 @@ version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
+
 "convert_case",
 "proc-macro2",
 "quote",
+
 "rustc_version",
 "syn",
]

@@ -709,6 +743,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a3d382e8464107391c8706b4c14b087808ecb909f6c15c34114bc42e53a9e4c"
dependencies = [
 "ct-codecs",
+
 "ed25519",
 "getrandom 0.2.8",
]

@@ -790,7 +825,7 @@ dependencies = [
 "regex",
 "serde",
 "serde_json",
-
 "sha3",
+
 "sha3 0.10.6",
 "thiserror",
 "uint",
]
@@ -1194,6 +1229,16 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"

[[package]]
name = "hmac"
+
version = "0.11.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
+
dependencies = [
+
 "crypto-mac",
+
 "digest 0.9.0",
+
]
+

+
[[package]]
+
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
@@ -1454,7 +1499,7 @@ dependencies = [
 "ecdsa",
 "elliptic-curve",
 "sha2 0.10.6",
-
 "sha3",
+
 "sha3 0.10.6",
]

[[package]]
@@ -1637,15 +1682,14 @@ dependencies = [
]

[[package]]
-
name = "nakamoto-net-poll"
-
version = "0.3.0"
-
source = "git+https://github.com/cloudhead/nakamoto?rev=90cc3eac67aa5cfd5f42cf7cb1e2b155af3214fb#90cc3eac67aa5cfd5f42cf7cb1e2b155af3214fb"
+
name = "netservices"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#045ab140fd33ab3008a0ce8c86d8b9d01b08662c"
dependencies = [
-
 "crossbeam-channel",
+
 "amplify",
+
 "cyphernet",
 "libc",
-
 "log",
-
 "nakamoto-net",
-
 "popol",
+
 "poll-reactor",
 "socket2",
]

@@ -1973,10 +2017,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"

[[package]]
+
name = "poll-reactor"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#045ab140fd33ab3008a0ce8c86d8b9d01b08662c"
+
dependencies = [
+
 "amplify",
+
 "crossbeam-channel",
+
 "libc",
+
 "popol",
+
 "socket2",
+
]
+

+
[[package]]
name = "popol"
-
version = "0.5.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "2dac9010e6fc2d991bbf3c362433214c28f7f1a82151b6b1de8dd69d27211495"
+
version = "1.0.0"
+
source = "git+https://github.com/Internet2-WG/popol?branch=api#02b8d4089bd234f3d9d46ef27f6e64cfbc45118a"
dependencies = [
 "libc",
]
@@ -2251,19 +2306,22 @@ dependencies = [
name = "radicle-node"
version = "0.2.0"
dependencies = [
+
 "amplify",
 "anyhow",
 "bloomy",
 "byteorder",
 "chrono",
 "colored",
 "crossbeam-channel",
+
 "cyphernet",
 "fastrand",
 "git-ref-format 0.1.0 (git+https://github.com/radicle-dev/radicle-git?rev=39ce2f934915a563f9420ac9c85480df8a591481)",
 "lexopt",
 "log",
 "nakamoto-net",
-
 "nakamoto-net-poll",
+
 "netservices",
 "nonempty 0.8.1",
+
 "poll-reactor",
 "qcheck",
 "qcheck-macros",
 "radicle",
@@ -2464,7 +2522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb"
dependencies = [
 "crypto-bigint",
-
 "hmac",
+
 "hmac 0.12.1",
 "zeroize",
]

@@ -2517,6 +2575,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6"

[[package]]
+
name = "rustc_version"
+
version = "0.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+
dependencies = [
+
 "semver",
+
]
+

+
[[package]]
name = "rustix"
version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2587,7 +2654,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f9e24d2b632954ded8ab2ef9fea0a0c769ea56ea98bddbafbad22caeeadf45d"
dependencies = [
-
 "hmac",
+
 "hmac 0.12.1",
 "pbkdf2",
 "salsa20",
 "sha2 0.10.6",
@@ -2608,6 +2675,12 @@ dependencies = [
]

[[package]]
+
name = "semver"
+
version = "1.0.14"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
+

+
[[package]]
name = "serde"
version = "1.0.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2700,6 +2773,18 @@ dependencies = [

[[package]]
name = "sha3"
+
version = "0.9.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f81199417d4e5de3f04b1e871023acea7389672c4135918f05aa9cbf2f2fa809"
+
dependencies = [
+
 "block-buffer 0.9.0",
+
 "digest 0.9.0",
+
 "keccak",
+
 "opaque-debug",
+
]
+

+
[[package]]
+
name = "sha3"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdf0c33fae925bdc080598b84bc15c55e7b9a4a43b3c704da051f977469691c9"
@@ -2756,7 +2841,7 @@ dependencies = [
 "iri-string",
 "k256",
 "rand 0.8.5",
-
 "sha3",
+
 "sha3 0.10.6",
 "thiserror",
 "time 0.3.17",
]
@@ -3157,6 +3242,26 @@ dependencies = [
]

[[package]]
+
name = "torut"
+
version = "0.2.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "99febc413f26cf855b3a309c5872edff5c31e0ffe9c2fce5681868761df36f69"
+
dependencies = [
+
 "base32",
+
 "base64",
+
 "derive_more",
+
 "ed25519-dalek",
+
 "hex",
+
 "hmac 0.11.0",
+
 "rand 0.7.3",
+
 "serde",
+
 "serde_derive",
+
 "sha2 0.9.9",
+
 "sha3 0.9.1",
+
 "tokio",
+
]
+

+
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -27,15 +27,18 @@ git = "https://github.com/cloudhead/nakamoto"
rev = "90cc3eac67aa5cfd5f42cf7cb1e2b155af3214fb"
version = "0.3.0"

-
[patch.crates-io.nakamoto-net-poll]
-
git = "https://github.com/cloudhead/nakamoto"
-
rev = "90cc3eac67aa5cfd5f42cf7cb1e2b155af3214fb"
-
version = "0.3.0"
-

[patch.crates-io.cyphernet]
git = "https://github.com/cyphernet-wg/rust-cyphernet"
version = "0.1.0"

+
[patch.crates-io.poll-reactor]
+
git = "https://github.com/cyphernet-wg/rust-netservices"
+
version = "0.1.0"
+

+
[patch.crates-io.netservices]
+
git = "https://github.com/cyphernet-wg/rust-netservices"
+
version = "0.1.0"
+

[patch.crates-io.radicle-git-ext]
git = "https://github.com/radicle-dev/radicle-git"
rev = "d3115a22158c8395705babefdc89049f7510d32d"
deleted radicle-crypto/src/cyphernet.rs
@@ -1,43 +0,0 @@
-
use amplify::{From, Wrapper};
-
use cyphernet::crypto::{EcPk, EcSig, EcSk, Ecdh};
-
use ed25519_compact::x25519;
-

-
use crate::{PublicKey, SecretKey, Signature};
-

-
#[derive(Wrapper, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, From)]
-
#[wrapper(Deref)]
-
pub struct SharedSecret([u8; 32]);
-

-
impl EcPk for PublicKey {}
-

-
impl EcSk for SecretKey {
-
    type Pk = PublicKey;
-

-
    fn to_pk(&self) -> Self::Pk {
-
        self.public_key().into()
-
    }
-
}
-

-
impl Ecdh for SharedSecret {
-
    type Sk = SecretKey;
-
    type Err = ed25519_compact::Error;
-

-
    fn ecdh(sk: &Self::Sk, pk: &<Self::Sk as EcSk>::Pk) -> Result<Self, Self::Err> {
-
        let xpk = x25519::PublicKey::from_ed25519(&pk.0)?;
-
        let xsk = x25519::SecretKey::from_ed25519(&sk.0)?;
-
        let ss = xpk.dh(&xsk)?;
-
        Ok(Self(*ss))
-
    }
-
}
-

-
impl EcSig for Signature {
-
    type Sk = SecretKey;
-

-
    fn sign(self, sk: &SecretKey, msg: impl AsRef<[u8]>) -> Self {
-
        sk.0.sign(msg, None).into()
-
    }
-

-
    fn verify(self, pk: &PublicKey, msg: impl AsRef<[u8]>) -> bool {
-
        pk.0.verify(msg, &self.0).is_ok()
-
    }
-
}
modified radicle-crypto/src/lib.rs
@@ -7,15 +7,11 @@ use thiserror::Error;

pub use ed25519::{Error, KeyPair, Seed};

-
#[cfg(any(test, feature = "test"))]
-
pub mod test;
-

pub mod hash;
#[cfg(feature = "ssh")]
pub mod ssh;
-

-
#[cfg(feature = "cyphernet")]
-
mod cyphernet;
+
#[cfg(any(test, feature = "test"))]
+
pub mod test;

/// Verified (used as type witness).
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize)]
@@ -27,6 +23,19 @@ pub struct Unverified;
/// Output of a Diffie-Hellman key exchange.
pub type SharedSecret = [u8; 32];

+
/// Trait alias used for Diffie–Hellman key exchange.
+
#[cfg(feature = "cyphernet")]
+
pub trait Negotiator:
+
    cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone
+
{
+
}
+

+
#[cfg(feature = "cyphernet")]
+
impl<T> Negotiator for T where
+
    T: cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone
+
{
+
}
+

/// Error returned if signing fails, eg. due to an HSM or KMS.
#[derive(Debug, Error)]
#[error(transparent)]
@@ -70,13 +79,6 @@ where
    }
}

-
/// A signer that can perform Elliptic-curve Diffie–Hellman.
-
pub trait Ecdh: Signer {
-
    /// Perform an ECDH key exchange. Takes the counter-party's public key,
-
    /// and returns a computed shared secret.
-
    fn ecdh(&self, other: &PublicKey) -> Result<SharedSecret, Error>;
-
}
-

/// Cryptographic signature.
#[derive(PartialEq, Eq, Hash, Copy, Clone, Serialize, Deserialize)]
#[serde(into = "String", try_from = "String")]
@@ -167,6 +169,9 @@ impl PublicKey {
    }
}

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::crypto::EcPk for PublicKey {}
+

/// The private/signing key.
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct SecretKey(ed25519::SecretKey);
modified radicle-crypto/src/ssh/keystore.rs
@@ -5,7 +5,7 @@ use std::{fs, io};
use thiserror::Error;
use zeroize::Zeroizing;

-
use crate::{keypair, Ecdh, PublicKey, SecretKey, SharedSecret, Signature, Signer, SignerError};
+
use crate::{keypair, PublicKey, SecretKey, SharedSecret, Signature, Signer, SignerError};

/// A secret key passphrase.
pub type Passphrase = Zeroizing<String>;
@@ -119,7 +119,7 @@ pub enum MemorySignerError {
/// so that signing never fails.
///
/// Can be created from a [`Keystore`] with the [`MemorySigner::load`] function.
-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
pub struct MemorySigner {
    public: PublicKey,
    secret: Zeroizing<SecretKey>,
@@ -139,7 +139,12 @@ impl Signer for MemorySigner {
    }
}

-
impl Ecdh for MemorySigner {
+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::crypto::Ecdh for MemorySigner {
+
    type Pk = PublicKey;
+
    type Secret = SharedSecret;
+
    type Err = ed25519_compact::Error;
+

    fn ecdh(&self, other: &PublicKey) -> Result<SharedSecret, ed25519_compact::Error> {
        let pk = ed25519_compact::x25519::PublicKey::from_ed25519(other)?;
        let sk = ed25519_compact::x25519::SecretKey::from_ed25519(&self.secret)?;
modified radicle-node/Cargo.toml
@@ -9,19 +9,22 @@ edition = "2021"
test = ["radicle/test", "radicle-crypto/test", "qcheck"]

[dependencies]
+
amplify = { version = "4.0.0-beta.1", default-features = false, features = ["std"] }
anyhow = { version = "1" }
bloomy = { version = "1.2" }
byteorder = { version = "1" }
chrono = { version = "0.4.0" }
colored = { version = "1.9.0" }
crossbeam-channel = { version = "0.5.6" }
+
cyphernet = { version = "0", features = ["serde", "tor", "dns", "ed25519"] }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
lexopt = { version = "0.2.1" }
log = { version = "0.4.17", features = ["std"] }
nakamoto-net = { version = "0.3.0" }
-
nakamoto-net-poll = { version = "0.3.0" }
+
netservices = { version = "0", features = ["poll-reactor", "socket2"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
+
poll-reactor = { version = "0", features = ["popol", "socket2"] }
qcheck = { version = "1", default-features = false, optional = true }
sqlite = { version = "0.30.3" }
sqlite3-src = { version = "0.4.0", features = ["bundled"] } # Ensures static linking
modified radicle-node/src/address/store.rs
@@ -12,7 +12,7 @@ use crate::clock::Timestamp;
use crate::prelude::Address;
use crate::service::NodeId;
use crate::sql::transaction;
-
use crate::wire::message::AddressType;
+
use crate::wire::AddressType;

#[derive(Error, Debug)]
pub enum Error {
modified radicle-node/src/client.rs
@@ -1,16 +1,9 @@
-
use std::{io, net};
+
use std::io;

-
use crossbeam_channel as chan;
-
use nakamoto_net::{LocalTime, Reactor};
use thiserror::Error;

-
use radicle::crypto::Signer;
-

-
use crate::profile::Profile;
+
use crate::address;
use crate::service::{routing, tracking};
-
use crate::wire::transcode::NoHandshake;
-
use crate::wire::Wire;
-
use crate::{address, service};

pub mod handle;

@@ -42,128 +35,3 @@ pub enum Error {
    #[error("network error: {0}")]
    Net(#[from] nakamoto_net::error::Error),
}
-

-
/// Client configuration.
-
#[derive(Debug, Clone)]
-
pub struct Config {
-
    /// Client service configuration.
-
    pub service: service::Config,
-
    /// Client listen addresses.
-
    pub listen: Vec<net::SocketAddr>,
-
}
-

-
impl Config {
-
    /// Create a new configuration for the given network.
-
    pub fn new(network: service::Network) -> Self {
-
        Self {
-
            service: service::Config {
-
                network,
-
                ..service::Config::default()
-
            },
-
            ..Self::default()
-
        }
-
    }
-
}
-

-
impl Default for Config {
-
    fn default() -> Self {
-
        Self {
-
            service: service::Config::default(),
-
            listen: vec![([0, 0, 0, 0], 0).into()],
-
        }
-
    }
-
}
-

-
pub struct Client<R: Reactor> {
-
    reactor: R,
-

-
    handle: chan::Sender<service::Command>,
-
    commands: chan::Receiver<service::Command>,
-
    shutdown: chan::Sender<()>,
-
    listening: chan::Receiver<net::SocketAddr>,
-
    events: Events,
-
}
-

-
impl<R: Reactor> Client<R> {
-
    pub fn new() -> Result<Self, Error> {
-
        let (handle, commands) = chan::unbounded::<service::Command>();
-
        let (shutdown, shutdown_recv) = chan::bounded(1);
-
        let (listening_send, listening) = chan::bounded(1);
-
        let reactor = R::new(shutdown_recv, listening_send)?;
-
        let events = Events {};
-

-
        Ok(Self {
-
            reactor,
-
            handle,
-
            commands,
-
            listening,
-
            shutdown,
-
            events,
-
        })
-
    }
-

-
    pub fn run<G: Signer>(
-
        mut self,
-
        config: Config,
-
        profile: Profile,
-
        signer: G,
-
    ) -> Result<(), Error> {
-
        let network = config.service.network;
-
        let rng = fastrand::Rng::new();
-
        let time = LocalTime::now();
-
        let storage = profile.storage;
-
        let node_dir = profile.home.join(NODE_DIR);
-
        let address_db = node_dir.join(ADDRESS_DB_FILE);
-
        let routing_db = node_dir.join(ROUTING_DB_FILE);
-
        let tracking_db = node_dir.join(TRACKING_DB_FILE);
-

-
        log::info!("Opening address book {}..", address_db.display());
-
        let addresses = address::Book::open(address_db)?;
-

-
        log::info!("Opening routing table {}..", routing_db.display());
-
        let routing = routing::Table::open(routing_db)?;
-

-
        log::info!("Opening tracking policy table {}..", tracking_db.display());
-
        let tracking = tracking::Config::open(tracking_db)?;
-

-
        log::info!("Initializing client ({:?})..", network);
-

-
        let service = service::Service::new(
-
            config.service,
-
            time,
-
            routing,
-
            storage,
-
            addresses,
-
            tracking,
-
            signer,
-
            rng,
-
        );
-

-
        self.reactor.run(
-
            &config.listen,
-
            Wire::<_, _, _, _, NoHandshake>::new(service),
-
            self.events,
-
            self.commands,
-
        )?;
-

-
        Ok(())
-
    }
-

-
    /// Create a new handle to communicate with the client.
-
    pub fn handle(&self) -> handle::Handle<R::Waker> {
-
        handle::Handle {
-
            waker: self.reactor.waker(),
-
            commands: self.handle.clone(),
-
            shutdown: self.shutdown.clone(),
-
            listening: self.listening.clone(),
-
        }
-
    }
-
}
-

-
pub struct Events {}
-

-
impl nakamoto_net::Publisher<service::Event> for Events {
-
    fn publish(&mut self, e: service::Event) {
-
        log::info!("Received event {:?}", e);
-
    }
-
}
modified radicle-node/src/client/handle.rs
@@ -1,8 +1,6 @@
-
use std::net;
use std::sync::Arc;

use crossbeam_channel as chan;
-
use nakamoto_net::Waker;
use thiserror::Error;

use crate::identity::Id;
@@ -48,62 +46,56 @@ impl<T> From<chan::SendError<T>> for Error {
    }
}

-
pub struct Handle<W: Waker> {
-
    pub(crate) commands: chan::Sender<service::Command>,
-
    pub(crate) shutdown: chan::Sender<()>,
-
    pub(crate) listening: chan::Receiver<net::SocketAddr>,
-
    pub(crate) waker: W,
+
pub struct Handle {
+
    pub(crate) controller: reactor::Controller<service::Command>,
}

-
impl<W: Waker> Handle<W> {
+
impl From<reactor::Controller<service::Command>> for Handle {
+
    fn from(controller: reactor::Controller<service::Command>) -> Handle {
+
        Handle { controller }
+
    }
+
}
+

+
impl Handle {
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.commands.send(cmd)?;
-
        self.waker.wake()?;
+
        self.controller.send(cmd)?;

        Ok(())
    }
}

-
impl<W: Waker> radicle::node::Handle for Handle<W> {
+
impl radicle::node::Handle for Handle {
    type Session = Session;
    type FetchLookup = FetchLookup;
    type Error = Error;

-
    fn listening(&self) -> Result<net::SocketAddr, Error> {
-
        self.listening.recv().map_err(Error::from)
-
    }
-

    fn fetch(&mut self, id: Id) -> Result<Self::FetchLookup, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.commands.send(service::Command::Fetch(id, sender))?;
+
        self.command(service::Command::Fetch(id, sender))?;
        receiver.recv().map_err(Error::from)
    }

    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.commands
-
            .send(service::Command::TrackNode(id, alias, sender))?;
+
        self.command(service::Command::TrackNode(id, alias, sender))?;
        receiver.recv().map_err(Error::from)
    }

    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.commands
-
            .send(service::Command::UntrackNode(id, sender))?;
+
        self.command(service::Command::UntrackNode(id, sender))?;
        receiver.recv().map_err(Error::from)
    }

    fn track_repo(&mut self, id: Id) -> Result<bool, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.commands
-
            .send(service::Command::TrackRepo(id, sender))?;
+
        self.command(service::Command::TrackRepo(id, sender))?;
        receiver.recv().map_err(Error::from)
    }

    fn untrack_repo(&mut self, id: Id) -> Result<bool, Error> {
        let (sender, receiver) = chan::bounded(1);
-
        self.commands
-
            .send(service::Command::UntrackRepo(id, sender))?;
+
        self.command(service::Command::UntrackRepo(id, sender))?;
        receiver.recv().map_err(Error::from)
    }

@@ -151,9 +143,6 @@ impl<W: Waker> radicle::node::Handle for Handle<W> {
    }

    fn shutdown(self) -> Result<(), Error> {
-
        self.shutdown.send(())?;
-
        self.waker.wake()?;
-

-
        Ok(())
+
        self.controller.shutdown().map_err(|_| Error::NotConnected)
    }
}
modified radicle-node/src/lib.rs
@@ -1,3 +1,6 @@
+
#[macro_use]
+
extern crate amplify;
+

pub mod address;
pub mod bounded;
pub mod client;
modified radicle-node/src/main.rs
@@ -1,22 +1,27 @@
use std::{env, net, process, thread};

use anyhow::Context as _;
-

-
use nakamoto_net::LocalDuration;
+
use cyphernet::addr::PeerAddr;
+
use nakamoto_net::{LocalDuration, LocalTime};
+
use reactor::poller::popol;
+
use reactor::Reactor;

use radicle::profile;
+
use radicle_node::client::handle::Handle;
+
use radicle_node::client::{ADDRESS_DB_FILE, NODE_DIR, ROUTING_DB_FILE, TRACKING_DB_FILE};
use radicle_node::crypto::ssh::keystore::MemorySigner;
-
use radicle_node::logger;
-
use radicle_node::prelude::Address;
-
use radicle_node::{client, control, service};
-

-
type Reactor = nakamoto_net_poll::Reactor<net::TcpStream>;
+
use radicle_node::prelude::{Address, NodeId};
+
use radicle_node::service::{routing, tracking};
+
use radicle_node::wire::Transport;
+
use radicle_node::{address, control, logger, service};

#[derive(Debug)]
struct Options {
-
    connect: Vec<Address>,
+
    connect: Vec<(NodeId, Address)>,
    external_addresses: Vec<Address>,
    limits: service::config::Limits,
+
    // FIXME(cloudhead): Listen on incoming connections.
+
    #[allow(dead_code)]
    listen: Vec<net::SocketAddr>,
}

@@ -33,8 +38,8 @@ impl Options {
        while let Some(arg) = parser.next()? {
            match arg {
                Long("connect") => {
-
                    let addr = parser.value()?.parse()?;
-
                    connect.push(addr);
+
                    let peer: PeerAddr<NodeId, Address> = parser.value()?.parse()?;
+
                    connect.push((*peer.id(), peer.addr().clone()));
                }
                Long("external-address") => {
                    let addr = parser.value()?.parse()?;
@@ -81,33 +86,48 @@ fn main() -> anyhow::Result<()> {
    let options = Options::from_env()?;
    let profile = radicle::Profile::load().context("Failed to load node profile")?;
    let node = profile.node();
-
    let client = client::Client::<Reactor>::new().context("Failed to initialize client")?;
-
    let signer = match profile.signer() {
-
        Ok(signer) => signer.boxed(),
-
        Err(err) => {
-
            let passphrase = env::var(profile::env::RAD_PASSPHRASE)
-
                .context("Either ssh-agent must be initialized, or `RAD_PASSPHRASE` must be set")
-
                .context(err)?
-
                .into();
-
            MemorySigner::load(&profile.keystore, passphrase)?.boxed()
-
        }
-
    };
-
    let handle = client.handle();
-
    let config = client::Config {
-
        service: service::Config {
-
            connect: options.connect,
-
            external_addresses: options.external_addresses,
-
            limits: options.limits,
-
            ..service::Config::default()
-
        },
-
        listen: options.listen,
+
    let passphrase = env::var(profile::env::RAD_PASSPHRASE)
+
        .context("`RAD_PASSPHRASE` is required to be set for the node to establish connections")?
+
        .into();
+
    let signer = MemorySigner::load(&profile.keystore, passphrase)?;
+
    let negotiator = signer.clone();
+
    let config = service::Config {
+
        connect: options.connect.into_iter().collect(),
+
        external_addresses: options.external_addresses,
+
        limits: options.limits,
+
        ..service::Config::default()
    };
+
    let proxy_addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
+
    let network = config.network;
+
    let rng = fastrand::Rng::new();
+
    let clock = LocalTime::now();
+
    let storage = profile.storage;
+
    let node_dir = profile.home.join(NODE_DIR);
+
    let address_db = node_dir.join(ADDRESS_DB_FILE);
+
    let routing_db = node_dir.join(ROUTING_DB_FILE);
+
    let tracking_db = node_dir.join(TRACKING_DB_FILE);
+

+
    log::info!("Opening address book {}..", address_db.display());
+
    let addresses = address::Book::open(address_db)?;
+

+
    log::info!("Opening routing table {}..", routing_db.display());
+
    let routing = routing::Table::open(routing_db)?;
+

+
    log::info!("Opening tracking policy table {}..", tracking_db.display());
+
    let tracking = tracking::Config::open(tracking_db)?;
+

+
    log::info!("Initializing service ({:?})..", network);
+
    let service = service::Service::new(
+
        config, clock, routing, storage, addresses, tracking, signer, rng,
+
    );

-
    let t1 = thread::spawn(move || control::listen(node, handle));
-
    let t2 = thread::spawn(move || client.run(config, profile, signer));
+
    let wire = Transport::new(service, negotiator, proxy_addr, clock);
+
    let reactor = Reactor::new(wire, popol::Poller::new());
+
    let handle = Handle::from(reactor.controller());
+
    let control = thread::spawn(move || control::listen(node, handle));

-
    t1.join().unwrap()?;
-
    t2.join().unwrap()?;
+
    control.join().unwrap()?;
+
    reactor.join().unwrap();

    Ok(())
}
modified radicle-node/src/service.rs
@@ -48,8 +48,6 @@ use self::reactor::Reactor;

/// Default radicle protocol port.
pub const DEFAULT_PORT: u16 = 8776;
-
/// Protocol version. Only updated for wire protocol changes.
-
pub const PROTOCOL_VERSION: u32 = 1;
/// Target number of peers to maintain connections to.
pub const TARGET_OUTBOUND_PEERS: usize = 8;
/// How often to run the "idle" task.
@@ -113,7 +111,7 @@ pub enum FetchError {
pub enum FetchLookup {
    /// Found seeds for the given project.
    Found {
-
        seeds: NonEmpty<net::SocketAddr>,
+
        seeds: NonEmpty<NodeId>,
        results: chan::Receiver<FetchResult>,
    },
    /// Can't fetch because no seeds were found for this project.
@@ -130,14 +128,11 @@ pub enum FetchLookup {
pub enum FetchResult {
    /// Successful fetch from a seed.
    Fetched {
-
        from: net::SocketAddr,
+
        from: NodeId,
        updated: Vec<RefUpdate>,
    },
    /// Error fetching the resource from a seed.
-
    Error {
-
        from: net::SocketAddr,
-
        error: FetchError,
-
    },
+
    Error { from: NodeId, error: FetchError },
}

/// Function used to query internal service state.
@@ -148,7 +143,7 @@ pub enum Command {
    /// Announce repository references for given project id to peers.
    AnnounceRefs(Id),
    /// Connect to node with the given address.
-
    Connect(net::SocketAddr),
+
    Connect(NodeId, Address),
    /// Fetch the given project from the network.
    Fetch(Id, chan::Sender<FetchLookup>),
    /// Track the given project.
@@ -167,7 +162,7 @@ impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({})", id),
-
            Self::Connect(addr) => write!(f, "Connect({})", addr),
+
            Self::Connect(id, addr) => write!(f, "Connect({}, {})", id, addr),
            Self::Fetch(id, _) => write!(f, "Fetch({})", id),
            Self::TrackRepo(id, _) => write!(f, "TrackRepo({})", id),
            Self::UntrackRepo(id, _) => write!(f, "UntrackRepo({})", id),
@@ -384,8 +379,8 @@ where

        // Connect to configured peers.
        let addrs = self.config.connect.clone();
-
        for addr in addrs {
-
            self.reactor.connect(addr);
+
        for (id, addr) in addrs {
+
            self.reactor.connect(id, addr);
        }
    }

@@ -440,7 +435,7 @@ where
        debug!("Command {:?}", cmd);

        match cmd {
-
            Command::Connect(addr) => self.reactor.connect(addr),
+
            Command::Connect(id, addr) => self.reactor.connect(id, addr),
            Command::Fetch(id, resp) => {
                if !self
                    .tracking
@@ -472,7 +467,7 @@ where

                let (results_, results) = chan::bounded(seeds.len());
                resp.send(FetchLookup::Found {
-
                    seeds: seeds.clone().map(|(_, peer)| peer.addr),
+
                    seeds: seeds.clone().map(|(_, peer)| peer.id),
                    results,
                })
                .ok();
@@ -483,7 +478,7 @@ where
                        Ok(updated) => {
                            results_
                                .send(FetchResult::Fetched {
-
                                    from: peer.addr,
+
                                    from: peer.id,
                                    updated,
                                })
                                .ok();
@@ -491,7 +486,7 @@ where
                        Err(err) => {
                            results_
                                .send(FetchResult::Error {
-
                                    from: peer.addr,
+
                                    from: peer.id,
                                    error: err.into(),
                                })
                                .ok();
@@ -536,38 +531,31 @@ where
        }
    }

-
    pub fn attempted(&mut self, addr: &net::SocketAddr) {
-
        let address = Address::from(*addr);
-
        let persistent = self.config.is_persistent(&address);
+
    pub fn accepted(&mut self, _addr: net::SocketAddr) {
+
        // Inbound connection attempt.
+
    }
+

+
    pub fn attempted(&mut self, id: NodeId, _addr: &Address) {
+
        let persistent = self.config.is_persistent(&id);
        let peer = self
            .sessions
-
            .entry(*addr)
-
            .or_insert_with(|| Session::new(*addr, Link::Outbound, persistent, self.rng.clone()));
+
            .entry(id)
+
            .or_insert_with(|| Session::new(id, Link::Outbound, persistent, self.rng.clone()));

        peer.attempted();
    }

-
    pub fn connecting(
-
        &mut self,
-
        _addr: net::SocketAddr,
-
        _local_addr: &net::SocketAddr,
-
        _link: Link,
-
    ) {
-
    }
-

-
    pub fn connected(&mut self, addr: net::SocketAddr, link: Link) {
-
        let address = addr.into();
-

-
        debug!("Connected to {} ({:?})", addr, link);
+
    pub fn connected(&mut self, remote: NodeId, link: Link) {
+
        debug!("Connected to {} ({:?})", remote, link);

        // For outbound connections, we are the first to say "Hello".
        // For inbound connections, we wait for the remote to say "Hello" first.
        // TODO: How should we deal with multiple peers connecting from the same IP address?
        if link.is_outbound() {
-
            if let Some(peer) = self.sessions.get_mut(&addr) {
+
            if let Some(peer) = self.sessions.get_mut(&remote) {
                if link.is_outbound() {
                    self.reactor.write_all(
-
                        addr,
+
                        remote,
                        gossip::handshake(
                            self.clock.as_secs(),
                            &self.storage,
@@ -581,11 +569,11 @@ where
            }
        } else {
            self.sessions.insert(
-
                addr,
+
                remote,
                Session::new(
-
                    addr,
+
                    remote,
                    Link::Inbound,
-
                    self.config.is_persistent(&address),
+
                    self.config.is_persistent(&remote),
                    self.rng.clone(),
                ),
            );
@@ -594,55 +582,57 @@ where

    pub fn disconnected(
        &mut self,
-
        addr: &net::SocketAddr,
+
        remote: NodeId,
        reason: &nakamoto::DisconnectReason<DisconnectReason>,
    ) {
        let since = self.local_time();
-
        let address = Address::from(*addr);

-
        debug!("Disconnected from {} ({})", addr, reason);
+
        debug!("Disconnected from {} ({})", remote, reason);

-
        if let Some(session) = self.sessions.get_mut(addr) {
+
        if let Some(session) = self.sessions.get_mut(&remote) {
            session.state = session::State::Disconnected { since };

            // Attempt to re-connect to persistent peers.
-
            if self.config.is_persistent(&address) && session.attempts() < MAX_CONNECTION_ATTEMPTS {
-
                if reason.is_dial_err() {
-
                    return;
-
                }
-
                if let nakamoto::DisconnectReason::Protocol(r) = reason {
-
                    if !r.is_transient() {
+
            if let Some(address) = self.config.peer(&remote) {
+
                if session.attempts() < MAX_CONNECTION_ATTEMPTS {
+
                    if reason.is_dial_err() {
                        return;
                    }
-
                }
-
                // TODO: Eventually we want a delay before attempting a reconnection,
-
                // with exponential back-off.
-
                debug!(
-
                    "Reconnecting to {} (attempts={})...",
-
                    addr,
-
                    session.attempts()
-
                );
+
                    if let nakamoto::DisconnectReason::Protocol(r) = reason {
+
                        if !r.is_transient() {
+
                            return;
+
                        }
+
                    }
+
                    // TODO: Eventually we want a delay before attempting a reconnection,
+
                    // with exponential back-off.
+
                    debug!(
+
                        "Reconnecting to {} (attempts={})...",
+
                        remote,
+
                        session.attempts()
+
                    );

-
                // TODO: Try to reconnect only if the peer was attempted. A disconnect without
-
                // even a successful attempt means that we're unlikely to be able to reconnect.
+
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
+
                    // even a successful attempt means that we're unlikely to be able to reconnect.

-
                self.reactor.connect(*addr);
+
                    self.reactor.connect(remote, address.clone());
+
                }
            } else {
-
                self.sessions.remove(addr);
+
                self.sessions.remove(&remote);
                self.maintain_connections();
            }
        }
    }

-
    pub fn received_message(&mut self, addr: &net::SocketAddr, message: Message) {
-
        match self.handle_message(addr, message) {
-
            Err(session::Error::NotFound(ip)) => {
-
                error!("Session not found for {ip}");
+
    pub fn received_message(&mut self, remote: NodeId, message: Message) {
+
        match self.handle_message(&remote, message) {
+
            Err(session::Error::NotFound(id)) => {
+
                error!("Session not found for {id}");
            }
            Err(err) => {
                // If there's an error, stop processing messages from this peer.
                // However, we still relay messages returned up to this point.
-
                self.reactor.disconnect(*addr, DisconnectReason::Error(err));
+
                self.reactor
+
                    .disconnect(remote, DisconnectReason::Error(err));

                // FIXME: The peer should be set in a state such that we don'that
                // process further messages.
@@ -822,7 +812,7 @@ where

    pub fn handle_message(
        &mut self,
-
        remote: &net::SocketAddr,
+
        remote: &NodeId,
        message: Message,
    ) -> Result<(), session::Error> {
        let Some(peer) = self.sessions.get_mut(remote) else {
@@ -830,18 +820,15 @@ where
        };
        peer.last_active = self.clock;

-
        debug!("Received {:?} from {}", &message, peer.ip());
+
        debug!("Received {:?} from {}", &message, peer.id);

        match (&mut peer.state, message) {
-
            (session::State::Initial, Message::Initialize { id, version, addrs }) => {
-
                if version != PROTOCOL_VERSION {
-
                    return Err(session::Error::WrongVersion(version));
-
                }
+
            (session::State::Initial, Message::Initialize {}) => {
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
                // extra "acknowledgment" message sent when the `Initialize` is well received.
                if peer.link.is_inbound() {
                    self.reactor.write_all(
-
                        peer.addr,
+
                        peer.id,
                        gossip::handshake(
                            self.clock.as_secs(),
                            &self.storage,
@@ -855,16 +842,15 @@ where
                // set after the first message is received only. Setting it here would
                // mean that messages received right after the handshake could be ignored.
                peer.state = session::State::Negotiated {
-
                    id,
+
                    id: *remote,
                    since: self.clock,
-
                    addrs: addrs.unbound(),
                    ping: Default::default(),
                };
            }
            (session::State::Initial, _) => {
                debug!(
                    "Disconnecting peer {} for sending us a message before handshake",
-
                    peer.ip()
+
                    peer.id
                );
                return Err(session::Error::Misbehavior);
            }
@@ -882,10 +868,9 @@ where
                    let relay_to = self
                        .sessions
                        .negotiated()
-
                        .filter(|(addr, _, _)| *addr != remote)
-
                        .filter(|(_, id, _)| **id != ann.node);
+
                        .filter(|(id, _)| *id != remote && *id != &ann.node);

-
                    self.reactor.relay(ann.clone(), relay_to.map(|(_, _, p)| p));
+
                    self.reactor.relay(ann.clone(), relay_to.map(|(_, p)| p));

                    return Ok(());
                }
@@ -895,14 +880,14 @@ where
                    .gossip
                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
                {
-
                    self.reactor.write(peer.addr, msg);
+
                    self.reactor.write(peer.id, msg);
                }
                peer.subscribe = Some(subscribe);
            }
            (session::State::Negotiated { .. }, Message::Initialize { .. }) => {
                debug!(
                    "Disconnecting peer {} for sending us a redundant handshake message",
-
                    peer.ip()
+
                    peer.id
                );
                return Err(session::Error::Misbehavior);
            }
@@ -912,7 +897,7 @@ where
                    return Ok(());
                }
                self.reactor.write(
-
                    peer.addr,
+
                    peer.id,
                    Message::Pong {
                        zeroes: ZeroBytes::new(ponglen),
                    },
@@ -926,7 +911,7 @@ where
                }
            }
            (session::State::Disconnected { .. }, msg) => {
-
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.ip());
+
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.id);
            }
        }
        Ok(())
@@ -966,7 +951,7 @@ where
        let node = self.node_id();
        let repo = self.storage.repository(id)?;
        let remote = repo.remote(&node)?;
-
        let peers = self.sessions.negotiated().map(|(_, _, p)| p);
+
        let peers = self.sessions.negotiated().map(|(_, p)| p);
        let timestamp = self.clock.as_secs();

        if remote.refs.len() > Refs::max() {
@@ -976,7 +961,6 @@ where
            );
        }
        let refs = BoundedVec::collect_from(&mut remote.refs.iter().map(|(a, b)| (a.clone(), *b)));
-

        let msg = AnnouncementMessage::from(RefsAnnouncement {
            id,
            refs,
@@ -1001,8 +985,8 @@ where
            &self.signer,
        );

-
        for addr in self.sessions.negotiated().map(|(_, _, p)| p.addr) {
-
            self.reactor.write(addr, inv.clone());
+
        for id in self.sessions.negotiated().map(|(id, _)| id) {
+
            self.reactor.write(*id, inv.clone());
        }
        Ok(())
    }
@@ -1025,12 +1009,11 @@ where
        let stale = self
            .sessions
            .negotiated()
-
            .filter(|(_, _, session)| session.last_active < *now - STALE_CONNECTION_TIMEOUT);
-
        for (_, _, session) in stale {
-
            self.reactor.disconnect(
-
                session.addr,
-
                DisconnectReason::Error(session::Error::Timeout),
-
            );
+
            .filter(|(_, session)| session.last_active < *now - STALE_CONNECTION_TIMEOUT);
+

+
        for (_, session) in stale {
+
            self.reactor
+
                .disconnect(session.id, DisconnectReason::Error(session::Error::Timeout));
        }
    }

@@ -1046,16 +1029,18 @@ where
        }
    }

-
    fn choose_addresses(&mut self) -> Vec<Address> {
-
        let mut initializing: Vec<Address> = Vec::new();
+
    fn choose_addresses(&mut self) -> Vec<(NodeId, Address)> {
+
        // TODO(cloudhead): Remove once noise is implemented.
+
        let mut initializing: Vec<NodeId> = Vec::new();
        let mut negotiated: HashMap<NodeId, &Session> = HashMap::new();
        for s in self.sessions.values() {
            if !s.link.is_outbound() {
                continue;
            }
            match s.state {
+
                // TODO(cloudhead): Remove when we have noise handshake.
                session::State::Initial => {
-
                    initializing.push(s.addr.into());
+
                    initializing.push(s.id);
                }
                session::State::Negotiated { id, .. } => {
                    negotiated.insert(id, s);
@@ -1074,11 +1059,11 @@ where
        self.addresses
            .entries()
            .unwrap()
-
            .filter(|(node_id, s)| {
-
                !initializing.contains(&s.addr) && !negotiated.contains_key(node_id)
+
            .filter(|(node_id, _)| {
+
                !initializing.contains(node_id) && !negotiated.contains_key(node_id)
            })
            .take(wanted)
-
            .map(|(_, s)| s.addr)
+
            .map(|(n, s)| (n, s.addr))
            .collect()
    }

@@ -1087,8 +1072,8 @@ where
        if addrs.is_empty() {
            debug!("No eligible peers available to connect to");
        }
-
        for addr in addrs {
-
            self.reactor.connect(addr.clone());
+
        for (id, addr) in addrs {
+
            self.reactor.connect(id, addr.clone());
        }
    }
}
@@ -1149,6 +1134,7 @@ where
#[derive(Debug)]
pub enum DisconnectReason {
    User,
+
    Peer,
    Error(session::Error),
}

@@ -1156,6 +1142,7 @@ impl DisconnectReason {
    fn is_transient(&self) -> bool {
        match self {
            Self::User => false,
+
            Self::Peer => false,
            Self::Error(..) => false,
        }
    }
@@ -1171,6 +1158,7 @@ impl fmt::Display for DisconnectReason {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::User => write!(f, "user"),
+
            Self::Peer => write!(f, "peer"),
            Self::Error(err) => write!(f, "error: {}", err),
        }
    }
@@ -1258,7 +1246,7 @@ impl Node {

#[derive(Debug)]
/// Holds currently (or recently) connected peers.
-
pub struct Sessions(AddressBook<net::SocketAddr, Session>);
+
pub struct Sessions(AddressBook<NodeId, Session>);

impl Sessions {
    pub fn new(rng: Rng) -> Self {
@@ -1270,25 +1258,23 @@ impl Sessions {
    }

    /// Iterator over fully negotiated peers.
-
    pub fn negotiated(
-
        &self,
-
    ) -> impl Iterator<Item = (&net::SocketAddr, &NodeId, &Session)> + Clone {
+
    pub fn negotiated(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
        self.0
            .iter()
-
            .filter_map(move |(addr, sess)| match &sess.state {
-
                session::State::Negotiated { id, .. } => Some((addr, id, sess)),
+
            .filter_map(move |(_, sess)| match &sess.state {
+
                session::State::Negotiated { id, .. } => Some((id, sess)),
                _ => None,
            })
    }

    /// Iterator over mutable fully negotiated peers.
-
    pub fn negotiated_mut(&mut self) -> impl Iterator<Item = (&net::SocketAddr, &mut Session)> {
+
    pub fn negotiated_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
        self.0.iter_mut().filter(move |(_, p)| p.is_negotiated())
    }
}

impl Deref for Sessions {
-
    type Target = AddressBook<net::SocketAddr, Session>;
+
    type Target = AddressBook<NodeId, Session>;

    fn deref(&self) -> &Self::Target {
        &self.0
@@ -1350,14 +1336,7 @@ mod gossip {
        };

        let mut msgs = vec![
-
            Message::init(
-
                *signer.public_key(),
-
                config
-
                    .external_addresses
-
                    .clone()
-
                    .try_into()
-
                    .expect("external addresses are within the limit"),
-
            ),
+
            Message::init(),
            Message::inventory(gossip::inventory(timestamp, inventory), signer),
            Message::subscribe(filter, timestamp, Timestamp::MAX),
        ];
modified radicle-node/src/service/config.rs
@@ -1,6 +1,7 @@
use super::nakamoto::LocalDuration;

use crate::service::message::Address;
+
use crate::service::NodeId;

/// Peer-to-peer network.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
@@ -33,7 +34,7 @@ impl Default for Limits {
pub struct Config {
    /// Peers to connect to on startup.
    /// Connections to these peers will be maintained.
-
    pub connect: Vec<Address>,
+
    pub connect: Vec<(NodeId, Address)>,
    /// Specify the node's public addresses
    pub external_addresses: Vec<Address>,
    /// Peer-to-peer network.
@@ -50,7 +51,7 @@ impl Default for Config {
    fn default() -> Self {
        Self {
            connect: Vec::default(),
-
            external_addresses: Vec::default(),
+
            external_addresses: vec![],
            network: Network::default(),
            relay: true,
            listen: vec![],
@@ -67,8 +68,12 @@ impl Config {
        }
    }

-
    pub fn is_persistent(&self, addr: &Address) -> bool {
-
        self.connect.contains(addr)
+
    pub fn peer(&self, id: &NodeId) -> Option<&Address> {
+
        self.connect.iter().find(|(i, _)| i == id).map(|(_, a)| a)
+
    }
+

+
    pub fn is_persistent(&self, id: &NodeId) -> bool {
+
        self.connect.iter().any(|(i, _)| i == id)
    }

    pub fn alias(&self) -> [u8; 32] {
modified radicle-node/src/service/message.rs
@@ -1,7 +1,5 @@
-
use std::str::FromStr;
-
use std::{fmt, io, mem, net};
-

-
use thiserror::Error;
+
use cyphernet::addr::{HostAddr, NetAddr};
+
use std::{fmt, io, mem, net, ops};

use crate::crypto;
use crate::git;
@@ -9,7 +7,7 @@ use crate::identity::Id;
use crate::node;
use crate::prelude::BoundedVec;
use crate::service::filter::Filter;
-
use crate::service::{NodeId, Timestamp, PROTOCOL_VERSION};
+
use crate::service::{NodeId, Timestamp, DEFAULT_PORT};
use crate::wire;

/// Maximum number of addresses which can be announced to other nodes.
@@ -30,83 +28,30 @@ impl fmt::Display for Hostname {
}

/// Peer public protocol address.
-
#[derive(Debug, Clone, PartialEq, Eq)]
-
pub enum Address {
-
    Ipv4 {
-
        ip: net::Ipv4Addr,
-
        port: u16,
-
    },
-
    Ipv6 {
-
        ip: net::Ipv6Addr,
-
        port: u16,
-
    },
-
    Hostname {
-
        host: Hostname,
-
        port: u16,
-
    },
-
    /// Tor V3 onion address.
-
    Onion {
-
        key: crypto::PublicKey,
-
        port: u16,
-
        checksum: u16,
-
        version: u8,
-
    },
-
}
+
#[derive(Wrapper, Clone, Eq, PartialEq, Debug, From)]
+
#[wrapper(Display, FromStr)]
+
pub struct Address(NetAddr<DEFAULT_PORT>);

-
impl From<net::SocketAddr> for Address {
-
    fn from(other: net::SocketAddr) -> Self {
-
        let port = other.port();
-

-
        match other.ip() {
-
            net::IpAddr::V4(ip) => Self::Ipv4 { ip, port },
-
            net::IpAddr::V6(ip) => Self::Ipv6 { ip, port },
-
        }
+
impl cyphernet::addr::Addr for Address {
+
    fn port(&self) -> u16 {
+
        self.0.port()
    }
}

-
#[derive(Debug, Error)]
-
pub enum AddressParseError {
-
    #[error("unsupported address type `{0}`")]
-
    Unsupported(String),
-
}
+
impl ops::Deref for Address {
+
    type Target = NetAddr<DEFAULT_PORT>;

-
impl FromStr for Address {
-
    type Err = AddressParseError;
-

-
    fn from_str(s: &str) -> Result<Self, Self::Err> {
-
        if let Ok(addr) = net::SocketAddr::from_str(s) {
-
            match addr.ip() {
-
                net::IpAddr::V4(ip) => Ok(Self::Ipv4 {
-
                    ip,
-
                    port: addr.port(),
-
                }),
-
                net::IpAddr::V6(ip) => Ok(Self::Ipv6 {
-
                    ip,
-
                    port: addr.port(),
-
                }),
-
            }
-
        } else {
-
            Err(Self::Err::Unsupported(s.to_owned()))
-
        }
+
    fn deref(&self) -> &Self::Target {
+
        &self.0
    }
}

-
impl fmt::Display for Address {
-
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-
        match self {
-
            Self::Ipv4 { ip, port } => {
-
                write!(f, "{}:{}", ip, port)
-
            }
-
            Self::Ipv6 { ip, port } => {
-
                write!(f, "{}:{}", ip, port)
-
            }
-
            Self::Hostname { host, port } => {
-
                write!(f, "{}:{}", host, port)
-
            }
-
            Self::Onion { key, port, .. } => {
-
                write!(f, "{}:{}", key, port)
-
            }
-
        }
+
impl From<net::SocketAddr> for Address {
+
    fn from(addr: net::SocketAddr) -> Self {
+
        Address(NetAddr {
+
            host: HostAddr::Ip(addr.ip()),
+
            port: Some(addr.port()),
+
        })
    }
}

@@ -366,12 +311,7 @@ impl Announcement {
#[derive(Clone, PartialEq, Eq)]
pub enum Message {
    /// The first message sent to a peer after connection.
-
    Initialize {
-
        // TODO: This is currently untrusted.
-
        id: NodeId,
-
        version: u32,
-
        addrs: BoundedVec<Address, ADDRESS_LIMIT>,
-
    },
+
    Initialize {},

    /// Subscribe to gossip messages matching the filter and time range.
    Subscribe(Subscribe),
@@ -394,12 +334,8 @@ pub enum Message {
}

impl Message {
-
    pub fn init(id: NodeId, addrs: BoundedVec<Address, ADDRESS_LIMIT>) -> Self {
-
        Self::Initialize {
-
            id,
-
            version: PROTOCOL_VERSION,
-
            addrs,
-
        }
+
    pub fn init() -> Self {
+
        Self::Initialize {}
    }

    pub fn announcement(
@@ -471,7 +407,7 @@ impl From<Announcement> for Message {
impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::Initialize { id, .. } => write!(f, "Initialize({})", id),
+
            Self::Initialize { .. } => write!(f, "Initialize(..)"),
            Self::Subscribe(Subscribe { since, until, .. }) => {
                write!(f, "Subscribe({}..{})", since, until)
            }
modified radicle-node/src/service/reactor.rs
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
-
use std::net;

use log::*;

@@ -12,11 +11,11 @@ use super::message::{Announcement, AnnouncementMessage};
#[derive(Debug)]
pub enum Io {
    /// There are some messages ready to be sent to a peer.
-
    Write(net::SocketAddr, Vec<Message>),
+
    Write(NodeId, Vec<Message>),
    /// Connect to a peer.
-
    Connect(net::SocketAddr),
+
    Connect(NodeId, Address),
    /// Disconnect from a peer.
-
    Disconnect(net::SocketAddr, DisconnectReason),
+
    Disconnect(NodeId, DisconnectReason),
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
    /// Emit an event.
@@ -37,35 +36,23 @@ impl Reactor {
    }

    /// Connect to a peer.
-
    pub fn connect(&mut self, addr: impl Into<Address>) {
+
    pub fn connect(&mut self, id: NodeId, addr: Address) {
        // TODO: Make sure we don't try to connect more than once to the same address.
-
        match addr.into() {
-
            Address::Ipv4 { ip, port } => {
-
                self.io
-
                    .push_back(Io::Connect(net::SocketAddr::new(net::IpAddr::V4(ip), port)));
-
            }
-
            Address::Ipv6 { ip, port } => {
-
                self.io
-
                    .push_back(Io::Connect(net::SocketAddr::new(net::IpAddr::V6(ip), port)));
-
            }
-
            other => {
-
                log::error!("Unsupported address type `{}`", other);
-
            }
-
        }
+
        self.io.push_back(Io::Connect(id, addr));
    }

    /// Disconnect a peer.
-
    pub fn disconnect(&mut self, addr: net::SocketAddr, reason: DisconnectReason) {
-
        self.io.push_back(Io::Disconnect(addr, reason));
+
    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
+
        self.io.push_back(Io::Disconnect(id, reason));
    }

-
    pub fn write(&mut self, remote: net::SocketAddr, msg: Message) {
-
        debug!("Write {:?} to {}", &msg, remote.ip());
+
    pub fn write(&mut self, remote: NodeId, msg: Message) {
+
        debug!("Write {:?} to {}", &msg, remote);

        self.io.push_back(Io::Write(remote, vec![msg]));
    }

-
    pub fn write_all(&mut self, remote: net::SocketAddr, msgs: impl IntoIterator<Item = Message>) {
+
    pub fn write_all(&mut self, remote: NodeId, msgs: impl IntoIterator<Item = Message>) {
        self.io
            .push_back(Io::Write(remote, msgs.into_iter().collect()));
    }
@@ -81,7 +68,7 @@ impl Reactor {
        peers: impl IntoIterator<Item = &'a Session>,
    ) {
        for peer in peers {
-
            self.write(peer.addr, msg.clone().into());
+
            self.write(peer.id, msg.clone().into());
        }
    }

modified radicle-node/src/service/session.rs
@@ -1,6 +1,5 @@
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::net;
use crate::service::storage;
use crate::service::{Link, LocalTime, NodeId, Reactor, Rng};

@@ -18,10 +17,8 @@ pub enum PingState {
#[derive(Debug, Default, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum State {
-
    /// Initial peer state. For outgoing peers this
-
    /// means we've attempted a connection. For incoming
-
    /// peers, this means they've successfully connected
-
    /// to us.
+
    /// Pre-handshake state.
+
    /// TODO(cloudhead): Remove once noise handshake is implemented.
    #[default]
    Initial,
    /// State after successful handshake.
@@ -29,8 +26,6 @@ pub enum State {
        /// The peer's unique identifier.
        id: NodeId,
        since: LocalTime,
-
        /// Addresses this peer is reachable on.
-
        addrs: Vec<message::Address>,
        ping: PingState,
    },
    /// When a peer is disconnected.
@@ -43,8 +38,8 @@ pub enum Error {
    WrongVersion(u32),
    #[error("invalid announcement timestamp: {0}")]
    InvalidTimestamp(u64),
-
    #[error("session not found for address `{0}`")]
-
    NotFound(net::SocketAddr),
+
    #[error("session not found for node `{0}`")]
+
    NotFound(NodeId),
    #[error("verification failed on fetch: {0}")]
    VerificationFailed(#[from] storage::VerifyError),
    #[error("peer misbehaved")]
@@ -58,8 +53,8 @@ pub enum Error {
/// A peer session. Each connected peer will have one session.
#[derive(Debug, Clone)]
pub struct Session {
-
    /// Peer address.
-
    pub addr: net::SocketAddr,
+
    /// Peer id.
+
    pub id: NodeId,
    /// Connection direction.
    pub link: Link,
    /// Whether we should attempt to re-connect
@@ -82,10 +77,10 @@ pub struct Session {
}

impl Session {
-
    pub fn new(addr: net::SocketAddr, link: Link, persistent: bool, rng: Rng) -> Self {
+
    pub fn new(id: NodeId, link: Link, persistent: bool, rng: Rng) -> Self {
        Self {
-
            addr,
-
            state: State::default(),
+
            id,
+
            state: State::Initial,
            link,
            subscribe: None,
            persistent,
@@ -95,10 +90,6 @@ impl Session {
        }
    }

-
    pub fn ip(&self) -> net::IpAddr {
-
        self.addr.ip()
-
    }
-

    pub fn is_negotiated(&self) -> bool {
        matches!(self.state, State::Negotiated { .. })
    }
@@ -120,7 +111,7 @@ impl Session {
            let msg = message::Ping::new(&mut self.rng);
            *ping = PingState::AwaitingResponse(msg.ponglen);

-
            reactor.write(self.addr, Message::Ping(msg));
+
            reactor.write(self.id, Message::Ping(msg));
        }
        Ok(())
    }
modified radicle-node/src/test/arbitrary.rs
@@ -1,6 +1,7 @@
use std::net;

use bloomy::BloomFilter;
+
use cyphernet::addr::{HostAddr, NetAddr};
use qcheck::Arbitrary;

use crate::crypto;
@@ -10,7 +11,7 @@ use crate::service::message::{
    Address, Announcement, InventoryAnnouncement, Message, NodeAnnouncement, Ping,
    RefsAnnouncement, Subscribe, ZeroBytes,
};
-
use crate::wire::message::MessageType;
+
use crate::wire::MessageType;

pub use radicle::test::arbitrary::*;

@@ -104,19 +105,16 @@ impl Arbitrary for Message {

impl Arbitrary for Address {
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
-
        if bool::arbitrary(g) {
-
            Address::Ipv4 {
-
                ip: net::Ipv4Addr::from(u32::arbitrary(g)),
-
                port: u16::arbitrary(g),
-
            }
+
        let ip = if bool::arbitrary(g) {
+
            net::IpAddr::V4(net::Ipv4Addr::from(u32::arbitrary(g)))
        } else {
            let octets: [u8; 16] = Arbitrary::arbitrary(g);
-

-
            Address::Ipv6 {
-
                ip: net::Ipv6Addr::from(octets),
-
                port: u16::arbitrary(g),
-
            }
-
        }
+
            net::IpAddr::V6(net::Ipv6Addr::from(octets))
+
        };
+
        Address::from(NetAddr {
+
            host: HostAddr::Ip(ip),
+
            port: Some(u16::arbitrary(g)),
+
        })
    }
}

modified radicle-node/src/test/handle.rs
@@ -21,10 +21,6 @@ impl radicle::node::Handle for Handle {
    type Session = service::Session;
    type FetchLookup = FetchLookup;

-
    fn listening(&self) -> Result<std::net::SocketAddr, Error> {
-
        unimplemented!()
-
    }
-

    fn fetch(&mut self, _id: Id) -> Result<FetchLookup, Error> {
        Ok(FetchLookup::NotFound)
    }
modified radicle-node/src/test/peer.rs
@@ -31,6 +31,7 @@ pub type Service<S, G> = service::Service<routing::Table, address::Book, S, G>;
pub struct Peer<S, G> {
    pub name: &'static str,
    pub service: Service<S, G>,
+
    pub id: NodeId,
    pub ip: net::IpAddr,
    pub rng: fastrand::Rng,
    pub local_addr: net::SocketAddr,
@@ -47,8 +48,12 @@ where
        self.initialize()
    }

-
    fn addr(&self) -> net::SocketAddr {
-
        net::SocketAddr::new(self.ip, DEFAULT_PORT)
+
    fn addr(&self) -> Address {
+
        self.address()
+
    }
+

+
    fn id(&self) -> NodeId {
+
        self.id
    }
}

@@ -108,6 +113,7 @@ where
    ) -> Self {
        let routing = routing::Table::memory().unwrap();
        let tracking = tracking::Config::memory().unwrap();
+
        let id = *config.signer.public_key();
        let service = Service::new(
            config.config,
            config.local_time,
@@ -124,6 +130,7 @@ where
        Self {
            name,
            service,
+
            id,
            ip,
            local_addr,
            rng: config.rng,
@@ -141,7 +148,7 @@ where
    }

    pub fn address(&self) -> Address {
-
        simulator::Peer::addr(self).into()
+
        Address::from(net::SocketAddr::from((self.ip, 8776)))
    }

    pub fn import_addresses<P>(&mut self, peers: P)
@@ -180,7 +187,7 @@ where
        self.service.node_id()
    }

-
    pub fn receive(&mut self, peer: &net::SocketAddr, msg: Message) {
+
    pub fn receive(&mut self, peer: NodeId, msg: Message) {
        self.service.received_message(peer, msg);
    }

@@ -225,20 +232,15 @@ where
    }

    pub fn connect_from(&mut self, peer: &Self) {
-
        let remote = simulator::Peer::<S, G>::addr(peer);
-
        let local = net::SocketAddr::new(self.ip, self.rng.u16(..));
+
        let remote_id = simulator::Peer::<S, G>::id(peer);

        self.initialize();
-
        self.service.connecting(remote, &local, Link::Inbound);
-
        self.service.connected(remote, Link::Inbound);
-
        self.receive(
-
            &remote,
-
            Message::init(peer.node_id(), Some(Address::from(remote)).into()),
-
        );
+
        self.service.connected(remote_id, Link::Inbound);
+
        self.receive(remote_id, Message::init());

-
        let mut msgs = self.messages(&remote);
+
        let mut msgs = self.messages(remote_id);
        msgs.find(|m| matches!(m, Message::Initialize { .. }))
-
            .expect("`initialize` is sent");
+
            .expect("`initialize` must be sent");
        msgs.find(|m| {
            matches!(
                m,
@@ -248,21 +250,20 @@ where
                })
            )
        })
-
        .expect("`inventory-announcement` is sent");
+
        .expect("`inventory-announcement` must be sent");
    }

    pub fn connect_to(&mut self, peer: &Self) {
-
        let remote = simulator::Peer::<S, G>::addr(peer);
+
        let remote_id = simulator::Peer::<S, G>::id(peer);
+
        let remote_addr = simulator::Peer::<S, G>::addr(peer);

        self.initialize();
-
        self.service.attempted(&remote);
-
        self.service
-
            .connecting(remote, &self.local_addr, Link::Outbound);
-
        self.service.connected(remote, Link::Outbound);
+
        self.service.attempted(remote_id, &remote_addr);
+
        self.service.connected(remote_id, Link::Outbound);

-
        let mut msgs = self.messages(&remote);
+
        let mut msgs = self.messages(remote_id);
        msgs.find(|m| matches!(m, Message::Initialize { .. }))
-
            .expect("`initialize` is sent");
+
            .expect("`initialize` must be sent");
        msgs.find(|m| {
            matches!(
                m,
@@ -272,19 +273,9 @@ where
                })
            )
        })
-
        .expect("`inventory-announcement` is sent");
-

-
        self.receive(
-
            &remote,
-
            Message::init(
-
                peer.node_id(),
-
                peer.config()
-
                    .listen
-
                    .clone()
-
                    .try_into()
-
                    .expect("within bound limits"),
-
            ),
-
        );
+
        .expect("`inventory-announcement` must be sent");
+

+
        self.receive(remote_id, Message::init());
    }

    pub fn elapse(&mut self, duration: LocalDuration) {
@@ -293,11 +284,11 @@ where
    }

    /// Drain outgoing messages sent from this peer to the remote address.
-
    pub fn messages(&mut self, remote: &net::SocketAddr) -> impl Iterator<Item = Message> {
+
    pub fn messages(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
        let mut msgs = Vec::new();

        self.service.reactor().outbox().retain(|o| match o {
-
            Io::Write(a, messages) if a == remote => {
+
            Io::Write(a, messages) if *a == remote => {
                msgs.extend(messages.clone());
                false
            }
modified radicle-node/src/test/simulator.rs
@@ -6,15 +6,16 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
use std::rc::Rc;
-
use std::{fmt, io, net};
+
use std::{fmt, io};

use log::*;
use nakamoto_net as nakamoto;
use nakamoto_net::{Link, LocalDuration, LocalTime};

use crate::crypto::Signer;
+
use crate::prelude::Address;
use crate::service::reactor::Io;
-
use crate::service::{DisconnectReason, Event, Message};
+
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::WriteStorage;
use crate::test::peer::Service;

@@ -23,10 +24,6 @@ pub const MIN_LATENCY: LocalDuration = LocalDuration::from_millis(1);
/// Maximum number of events buffered per peer.
pub const MAX_EVENTS: usize = 2048;

-
/// Identifier for a simulated node/peer.
-
/// The simulator requires each peer to have a distinct IP address.
-
type NodeId = std::net::IpAddr;
-

/// A simulated peer. Service instances have to be wrapped in this type to be simulated.
pub trait Peer<S, G>:
    Deref<Target = Service<S, G>> + DerefMut<Target = Service<S, G>> + 'static
@@ -35,7 +32,9 @@ pub trait Peer<S, G>:
    /// current time.
    fn init(&mut self);
    /// Get the peer address.
-
    fn addr(&self) -> net::SocketAddr;
+
    fn addr(&self) -> Address;
+
    /// Get the peer id.
+
    fn id(&self) -> NodeId;
}

/// Simulated service input.
@@ -43,25 +42,22 @@ pub trait Peer<S, G>:
pub enum Input {
    /// Connection attempt underway.
    Connecting {
-
        /// Remote peer address.
-
        addr: net::SocketAddr,
+
        /// Remote peer id.
+
        id: NodeId,
+
        /// Address used to connect.
+
        addr: Address,
    },
    /// New connection with a peer.
    Connected {
        /// Remote peer id.
-
        addr: net::SocketAddr,
-
        /// Local peer id.
-
        local_addr: net::SocketAddr,
+
        id: NodeId,
        /// Link direction.
        link: Link,
    },
    /// Disconnected from peer.
-
    Disconnected(
-
        net::SocketAddr,
-
        Rc<nakamoto::DisconnectReason<DisconnectReason>>,
-
    ),
+
    Disconnected(NodeId, Rc<nakamoto::DisconnectReason<DisconnectReason>>),
    /// Received a message from a remote peer.
-
    Received(net::SocketAddr, Vec<Message>),
+
    Received(NodeId, Vec<Message>),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
}
@@ -73,7 +69,7 @@ pub struct Scheduled {
    pub node: NodeId,
    /// The remote peer from which this input originates.
    /// If the input originates from the local node, this should be set to the zero address.
-
    pub remote: net::SocketAddr,
+
    pub remote: NodeId,
    /// The input being scheduled.
    pub input: Input,
}
@@ -88,19 +84,17 @@ impl fmt::Display for Scheduled {
                Ok(())
            }
            Input::Connected {
-
                addr,
-
                local_addr,
+
                id: addr,
                link: Link::Inbound,
                ..
-
            } => write!(f, "{} <== {}: Connected", local_addr, addr),
+
            } => write!(f, "{} <== {}: Connected", self.node, addr),
            Input::Connected {
-
                local_addr,
-
                addr,
+
                id: addr,
                link: Link::Outbound,
                ..
-
            } => write!(f, "{} ==> {}: Connected", local_addr, addr),
-
            Input::Connecting { addr } => {
-
                write!(f, "{} => {}: Connecting", self.node, addr)
+
            } => write!(f, "{} ==> {}: Connected", self.node, addr),
+
            Input::Connecting { id, .. } => {
+
                write!(f, "{} => {}: Connecting", self.node, id)
            }
            Input::Disconnected(addr, reason) => {
                write!(f, "{} =/= {}: Disconnected: {}", self.node, addr, reason)
@@ -139,7 +133,7 @@ impl Inbox {
    }

    /// Get the last message sent between two peers. Only checks one direction.
-
    fn last(&self, node: &NodeId, remote: &net::SocketAddr) -> Option<(&LocalTime, &Scheduled)> {
+
    fn last(&self, node: &NodeId, remote: &NodeId) -> Option<(&LocalTime, &Scheduled)> {
        self.messages
            .iter()
            .rev()
@@ -179,7 +173,7 @@ pub struct Simulation<S, G> {
    /// Network partitions between two nodes.
    partitions: BTreeSet<(NodeId, NodeId)>,
    /// Set of existing connections between nodes.
-
    connections: BTreeMap<(NodeId, NodeId), u16>,
+
    connections: BTreeSet<(NodeId, NodeId)>,
    /// Set of connection attempts.
    attempts: BTreeSet<(NodeId, NodeId)>,
    /// Simulation options.
@@ -207,7 +201,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
            priority: VecDeque::new(),
            partitions: BTreeSet::new(),
            latencies: BTreeMap::new(),
-
            connections: BTreeMap::new(),
+
            connections: BTreeSet::new(),
            attempts: BTreeSet::new(),
            opts,
            start_time: time,
@@ -287,7 +281,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
    ) where
        P: Peer<S, G>,
    {
-
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.addr().ip(), p)).collect();
+
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.id(), p)).collect();

        while self.step_(&mut nodes) {
            if !pred(self) {
@@ -300,7 +294,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
    /// This function should be called until it returns `false`, or some desired state is reached.
    /// Returns `true` if there are more messages to process.
    pub fn step<'a, P: Peer<S, G>>(&mut self, peers: impl IntoIterator<Item = &'a mut P>) -> bool {
-
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.addr().ip(), p)).collect();
+
        let mut nodes: BTreeMap<_, _> = peers.into_iter().map(|p| (p.id(), p)).collect();
        self.step_(&mut nodes)
    }

@@ -340,10 +334,10 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {

        // Schedule any messages in the pipes.
        for peer in nodes.values_mut() {
-
            let ip = peer.addr().ip();
+
            let id = peer.id();

            for o in peer.by_ref() {
-
                self.schedule(&ip, o);
+
                self.schedule(&id, o);
            }
        }
        // Next high-priority message.
@@ -370,42 +364,38 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                p.tick(time);

                match input {
-
                    Input::Connecting { addr } => {
-
                        if self.attempts.insert((node, addr.ip())) {
-
                            p.attempted(&addr);
+
                    Input::Connecting { id, addr } => {
+
                        if self.attempts.insert((node, id)) {
+
                            // TODO: Also call `inbound` for inbound attempts.
+
                            p.attempted(id, &addr);
                        }
                    }
-
                    Input::Connected {
-
                        addr,
-
                        local_addr,
-
                        link,
-
                    } => {
-
                        let conn = (node, addr.ip());
+
                    Input::Connected { id, link } => {
+
                        let conn = (node, id);

                        let attempted = link.is_outbound() && self.attempts.remove(&conn);
                        if attempted || link.is_inbound() {
-
                            if self.connections.insert(conn, local_addr.port()).is_none() {
-
                                p.connecting(addr, &local_addr, link);
-
                                p.connected(addr, link);
+
                            if self.connections.insert(conn) {
+
                                p.connected(id, link);
                            }
                        }
                    }
-
                    Input::Disconnected(addr, reason) => {
-
                        let conn = (node, addr.ip());
+
                    Input::Disconnected(id, reason) => {
+
                        let conn = (node, id);
                        let attempt = self.attempts.remove(&conn);
-
                        let connection = self.connections.remove(&conn).is_some();
+
                        let connection = self.connections.remove(&conn);

                        // Can't be both attempting and connected.
                        assert!(!(attempt && connection));

                        if attempt || connection {
-
                            p.disconnected(&addr, &reason);
+
                            p.disconnected(id, &reason);
                        }
                    }
                    Input::Wake => p.wake(),
-
                    Input::Received(addr, msgs) => {
+
                    Input::Received(id, msgs) => {
                        for msg in msgs {
-
                            p.received_message(&addr, msg);
+
                            p.received_message(id, msg);
                        }
                    }
                }
@@ -433,14 +423,12 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                }
                // If the other end has disconnected the sender with some latency, there may not be
                // a connection remaining to use.
-
                let port = if let Some(port) = self.connections.get(&(node, receiver.ip())) {
-
                    *port
-
                } else {
+
                if self.connections.get(&(node, receiver)).is_none() {
                    return;
-
                };
+
                }
+
                let sender = node;

-
                let sender: net::SocketAddr = (node, port).into();
-
                if self.is_partitioned(sender.ip(), receiver.ip()) {
+
                if self.is_partitioned(sender, receiver) {
                    // Drop message if nodes are partitioned.
                    info!(
                        target: "sim",
@@ -452,10 +440,10 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {

                // Schedule message in the future, ensuring messages don't arrive out-of-order
                // between two peers.
-
                let latency = self.latency(node, receiver.ip());
+
                let latency = self.latency(node, receiver);
                let time = self
                    .inbox
-
                    .last(&receiver.ip(), &sender)
+
                    .last(&receiver, &sender)
                    .map(|(k, _)| *k)
                    .unwrap_or_else(|| self.time);
                let time = time + latency;
@@ -473,30 +461,28 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    time,
                    Scheduled {
                        remote: sender,
-
                        node: receiver.ip(),
+
                        node: receiver,
                        input: Input::Received(sender, msgs),
                    },
                );
            }
-
            Io::Connect(remote) => {
-
                assert!(remote.ip() != node, "self-connections are not allowed");
+
            Io::Connect(remote, addr) => {
+
                assert!(remote != node, "self-connections are not allowed");

-
                // Create an ephemeral sockaddr for the connecting (local) node.
-
                let local_addr: net::SocketAddr = net::SocketAddr::new(node, self.rng.u16(8192..));
-
                let latency = self.latency(node, remote.ip());
+
                let latency = self.latency(node, remote);

                self.inbox.insert(
                    self.time + MIN_LATENCY,
                    Scheduled {
                        node,
                        remote,
-
                        input: Input::Connecting { addr: remote },
+
                        input: Input::Connecting { id: remote, addr },
                    },
                );

                // Fail to connect if the nodes are partitioned.
-
                if self.is_partitioned(node, remote.ip()) {
-
                    log::info!(target: "sim", "{} -/-> {} (partitioned)", node, remote.ip());
+
                if self.is_partitioned(node, remote) {
+
                    log::info!(target: "sim", "{} -/-> {} (partitioned)", node, remote);

                    // Sometimes, the service gets a failure input, other times it just hangs.
                    if self.rng.bool() {
@@ -521,11 +507,10 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    // The remote will get the connection attempt with some latency.
                    self.time + latency,
                    Scheduled {
-
                        node: remote.ip(),
-
                        remote: local_addr,
+
                        node: remote,
+
                        remote: node,
                        input: Input::Connected {
-
                            addr: local_addr,
-
                            local_addr: remote,
+
                            id: node,
                            link: Link::Inbound,
                        },
                    },
@@ -537,8 +522,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        remote,
                        node,
                        input: Input::Connected {
-
                            addr: remote,
-
                            local_addr,
+
                            id: remote,
                            link: Link::Outbound,
                        },
                    },
@@ -558,21 +542,20 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                //
                // It's also possible that the connection was only attempted and never succeeded,
                // in which case we would return here.
-
                let Some(port) = self.connections.get(&(node, remote.ip())) else {
+
                if !self.connections.contains(&(node, remote)) {
                    debug!(target: "sim", "Ignoring disconnect of {remote} from {node}");
                    return;
                };
-
                let local_addr: net::SocketAddr = (node, *port).into();
-
                let latency = self.latency(node, remote.ip());
+
                let latency = self.latency(node, remote);

                // The remote node receives the disconnection with some delay.
                self.inbox.insert(
                    self.time + latency,
                    Scheduled {
-
                        node: remote.ip(),
-
                        remote: local_addr,
+
                        node: remote,
+
                        remote: node,
                        input: Input::Disconnected(
-
                            local_addr,
+
                            node,
                            Rc::new(nakamoto::DisconnectReason::ConnectionError(
                                io::Error::from(io::ErrorKind::ConnectionReset).into(),
                            )),
@@ -595,7 +578,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        Scheduled {
                            node,
                            // The remote is not applicable for this type of output.
-
                            remote: ([0, 0, 0, 0], 0).into(),
+
                            remote: [0; 32].into(),
                            input: Input::Wake,
                        },
                    );
modified radicle-node/src/tests.rs
@@ -64,28 +64,28 @@ fn test_ping_response() {

    alice.connect_to(&bob);
    alice.receive(
-
        &bob.addr(),
+
        bob.id(),
        Message::Ping(Ping {
            ponglen: Ping::MAX_PONG_ZEROES,
            zeroes: ZeroBytes::new(42),
        }),
    );
    assert_matches!(
-
        alice.messages(&bob.addr()).next(),
+
        alice.messages(bob.id()).next(),
        Some(Message::Pong { zeroes }) if zeroes.len() == Ping::MAX_PONG_ZEROES as usize,
        "respond with correctly formatted pong",
    );

    alice.connect_to(&eve);
    alice.receive(
-
        &eve.addr(),
+
        eve.id(),
        Message::Ping(Ping {
            ponglen: Ping::MAX_PONG_ZEROES + 1,
            zeroes: ZeroBytes::new(42),
        }),
    );
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        None,
        "ignore unsupported ping message",
    );
@@ -101,7 +101,7 @@ fn test_disconnecting_unresponsive_peer() {
    alice.elapse(STALE_CONNECTION_TIMEOUT + LocalDuration::from_secs(1));
    alice
        .outbox()
-
        .find(|m| matches!(m, &Io::Disconnect(addr, _) if addr == bob.addr()))
+
        .find(|m| matches!(m, &Io::Disconnect(addr, _) if addr == bob.id()))
        .expect("disconnect an unresponsive bob");
}

@@ -117,7 +117,7 @@ fn test_connection_kept_alive() {
    )
    .initialize([&mut alice, &mut bob]);

-
    alice.command(service::Command::Connect(bob.addr()));
+
    alice.command(service::Command::Connect(bob.id(), bob.address()));
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
    assert_eq!(1, alice.sessions().negotiated().count(), "bob connects");

@@ -148,11 +148,11 @@ fn test_outbound_connection() {
        .service
        .sessions()
        .negotiated()
-
        .map(|(ip, _, _)| *ip)
+
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();

-
    assert!(peers.contains(&eve.addr()));
-
    assert!(peers.contains(&bob.addr()));
+
    assert!(peers.contains(&eve.id()));
+
    assert!(peers.contains(&bob.id()));
}

#[test]
@@ -168,11 +168,11 @@ fn test_inbound_connection() {
        .service
        .sessions()
        .negotiated()
-
        .map(|(ip, _, _)| *ip)
+
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();

-
    assert!(peers.contains(&eve.addr()));
-
    assert!(peers.contains(&bob.addr()));
+
    assert!(peers.contains(&eve.id()));
+
    assert!(peers.contains(&bob.id()));
}

#[test]
@@ -185,7 +185,7 @@ fn test_persistent_peer_connect() {
        MockStorage::empty(),
        peer::Config {
            config: Config {
-
                connect: vec![bob.address(), eve.address()],
+
                connect: vec![(bob.id(), bob.address()), (eve.id(), eve.address())],
                ..Config::default()
            },
            ..peer::Config::default()
@@ -195,24 +195,12 @@ fn test_persistent_peer_connect() {
    alice.initialize();

    let mut outbox = alice.outbox();
-
    assert_matches!(outbox.next(), Some(Io::Connect(a)) if a == bob.addr());
-
    assert_matches!(outbox.next(), Some(Io::Connect(a)) if a == eve.addr());
+
    assert_matches!(outbox.next(), Some(Io::Connect(a, _)) if a == bob.id());
+
    assert_matches!(outbox.next(), Some(Io::Connect(a, _)) if a == eve.id());
    assert_matches!(outbox.next(), None);
}

#[test]
-
#[ignore]
-
fn test_wrong_peer_version() {
-
    // TODO
-
}
-

-
#[test]
-
#[ignore]
-
fn test_wrong_peer_magic() {
-
    // TODO
-
}
-

-
#[test]
fn test_inventory_sync() {
    let tmp = tempfile::tempdir().unwrap();
    let mut alice = Peer::config(
@@ -229,7 +217,7 @@ fn test_inventory_sync() {

    alice.connect_to(&bob);
    alice.receive(
-
        &bob.addr(),
+
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: projs.clone().try_into().unwrap(),
@@ -325,7 +313,7 @@ fn test_inventory_pruning() {
        alice.connect_to(&bob);
        for num_projs in test.peer_projects {
            alice.receive(
-
                &bob.addr(),
+
                bob.id(),
                Message::inventory(
                    InventoryAnnouncement {
                        inventory: test::arbitrary::vec::<Id>(num_projs).try_into().unwrap(),
@@ -380,7 +368,7 @@ fn test_inventory_relay_bad_timestamp() {

    alice.connect_to(&bob);
    alice.receive(
-
        &bob.addr(),
+
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: BoundedVec::new(),
@@ -392,7 +380,7 @@ fn test_inventory_relay_bad_timestamp() {
    assert_matches!(
        alice.outbox().next(),
        Some(Io::Disconnect(addr, DisconnectReason::Error(session::Error::InvalidTimestamp(t))))
-
        if addr == bob.addr() && t == timestamp
+
        if addr == bob.id() && t == timestamp
    );
}

@@ -406,12 +394,12 @@ fn test_announcement_rebroadcast() {

    let received = test::gossip::messages(6, alice.local_time(), MAX_TIME_DELTA);
    for msg in received.iter().cloned() {
-
        alice.receive(&bob.addr(), msg);
+
        alice.receive(bob.id(), msg);
    }

    alice.connect_from(&eve);
    alice.receive(
-
        &eve.addr(),
+
        eve.id(),
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
            since: Timestamp::MIN,
@@ -419,7 +407,7 @@ fn test_announcement_rebroadcast() {
        }),
    );

-
    let relayed = alice.messages(&eve.addr()).collect::<Vec<_>>();
+
    let relayed = alice.messages(eve.id()).collect::<Vec<_>>();
    assert_eq!(relayed, received);
}

@@ -443,13 +431,13 @@ fn test_announcement_rebroadcast_timestamp_filtered() {
        .chain(third.iter())
        .cloned()
    {
-
        alice.receive(&bob.addr(), msg);
+
        alice.receive(bob.id(), msg);
    }

    // Eve subscribes to messages within the period of the second batch only.
    alice.connect_from(&eve);
    alice.receive(
-
        &eve.addr(),
+
        eve.id(),
        Message::Subscribe(Subscribe {
            filter: Filter::default(),
            since: alice.local_time().as_secs(),
@@ -457,7 +445,7 @@ fn test_announcement_rebroadcast_timestamp_filtered() {
        }),
    );

-
    let relayed = alice.messages(&eve.addr()).collect::<Vec<_>>();
+
    let relayed = alice.messages(eve.id()).collect::<Vec<_>>();
    assert_eq!(relayed.len(), second.len());
    assert_eq!(relayed, second);
}
@@ -470,56 +458,56 @@ fn test_announcement_relay() {

    alice.connect_to(&bob);
    alice.connect_to(&eve);
-
    alice.receive(&bob.addr(), bob.inventory_announcement());
+
    alice.receive(bob.id(), bob.inventory_announcement());

    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_))
    );

-
    alice.receive(&bob.addr(), bob.inventory_announcement());
+
    alice.receive(bob.id(), bob.inventory_announcement());
    assert!(
-
        alice.messages(&eve.addr()).next().is_none(),
+
        alice.messages(eve.id()).next().is_none(),
        "Another inventory with the same timestamp is ignored"
    );

    bob.elapse(LocalDuration::from_mins(1));
-
    alice.receive(&bob.addr(), bob.inventory_announcement());
+
    alice.receive(bob.id(), bob.inventory_announcement());
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "Another inventory with a fresher timestamp is relayed"
    );

-
    alice.receive(&bob.addr(), bob.node_announcement());
+
    alice.receive(bob.id(), bob.node_announcement());
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "A node announcement with the same timestamp as the inventory is relayed"
    );

-
    alice.receive(&bob.addr(), bob.node_announcement());
-
    assert!(alice.messages(&eve.addr()).next().is_none(), "Only once");
+
    alice.receive(bob.id(), bob.node_announcement());
+
    assert!(alice.messages(eve.id()).next().is_none(), "Only once");

-
    alice.receive(&eve.addr(), eve.node_announcement());
+
    alice.receive(eve.id(), eve.node_announcement());
    assert_matches!(
-
        alice.messages(&bob.addr()).next(),
+
        alice.messages(bob.id()).next(),
        Some(Message::Announcement(_)),
        "A node announcement from Eve is relayed to Bob"
    );
    assert!(
-
        alice.messages(&eve.addr()).next().is_none(),
+
        alice.messages(eve.id()).next().is_none(),
        "But not back to Eve"
    );

    eve.elapse(LocalDuration::from_mins(1));
-
    alice.receive(&bob.addr(), eve.node_announcement());
+
    alice.receive(bob.id(), eve.node_announcement());
    assert!(
-
        alice.messages(&bob.addr()).next().is_none(),
+
        alice.messages(bob.id()).next().is_none(),
        "Bob already know about this message, since he sent it"
    );
    assert!(
-
        alice.messages(&eve.addr()).next().is_none(),
+
        alice.messages(eve.id()).next().is_none(),
        "Eve already know about this message, since she signed it"
    );
}
@@ -563,31 +551,31 @@ fn test_refs_announcement_relay() {
    alice.track_repo(&bob_inv[2], tracking::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
-
    alice.receive(&eve.addr(), Message::Subscribe(Subscribe::all()));
+
    alice.receive(eve.id(), Message::Subscribe(Subscribe::all()));

-
    alice.receive(&bob.addr(), bob.refs_announcement(bob_inv[0]));
+
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "A refs announcement from Bob is relayed to Eve"
    );

-
    alice.receive(&bob.addr(), bob.refs_announcement(bob_inv[0]));
+
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
    assert!(
-
        alice.messages(&eve.addr()).next().is_none(),
+
        alice.messages(eve.id()).next().is_none(),
        "The same ref announement is not relayed"
    );

-
    alice.receive(&bob.addr(), bob.refs_announcement(bob_inv[1]));
+
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[1]));
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "But a different one is"
    );

-
    alice.receive(&bob.addr(), bob.refs_announcement(bob_inv[2]));
+
    alice.receive(bob.id(), bob.refs_announcement(bob_inv[2]));
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
        "And a third one is as well"
    );
@@ -603,9 +591,9 @@ fn test_refs_announcement_no_subscribe() {
    alice.track_repo(&id, tracking::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
-
    alice.receive(&bob.addr(), bob.refs_announcement(id));
+
    alice.receive(bob.id(), bob.refs_announcement(id));

-
    assert!(alice.messages(&eve.addr()).next().is_none());
+
    assert!(alice.messages(eve.id()).next().is_none());
}

#[test]
@@ -621,7 +609,7 @@ fn test_inventory_relay() {
    alice.connect_to(&bob);
    alice.connect_from(&eve);
    alice.receive(
-
        &bob.addr(),
+
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: inv.clone(),
@@ -631,7 +619,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
@@ -640,13 +628,13 @@ fn test_inventory_relay() {
        if node == bob.node_id() && timestamp == now
    );
    assert_matches!(
-
        alice.messages(&bob.addr()).next(),
+
        alice.messages(bob.id()).next(),
        None,
        "The inventory is not sent back to Bob"
    );

    alice.receive(
-
        &bob.addr(),
+
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: inv.clone(),
@@ -656,13 +644,13 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        None,
        "Sending the same inventory again doesn't trigger a relay"
    );

    alice.receive(
-
        &bob.addr(),
+
        bob.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: inv.clone(),
@@ -672,7 +660,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(&eve.addr()).next(),
+
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
@@ -684,7 +672,7 @@ fn test_inventory_relay() {

    // Inventory from Eve relayed to Bob.
    alice.receive(
-
        &eve.addr(),
+
        eve.id(),
        Message::inventory(
            InventoryAnnouncement {
                inventory: inv,
@@ -694,7 +682,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(&bob.addr()).next(),
+
        alice.messages(bob.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
@@ -714,7 +702,7 @@ fn test_persistent_peer_reconnect() {
        MockStorage::empty(),
        peer::Config {
            config: Config {
-
                connect: vec![bob.address(), eve.address()],
+
                connect: vec![(bob.id(), bob.address()), (eve.id(), eve.address())],
                ..Config::default()
            },
            ..peer::Config::default()
@@ -733,10 +721,10 @@ fn test_persistent_peer_reconnect() {
    let ips = alice
        .sessions()
        .negotiated()
-
        .map(|(ip, _, _)| *ip)
+
        .map(|(id, _)| *id)
        .collect::<Vec<_>>();
-
    assert!(ips.contains(&bob.addr()));
-
    assert!(ips.contains(&eve.addr()));
+
    assert!(ips.contains(&bob.id()));
+
    assert!(ips.contains(&eve.id()));

    // ... Negotiated ...
    //
@@ -748,25 +736,25 @@ fn test_persistent_peer_reconnect() {
    // A non-transient disconnect, such as one requested by the user will not trigger
    // a reconnection.
    alice.disconnected(
-
        &eve.addr(),
+
        eve.id(),
        &nakamoto::DisconnectReason::DialError(error.clone()),
    );
    assert_matches!(alice.outbox().next(), None);

    for _ in 0..MAX_CONNECTION_ATTEMPTS {
        alice.disconnected(
-
            &bob.addr(),
+
            bob.id(),
            &nakamoto::DisconnectReason::ConnectionError(error.clone()),
        );
-
        assert_matches!(alice.outbox().next(), Some(Io::Connect(a)) if a == bob.addr());
+
        assert_matches!(alice.outbox().next(), Some(Io::Connect(a, _)) if a == bob.id());
        assert_matches!(alice.outbox().next(), None);

-
        alice.attempted(&bob.addr());
+
        alice.attempted(bob.id(), &bob.address());
    }

    // After the max connection attempts, a disconnect doesn't trigger a reconnect.
    alice.disconnected(
-
        &bob.addr(),
+
        bob.id(),
        &nakamoto::DisconnectReason::ConnectionError(error),
    );
    assert_matches!(alice.outbox().next(), None);
@@ -803,19 +791,19 @@ fn test_maintain_connections() {
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    for peer in connected.iter() {
        alice.disconnected(
-
            &peer.addr(),
+
            peer.id(),
            &nakamoto::DisconnectReason::ConnectionError(error.clone()),
        );

-
        let addr = alice
+
        let id = alice
            .outbox()
            .find_map(|o| match o {
-
                Io::Connect(addr) => Some(addr),
+
                Io::Connect(id, _) => Some(id),
                _ => None,
            })
            .expect("Alice connects to a new peer");
-
        assert!(addr != peer.addr());
-
        unconnected.retain(|p| p.addr() != addr);
+
        assert!(id != peer.id());
+
        unconnected.retain(|p| p.id() != id);
    }
    assert!(
        unconnected.is_empty(),
@@ -848,8 +836,8 @@ fn test_push_and_pull() {
    local::register(alice.storage().clone());

    // Alice and Bob connect to Eve.
-
    alice.command(service::Command::Connect(eve.addr()));
-
    bob.command(service::Command::Connect(eve.addr()));
+
    alice.command(service::Command::Connect(eve.id(), eve.address()));
+
    bob.command(service::Command::Connect(eve.id(), eve.address()));

    // Alice creates a new project.
    let (proj_id, _, _) = rad::init(
@@ -902,7 +890,7 @@ fn test_push_and_pull() {
        .unwrap()
        .is_some());
    assert_matches!(
-
        sim.events(&bob.ip).next(),
+
        sim.events(&bob.id).next(),
        Some(service::Event::RefsFetched { from, .. })
        if from == eve.node_id(),
        "Bob fetched from Eve"
@@ -947,10 +935,10 @@ fn prop_inventory_exchange_dense() {
        }

        // Fully-connected.
-
        bob.command(Command::Connect(alice.addr()));
-
        bob.command(Command::Connect(eve.addr()));
-
        eve.command(Command::Connect(alice.addr()));
-
        eve.command(Command::Connect(bob.addr()));
+
        bob.command(Command::Connect(alice.id(), alice.address()));
+
        bob.command(Command::Connect(eve.id(), eve.address()));
+
        eve.command(Command::Connect(alice.id(), alice.address()));
+
        eve.command(Command::Connect(bob.id(), bob.address()));

        let mut peers: HashMap<_, _> = [
            (alice.node_id(), alice),
modified radicle-node/src/wire.rs
@@ -1,33 +1,27 @@
-
pub mod message;
-
pub mod transcode;
+
mod message;
+
mod transport;

-
use std::collections::{BTreeMap, HashMap, VecDeque};
+
pub use message::{AddressType, MessageType};
+
pub use transport::Transport;
+

+
use std::collections::BTreeMap;
use std::convert::TryFrom;
-
use std::net;
use std::ops::Deref;
use std::string::FromUtf8Error;
use std::{io, mem};

use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
-
use nakamoto_net as nakamoto;
-
use nakamoto_net::{Link, LocalTime};

-
use crate::address;
use crate::crypto::hash::Digest;
-
use crate::crypto::{PublicKey, Signature, Signer, Unverified};
-
use crate::deserializer::Deserializer;
+
use crate::crypto::{PublicKey, Signature, Unverified};
use crate::git;
use crate::git::fmt;
use crate::identity::Id;
use crate::node;
use crate::prelude::*;
-
use crate::service;
-
use crate::service::reactor::Io;
-
use crate::service::{filter, routing, session};
+
use crate::service::filter;
use crate::storage::refs::Refs;
use crate::storage::refs::SignedRefs;
-
use crate::storage::WriteStorage;
-
use crate::wire::transcode::{Framer, Handshake, HandshakeResult, MuxMsg, Transcode};

/// The default type we use to represent sizes on the wire.
///
@@ -474,194 +468,6 @@ impl Decode for node::Features {
    }
}

-
#[derive(Debug)]
-
pub struct Inbox<T: Transcode> {
-
    pub pipeline: Framer<T>,
-
    pub deserializer: Deserializer,
-
}
-

-
#[derive(Debug)]
-
pub struct Wire<R, S, W, G, H: Handshake> {
-
    handshakes: HashMap<net::SocketAddr, H>,
-
    inner_queue: VecDeque<nakamoto::Io<service::Event, service::DisconnectReason>>,
-
    inboxes: HashMap<net::SocketAddr, Inbox<H::Transcoder>>,
-
    inner: service::Service<R, S, W, G>,
-
}
-

-
impl<R, S, W, G, H: Handshake> Wire<R, S, W, G, H> {
-
    pub fn new(inner: service::Service<R, S, W, G>) -> Self {
-
        Self {
-
            handshakes: HashMap::new(),
-
            inner_queue: Default::default(),
-
            inboxes: HashMap::new(),
-
            inner,
-
        }
-
    }
-
}
-

-
impl<R, S, W, G, H> nakamoto::Protocol for Wire<R, S, W, G, H>
-
where
-
    R: routing::Store,
-
    S: address::Store,
-
    W: WriteStorage + 'static,
-
    G: Signer,
-
    H: Handshake,
-
{
-
    type Event = service::Event;
-
    type Command = service::Command;
-
    type DisconnectReason = service::DisconnectReason;
-

-
    fn initialize(&mut self, time: LocalTime) {
-
        self.inner.initialize(time)
-
    }
-

-
    fn tick(&mut self, now: nakamoto::LocalTime) {
-
        self.inner.tick(now)
-
    }
-

-
    fn wake(&mut self) {
-
        self.inner.wake()
-
    }
-

-
    fn command(&mut self, cmd: Self::Command) {
-
        self.inner.command(cmd)
-
    }
-

-
    fn attempted(&mut self, addr: &std::net::SocketAddr) {
-
        self.inner.attempted(addr)
-
    }
-

-
    fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
-
        self.handshakes.insert(addr, H::new(link));
-
        self.inner.connecting(addr, local_addr, link)
-
    }
-

-
    fn disconnected(
-
        &mut self,
-
        addr: &net::SocketAddr,
-
        reason: nakamoto::DisconnectReason<service::DisconnectReason>,
-
    ) {
-
        self.handshakes.remove(addr);
-
        self.inboxes.remove(addr);
-
        self.inner.disconnected(addr, &reason)
-
    }
-

-
    fn received_bytes(&mut self, addr: &net::SocketAddr, raw_bytes: &[u8]) {
-
        if let Some(handshake) = self.handshakes.remove(addr) {
-
            debug_assert!(!self.inboxes.contains_key(addr));
-

-
            match handshake.step(raw_bytes) {
-
                HandshakeResult::Next(handshake, reply) => {
-
                    self.handshakes.insert(*addr, handshake);
-
                    if !reply.is_empty() {
-
                        self.inner_queue
-
                            .push_back(nakamoto::Io::Write(*addr, reply));
-
                    }
-
                    return;
-
                }
-
                HandshakeResult::Complete(transcoder, reply, link) => {
-
                    log::debug!("handshake with peer {} is complete", addr);
-
                    if !reply.is_empty() {
-
                        self.inner_queue
-
                            .push_back(nakamoto::Io::Write(*addr, reply));
-
                    }
-
                    let pipeline = Framer::new(transcoder);
-
                    self.inboxes.insert(
-
                        *addr,
-
                        Inbox {
-
                            pipeline,
-
                            deserializer: Deserializer::new(256),
-
                        },
-
                    );
-
                    self.inner.connected(*addr, link);
-
                }
-
                HandshakeResult::Error(err) => {
-
                    log::error!("invalid handshake input. Details: {}", err);
-
                    self.inner_queue.push_back(nakamoto::Io::Disconnect(
-
                        *addr,
-
                        service::DisconnectReason::Error(session::Error::Handshake(
-
                            err.to_string(),
-
                        )),
-
                    ));
-
                    return;
-
                }
-
            }
-
        }
-

-
        if let Some(Inbox {
-
            pipeline,
-
            deserializer,
-
        }) = self.inboxes.get_mut(addr)
-
        {
-
            pipeline.input(raw_bytes);
-
            for frame in pipeline {
-
                let Ok(msg) = MuxMsg::try_from(frame) else {
-
                    // TODO: Disconnect peer.
-
                    log::error!("Message frame with invalid channel structure from {}", addr);
-
                    return;
-
                };
-
                match msg.channel {
-
                    0 => deserializer.input(&msg.data),
-
                    1 => { /* TODO: Send to git worker */ }
-
                    wrong_channel => {
-
                        // TODO: Disconnect peer.
-
                        log::error!("Wrong message channel {} from peer {}", wrong_channel, addr);
-
                        return;
-
                    }
-
                };
-
            }
-

-
            for message in deserializer {
-
                match message {
-
                    Ok(msg) => self.inner.received_message(addr, msg),
-
                    Err(err) => {
-
                        // TODO: Disconnect peer.
-
                        log::error!("Invalid message received from {}: {}", addr, err);
-

-
                        return;
-
                    }
-
                }
-
            }
-
        } else {
-
            log::debug!("Received message from unknown peer {}", addr);
-
        }
-
    }
-
}
-

-
impl<R, S, W, G, H: Handshake> Iterator for Wire<R, S, W, G, H> {
-
    type Item = nakamoto::Io<service::Event, service::DisconnectReason>;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        if let Some(event) = self.inner_queue.pop_front() {
-
            return Some(event);
-
        }
-

-
        match self.inner.next() {
-
            Some(Io::Write(addr, msgs)) => {
-
                let mut buf = Vec::new();
-
                for msg in msgs {
-
                    log::debug!("Write {:?} to {}", &msg, addr.ip());
-

-
                    msg.encode(&mut buf)
-
                        .expect("writing to an in-memory buffer doesn't fail");
-
                }
-
                let Inbox { pipeline, .. } = self.inboxes.get_mut(&addr).expect(
-
                    "broken handshake implementation: data sent before handshake was complete",
-
                );
-
                let data = pipeline.frame(buf).expect("oversized data for a frame");
-
                let msg = MuxMsg { channel: 0, data };
-
                Some(nakamoto::Io::Write(addr, msg.into()))
-
            }
-
            Some(Io::Event(e)) => Some(nakamoto::Io::Event(e)),
-
            Some(Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
-
            Some(Io::Disconnect(a, r)) => Some(nakamoto::Io::Disconnect(a, r)),
-
            Some(Io::Wakeup(d)) => Some(nakamoto::Io::Wakeup(d)),
-

-
            None => None,
-
        }
-
    }
-
}
-

#[cfg(test)]
mod tests {
    use super::*;
modified radicle-node/src/wire/message.rs
@@ -1,11 +1,12 @@
use std::{io, mem, net};

use byteorder::{NetworkEndian, ReadBytesExt};
+
use cyphernet::addr::{Addr, HostAddr, NetAddr};

use crate::prelude::*;
-
use crate::service;
use crate::service::message::*;
use crate::wire;
+
use crate::wire::{Decode, Encode};

/// Message type.
#[repr(u16)]
@@ -64,6 +65,22 @@ impl Message {
    }
}

+
impl netservices::Frame for Message {
+
    type Error = wire::Error;
+

+
    fn unmarshall(mut reader: impl io::Read) -> Result<Option<Self>, Self::Error> {
+
        match Message::decode(&mut reader) {
+
            Ok(msg) => Ok(Some(msg)),
+
            Err(wire::Error::Io(_)) => Ok(None),
+
            Err(err) => Err(err),
+
        }
+
    }
+

+
    fn marshall(&self, mut writer: impl io::Write) -> Result<usize, Self::Error> {
+
        self.encode(&mut writer).map_err(wire::Error::from)
+
    }
+
}
+

/// Address type.
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -82,11 +99,12 @@ impl From<AddressType> for u8 {

impl From<&Address> for AddressType {
    fn from(a: &Address) -> Self {
-
        match a {
-
            Address::Ipv4 { .. } => AddressType::Ipv4,
-
            Address::Ipv6 { .. } => AddressType::Ipv6,
-
            Address::Hostname { .. } => AddressType::Hostname,
-
            Address::Onion { .. } => AddressType::Onion,
+
        match a.host {
+
            HostAddr::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
+
            HostAddr::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
+
            HostAddr::Dns(_) => AddressType::Hostname,
+
            HostAddr::Tor(_) => AddressType::Onion,
+
            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
        }
    }
}
@@ -169,11 +187,7 @@ impl wire::Encode for Message {
        let mut n = self.type_id().encode(writer)?;

        match self {
-
            Self::Initialize { id, version, addrs } => {
-
                n += id.encode(writer)?;
-
                n += version.encode(writer)?;
-
                n += addrs.encode(writer)?;
-
            }
+
            Self::Initialize {} => {}
            Self::Subscribe(Subscribe {
                filter,
                since,
@@ -216,13 +230,7 @@ impl wire::Decode for Message {
        let type_id = reader.read_u16::<NetworkEndian>()?;

        match MessageType::try_from(type_id) {
-
            Ok(MessageType::Initialize) => {
-
                let id = NodeId::decode(reader)?;
-
                let version = u32::decode(reader)?;
-
                let addrs = BoundedVec::<Address, { service::ADDRESS_LIMIT }>::decode(reader)?;
-

-
                Ok(Self::Initialize { id, version, addrs })
-
            }
+
            Ok(MessageType::Initialize) => Ok(Self::Initialize {}),
            Ok(MessageType::Subscribe) => {
                let filter = Filter::decode(reader)?;
                let since = Timestamp::decode(reader)?;
@@ -288,20 +296,21 @@ impl wire::Encode for Address {
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
        let mut n = 0;

-
        match self {
-
            Self::Ipv4 { ip, port } => {
+
        match self.host {
+
            HostAddr::Ip(net::IpAddr::V4(ip)) => {
                n += u8::from(AddressType::Ipv4).encode(writer)?;
                n += ip.octets().encode(writer)?;
-
                n += port.encode(writer)?;
            }
-
            Self::Ipv6 { ip, port } => {
+
            HostAddr::Ip(net::IpAddr::V6(ip)) => {
                n += u8::from(AddressType::Ipv6).encode(writer)?;
                n += ip.octets().encode(writer)?;
-
                n += port.encode(writer)?;
            }
-
            Self::Hostname { .. } => todo!(),
-
            Self::Onion { .. } => todo!(),
+
            _ => {
+
                todo!();
+
            }
        }
+
        n += self.port().encode(writer)?;
+

        Ok(n)
    }
}
@@ -309,21 +318,18 @@ impl wire::Encode for Address {
impl wire::Decode for Address {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
        let addrtype = reader.read_u8()?;
-

-
        match AddressType::try_from(addrtype) {
+
        let host = match AddressType::try_from(addrtype) {
            Ok(AddressType::Ipv4) => {
                let octets: [u8; 4] = wire::Decode::decode(reader)?;
                let ip = net::Ipv4Addr::from(octets);
-
                let port = u16::decode(reader)?;

-
                Ok(Self::Ipv4 { ip, port })
+
                HostAddr::Ip(net::IpAddr::V4(ip))
            }
            Ok(AddressType::Ipv6) => {
                let octets: [u8; 16] = wire::Decode::decode(reader)?;
                let ip = net::Ipv6Addr::from(octets);
-
                let port = u16::decode(reader)?;

-
                Ok(Self::Ipv6 { ip, port })
+
                HostAddr::Ip(net::IpAddr::V6(ip))
            }
            Ok(AddressType::Hostname) => {
                todo!();
@@ -331,8 +337,14 @@ impl wire::Decode for Address {
            Ok(AddressType::Onion) => {
                todo!();
            }
-
            Err(other) => Err(wire::Error::UnknownAddressType(other)),
-
        }
+
            Err(other) => return Err(wire::Error::UnknownAddressType(other)),
+
        };
+
        let port = u16::decode(reader)?;
+

+
        Ok(Self::from(NetAddr {
+
            host,
+
            port: Some(port),
+
        }))
    }
}

deleted radicle-node/src/wire/transcode.rs
@@ -1,223 +0,0 @@
-
use std::collections::VecDeque;
-
use std::convert::Infallible;
-
use std::io;
-
use std::io::Read;
-

-
use nakamoto_net::Link;
-

-
// TODO: Implement Try trait once stabilized
-
/// Result of a state-machine transition.
-
pub enum HandshakeResult<H: Handshake, T: Transcode> {
-
    /// Handshake is not completed; we proceed to the next handshake stage.
-
    Next(H, Vec<u8>),
-
    /// Handshake is completed; we now can communicate in a secure way.
-
    Complete(T, Vec<u8>, Link),
-
    /// Handshake has failed with some error.
-
    Error(H::Error),
-
}
-

-
/// State machine implementation of a handshake protocol which can be run by
-
/// peers.
-
pub trait Handshake: Sized {
-
    /// The resulting transcoder which will be constructed upon a successful
-
    /// handshake
-
    type Transcoder: Transcode;
-
    /// Errors which may happen during the handshake.
-
    type Error: std::error::Error;
-

-
    /// Create a new handshake state-machine.
-
    fn new(link: Link) -> Self;
-
    /// Advance the state-machine to the next state.
-
    fn step(self, input: &[u8]) -> HandshakeResult<Self, Self::Transcoder>;
-
    /// Returns direction of the handshake protocol.
-
    fn link(&self) -> Link;
-
}
-

-
/// Dumb handshake structure which runs void protocol.
-
#[derive(Debug)]
-
pub struct NoHandshake(Link);
-

-
impl Handshake for NoHandshake {
-
    type Transcoder = PlainTranscoder;
-
    type Error = Infallible;
-

-
    fn new(link: Link) -> Self {
-
        NoHandshake(link)
-
    }
-

-
    fn step(self, _input: &[u8]) -> HandshakeResult<Self, Self::Transcoder> {
-
        HandshakeResult::Complete(PlainTranscoder, vec![], self.0)
-
    }
-

-
    fn link(&self) -> Link {
-
        self.0
-
    }
-
}
-

-
/// Trait allowing transcoding a stream using some form of stream encryption and/or encoding.
-
pub trait Transcode {
-
    /// Decodes data received from the remote peer and update the internal state
-
    /// of the transcoder, if necessary.
-
    fn decode(&mut self, data: &[u8]) -> Vec<u8>;
-

-
    /// Encodes data before sending it to the remote peer.
-
    fn encode(&mut self, data: Vec<u8>) -> Vec<u8>;
-
}
-

-
/// Transcoder which does nothing.
-
#[derive(Debug, Default)]
-
pub struct PlainTranscoder;
-

-
impl Transcode for PlainTranscoder {
-
    fn decode(&mut self, data: &[u8]) -> Vec<u8> {
-
        data.to_vec()
-
    }
-

-
    fn encode(&mut self, data: Vec<u8>) -> Vec<u8> {
-
        data
-
    }
-
}
-

-
pub type Frame = Vec<u8>;
-

-
#[derive(Copy, Clone, Debug)]
-
pub struct OversizedData(usize);
-

-
#[derive(Debug, Default)]
-
pub struct Framer<T: Transcode> {
-
    input: VecDeque<u8>,
-
    inner: T,
-
}
-

-
impl<T: Transcode> Framer<T> {
-
    pub fn new(inner: T) -> Self {
-
        Framer {
-
            input: Default::default(),
-
            inner,
-
        }
-
    }
-

-
    pub fn input(&mut self, encoded: &[u8]) {
-
        self.input.extend(self.inner.decode(encoded));
-
    }
-

-
    pub fn frame(&mut self, decoded: Vec<u8>) -> Result<Frame, OversizedData> {
-
        let mut data = self.inner.encode(decoded);
-
        let len = data.len();
-
        let len = u8::try_from(len).map_err(|_| OversizedData(len))?;
-
        let len = len.to_be_bytes();
-
        let mut buf = Vec::with_capacity(data.len() + 2);
-

-
        buf.extend(len);
-
        buf.append(&mut data);
-

-
        Ok(buf)
-
    }
-
}
-

-
impl<T: Transcode> Iterator for Framer<T> {
-
    type Item = Frame;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        if self.input.len() < 2 {
-
            return None;
-
        }
-
        let mut len = [0u8; 2];
-
        self.input
-
            .read_exact(&mut len)
-
            .expect("the length is checked");
-

-
        let len = u16::from_be_bytes(len) as usize;
-
        if self.input.len() < 2 + len {
-
            return None;
-
        }
-
        self.input.pop_front();
-
        self.input.pop_front();
-

-
        let reminder = self.input.split_off(len);
-
        let mut data = vec![0u8; len];
-

-
        self.input.read_exact(&mut data).expect("checked length");
-
        self.input = reminder;
-

-
        Some(data)
-
    }
-
}
-

-
#[derive(Copy, Clone, Debug)]
-
pub struct ChannelError;
-

-
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
-
pub struct MuxMsg {
-
    pub channel: u16,
-
    pub data: Vec<u8>,
-
}
-

-
impl From<MuxMsg> for Frame {
-
    fn from(mut msg: MuxMsg) -> Self {
-
        let channel = msg.channel.to_be_bytes();
-
        let mut data = Vec::with_capacity(msg.data.len() + 2);
-

-
        data.extend(channel);
-
        data.append(&mut msg.data);
-
        data
-
    }
-
}
-

-
impl TryFrom<Frame> for MuxMsg {
-
    type Error = ChannelError;
-

-
    fn try_from(frame: Frame) -> Result<Self, Self::Error> {
-
        if frame.len() < 2 {
-
            return Err(ChannelError);
-
        }
-
        let mut channel = [0u8; 2];
-
        let mut cursor = io::Cursor::new(frame);
-

-
        cursor
-
            .read_exact(&mut channel)
-
            .expect("the length is checked");
-

-
        let channel = u16::from_be_bytes(channel);
-

-
        Ok(MuxMsg {
-
            channel,
-
            data: cursor.into_inner(),
-
        })
-
    }
-
}
-

-
#[cfg(test)]
-
mod test {
-
    use super::*;
-
    use crate::deserializer::Deserializer;
-

-
    #[test]
-
    fn decode() {
-
        let mut pipeline = Framer::new(PlainTranscoder);
-
        let mut deser = Deserializer::<String>::new(512);
-

-
        let data = [
-
            0x00, 0x04, 0x00, 0x00, b'a', b'b', 0x00, 0x07, 0x00, 0x01, b'M', b'a', b'x', b'i',
-
            b'm',
-
        ];
-
        let mut expected_payloads = [(0u16, b"ab".to_vec()), (1, b"Maxim".to_vec())].into_iter();
-
        let mut expected_msgs = ["ab", "Maxim"].into_iter();
-

-
        for byte in data {
-
            // Writing data byte by byte, ensuring that the reading is not broken
-
            pipeline.input(&[byte]);
-
            for frame in &mut pipeline {
-
                let msg = MuxMsg::try_from(frame).unwrap();
-
                let (channel, data) = expected_payloads.next().unwrap();
-
                deser.input(&data);
-
                assert_eq!(msg, MuxMsg { channel, data });
-
            }
-
        }
-

-
        for msg in deser {
-
            let msg = msg.unwrap();
-
            assert_eq!(msg, expected_msgs.next().unwrap());
-
        }
-
    }
-
}
added radicle-node/src/wire/transport.rs
@@ -0,0 +1,384 @@
+
//! Implementation of the transport protocol.
+
//!
+
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
+
//! The handshake itself is implemented in the external [`netservices`] crate.
+
use std::collections::VecDeque;
+
use std::os::unix::io::AsRawFd;
+
use std::os::unix::prelude::RawFd;
+
use std::sync::Arc;
+
use std::time::Duration;
+
use std::{io, net};
+

+
use cyphernet::addr::{Addr as _, HostAddr, PeerAddr};
+
use nakamoto_net::{DisconnectReason, Link, LocalTime};
+
use netservices::noise::NoiseXk;
+
use netservices::wire::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
+
use netservices::NetSession;
+

+
use radicle::collections::HashMap;
+
use radicle::crypto::Negotiator;
+
use radicle::node::NodeId;
+
use radicle::storage::WriteStorage;
+

+
use crate::crypto::Signer;
+
use crate::service::reactor::Io;
+
use crate::service::{routing, session, Message, Service};
+
use crate::{address, service};
+

+
/// The peer session type.
+
pub type Session<N> = NoiseXk<N>;
+

+
/// Reactor action.
+
type Action<G> = reactor::Action<NetAccept<Session<G>>, NetTransport<Session<G>, Message>>;
+

+
/// Peer connection state machine.
+
#[derive(Debug)]
+
enum Peer {
+
    /// The initial state before handshake is completed.
+
    Connecting { link: Link },
+
    /// The state after handshake is completed.
+
    /// Peers in this state are handled by the underlying service.
+
    Connected { link: Link, id: NodeId },
+
    /// The state after a peer was disconnected, either during handshake,
+
    /// or once connected.
+
    Disconnected {
+
        id: NodeId,
+
        reason: DisconnectReason<service::DisconnectReason>,
+
    },
+
}
+

+
impl Peer {
+
    /// Return a new connecting peer.
+
    fn connecting(link: Link) -> Self {
+
        Self::Connecting { link }
+
    }
+

+
    /// Switch to connected state.
+
    fn connected(&mut self, id: NodeId) {
+
        if let Self::Connecting { link } = self {
+
            *self = Self::Connected { link: *link, id };
+
        } else {
+
            panic!("Peer::connected: session for {} is already established", id);
+
        }
+
    }
+

+
    /// Switch to disconnected state.
+
    fn disconnected(&mut self, reason: DisconnectReason<service::DisconnectReason>) {
+
        if let Self::Connected { id, .. } = self {
+
            *self = Self::Disconnected { id: *id, reason };
+
        } else {
+
            panic!("Peer::disconnected: session is not connected");
+
        }
+
    }
+
}
+

+
/// Transport protocol implementation for a set of peers.
+
pub struct Transport<R, S, W, G: Negotiator> {
+
    /// Backing service instance.
+
    service: Service<R, S, W, G>,
+
    /// Used to performs X25519 key exchange.
+
    keypair: G,
+
    /// Internal queue of actions to send to the reactor.
+
    actions: VecDeque<Action<G>>,
+
    /// Peer sessions.
+
    peers: HashMap<RawFd, Peer>,
+
    /// SOCKS5 proxy address.
+
    proxy: net::SocketAddr,
+
}
+

+
impl<R, S, W, G> Transport<R, S, W, G>
+
where
+
    R: routing::Store,
+
    S: address::Store,
+
    W: WriteStorage + 'static,
+
    G: Signer + Negotiator,
+
{
+
    pub fn new(
+
        mut service: Service<R, S, W, G>,
+
        keypair: G,
+
        proxy: net::SocketAddr,
+
        clock: LocalTime,
+
    ) -> Self {
+
        service.initialize(clock);
+

+
        Self {
+
            service,
+
            keypair,
+
            proxy,
+
            actions: VecDeque::new(),
+
            peers: HashMap::default(),
+
        }
+
    }
+

+
    fn by_id(&self, id: &NodeId) -> RawFd {
+
        self.connected()
+
            .find(|(_, i)| *i == id)
+
            .map(|(fd, _)| fd)
+
            .unwrap()
+
    }
+

+
    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
+
        self.peers.iter().filter_map(|(fd, peer)| {
+
            if let Peer::Connected { id, .. } = peer {
+
                Some((*fd, id))
+
            } else {
+
                None
+
            }
+
        })
+
    }
+

+
    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason<service::DisconnectReason>) {
+
        let Some(peer) = self.peers.get_mut(&fd) else {
+
            log::error!(target: "transport", "Peer with fd {fd} was not found");
+
            return;
+
        };
+
        if let Peer::Disconnected { .. } = peer {
+
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
+
            return;
+
        };
+
        log::debug!(target: "transport", "Disconnecting peer with fd {} ({})..", fd, reason);
+
        peer.disconnected(reason);
+

+
        self.actions.push_back(Action::UnregisterTransport(fd));
+
    }
+
}
+

+
impl<R, S, W, G> reactor::Handler for Transport<R, S, W, G>
+
where
+
    R: routing::Store + Send,
+
    S: address::Store + Send,
+
    W: WriteStorage + Send + 'static,
+
    G: Signer + Negotiator + Send,
+
{
+
    type Listener = NetAccept<Session<G>>;
+
    type Transport = NetTransport<Session<G>, Message>;
+
    type Command = service::Command;
+

+
    fn handle_wakeup(&mut self) {
+
        self.service.wake()
+
    }
+

+
    fn handle_listener_event(
+
        &mut self,
+
        socket_addr: net::SocketAddr,
+
        event: ListenerEvent<Session<G>>,
+
        duration: Duration,
+
    ) {
+
        self.service.tick(LocalTime::from_secs(duration.as_secs()));
+

+
        match event {
+
            ListenerEvent::Accepted(session) => {
+
                log::debug!(
+
                    target: "transport",
+
                    "Accepted inbound peer connection from {}..",
+
                    session.transition_addr()
+
                );
+
                self.peers
+
                    .insert(session.as_raw_fd(), Peer::connecting(Link::Inbound));
+

+
                let transport = match NetTransport::<Session<G>, Message>::upgrade(session) {
+
                    Ok(transport) => transport,
+
                    Err(err) => {
+
                        log::error!(target: "transport", "Failed to upgrade accepted peer socket: {err}");
+
                        return;
+
                    }
+
                };
+
                self.service.accepted(socket_addr);
+
                self.actions
+
                    .push_back(reactor::Action::RegisterTransport(transport))
+
            }
+
            ListenerEvent::Error(err) => {
+
                log::error!(target: "transport", "Error listening for inbound connections: {err}");
+
            }
+
        }
+
    }
+

+
    fn handle_transport_event(
+
        &mut self,
+
        fd: RawFd,
+
        event: SessionEvent<Session<G>, Message>,
+
        duration: Duration,
+
    ) {
+
        self.service.tick(LocalTime::from_secs(duration.as_secs()));
+

+
        match event {
+
            SessionEvent::SessionEstablished(node_id) => {
+
                log::debug!(target: "transport", "Session established with {node_id}");
+

+
                let conflicting = self
+
                    .connected()
+
                    .filter(|(_, id)| **id == node_id)
+
                    .map(|(fd, _)| fd)
+
                    .collect::<Vec<_>>();
+

+
                for fd in conflicting {
+
                    log::warn!(
+
                        target: "transport", "Closing conflicting session with {node_id} (fd={fd})"
+
                    );
+
                    self.disconnect(
+
                        fd,
+
                        DisconnectReason::DialError(
+
                            io::Error::from(io::ErrorKind::AlreadyExists).into(),
+
                        ),
+
                    );
+
                }
+

+
                let Some(peer) = self.peers.get_mut(&fd) else {
+
                    log::error!(target: "transport", "Session not found for fd {fd}");
+
                    return;
+
                };
+
                let Peer::Connecting { link } = peer else {
+
                    log::error!(
+
                        target: "transport",
+
                        "Session for {node_id} was either not found, or in an invalid state"
+
                    );
+
                    return;
+
                };
+
                let link = *link;
+

+
                peer.connected(node_id);
+
                self.service.connected(node_id, link);
+
            }
+
            SessionEvent::Message(msg) => {
+
                if let Some(Peer::Connected { link, id }) = self.peers.get(&fd) {
+
                    log::debug!(
+
                        target: "transport", "Received message {:?} from {} ({:?})", msg, id, link
+
                    );
+
                    self.service.received_message(*id, msg);
+
                } else {
+
                    log::warn!(target: "transport", "Dropping message from unconnected peer with fd {fd}");
+
                }
+
            }
+
            SessionEvent::FrameFailure(_err) => {
+
                // TODO(cloudhead): Include error in reason.
+
                self.disconnect(
+
                    fd,
+
                    DisconnectReason::Protocol(service::DisconnectReason::Error(
+
                        session::Error::Misbehavior,
+
                    )),
+
                );
+
            }
+
            SessionEvent::ConnectionFailure(err) => {
+
                self.disconnect(fd, DisconnectReason::ConnectionError(Arc::new(err)));
+
            }
+
            SessionEvent::Disconnected => {
+
                self.disconnect(
+
                    fd,
+
                    DisconnectReason::Protocol(service::DisconnectReason::Peer),
+
                );
+
            }
+
        }
+
    }
+

+
    fn handle_command(&mut self, cmd: Self::Command) {
+
        self.service.command(cmd)
+
    }
+

+
    fn handle_error(&mut self, err: reactor::Error<net::SocketAddr, RawFd>) {
+
        match err {
+
            reactor::Error::ListenerUnknown(id) => {
+
                log::error!(target: "transport", "Received error: unknown listener {}", id);
+
            }
+
            reactor::Error::PeerUnknown(id) => {
+
                log::error!(target: "transport", "Received error: unknown peer {}", id);
+
            }
+
            reactor::Error::PeerDisconnected(fd, err) => {
+
                log::error!(target: "transport", "Received error: peer {} disconnected: {}", fd, err);
+

+
                self.actions.push_back(Action::UnregisterTransport(fd));
+
            }
+
        }
+
    }
+

+
    fn handover_listener(&mut self, _listener: Self::Listener) {
+
        panic!("Transport::handover_listener: listener handover is not supported");
+
    }
+

+
    fn handover_transport(&mut self, transport: Self::Transport) {
+
        let fd = transport.as_raw_fd();
+

+
        if let Some(Peer::Disconnected { id, reason }) = self.peers.get(&fd) {
+
            // Disconnect TCP stream.
+
            drop(transport);
+

+
            self.service.disconnected(*id, reason);
+
        } else {
+
            todo!("The handover protocol is not implemented");
+
        }
+
    }
+
}
+

+
impl<R, S, W, G> Iterator for Transport<R, S, W, G>
+
where
+
    R: routing::Store,
+
    S: address::Store,
+
    W: WriteStorage + 'static,
+
    G: Signer + Negotiator,
+
{
+
    type Item = reactor::Action<NetAccept<Session<G>>, NetTransport<Session<G>, Message>>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        if let Some(event) = self.actions.pop_front() {
+
            return Some(event);
+
        }
+

+
        while let Some(ev) = self.service.next() {
+
            match ev {
+
                Io::Write(node_id, msgs) => {
+
                    log::debug!(
+
                        target: "transport", "Sending {} message(s) to {}", msgs.len(), node_id
+
                    );
+
                    let fd = self.by_id(&node_id);
+

+
                    return Some(reactor::Action::Send(fd, msgs));
+
                }
+
                Io::Event(_e) => {
+
                    log::warn!(
+
                        target: "transport", "Events are not currently supported"
+
                    );
+
                }
+
                Io::Connect(node_id, addr) => {
+
                    let socket_addr = match addr.host {
+
                        HostAddr::Ip(ip) => net::SocketAddr::new(ip, addr.port()),
+
                        HostAddr::Dns(_) => todo!(),
+
                        _ => self.proxy,
+
                    };
+

+
                    if self.connected().any(|(_, id)| id == &node_id) {
+
                        log::error!(
+
                            target: "transport",
+
                            "Attempt to connect to already connected peer {node_id}"
+
                        );
+
                        break;
+
                    }
+

+
                    match NetTransport::<Session<G>, Message>::connect(
+
                        PeerAddr::new(node_id, socket_addr),
+
                        &self.keypair,
+
                    ) {
+
                        Ok(transport) => {
+
                            self.service.attempted(node_id, &socket_addr.into());
+
                            self.peers
+
                                .insert(transport.as_raw_fd(), Peer::connecting(Link::Outbound));
+

+
                            return Some(reactor::Action::RegisterTransport(transport));
+
                        }
+
                        Err(err) => {
+
                            self.service
+
                                .disconnected(node_id, &DisconnectReason::DialError(Arc::new(err)));
+
                            break;
+
                        }
+
                    }
+
                }
+
                Io::Disconnect(node_id, reason) => {
+
                    let fd = self.by_id(&node_id);
+
                    self.disconnect(fd, DisconnectReason::Protocol(reason));
+

+
                    return self.actions.pop_back();
+
                }
+
                Io::Wakeup(d) => return Some(reactor::Action::Wakeup(d.into())),
+
            }
+
        }
+
        None
+
    }
+
}
modified radicle/Cargo.toml
@@ -15,7 +15,7 @@ base64 = { version= "0.13" }
byteorder = { version = "1.4" }
crossbeam-channel = { version = "0.5.6" }
ed25519-compact = { version = "2.0.2", features = ["pem"] }
-
cyphernet = { version = "0", optional = true }
+
cyphernet = { version = "0" }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
multibase = { version = "0.9.1" }
@@ -49,7 +49,7 @@ version = "0"
[dependencies.radicle-crypto]
path = "../radicle-crypto"
version = "0"
-
features = ["git-ref-format", "ssh", "sqlite"]
+
features = ["git-ref-format", "ssh", "sqlite", "cyphernet"]

[dependencies.radicle-ssh]
path = "../radicle-ssh"
modified radicle/src/node.rs
@@ -1,9 +1,9 @@
mod features;

+
use std::io;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
-
use std::{io, net};

use crate::crypto::PublicKey;
use crate::identity::Id;
@@ -37,8 +37,6 @@ pub trait Handle {
    /// The error returned by all methods.
    type Error: std::error::Error;

-
    /// Wait for the node's listening socket to be bound.
-
    fn listening(&self) -> Result<net::SocketAddr, Self::Error>;
    /// Retrieve or update the project from network.
    fn fetch(&mut self, id: Id) -> Result<Self::FetchLookup, Self::Error>;
    /// Start tracking the given project. Doesn't do anything if the project is already
@@ -202,10 +200,6 @@ impl Handle for Node {
        todo!();
    }

-
    fn listening(&self) -> Result<net::SocketAddr, Error> {
-
        todo!();
-
    }
-

    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
        todo!();
    }