Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Complete Noise handshake integration
Alexis Sellier committed 3 years ago
commit 63e72488473741c516feb19a25b0e1753c3f1152
parent 1aba9812c701dbdddc84485cc8589e3f788bbd96
28 files changed +1121 -388
modified Cargo.lock
@@ -9,13 +9,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"

[[package]]
+
name = "aead"
+
version = "0.4.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
+
dependencies = [
+
 "generic-array",
+
]
+

+
[[package]]
+
name = "aead"
+
version = "0.5.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "5c192eb8f11fc081b0fe4259ba5af04217d4e0faddd02417310a927911abd7c8"
+
dependencies = [
+
 "crypto-common",
+
 "generic-array",
+
]
+

+
[[package]]
name = "aes"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433cfd6710c9986c576a25ca913c39d66a6474107b406f34f91d4a8923395241"
dependencies = [
 "cfg-if",
-
 "cipher",
+
 "cipher 0.4.3",
 "cpufeatures",
]

@@ -30,9 +49,9 @@ dependencies = [

[[package]]
name = "amplify"
-
version = "4.0.0-beta.6"
+
version = "4.0.0-beta.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "4ff4d466b0cb5124991d3a2085c28f2aed6d86264c3c0bdbd7d1b64a49b4726b"
+
checksum = "1cc8da6203c5311cc0d74155341a5577564b8360976988eeb5ce7ab6903757f1"
dependencies = [
 "amplify_derive",
 "amplify_num",
@@ -292,7 +311,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e412e2cd0f2b2d93e02543ceae7917b3c70331573df19ee046bcbc35e45e87d7"
dependencies = [
 "byteorder",
-
 "cipher",
+
 "cipher 0.4.3",
]

[[package]]
@@ -338,6 +357,55 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"

[[package]]
+
name = "chacha20"
+
version = "0.8.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "5c80e5460aa66fe3b91d40bcbdab953a597b60053e34d684ac6903f863b680a6"
+
dependencies = [
+
 "cfg-if",
+
 "cipher 0.3.0",
+
 "cpufeatures",
+
 "zeroize",
+
]
+

+
[[package]]
+
name = "chacha20"
+
version = "0.9.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "c7fc89c7c5b9e7a02dfe45cd2367bae382f9ed31c61ca8debe5f827c420a2f08"
+
dependencies = [
+
 "cfg-if",
+
 "cipher 0.4.3",
+
 "cpufeatures",
+
]
+

+
[[package]]
+
name = "chacha20poly1305"
+
version = "0.9.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a18446b09be63d457bbec447509e85f662f32952b035ce892290396bc0b0cff5"
+
dependencies = [
+
 "aead 0.4.3",
+
 "chacha20 0.8.2",
+
 "cipher 0.3.0",
+
 "poly1305 0.7.2",
+
 "zeroize",
+
]
+

+
[[package]]
+
name = "chacha20poly1305"
+
version = "0.10.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35"
+
dependencies = [
+
 "aead 0.5.1",
+
 "chacha20 0.9.0",
+
 "cipher 0.4.3",
+
 "poly1305 0.8.0",
+
 "zeroize",
+
]
+

+
[[package]]
name = "chrono"
version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -354,12 +422,22 @@ dependencies = [

[[package]]
name = "cipher"
+
version = "0.3.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
+
dependencies = [
+
 "generic-array",
+
]
+

+
[[package]]
+
name = "cipher"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e"
dependencies = [
 "crypto-common",
 "inout",
+
 "zeroize",
]

[[package]]
@@ -488,6 +566,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
 "generic-array",
+
 "rand_core 0.6.4",
 "typenum",
]

@@ -513,7 +592,7 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
-
 "cipher",
+
 "cipher 0.4.3",
]

[[package]]
@@ -574,17 +653,37 @@ dependencies = [
]

[[package]]
-
name = "cyphernet"
+
name = "cypheraddr"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-cyphernet#2284beac543326b79705909b96efdcd2ecb008d3"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet?rev=7ff92b1540d25658d4810d9ca274507685af0ba6#7ff92b1540d25658d4810d9ca274507685af0ba6"
dependencies = [
 "amplify",
 "base32",
+
 "cyphergraphy",
+
 "sha3",
+
]
+

+
[[package]]
+
name = "cyphergraphy"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet?rev=7ff92b1540d25658d4810d9ca274507685af0ba6#7ff92b1540d25658d4810d9ca274507685af0ba6"
+
dependencies = [
+
 "amplify",
 "ed25519-compact",
 "multibase",
-
 "serde",
-
 "sha3",
-
 "socks",
+
 "sha2 0.10.6",
+
]
+

+
[[package]]
+
name = "cyphernet"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet?rev=7ff92b1540d25658d4810d9ca274507685af0ba6#7ff92b1540d25658d4810d9ca274507685af0ba6"
+
dependencies = [
+
 "cypheraddr",
+
 "cyphergraphy",
+
 "eidolon",
+
 "noise-framework",
+
 "socks5-client",
]

[[package]]
@@ -720,6 +819,15 @@ dependencies = [
]

[[package]]
+
name = "eidolon"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet?rev=7ff92b1540d25658d4810d9ca274507685af0ba6#7ff92b1540d25658d4810d9ca274507685af0ba6"
+
dependencies = [
+
 "amplify",
+
 "cyphergraphy",
+
]
+

+
[[package]]
name = "elliptic-curve"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1336,7 +1444,7 @@ dependencies = [
[[package]]
name = "io-reactor"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#d8660d8d0697bfd5b3983a5c1ea202ea26294639"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices?rev=e48011b10db9f9575af3ff1343e077ba9175028d#e48011b10db9f9575af3ff1343e077ba9175028d"
dependencies = [
 "amplify",
 "crossbeam-channel",
@@ -1586,17 +1694,31 @@ dependencies = [
[[package]]
name = "netservices"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#d8660d8d0697bfd5b3983a5c1ea202ea26294639"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices?rev=e48011b10db9f9575af3ff1343e077ba9175028d#e48011b10db9f9575af3ff1343e077ba9175028d"
dependencies = [
 "amplify",
+
 "chacha20 0.9.0",
+
 "chacha20poly1305 0.9.1",
 "cyphernet",
+
 "ed25519-compact",
 "io-reactor",
 "libc",
 "log",
+
 "rand 0.8.5",
 "socket2",
]

[[package]]
+
name = "noise-framework"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet?rev=7ff92b1540d25658d4810d9ca274507685af0ba6#7ff92b1540d25658d4810d9ca274507685af0ba6"
+
dependencies = [
+
 "amplify",
+
 "chacha20poly1305 0.10.1",
+
 "cyphergraphy",
+
]
+

+
[[package]]
name = "nom"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1897,6 +2019,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"

[[package]]
+
name = "poly1305"
+
version = "0.7.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
+
dependencies = [
+
 "cpufeatures",
+
 "opaque-debug",
+
 "universal-hash 0.4.1",
+
]
+

+
[[package]]
+
name = "poly1305"
+
version = "0.8.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf"
+
dependencies = [
+
 "cpufeatures",
+
 "opaque-debug",
+
 "universal-hash 0.5.0",
+
]
+

+
[[package]]
name = "popol"
version = "1.0.0"
source = "git+https://github.com/Cyphernet-WG/popol?branch=api#0b78f5ef1c39741cfc67157b7d2c7a27064150b1"
@@ -2186,6 +2330,7 @@ dependencies = [
name = "radicle-node"
version = "0.2.0"
dependencies = [
+
 "amplify",
 "anyhow",
 "bloomy",
 "byteorder",
@@ -2478,7 +2623,7 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
dependencies = [
-
 "cipher",
+
 "cipher 0.4.3",
]

[[package]]
@@ -2736,14 +2881,12 @@ dependencies = [
]

[[package]]
-
name = "socks"
-
version = "0.3.4"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f0c3dbbd9ae980613c6dd8e28a9407b50509d3803b57624d5dfe8315218cd58b"
+
name = "socks5-client"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-cyphernet?rev=7ff92b1540d25658d4810d9ca274507685af0ba6#7ff92b1540d25658d4810d9ca274507685af0ba6"
dependencies = [
-
 "byteorder",
-
 "libc",
-
 "winapi",
+
 "amplify",
+
 "cypheraddr",
]

[[package]]
@@ -3267,6 +3410,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"

[[package]]
+
name = "universal-hash"
+
version = "0.4.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
+
dependencies = [
+
 "generic-array",
+
 "subtle",
+
]
+

+
[[package]]
+
name = "universal-hash"
+
version = "0.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7d3160b73c9a19f7e2939a2fdad446c57c1bbbbf4d919d3213ff1267a580d8b5"
+
dependencies = [
+
 "crypto-common",
+
 "subtle",
+
]
+

+
[[package]]
name = "url"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -35,14 +35,17 @@ version = "0.3.0"

[patch.crates-io.cyphernet]
git = "https://github.com/cyphernet-wg/rust-cyphernet"
+
rev = "7ff92b1540d25658d4810d9ca274507685af0ba6"
version = "0.1.0"

[patch.crates-io.io-reactor]
git = "https://github.com/cyphernet-wg/rust-netservices"
+
rev = "e48011b10db9f9575af3ff1343e077ba9175028d"
version = "0.1.0"

[patch.crates-io.netservices]
git = "https://github.com/cyphernet-wg/rust-netservices"
+
rev = "e48011b10db9f9575af3ff1343e077ba9175028d"
version = "0.1.0"

[patch.crates-io.radicle-git-ext]
modified radicle-crypto/src/lib.rs
@@ -2,6 +2,8 @@ use std::cmp::Ordering;
use std::sync::Arc;
use std::{fmt, ops::Deref, str::FromStr};

+
#[cfg(feature = "cyphernet")]
+
use cyphernet::{EcSigInvalid, EcSkInvalid, EcVerifyError};
use ed25519_compact as ed25519;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -24,19 +26,6 @@ pub struct Unverified;
/// Output of a Diffie-Hellman key exchange.
pub type SharedSecret = [u8; 32];

-
/// Trait alias used for Diffie–Hellman key exchange.
-
#[cfg(feature = "cyphernet")]
-
pub trait Negotiator:
-
    cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone + Send
-
{
-
}
-

-
#[cfg(feature = "cyphernet")]
-
impl<T> Negotiator for T where
-
    T: cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone + Send
-
{
-
}
-

/// Error returned if signing fails, eg. due to an HSM or KMS.
#[derive(Debug, Clone, Error)]
#[error(transparent)]
@@ -85,6 +74,12 @@ where
#[serde(into = "String", try_from = "String")]
pub struct Signature(pub ed25519::Signature);

+
impl AsRef<[u8]> for Signature {
+
    fn as_ref(&self) -> &[u8] {
+
        self.0.as_ref()
+
    }
+
}
+

impl fmt::Display for Signature {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let base = multibase::Base::Base58Btc;
@@ -171,19 +166,97 @@ impl PublicKey {
}

#[cfg(feature = "cyphernet")]
-
impl cyphernet::crypto::EcPk for PublicKey {
-
    // TODO: Change this once NoiseXK is working.
-
    fn generator() -> Self {
-
        use amplify::hex::FromHex;
-

-
        ed25519::PublicKey::from_slice(
-
            &Vec::<u8>::from_hex(
-
                "5866666666666666666666666666666666666666666666666666666666666666",
-
            )
-
            .unwrap(),
-
        )
-
        .unwrap()
-
        .into()
+
impl cyphernet::display::MultiDisplay<cyphernet::display::Encoding> for PublicKey {
+
    type Display = String;
+

+
    fn display_fmt(&self, _: &cyphernet::display::Encoding) -> Self::Display {
+
        self.to_string()
+
    }
+
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::display::MultiDisplay<cyphernet::display::Encoding> for Signature {
+
    type Display = String;
+

+
    fn display_fmt(&self, _: &cyphernet::display::Encoding) -> Self::Display {
+
        self.to_string()
+
    }
+
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::EcSk for SecretKey {
+
    type Pk = PublicKey;
+

+
    fn generate_keypair() -> (Self, Self::Pk)
+
    where
+
        Self: Sized,
+
    {
+
        let pair = KeyPair::generate();
+
        (pair.sk.into(), pair.pk.into())
+
    }
+

+
    fn to_pk(&self) -> Result<Self::Pk, EcSkInvalid> {
+
        Ok(self.public_key().into())
+
    }
+
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::EcSign for SecretKey {
+
    type Sig = Signature;
+

+
    fn sign(&self, msg: impl AsRef<[u8]>) -> Self::Sig {
+
        self.0.sign(msg, None).into()
+
    }
+
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::EcPk for PublicKey {
+
    const COMPRESSED_LEN: usize = 32;
+
    const CURVE_NAME: &'static str = "Ed25519";
+

+
    type Compressed = [u8; 32];
+

+
    fn base_point() -> Self {
+
        unimplemented!()
+
    }
+

+
    fn to_pk_compressed(&self) -> Self::Compressed {
+
        *self.0.deref()
+
    }
+

+
    fn from_pk_compressed(pk: Self::Compressed) -> Result<Self, cyphernet::EcPkInvalid> {
+
        Ok(PublicKey::from(pk))
+
    }
+

+
    fn from_pk_compressed_slice(slice: &[u8]) -> Result<Self, cyphernet::EcPkInvalid> {
+
        ed25519::PublicKey::from_slice(slice)
+
            .map_err(|_| cyphernet::EcPkInvalid::default())
+
            .map(Self)
+
    }
+
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::EcSig for Signature {
+
    const COMPRESSED_LEN: usize = 64;
+
    type Pk = PublicKey;
+
    type Compressed = [u8; 64];
+

+
    fn to_sig_compressed(&self) -> Self::Compressed {
+
        *self.0.deref()
+
    }
+

+
    fn from_sig_compressed(sig: Self::Compressed) -> Result<Self, EcSigInvalid> {
+
        Ok(Signature::from(sig))
+
    }
+

+
    fn from_sig_compressed_slice(slice: &[u8]) -> Result<Self, EcSigInvalid> {
+
        ed25519::Signature::from_slice(slice)
+
            .map_err(|_| EcSigInvalid::default())
+
            .map(Signature)
+
    }
+

+
    fn verify(&self, pk: &Self::Pk, msg: impl AsRef<[u8]>) -> Result<(), EcVerifyError> {
+
        self.0.verify(pk, msg)
    }
}

modified radicle-crypto/src/ssh/keystore.rs
@@ -1,7 +1,10 @@
+
use std::ops::Deref;
use std::os::unix::fs::DirBuilderExt;
use std::path::{Path, PathBuf};
use std::{fs, io};

+
#[cfg(feature = "cyphernet")]
+
use cyphernet::{EcSign, EcSk, EcSkInvalid};
use thiserror::Error;
use zeroize::Zeroizing;

@@ -132,7 +135,7 @@ pub enum MemorySignerError {
/// so that signing never fails.
///
/// Can be created from a [`Keystore`] with the [`MemorySigner::load`] function.
-
#[derive(Debug, Clone)]
+
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MemorySigner {
    public: PublicKey,
    secret: Zeroizing<SecretKey>,
@@ -144,34 +147,37 @@ impl Signer for MemorySigner {
    }

    fn sign(&self, msg: &[u8]) -> Signature {
-
        Signature(self.secret.sign(msg, None))
+
        Signature(self.secret.deref().deref().sign(msg, None))
    }

    fn try_sign(&self, msg: &[u8]) -> Result<Signature, SignerError> {
-
        Ok(self.sign(msg))
+
        Ok(Signer::sign(self, msg))
    }
}

#[cfg(feature = "cyphernet")]
-
impl cyphernet::crypto::Ecdh for MemorySigner {
-
    type Secret = crate::SharedSecret;
-
    type Err = ed25519_compact::Error;
+
impl EcSk for MemorySigner {
+
    type Pk = PublicKey;

-
    fn ecdh(&self, other: &PublicKey) -> Result<crate::SharedSecret, ed25519_compact::Error> {
-
        let pk = ed25519_compact::x25519::PublicKey::from_ed25519(other)?;
-
        let sk = ed25519_compact::x25519::SecretKey::from_ed25519(&self.secret)?;
-
        let ss = pk.dh(&sk)?;
+
    fn generate_keypair() -> (Self, Self::Pk)
+
    where
+
        Self: Sized,
+
    {
+
        // TODO(cloudhead): Do we need `EcSk` on `MemorySigner`?
+
        todo!()
+
    }

-
        Ok(*ss)
+
    fn to_pk(&self) -> Result<Self::Pk, EcSkInvalid> {
+
        Ok(*self.public_key())
    }
}

#[cfg(feature = "cyphernet")]
-
impl cyphernet::crypto::EcSk for MemorySigner {
-
    type Pk = PublicKey;
+
impl EcSign for MemorySigner {
+
    type Sig = Signature;

-
    fn to_pk(&self) -> Self::Pk {
-
        self.public
+
    fn sign(&self, msg: impl AsRef<[u8]>) -> Self::Sig {
+
        Signer::sign(self, msg.as_ref())
    }
}

modified radicle-crypto/src/test/signer.rs
@@ -71,22 +71,3 @@ impl Signer for MockSigner {
        Ok(self.sign(msg))
    }
}
-

-
#[cfg(feature = "cyphernet")]
-
impl cyphernet::crypto::Ecdh for MockSigner {
-
    type Secret = crate::SharedSecret;
-
    type Err = crate::Error;
-

-
    fn ecdh(&self, _pk: &Self::Pk) -> Result<Self::Secret, Self::Err> {
-
        Ok([0; 32])
-
    }
-
}
-

-
#[cfg(feature = "cyphernet")]
-
impl cyphernet::crypto::EcSk for MockSigner {
-
    type Pk = PublicKey;
-

-
    fn to_pk(&self) -> Self::Pk {
-
        self.pk
-
    }
-
}
modified radicle-node/Cargo.toml
@@ -9,13 +9,14 @@ edition = "2021"
test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "qcheck"]

[dependencies]
+
amplify = { version = "4.0.0-beta.7" }
anyhow = { version = "1" }
bloomy = { version = "1.2" }
byteorder = { version = "1" }
chrono = { version = "0.4.0" }
colored = { version = "1.9.0" }
crossbeam-channel = { version = "0.5.6" }
-
cyphernet = { version = "0", features = ["serde", "tor", "dns", "ed25519"] }
+
cyphernet = { version = "0", features = ["tor", "dns", "ed25519"] }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
lexopt = { version = "0.2.1" }
modified radicle-node/src/address/types.rs
@@ -9,7 +9,7 @@ use crate::collections::HashMap;
use crate::LocalTime;

/// A map with the ability to randomly select values.
-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
pub struct AddressBook<K, V> {
    inner: HashMap<K, V>,
    rng: fastrand::Rng,
modified radicle-node/src/client.rs
@@ -1,13 +1,15 @@
use std::io;
use std::{net, thread, time};

-
use netservices::resources::NetAccept;
+
use cyphernet::{Cert, EcSign};
+
use netservices::resource::NetAccept;
use reactor::poller::popol;
use reactor::Reactor;
use thiserror::Error;

use crate::address;
use crate::control;
+
use crate::crypto::Signature;
use crate::node::NodeId;
use crate::service::{routing, tracking};
use crate::wire::Transport;
@@ -50,16 +52,16 @@ pub enum Error {
}

/// Holds join handles to the client threads, as well as a client handle.
-
pub struct Runtime<G: crypto::Signer + crypto::Negotiator> {
+
pub struct Runtime<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone> {
    pub id: NodeId,
    pub handle: Handle<Transport<routing::Table, address::Book, radicle::Storage, G>>,
    pub control: thread::JoinHandle<Result<(), control::Error>>,
-
    pub reactor: Reactor<Transport<service::routing::Table, address::Book, radicle::Storage, G>>,
+
    pub reactor: Reactor<Transport<routing::Table, address::Book, radicle::Storage, G>>,
    pub pool: WorkerPool,
    pub local_addrs: Vec<net::SocketAddr>,
}

-
impl<G: crypto::Signer + crypto::Negotiator + 'static> Runtime<G> {
+
impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static> Runtime<G> {
    /// Run the client.
    ///
    /// This function spawns threads.
@@ -72,7 +74,6 @@ impl<G: crypto::Signer + crypto::Negotiator + 'static> Runtime<G> {
    ) -> Result<Runtime<G>, Error> {
        let id = *profile.id();
        let node = profile.node();
-
        let negotiator = signer.clone();
        let network = config.network;
        let rng = fastrand::Rng::new();
        let clock = LocalTime::now();
@@ -99,14 +100,25 @@ impl<G: crypto::Signer + crypto::Negotiator + 'static> Runtime<G> {
            storage.clone(),
            addresses,
            tracking,
-
            signer,
+
            signer.clone(),
            rng,
        );

+
        let cert = Cert {
+
            pk: id,
+
            sig: EcSign::sign(&signer, id.as_slice()),
+
        };
+

        let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<G>>();
-
        let pool = WorkerPool::with(10, time::Duration::from_secs(9), storage, worker_recv);
-
        let wire = Transport::new(service, worker_send, negotiator.clone(), proxy, clock);
-
        let reactor = Reactor::new(wire, popol::Poller::new())?;
+
        let pool = WorkerPool::with(
+
            10,
+
            time::Duration::from_secs(9),
+
            storage,
+
            worker_recv,
+
            id.to_human(),
+
        );
+
        let wire = Transport::new(service, worker_send, cert, signer, proxy, clock);
+
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
        let handle = Handle::from(reactor.controller());
        let control = thread::spawn({
            let handle = handle.clone();
@@ -116,7 +128,8 @@ impl<G: crypto::Signer + crypto::Negotiator + 'static> Runtime<G> {
        let mut local_addrs = Vec::new();

        for addr in listen {
-
            let listener = NetAccept::bind(addr, negotiator.clone())?;
+
            // TODO: Once the API supports it, we can pass an opaque type here.
+
            let listener = NetAccept::bind(&addr)?;
            let local_addr = listener.local_addr();

            local_addrs.push(local_addr);
modified radicle-node/src/client/handle.rs
@@ -6,7 +6,7 @@ use thiserror::Error;
use crate::identity::Id;
use crate::service;
use crate::service::{CommandError, FetchLookup, QueryState};
-
use crate::service::{NodeId, Session};
+
use crate::service::{NodeId, Sessions};

/// An error resulting from a handle method.
#[derive(Error, Debug)]
@@ -72,7 +72,7 @@ impl<T: reactor::Handler<Command = service::Command>> Handle<T> {
}

impl<T: reactor::Handler<Command = service::Command>> radicle::node::Handle for Handle<T> {
-
    type Session = Session;
+
    type Sessions = Sessions;
    type FetchLookup = FetchLookup;
    type Error = Error;

@@ -133,9 +133,19 @@ impl<T: reactor::Handler<Command = service::Command>> radicle::node::Handle for
        Ok(receiver)
    }

-
    fn sessions(&self) -> Result<chan::Receiver<(NodeId, Session)>, Error> {
-
        // TODO: This can be implemented once we have real peer sessions.
-
        todo!()
+
    fn sessions(&self) -> Result<Self::Sessions, Error> {
+
        let (sender, receiver) = chan::unbounded();
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            sender.send(state.sessions().clone()).ok();
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        let sessions = receiver.recv()?;
+

+
        Ok(sessions)
    }

    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
modified radicle-node/src/lib.rs
@@ -14,7 +14,8 @@ pub mod tests;
pub mod wire;
pub mod worker;

-
pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
+
pub use nakamoto_net::{Io, LocalDuration, LocalTime};
+
pub use netservices::LinkDirection as Link;
pub use radicle::{collections, crypto, git, identity, node, profile, rad, storage};

pub mod prelude {
modified radicle-node/src/main.rs
@@ -32,7 +32,7 @@ impl Options {
            match arg {
                Long("connect") => {
                    let peer: PeerAddr<NodeId, Address> = parser.value()?.parse()?;
-
                    connect.push((*peer.id(), peer.addr().clone()));
+
                    connect.push((peer.id, peer.addr.clone()));
                }
                Long("external-address") => {
                    let addr = parser.value()?.parse()?;
modified radicle-node/src/service.rs
@@ -1,4 +1,5 @@
#![allow(clippy::too_many_arguments)]
+
#![allow(clippy::collapsible_match)]
pub mod config;
pub mod filter;
pub mod message;
@@ -18,7 +19,6 @@ use fastrand::Rng;
use log::*;
use nakamoto::{LocalDuration, LocalTime};
use nakamoto_net as nakamoto;
-
use nakamoto_net::Link;
use nonempty::NonEmpty;
use radicle::node::{Address, Features};
use radicle::storage::{Namespaces, ReadStorage};
@@ -27,7 +27,7 @@ use crate::address;
use crate::address::AddressBook;
use crate::clock::Timestamp;
use crate::crypto;
-
use crate::crypto::{Negotiator, Signer, Verified};
+
use crate::crypto::{Signer, Verified};
use crate::git;
use crate::identity::{Doc, Id};
use crate::node;
@@ -37,6 +37,7 @@ use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::session::Protocol;
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};
+
use crate::Link;

pub use crate::node::NodeId;
pub use crate::service::config::{Config, Network};
@@ -107,6 +108,8 @@ pub enum FetchError {
    Fetch(#[from] storage::FetchError),
    #[error(transparent)]
    Io(#[from] io::Error),
+
    #[error(transparent)]
+
    Project(#[from] storage::ProjectError),
}

/// Result of looking up seeds in our routing table.
@@ -139,6 +142,16 @@ pub enum FetchResult {
    Error { from: NodeId, error: FetchError },
}

+
impl FetchResult {
+
    /// Get the remote node id.
+
    pub fn remote(&self) -> &NodeId {
+
        match self {
+
            Self::Fetched { from, .. } => from,
+
            Self::Error { from, .. } => from,
+
        }
+
    }
+
}
+

/// Function used to query internal service state.
pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;

@@ -248,7 +261,7 @@ where
    R: routing::Store,
    A: address::Store,
    S: WriteStorage + 'static,
-
    G: Signer + Negotiator,
+
    G: Signer,
{
    pub fn new(
        config: Config,
@@ -448,10 +461,9 @@ where

                    return;
                };
-
                log::debug!("Found {} seeds for {}", seeds.len(), id);
+
                log::debug!("Found {} seed(s) for {}", seeds.len(), id);

-
                // FIXME: Get results back to user.
-
                let (_, results) = chan::bounded(seeds.len());
+
                let (results_send, results) = chan::bounded(seeds.len());
                resp.send(FetchLookup::Found {
                    seeds: seeds.clone(),
                    results,
@@ -461,7 +473,7 @@ where
                // TODO: Limit the number of seeds we fetch from? Randomize?
                for seed in seeds {
                    let session = self.sessions.get_mut(&seed).unwrap();
-
                    if let Some(fetch) = session.fetch(id) {
+
                    if let Some(fetch) = session.fetch(id, results_send.clone()) {
                        self.reactor.write(session.id, fetch);
                        self.reactor
                            .fetch(session.id, id, Namespaces::default(), true);
@@ -509,9 +521,21 @@ where
        }
    }

-
    pub fn fetch_complete(&mut self, _result: FetchResult) {
+
    pub fn fetch_complete(&mut self, result: FetchResult) {
        // TODO(cloudhead): handle completed job with service business logic
        // TODO: Downgrade session to gossip protocol.
+
        if let Some(session) = self.sessions.get_mut(result.remote()) {
+
            if let session::State::Connected { protocol, .. } = &session.state {
+
                if let session::Protocol::Fetch {
+
                    results: Some(results),
+
                } = protocol
+
                {
+
                    results.send(result).unwrap();
+
                } else {
+
                    // Fetch initiated by remote, we don't need to report back.
+
                }
+
            }
+
        }
    }

    pub fn accepted(&mut self, _addr: net::SocketAddr) {
@@ -529,7 +553,7 @@ where
    }

    pub fn connected(&mut self, remote: NodeId, link: Link) {
-
        debug!("Connected to {} ({:?})", remote, link);
+
        info!("Connected to {} ({:?})", remote, link);

        // For outbound connections, we are the first to say "Hello".
        // For inbound connections, we wait for the remote to say "Hello" first.
@@ -799,7 +823,13 @@ where
        debug!("Received message {:?} from {}", &message, peer.id);

        match (&mut peer.state, message) {
-
            (session::State::Connected { protocol, .. }, _) if *protocol == Protocol::Fetch => {
+
            (
+
                session::State::Connected {
+
                    protocol: session::Protocol::Fetch { .. },
+
                    ..
+
                },
+
                _,
+
            ) => {
                // This should never happen if the service is properly configured, since all
                // incoming data is sent directly to the Git worker.
                log::error!("Received gossip message from {remote} during Git fetch");
@@ -882,8 +912,8 @@ where
                }
            }
            (session::State::Connected { protocol, .. }, Message::Fetch { repo }) => {
-
                *protocol = Protocol::Fetch;
-
                // All we need is to instruct the transport to handover to the worker
+
                *protocol = Protocol::Fetch { results: None };
+
                // Instruct the transport to handover the socket to the worker.
                self.reactor
                    .fetch(*remote, repo, Namespaces::default(), false);
            }
@@ -1218,7 +1248,7 @@ impl Node {
    }
}

-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
/// Holds currently (or recently) connected peers.
pub struct Sessions(AddressBook<NodeId, Session>);

modified radicle-node/src/service/message.rs
@@ -391,7 +391,7 @@ impl fmt::Debug for Message {
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {:?})", zeroes),
            Self::Pong { zeroes } => write!(f, "Pong({:?})", zeroes),
-
            Self::Fetch { repo } => write!(f, "Upgrade({repo})"),
+
            Self::Fetch { repo } => write!(f, "Fetch({repo})"),
        }
    }
}
modified radicle-node/src/service/reactor.rs
@@ -79,8 +79,12 @@ impl Reactor {
    }

    pub fn write_all(&mut self, remote: NodeId, msgs: impl IntoIterator<Item = Message>) {
-
        self.io
-
            .push_back(Io::Write(remote, msgs.into_iter().collect()));
+
        let msgs = msgs.into_iter().collect::<Vec<_>>();
+
        let len = msgs.len();
+
        for (no, msg) in msgs.iter().enumerate() {
+
            debug!("Write {no}/{len} {:?} message to {}", msg, remote);
+
        }
+
        self.io.push_back(Io::Write(remote, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
@@ -89,7 +93,7 @@ impl Reactor {

    pub fn fetch(&mut self, remote: NodeId, repo: Id, namespaces: Namespaces, initiated: bool) {
        if initiated {
-
            debug!("Fetch initiated for {} from {}..", repo, remote);
+
            debug!("Fetch initiated for {} with {}..", repo, remote);
        } else {
            debug!("Fetch requested for {} from {}..", repo, remote);
        }
modified radicle-node/src/service/session.rs
@@ -1,7 +1,9 @@
+
use crate::service::chan;
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::storage;
-
use crate::service::{Id, Link, LocalTime, NodeId, Reactor, Rng};
+
use crate::service::{storage, FetchResult};
+
use crate::service::{Id, LocalTime, NodeId, Reactor, Rng};
+
use crate::Link;

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub enum PingState {
@@ -15,7 +17,7 @@ pub enum PingState {
}

/// Session protocol.
-
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+
#[derive(Debug, Default, Clone)]
pub enum Protocol {
    /// The default message-based gossip protocol.
    #[default]
@@ -23,7 +25,12 @@ pub enum Protocol {
    /// Git smart protocol. Used for fetching repository data.
    /// This protocol is used after a connection upgrade via the
    /// [`Message::Fetch`] message.
-
    Fetch,
+
    Fetch {
+
        /// Channel to send fetch results on. Set to `Some` when the fetch
+
        /// is locally initiated. Otherwise, no results need to be communicated
+
        /// back.
+
        results: Option<chan::Sender<FetchResult>>,
+
    },
}

#[derive(Debug, Clone)]
@@ -138,10 +145,12 @@ impl Session {
        self.attempts += 1;
    }

-
    pub fn fetch(&mut self, repo: Id) -> Option<Message> {
+
    pub fn fetch(&mut self, repo: Id, results: chan::Sender<FetchResult>) -> Option<Message> {
        if let State::Connected { protocol, .. } = &mut self.state {
-
            if *protocol == Protocol::Gossip {
-
                *protocol = Protocol::Fetch;
+
            if let Protocol::Gossip = protocol {
+
                *protocol = Protocol::Fetch {
+
                    results: Some(results),
+
                };
                return Some(Message::Fetch { repo });
            } else {
                log::error!(
modified radicle-node/src/test/handle.rs
@@ -18,7 +18,7 @@ pub struct Handle {

impl radicle::node::Handle for Handle {
    type Error = Error;
-
    type Session = service::Session;
+
    type Sessions = service::Sessions;
    type FetchLookup = FetchLookup;

    fn connect(&mut self, _node: NodeId, _addr: radicle::node::Address) -> Result<(), Error> {
@@ -55,7 +55,7 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

-
    fn sessions(&self) -> Result<chan::Receiver<(service::NodeId, service::Session)>, Error> {
+
    fn sessions(&self) -> Result<Self::Sessions, Error> {
        unimplemented!();
    }

modified radicle-node/src/test/logger.rs
@@ -14,14 +14,20 @@ impl Log for Logger {

        match record.target() {
            "test" => {
-
                println!("{} {}", "test:".cyan(), record.args().to_string().yellow())
+
                println!("{} {}", "test:".cyan(), record.args().to_string().cyan())
            }
            "sim" => {
                println!("{}  {}", "sim:".bold(), record.args().to_string().bold())
            }
            target => {
                if self.enabled(record.metadata()) {
-
                    let s = format!("{:<8} {}", format!("{}:", target), record.args());
+
                    let current = std::thread::current();
+
                    let msg = format!("{:<8} {}", format!("{}:", target), record.args());
+
                    let s = if let Some(name) = current.name() {
+
                        format!("{:<8} {msg}", name)
+
                    } else {
+
                        msg
+
                    };
                    match record.level() {
                        log::Level::Warn => {
                            println!("{}", s.yellow());
modified radicle-node/src/test/peer.rs
@@ -9,7 +9,7 @@ use crate::address;
use crate::address::Store;
use crate::clock::Timestamp;
use crate::crypto::test::signer::MockSigner;
-
use crate::crypto::{Negotiator, Signer};
+
use crate::crypto::Signer;
use crate::identity::Id;
use crate::node;
use crate::prelude::*;
@@ -22,7 +22,8 @@ use crate::storage::{RemoteId, WriteStorage};
use crate::test::arbitrary;
use crate::test::simulator;
use crate::test::storage::MockStorage;
-
use crate::{Link, LocalDuration, LocalTime};
+
use crate::Link;
+
use crate::{LocalDuration, LocalTime};

/// Service instantiation used for testing.
pub type Service<S, G> = service::Service<routing::Table, address::Book, S, G>;
@@ -42,7 +43,7 @@ pub struct Peer<S, G> {
impl<S, G> simulator::Peer<S, G> for Peer<S, G>
where
    S: WriteStorage + 'static,
-
    G: Signer + Negotiator + 'static,
+
    G: Signer + 'static,
{
    fn init(&mut self) {
        self.initialize()
@@ -103,7 +104,7 @@ impl Default for Config<MockSigner> {
impl<S, G> Peer<S, G>
where
    S: WriteStorage + 'static,
-
    G: Signer + Negotiator + 'static,
+
    G: Signer + 'static,
{
    pub fn config(
        name: &'static str,
modified radicle-node/src/test/simulator.rs
@@ -10,14 +10,15 @@ use std::sync::Arc;
use std::{fmt, io};

use log::*;
-
use nakamoto_net::{Link, LocalDuration, LocalTime};
+
use nakamoto_net::{LocalDuration, LocalTime};

-
use crate::crypto::{Negotiator, Signer};
+
use crate::crypto::Signer;
use crate::prelude::Address;
use crate::service::reactor::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
use crate::storage::WriteStorage;
use crate::test::peer::Service;
+
use crate::Link;

/// Minimum latency between peers.
pub const MIN_LATENCY: LocalDuration = LocalDuration::from_millis(1);
@@ -190,7 +191,7 @@ pub struct Simulation<S, G> {
    signer: PhantomData<G>,
}

-
impl<S: WriteStorage + 'static, G: Signer + Negotiator> Simulation<S, G> {
+
impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
    /// Create a new simulation.
    pub fn new(time: LocalTime, rng: fastrand::Rng, opts: Options) -> Self {
        Self {
modified radicle-node/src/tests/e2e.rs
@@ -1,131 +1,319 @@
-
use std::path::{Path, PathBuf};
+
use std::path::Path;
use std::{
    collections::{BTreeMap, BTreeSet},
-
    net, thread,
+
    iter, net, thread,
+
    time::Duration,
};

use radicle::crypto::ssh::keystore::MemorySigner;
use radicle::git::refname;
use radicle::identity::Id;
use radicle::node::Handle;
-
use radicle::rad;
+
use radicle::storage::WriteStorage;
use radicle::test::fixtures;
use radicle::Profile;
use radicle::Storage;
+
use radicle::{assert_matches, rad};

use crate::address;
use crate::node::NodeId;
-
use crate::service::routing;
+
use crate::service::{routing, FetchLookup, FetchResult};
use crate::storage::git::transport;
+
use crate::test::logger;
use crate::wire::Transport;
use crate::{client, client::Runtime, service};

-
type TestHandle = (
-
    client::handle::Handle<Transport<routing::Table, address::Book, Storage, MemorySigner>>,
-
    thread::JoinHandle<Result<(), client::Error>>,
-
);
-

-
/// Populate a storage instance with a project.
-
fn populate(storage: &Storage, signer: &MemorySigner) {
-
    transport::local::register(storage.clone());
-

-
    let tmp = tempfile::tempdir().unwrap();
-
    let (repo, _) = fixtures::repository(tmp.path().join("acme"));
-

-
    rad::init(
-
        &repo,
-
        "acme",
-
        "Acme's Repo",
-
        refname!("master"),
-
        signer,
-
        storage,
-
    )
-
    .unwrap();
+
/// Represents a running node.
+
struct Node {
+
    id: NodeId,
+
    addr: net::SocketAddr,
+
    handle: client::handle::Handle<Transport<routing::Table, address::Book, Storage, MemorySigner>>,
+
    signer: MemorySigner,
+
    storage: Storage,
+
    #[allow(dead_code)]
+
    thread: thread::JoinHandle<Result<(), client::Error>>,
}

-
/// Create a node runtime.
-
fn runtime(home: &Path, config: service::Config) -> Runtime<MemorySigner> {
-
    let profile = Profile::init(home, "pasphrase".to_owned()).unwrap();
-
    let signer = MemorySigner::gen();
-
    let listen = vec![([0, 0, 0, 0], 0).into()];
-
    let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
+
impl Node {
+
    /// Spawn a node in its own thread.
+
    fn spawn(base: &Path, config: service::Config) -> Self {
+
        let home = base.join(
+
            iter::repeat_with(fastrand::alphanumeric)
+
                .take(8)
+
                .collect::<String>(),
+
        );

-
    populate(&profile.storage, &signer);
+
        let profile = Profile::init(home.as_path(), "pasphrase".to_owned()).unwrap();
+
        let signer = MemorySigner::load(&profile.keystore, "pasphrase".to_owned().into()).unwrap();
+
        let listen = vec![([0, 0, 0, 0], 0).into()];
+
        let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
+
        let storage = profile.storage.clone();

-
    Runtime::with(profile, config, listen, proxy, signer).unwrap()
-
}
+
        let rt = Runtime::with(profile, config, listen, proxy, signer.clone()).unwrap();

-
/// Create a network of nodes connected to each other.
-
fn network(
-
    configs: impl IntoIterator<Item = (service::Config, PathBuf)>,
-
) -> BTreeMap<(NodeId, net::SocketAddr), TestHandle> {
-
    let mut runtimes = BTreeMap::new();
-
    for (config, home) in configs.into_iter() {
-
        let rt = runtime(home.as_ref(), config);
-
        let id = rt.id;
        let addr = *rt.local_addrs.first().unwrap();
+
        let id = rt.id;
        let handle = rt.handle.clone();
-
        let join = thread::spawn(|| rt.run());
+
        let thread = thread::Builder::new()
+
            .name(id.to_string())
+
            .spawn(|| rt.run())
+
            .unwrap();

-
        runtimes.insert((id, addr), (handle, join));
+
        Self {
+
            id,
+
            addr,
+
            handle,
+
            signer,
+
            storage,
+
            thread,
+
        }
    }

-
    let mut connect = Vec::new();
-
    for (i, (from, _)) in runtimes.iter().enumerate() {
-
        let peers = runtimes
-
            .iter()
-
            .skip(i + 1)
-
            .map(|(p, _)| *p)
-
            .collect::<Vec<(NodeId, net::SocketAddr)>>();
-
        for to in peers {
-
            connect.push((*from, to));
+
    /// Connect this node to another node, and wait for the connection to be established both ways.
+
    fn connect(&mut self, remote: &Node) {
+
        self.handle.connect(remote.id, remote.addr.into()).unwrap();
+

+
        loop {
+
            let local_sessions = self.handle.sessions().unwrap();
+
            let remote_sessions = remote.handle.sessions().unwrap();
+

+
            let local_sessions = local_sessions
+
                .negotiated()
+
                .map(|(id, _)| id)
+
                .collect::<BTreeSet<_>>();
+
            let remote_sessions = remote_sessions
+
                .negotiated()
+
                .map(|(id, _)| id)
+
                .collect::<BTreeSet<_>>();
+

+
            if local_sessions.contains(&remote.id) && remote_sessions.contains(&self.id) {
+
                break;
+
            }
+
            thread::sleep(Duration::from_millis(100));
        }
    }

-
    for (from, (to_id, to_addr)) in connect {
-
        let (handle, _) = runtimes.get_mut(&from).unwrap();
-
        handle.connect(to_id, to_addr.into()).unwrap();
+
    /// Populate a storage instance with a project.
+
    fn project(&mut self, name: &str) -> Id {
+
        transport::local::register(self.storage.clone());
+

+
        let tmp = tempfile::tempdir().unwrap();
+
        let (repo, _) = fixtures::gen::repository(tmp.path());
+
        let description = iter::repeat_with(fastrand::alphabetic)
+
            .take(12)
+
            .collect::<String>();
+
        let id = rad::init(
+
            &repo,
+
            name,
+
            &description,
+
            refname!("master"),
+
            &self.signer,
+
            &self.storage,
+
        )
+
        .map(|(id, _, _)| id)
+
        .unwrap();
+

+
        log::debug!(target: "test", "Initialized project {id} for node {}", self.id);
+

+
        id
    }
-
    runtimes
}

/// Checks whether the nodes have converged in their routing tables.
#[track_caller]
-
fn check(
-
    nodes: impl IntoIterator<Item = ((NodeId, net::SocketAddr), TestHandle)>,
-
) -> BTreeSet<(Id, NodeId)> {
-
    let mut by_node = BTreeMap::<NodeId, BTreeSet<(Id, NodeId)>>::new();
-
    let mut all = BTreeSet::<(Id, NodeId)>::new();
-

-
    for ((id, _), (handle, _)) in nodes {
-
        let routing = handle.routing().unwrap();
-

-
        for (rid, node) in routing.try_iter() {
-
            all.insert((rid, node));
-
            by_node
-
                .entry(id)
-
                .or_insert_with(BTreeSet::new)
-
                .insert((rid, node));
+
fn check<'a>(nodes: impl IntoIterator<Item = &'a Node>) -> BTreeSet<(Id, NodeId)> {
+
    let nodes = nodes.into_iter().collect::<Vec<_>>();
+

+
    let mut all_routes = BTreeSet::<(Id, NodeId)>::new();
+
    let mut remaining = BTreeMap::from_iter(nodes.iter().map(|node| (node.id, node)));
+

+
    // First build the set of all routes.
+
    for node in &nodes {
+
        let routing = node.handle.routing().unwrap();
+

+
        for (rid, seed) in routing.try_iter() {
+
            all_routes.insert((rid, seed));
        }
    }

-
    for (node, routes) in by_node {
-
        assert_eq!(routes, all, "{node} failed to converge");
+
    // Then, while there are nodes remaining to converge, check each node to see if
+
    // its routing table has all routes. If so, remove it from the remaining nodes.
+
    while !remaining.is_empty() {
+
        remaining.retain(|_, node| {
+
            let routing = node.handle.routing().unwrap();
+
            let routes = BTreeSet::from_iter(routing.try_iter());
+

+
            if routes == all_routes {
+
                log::debug!(target: "test", "Node {} has converged", node.id);
+
                return false;
+
            }
+
            true
+
        });
+
        thread::sleep(Duration::from_millis(100));
    }
-
    all
+
    all_routes
}

#[test]
-
fn test_e2e() {
+
//
+
//     alice -- bob
+
//
+
fn test_inventory_sync_basic() {
+
    logger::init(log::Level::Debug);
+

    let tmp = tempfile::tempdir().unwrap();
-
    let base = tmp.path();
-
    let nodes = network(vec![
-
        (service::Config::default(), base.join("alice")),
-
        (service::Config::default(), base.join("bob")),
-
    ]);
-
    // TODO: Find a better way to wait for synchronization, eg. using events, or using a loop.
-
    thread::sleep(std::time::Duration::from_secs(3));
-

-
    let routes = check(nodes);
+

+
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
+
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+

+
    alice.project("alice");
+
    bob.project("bob");
+
    alice.connect(&bob);
+

+
    let routes = check([&alice, &bob]);
    assert_eq!(routes.len(), 2);
}
+

+
#[test]
+
//
+
//     alice -- bob -- eve
+
//
+
fn test_inventory_sync_bridge() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+

+
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
+
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+
    let mut eve = Node::spawn(tmp.path(), service::Config::default());
+

+
    alice.project("alice");
+
    bob.project("bob");
+
    eve.project("eve");
+

+
    alice.connect(&bob);
+
    bob.connect(&eve);
+

+
    let routes = check([&alice, &bob, &eve]);
+
    assert_eq!(routes.len(), 3);
+
}
+

+
#[test]
+
//
+
//     alice -- bob
+
//       |       |
+
//     carol -- eve
+
//
+
fn test_inventory_sync_ring() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+

+
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
+
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+
    let mut eve = Node::spawn(tmp.path(), service::Config::default());
+
    let mut carol = Node::spawn(tmp.path(), service::Config::default());
+

+
    alice.project("alice");
+
    bob.project("bob");
+
    eve.project("eve");
+
    carol.project("carol");
+

+
    alice.connect(&bob);
+
    bob.connect(&eve);
+
    eve.connect(&carol);
+
    carol.connect(&alice);
+

+
    let routes = check([&alice, &bob, &eve, &carol]);
+
    assert_eq!(routes.len(), 4);
+
}
+

+
#[test]
+
//
+
//             dave
+
//              |
+
//     eve -- alice -- bob
+
//              |
+
//            carol
+
//
+
fn test_inventory_sync_star() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+

+
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
+
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+
    let mut eve = Node::spawn(tmp.path(), service::Config::default());
+
    let mut carol = Node::spawn(tmp.path(), service::Config::default());
+
    let mut dave = Node::spawn(tmp.path(), service::Config::default());
+

+
    alice.project("alice");
+
    bob.project("bob");
+
    eve.project("eve");
+
    carol.project("carol");
+
    dave.project("dave");
+

+
    bob.connect(&alice);
+
    eve.connect(&alice);
+
    carol.connect(&alice);
+
    dave.connect(&alice);
+

+
    let routes = check([&alice, &bob, &eve, &carol, &dave]);
+
    assert_eq!(routes.len(), 5);
+
}
+

+
#[test]
+
#[ignore]
+
fn test_replication() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Node::spawn(tmp.path(), service::Config::default());
+
    let mut bob = Node::spawn(tmp.path(), service::Config::default());
+
    let acme = bob.project("acme");
+

+
    alice.connect(&bob);
+
    check([&alice, &bob]);
+

+
    let inventory = alice.handle.inventory().unwrap();
+
    assert!(inventory.try_iter().next().is_none());
+

+
    let tracked = alice.handle.track_repo(acme).unwrap();
+
    assert!(tracked);
+

+
    let (seeds, results) = match alice.handle.fetch(acme).unwrap() {
+
        FetchLookup::Found { seeds, results } => (seeds, results),
+
        other => panic!("Fetch lookup failed, got {:?}", other),
+
    };
+
    assert_eq!(seeds, nonempty::NonEmpty::new(bob.id));
+

+
    let (from, updated) = match results.recv_timeout(Duration::from_secs(6)).unwrap() {
+
        FetchResult::Fetched { from, updated } => (from, updated),
+
        FetchResult::Error { from, error } => {
+
            panic!("Fetch failed from {from}: {error}");
+
        }
+
    };
+
    assert_eq!(from, bob.id);
+
    assert_eq!(updated, vec![]);
+

+
    let inventory = alice.handle.inventory().unwrap();
+
    assert_eq!(inventory.try_iter().next(), Some(acme));
+
    assert_eq!(
+
        alice
+
            .storage
+
            .repository(acme)
+
            .unwrap()
+
            .remotes()
+
            .unwrap()
+
            .map(|r| r.unwrap())
+
            .collect::<Vec<_>>(),
+
        bob.storage
+
            .repository(acme)
+
            .unwrap()
+
            .remotes()
+
            .unwrap()
+
            .map(|r| r.unwrap())
+
            .collect::<Vec<_>>(),
+
    );
+
    assert_matches!(alice.storage.repository(acme).unwrap().verify(), Ok(()));
+
}
modified radicle-node/src/wire.rs
@@ -11,6 +11,8 @@ use std::string::FromUtf8Error;
use std::{io, mem};

use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
+
use cyphernet::Sha256;
+
use netservices::session::{CypherReader, CypherSession, CypherWriter};

use crate::crypto::hash::Digest;
use crate::crypto::{PublicKey, Signature, Unverified};
@@ -31,6 +33,10 @@ use crate::storage::refs::SignedRefs;
/// Note that in certain cases, we may use a smaller type.
pub type Size = u16;

+
pub type WireSession<G> = CypherSession<G, Sha256>;
+
pub type WireReader = CypherReader<Sha256>;
+
pub type WireWriter<G> = CypherWriter<G, Sha256>;
+

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("i/o: {0}")]
modified radicle-node/src/wire/message.rs
@@ -1,7 +1,7 @@
use std::{io, mem, net};

use byteorder::{NetworkEndian, ReadBytesExt};
-
use cyphernet::addr::{Addr, HostAddr, NetAddr};
+
use cyphernet::addr::{Addr, HostName, NetAddr};
use radicle::node::Address;

use crate::prelude::*;
@@ -104,10 +104,10 @@ impl From<AddressType> for u8 {
impl From<&Address> for AddressType {
    fn from(a: &Address) -> Self {
        match a.host {
-
            HostAddr::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
-
            HostAddr::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
-
            HostAddr::Dns(_) => AddressType::Hostname,
-
            HostAddr::Tor(_) => AddressType::Onion,
+
            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
+
            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
+
            HostName::Dns(_) => AddressType::Hostname,
+
            HostName::Tor(_) => AddressType::Onion,
            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
        }
    }
@@ -308,11 +308,11 @@ impl wire::Encode for Address {
        let mut n = 0;

        match self.host {
-
            HostAddr::Ip(net::IpAddr::V4(ip)) => {
+
            HostName::Ip(net::IpAddr::V4(ip)) => {
                n += u8::from(AddressType::Ipv4).encode(writer)?;
                n += ip.octets().encode(writer)?;
            }
-
            HostAddr::Ip(net::IpAddr::V6(ip)) => {
+
            HostName::Ip(net::IpAddr::V6(ip)) => {
                n += u8::from(AddressType::Ipv6).encode(writer)?;
                n += ip.octets().encode(writer)?;
            }
@@ -334,13 +334,13 @@ impl wire::Decode for Address {
                let octets: [u8; 4] = wire::Decode::decode(reader)?;
                let ip = net::Ipv4Addr::from(octets);

-
                HostAddr::Ip(net::IpAddr::V4(ip))
+
                HostName::Ip(net::IpAddr::V4(ip))
            }
            Ok(AddressType::Ipv6) => {
                let octets: [u8; 16] = wire::Decode::decode(reader)?;
                let ip = net::Ipv6Addr::from(octets);

-
                HostAddr::Ip(net::IpAddr::V6(ip))
+
                HostName::Ip(net::IpAddr::V6(ip))
            }
            Ok(AddressType::Hostname) => {
                todo!();
@@ -352,10 +352,7 @@ impl wire::Decode for Address {
        };
        let port = u16::decode(reader)?;

-
        Ok(Self::from(NetAddr {
-
            host,
-
            port: Some(port),
-
        }))
+
        Ok(Self::from(NetAddr { host, port }))
    }
}

modified radicle-node/src/wire/transport.rs
@@ -1,39 +1,40 @@
//! Implementation of the transport protocol.
//!
-
//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
-
//! The handshake itself is implemented in the external [`netservices`] crate.
+
//! We use the Noise NN handshake pattern to establish an encrypted stream with a remote peer.
+
//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
use std::collections::VecDeque;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::sync::Arc;
-
use std::time::{Instant, SystemTime};
+
use std::time::{Duration, SystemTime};
use std::{io, net};

+
use amplify::Wrapper as _;
use crossbeam_channel as chan;
-
use cyphernet::addr::{Addr as _, HostAddr, PeerAddr};
-
use nakamoto_net::{Link, LocalTime};
-
use netservices::noise::NoiseXk;
-
use netservices::resources::{ListenerEvent, NetAccept, NetResource, SessionEvent};
-
use netservices::NetSession;
+
use cyphernet::{Cert, Digest, EcSign, Sha256};
+
use nakamoto_net::LocalTime;
+
use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
+
use netservices::session::ProtocolArtifact;
+
use netservices::{NetConnection, NetSession};

use radicle::collections::HashMap;
-
use radicle::crypto::Negotiator;
+
use radicle::crypto::Signature;
use radicle::node::NodeId;
use radicle::storage::WriteStorage;

use crate::crypto::Signer;
use crate::service::reactor::{Fetch, Io};
use crate::service::{routing, session, DisconnectReason, Message, Service};
-
use crate::wire::{Decode, Encode};
+
use crate::wire::{Decode, Encode, WireSession};
use crate::worker::{WorkerReq, WorkerResp};
+
use crate::Link;
use crate::{address, service};

/// Reactor action.
-
type Action<G> = reactor::Action<NetAccept<NoiseXk<G>>, NetResource<NoiseXk<G>>>;
+
type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;

/// Peer connection state machine.
-
#[derive(Debug)]
-
enum Peer<G: Negotiator> {
+
enum Peer<G: Signer + EcSign> {
    /// The initial state before handshake is completed.
    Connecting { link: Link },
    /// The state after handshake is completed.
@@ -42,7 +43,7 @@ enum Peer<G: Negotiator> {
    /// The state after a peer was disconnected, either during handshake,
    /// or once connected.
    Disconnected {
-
        id: NodeId,
+
        id: Option<NodeId>,
        reason: DisconnectReason,
    },
    /// The state after we've started the process of upgraded the peer for a fetch.
@@ -60,7 +61,34 @@ enum Peer<G: Negotiator> {
    },
}

-
impl<G: Negotiator> Peer<G> {
+
impl<G: Signer + EcSign> std::fmt::Debug for Peer<G> {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            Self::Connecting { link } => write!(f, "Connecting({:?})", link),
+
            Self::Connected { link, id } => write!(f, "Connected({link:?}, {id})"),
+
            Self::Disconnected { reason, id } => write!(f, "Disconnected({reason}, {id:?})"),
+
            Self::Upgrading { fetch, link, id } => write!(
+
                f,
+
                "Upgrading(initiated={}, {link:?}, {id})",
+
                fetch.initiated
+
            ),
+
            Self::Upgraded { link, id, .. } => write!(f, "Upgraded({link:?}, {id})"),
+
        }
+
    }
+
}
+

+
impl<G: Signer + EcSign> Peer<G> {
+
    /// Return the peer's id, if any.
+
    fn id(&self) -> Option<&NodeId> {
+
        match self {
+
            Peer::Connected { id, .. }
+
            | Peer::Disconnected { id: Some(id), .. }
+
            | Peer::Upgrading { id, .. }
+
            | Peer::Upgraded { id, .. } => Some(id),
+
            _ => None,
+
        }
+
    }
+

    /// Return a new connecting peer.
    fn connecting(link: Link) -> Self {
        Self::Connecting { link }
@@ -78,9 +106,14 @@ impl<G: Negotiator> Peer<G> {
    /// Switch to disconnected state.
    fn disconnected(&mut self, reason: DisconnectReason) {
        if let Self::Connected { id, .. } = self {
-
            *self = Self::Disconnected { id: *id, reason };
+
            *self = Self::Disconnected {
+
                id: Some(*id),
+
                reason,
+
            };
+
        } else if let Self::Connecting { .. } = self {
+
            *self = Self::Disconnected { id: None, reason };
        } else {
-
            panic!("Peer::disconnected: session is not connected");
+
            panic!("Peer::disconnected: session is not connected ({:?})", self);
        }
    }

@@ -101,6 +134,7 @@ impl<G: Negotiator> Peer<G> {
    fn upgraded(&mut self, listener: chan::Receiver<WorkerResp<G>>) -> Fetch {
        if let Self::Upgrading { fetch, id, link } = self {
            let fetch = fetch.clone();
+
            log::debug!(target: "transport", "Peer {id} upgraded for fetch {}", fetch.repo);

            *self = Self::Upgraded {
                id: *id,
@@ -127,13 +161,15 @@ impl<G: Negotiator> Peer<G> {
}

/// Transport protocol implementation for a set of peers.
-
pub struct Transport<R, S, W, G: Negotiator> {
+
pub struct Transport<R, S, W, G: Signer + EcSign> {
    /// Backing service instance.
    service: Service<R, S, W, G>,
    /// Worker pool interface.
    worker: chan::Sender<WorkerReq<G>>,
-
    /// Used to performs X25519 key exchange.
-
    keypair: G,
+
    /// Used for authentication; keeps local identity.
+
    cert: Cert<Signature>,
+
    /// Used for authentication.
+
    signer: G,
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Peer sessions.
@@ -149,12 +185,13 @@ where
    R: routing::Store,
    S: address::Store,
    W: WriteStorage + 'static,
-
    G: Signer + Negotiator,
+
    G: Signer + EcSign,
{
    pub fn new(
        mut service: Service<R, S, W, G>,
        worker: chan::Sender<WorkerReq<G>>,
-
        keypair: G,
+
        cert: Cert<Signature>,
+
        signer: G,
        proxy: net::SocketAddr,
        clock: LocalTime,
    ) -> Self {
@@ -163,7 +200,8 @@ where
        Self {
            service,
            worker,
-
            keypair,
+
            cert,
+
            signer,
            proxy,
            actions: VecDeque::new(),
            peers: HashMap::default(),
@@ -171,11 +209,30 @@ where
        }
    }

-
    fn by_id(&self, id: &NodeId) -> RawFd {
-
        self.connected()
-
            .find(|(_, i)| *i == id)
-
            .map(|(fd, _)| fd)
-
            .unwrap()
+
    fn peer_mut_by_fd(&mut self, fd: RawFd) -> &mut Peer<G> {
+
        self.peers.get_mut(&fd).unwrap_or_else(|| {
+
            log::error!(target: "transport", "Peer with fd {fd} was not found");
+
            panic!("Peer with fd {fd} is not known");
+
        })
+
    }
+

+
    fn fd_by_id(&self, node_id: &NodeId) -> (RawFd, &Peer<G>) {
+
        self.peers
+
            .iter()
+
            .find(|(_, peer)| peer.id() == Some(node_id))
+
            .map(|(fd, peer)| (*fd, peer))
+
            .unwrap_or_else(|| panic!("Peer {node_id} was expected to be known to the transport"))
+
    }
+

+
    fn connected_fd_by_id(&self, node_id: &NodeId) -> RawFd {
+
        match self.fd_by_id(node_id) {
+
            (fd, Peer::Connected { .. }) => fd,
+
            (fd, peer) => {
+
                panic!(
+
                    "Peer {node_id} (fd={fd}) was expected to be in a connected state ({peer:?})"
+
                )
+
            }
+
        }
    }

    fn connected(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
@@ -189,43 +246,38 @@ where
    }

    fn disconnect(&mut self, fd: RawFd, reason: DisconnectReason) {
-
        let Some(peer) = self.peers.get_mut(&fd) else {
-
            log::error!(target: "transport", "Peer with fd {fd} was not found");
-
            return;
-
        };
+
        let peer = self.peer_mut_by_fd(fd);
        if let Peer::Disconnected { .. } = peer {
-
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
+
            log::error!(target: "transport", "Peer (fd={fd}) is already disconnected");
            return;
        };
-
        log::debug!(target: "transport", "Disconnecting peer with fd {} ({})..", fd, reason);
+
        log::debug!(target: "transport", "Disconnecting peer (fd={fd}): {reason}");
        peer.disconnected(reason);

        self.actions.push_back(Action::UnregisterTransport(fd));
    }

    fn upgrade(&mut self, fd: RawFd, fetch: Fetch) {
-
        let Some(peer) = self.peers.get_mut(&fd) else {
-
            log::error!(target: "transport", "Peer with fd {fd} was not found");
-
            return;
-
        };
+
        let peer = self.peer_mut_by_fd(fd);
        if let Peer::Disconnected { .. } = peer {
-
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
+
            log::error!(target: "transport", "Peer (fd={fd}) is already disconnected");
            return;
        };
-
        log::debug!(target: "transport", "Requesting transport handover from reactor for fd {fd}");
+
        log::debug!(target: "transport", "Requesting transport handover from reactor for peer (fd={fd})");
        peer.upgrading(fetch);

        self.actions.push_back(Action::UnregisterTransport(fd));
    }

-
    fn upgraded(&mut self, session: NetResource<NoiseXk<G>>) {
-
        let fd = session.as_raw_fd();
-
        let Some(peer) = self.peers.get_mut(&fd) else {
-
            log::error!(target: "transport", "Peer with fd {fd} was not found");
-
            return;
-
        };
+
    fn upgraded(&mut self, transport: NetTransport<WireSession<G>>) {
+
        let fd = transport.as_raw_fd();
+
        let peer = self.peer_mut_by_fd(fd);
        let (send, recv) = chan::bounded::<WorkerResp<G>>(1);
        let fetch = peer.upgraded(recv);
+
        let session = match transport.into_session() {
+
            Ok(session) => session,
+
            Err(_) => panic!("Transport::upgraded: peer write buffer not empty on upgrade"),
+
        };

        if self
            .worker
@@ -242,15 +294,25 @@ where
    }

    fn fetch_complete(&mut self, resp: WorkerResp<G>) {
+
        log::debug!(target: "transport", "Fetch completed: {:?}", resp.result);
+

        let session = resp.session;
-
        let fd = session.as_raw_fd();
-
        let Some(peer) = self.peers.get_mut(&fd) else {
-
            log::error!(target: "transport", "Peer with fd {fd} was not found");
-
            return;
-
        };
-
        if let Peer::Disconnected { .. } = peer {
+
        let fd = session.as_connection().as_raw_fd();
+
        let peer = self.peer_mut_by_fd(fd);
+

+
        let session = if let Peer::Disconnected { .. } = peer {
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
            return;
+
        } else if let Peer::Upgraded { link, .. } = peer {
+
            match NetTransport::with_session(session, *link) {
+
                Ok(session) => session,
+
                Err(err) => {
+
                    log::error!(target: "transport", "Session downgrade failed: {err}");
+
                    return;
+
                }
+
            }
+
        } else {
+
            todo!();
        };
        peer.downgrade();

@@ -264,13 +326,13 @@ where
    R: routing::Store + Send,
    S: address::Store + Send,
    W: WriteStorage + Send + 'static,
-
    G: Signer + Negotiator + Send,
+
    G: Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + Send,
{
-
    type Listener = NetAccept<NoiseXk<G>>;
-
    type Transport = NetResource<NoiseXk<G>>;
+
    type Listener = NetAccept<WireSession<G>>;
+
    type Transport = NetTransport<WireSession<G>>;
    type Command = service::Command;

-
    fn tick(&mut self, _time: Instant) {
+
    fn tick(&mut self, _time: Duration) {
        // FIXME: Change this once a proper timestamp is passed into the function.
        self.service.tick(LocalTime::from(SystemTime::now()));

@@ -294,23 +356,29 @@ where
    fn handle_listener_event(
        &mut self,
        socket_addr: net::SocketAddr,
-
        event: ListenerEvent<NoiseXk<G>>,
-
        _: Instant,
+
        event: ListenerEvent<WireSession<G>>,
+
        _: Duration,
    ) {
        match event {
-
            ListenerEvent::Accepted(session) => {
+
            ListenerEvent::Accepted(connection) => {
                log::debug!(
                    target: "transport",
-
                    "Accepted inbound peer connection from {}..",
-
                    session.transient_addr()
+
                    "Accepting inbound peer connection from {}..",
+
                    connection.remote_addr()
                );
                self.peers
-
                    .insert(session.as_raw_fd(), Peer::connecting(Link::Inbound));
+
                    .insert(connection.as_raw_fd(), Peer::connecting(Link::Inbound));

-
                let transport = match NetResource::<NoiseXk<G>>::new(session) {
+
                let session = WireSession::accept::<{ Sha256::OUTPUT_LEN }>(
+
                    connection,
+
                    self.cert,
+
                    vec![],
+
                    self.signer.clone(),
+
                );
+
                let transport = match NetTransport::with_session(session, Link::Inbound) {
                    Ok(transport) => transport,
                    Err(err) => {
-
                        log::error!(target: "transport", "Failed to upgrade accepted peer socket: {err}");
+
                        log::error!(target: "transport", "Failed to create transport for accepted connection: {err}");
                        return;
                    }
                };
@@ -324,9 +392,17 @@ where
        }
    }

-
    fn handle_transport_event(&mut self, fd: RawFd, event: SessionEvent<NoiseXk<G>>, _: Instant) {
+
    fn handle_transport_event(
+
        &mut self,
+
        fd: RawFd,
+
        event: SessionEvent<WireSession<G>>,
+
        _: Duration,
+
    ) {
        match event {
-
            SessionEvent::Established(node_id) => {
+
            SessionEvent::Established(ProtocolArtifact {
+
                state: Cert { pk: node_id, .. },
+
                ..
+
            }) => {
                log::debug!(target: "transport", "Session established with {node_id}");

                let conflicting = self
@@ -390,6 +466,7 @@ where
                }
            }
            SessionEvent::Terminated(err) => {
+
                log::debug!(target: "transport", "Session for fd {fd} terminated: {err}");
                self.disconnect(fd, DisconnectReason::Connection(Arc::new(err)));
            }
        }
@@ -401,7 +478,7 @@ where

    fn handle_error(
        &mut self,
-
        err: reactor::Error<NetAccept<NoiseXk<G>>, NetResource<NoiseXk<G>>>,
+
        err: reactor::Error<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>,
    ) {
        match &err {
            reactor::Error::ListenerUnknown(id) => {
@@ -455,9 +532,16 @@ where
                // Disconnect TCP stream.
                drop(transport);

-
                self.service.disconnected(*id, reason);
+
                if let Some(id) = id {
+
                    self.service.disconnected(*id, reason);
+
                } else {
+
                    // TODO: Handle this case by calling `disconnected` with the address instead of
+
                    // the node id.
+
                }
            }
            Some(Peer::Upgrading { .. }) => {
+
                log::debug!(target: "transport", "Received handover of transport with fd {fd}");
+

                self.upgraded(transport);
            }
            Some(_) => {
@@ -475,27 +559,23 @@ where
    R: routing::Store,
    S: address::Store,
    W: WriteStorage + 'static,
-
    G: Signer + Negotiator,
+
    G: Signer + EcSign<Pk = NodeId, Sig = Signature>,
{
-
    type Item = reactor::Action<NetAccept<NoiseXk<G>>, NetResource<NoiseXk<G>>>;
+
    type Item = Action<G>;

    fn next(&mut self) -> Option<Self::Item> {
-
        if let Some(event) = self.actions.pop_front() {
-
            return Some(event);
-
        }
-

        while let Some(ev) = self.service.next() {
            match ev {
                Io::Write(node_id, msgs) => {
                    log::debug!(
                        target: "transport", "Sending {} message(s) to {}", msgs.len(), node_id
                    );
-
                    let fd = self.by_id(&node_id);
+
                    let fd = self.connected_fd_by_id(&node_id);
                    let mut data = Vec::new();
                    for msg in msgs {
                        msg.encode(&mut data).expect("in-memory writes never fail");
                    }
-
                    return Some(reactor::Action::Send(fd, data));
+
                    self.actions.push_back(reactor::Action::Send(fd, data));
                }
                Io::Event(_e) => {
                    log::warn!(
@@ -503,12 +583,6 @@ where
                    );
                }
                Io::Connect(node_id, addr) => {
-
                    let socket_addr = match addr.host {
-
                        HostAddr::Ip(ip) => net::SocketAddr::new(ip, addr.port()),
-
                        HostAddr::Dns(_) => todo!(),
-
                        _ => self.proxy,
-
                    };
-

                    if self.connected().any(|(_, id)| id == &node_id) {
                        log::error!(
                            target: "transport",
@@ -517,18 +591,26 @@ where
                        break;
                    }

-
                    match NetResource::<NoiseXk<G>>::connect(
-
                        PeerAddr::new(node_id, socket_addr),
-
                        &self.keypair,
-
                        // TODO: Improve API to not require a boolean.
-
                        true,
-
                    ) {
+
                    match WireSession::connect_nonblocking::<{ Sha256::OUTPUT_LEN }>(
+
                        addr.to_inner(),
+
                        self.cert,
+
                        vec![node_id],
+
                        self.signer.clone(),
+
                        self.proxy.into(),
+
                        false,
+
                    )
+
                    .and_then(|session| {
+
                        NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
+
                    }) {
                        Ok(transport) => {
-
                            self.service.attempted(node_id, &socket_addr.into());
+
                            self.service.attempted(node_id, &addr);
+
                            // TODO: Keep track of peer address for when peer disconnects before
+
                            // handshake is complete.
                            self.peers
                                .insert(transport.as_raw_fd(), Peer::connecting(Link::Outbound));

-
                            return Some(reactor::Action::RegisterTransport(transport));
+
                            self.actions
+
                                .push_back(reactor::Action::RegisterTransport(transport));
                        }
                        Err(err) => {
                            self.service
@@ -538,19 +620,19 @@ where
                    }
                }
                Io::Disconnect(node_id, reason) => {
-
                    let fd = self.by_id(&node_id);
+
                    let fd = self.connected_fd_by_id(&node_id);
                    self.disconnect(fd, reason);
-

-
                    return self.actions.pop_back();
                }
-
                Io::Wakeup(d) => return Some(reactor::Action::SetTimer(d.into())),
+
                Io::Wakeup(d) => {
+
                    self.actions.push_back(reactor::Action::SetTimer(d.into()));
+
                }
                Io::Fetch(fetch) => {
                    // TODO: Check that the node_id is connected, queue request otherwise.
-
                    let fd = self.by_id(&fetch.remote);
+
                    let fd = self.connected_fd_by_id(&fetch.remote);
                    self.upgrade(fd, fetch);
                }
            }
        }
-
        None
+
        self.actions.pop_front()
    }
}
modified radicle-node/src/worker.rs
@@ -1,47 +1,44 @@
-
use core::time;
use std::io::prelude::*;
-
use std::process;
-
use std::thread;
use std::thread::JoinHandle;
-
use std::{io, net};
+
use std::{env, io, net, process, thread, time};

use crossbeam_channel as chan;
-
use netservices::noise::NoiseXk;
-
use netservices::resources::{NetReader, NetResource, NetWriter, SplitIo};
+
use cyphernet::EcSign;
use netservices::tunnel::Tunnel;
+
use netservices::{NetSession, SplitIo};

-
use radicle::crypto::Negotiator;
-
use radicle::storage::{ReadRepository, RefUpdate, WriteStorage};
+
use radicle::crypto::Signer;
+
use radicle::identity::Id;
+
use radicle::storage::{ReadRepository, RefUpdate, WriteRepository, WriteStorage};
use radicle::Storage;
use reactor::poller::popol;

use crate::service::reactor::Fetch;
use crate::service::{FetchError, FetchResult};
-

-
type Session<G> = NetResource<NoiseXk<G>>;
+
use crate::wire::{WireReader, WireSession, WireWriter};

/// Worker request.
-
pub struct WorkerReq<G: Negotiator> {
+
pub struct WorkerReq<G: Signer + EcSign> {
    pub fetch: Fetch,
-
    pub session: NetResource<NoiseXk<G>>,
+
    pub session: WireSession<G>,
    pub drain: Vec<u8>,
    pub channel: chan::Sender<WorkerResp<G>>,
}

/// Worker response.
-
pub struct WorkerResp<G: Negotiator> {
+
pub struct WorkerResp<G: Signer + EcSign> {
    pub result: FetchResult,
-
    pub session: Session<G>,
+
    pub session: WireSession<G>,
}

/// A worker that replicates git objects.
-
struct Worker<G: Negotiator> {
+
struct Worker<G: Signer + EcSign> {
    storage: Storage,
    tasks: chan::Receiver<WorkerReq<G>>,
    timeout: time::Duration,
}

-
impl<G: Negotiator + 'static> Worker<G> {
+
impl<G: Signer + EcSign> Worker<G> {
    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
    /// the next task.
    fn run(self) -> Result<(), chan::RecvError> {
@@ -70,6 +67,8 @@ impl<G: Negotiator + 'static> Worker<G> {
                error,
            },
        };
+
        log::debug!(target: "worker", "Sending response back to service..");
+

        if channel.send(WorkerResp { result, session }).is_err() {
            log::error!("Unable to report fetch result: worker channel disconnected");
        }
@@ -79,9 +78,11 @@ impl<G: Negotiator + 'static> Worker<G> {
        &self,
        fetch: &Fetch,
        drain: Vec<u8>,
-
        session: Session<G>,
-
    ) -> (Session<G>, Result<Vec<RefUpdate>, FetchError>) {
+
        mut session: WireSession<G>,
+
    ) -> (WireSession<G>, Result<Vec<RefUpdate>, FetchError>) {
        if fetch.initiated {
+
            log::debug!(target: "worker", "Worker processing outgoing fetch for {}", fetch.repo);
+

            let mut tunnel = match Tunnel::with(session, net::SocketAddr::from(([0, 0, 0, 0], 0))) {
                Ok(tunnel) => tunnel,
                Err((session, err)) => return (session, Err(err.into())),
@@ -91,6 +92,11 @@ impl<G: Negotiator + 'static> Worker<G> {

            (session, result)
        } else {
+
            log::debug!(target: "worker", "Worker processing incoming fetch for {}", fetch.repo);
+

+
            if let Err(err) = session.as_connection_mut().set_nonblocking(false) {
+
                return (session, Err(err.into()));
+
            }
            let (mut stream_r, mut stream_w) = match session.split_io() {
                Ok((r, w)) => (r, w),
                Err(err) => {
@@ -98,7 +104,7 @@ impl<G: Negotiator + 'static> Worker<G> {
                }
            };
            let result = self.upload_pack(fetch, drain, &mut stream_r, &mut stream_w);
-
            let session = NetResource::from_split_io(stream_r, stream_w);
+
            let session = WireSession::from_split_io(stream_r, stream_w);

            (session, result)
        }
@@ -107,26 +113,50 @@ impl<G: Negotiator + 'static> Worker<G> {
    fn fetch(
        &self,
        fetch: &Fetch,
-
        tunnel: &mut Tunnel<Session<G>>,
+
        tunnel: &mut Tunnel<WireSession<G>>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
-
        let tunnel_addr = tunnel.local_addr()?;
        let repo = self.storage.repository(fetch.repo)?;
-
        let child = process::Command::new("git")
-
            .current_dir(repo.path())
+
        let tunnel_addr = tunnel.local_addr()?;
+
        let mut cmd = process::Command::new("git");
+
        cmd.current_dir(repo.path())
+
            .env("GIT_PROTOCOL", "2")
+
            .env_clear()
+
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
            .arg("fetch")
-
            .arg("--atomic") // The path to the git repo must be exact.
-
            .arg(format!("git://{tunnel_addr}"))
+
            .arg("--atomic")
+
            .arg("--verbose")
+
            .arg(format!("git://{tunnel_addr}/{}", repo.id))
            .arg(fetch.namespaces.as_fetchspec())
-
            .arg(".")
            .stdout(process::Stdio::piped())
-
            .stdin(process::Stdio::piped())
-
            .spawn()?;
+
            .stderr(process::Stdio::piped())
+
            .stdin(process::Stdio::piped());
+

+
        log::debug!(target: "worker", "Running command: {:?}", cmd);
+

+
        let mut child = cmd.spawn()?;
+
        let mut stderr = child.stderr.take().unwrap();

        let _ = tunnel.tunnel_once(popol::Poller::new(), self.timeout)?;
-
        let output = child.wait_with_output()?;
+
        let status = child.wait()?;

        // TODO: Parse fetch output to return updates.
-
        log::debug!(target: "worker", "Fetch output for {}: {:?}", fetch.repo, output);
+
        log::debug!(target: "worker", "Fetch for {} exited with status {:?}", fetch.repo, status.code());
+

+
        if let Some(status) = status.code() {
+
            log::debug!(target: "worker", "Upload pack for {} exited with status {:?}", fetch.repo, status);
+
        } else {
+
            log::debug!(target: "worker", "Upload pack for {} exited with unknown status", fetch.repo);
+
        }
+

+
        if !status.success() {
+
            let mut err = Vec::new();
+
            stderr.read_to_end(&mut err)?;
+

+
            let err = String::from_utf8_lossy(&err);
+
            log::debug!(target: "worker", "Fetch for {}: stderr: {err}", fetch.repo);
+
        }
+
        let head = repo.set_head()?;
+
        log::debug!(target: "worker", "Setting head for {} to {head}", fetch.repo);

        Ok(vec![])
    }
@@ -135,26 +165,30 @@ impl<G: Negotiator + 'static> Worker<G> {
        &self,
        fetch: &Fetch,
        drain: Vec<u8>,
-
        stream_r: &mut NetReader<NoiseXk<G>>,
-
        stream_w: &mut NetWriter<NoiseXk<G>>,
+
        stream_r: &mut WireReader,
+
        stream_w: &mut WireWriter<G>,
    ) -> Result<Vec<RefUpdate>, FetchError> {
        let repo = self.storage.repository(fetch.repo)?;
        let mut child = process::Command::new("git")
            .current_dir(repo.path())
+
            .env_clear()
+
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
+
            .env("GIT_PROTOCOL", "2")
            .arg("upload-pack")
            .arg("--strict") // The path to the git repo must be exact.
            .arg(".")
            .stdout(process::Stdio::piped())
+
            .stderr(process::Stdio::piped())
            .stdin(process::Stdio::piped())
            .spawn()?;

        let mut stdin = child.stdin.take().unwrap();
        let mut stdout = child.stdout.take().unwrap();
+
        let mut stderr = child.stderr.take().unwrap();

        thread::scope(|scope| {
            let t = scope.spawn(move || {
                let mut buf = [0u8; 65535];
-

                // First drain the buffer of incoming data that was waiting.
                if stdin.write_all(&drain[..]).is_err() {
                    return;
@@ -162,6 +196,15 @@ impl<G: Negotiator + 'static> Worker<G> {
                // Then process any new data coming into the socket, and write it
                // to the standard input of the `upload-pack` process.
                while let Ok(n) = stream_r.read(&mut buf) {
+
                    if let Ok(line) = std::str::from_utf8(&buf[..n]) {
+
                        // FIXME: The git command could come in the drain object.
+
                        // FIXME: We should only call this once, before looping.
+
                        if let Some(cmd) = GitCommand::parse(line) {
+
                            // FIXME: Convert this into an error.
+
                            debug_assert_eq!(cmd.repo, fetch.repo);
+
                            continue;
+
                        }
+
                    }
                    if n == 0 {
                        break;
                    }
@@ -178,7 +221,21 @@ impl<G: Negotiator + 'static> Worker<G> {

            Ok::<_, FetchError>(())
        })?;
-
        child.wait()?;
+
        let status = child.wait()?;
+

+
        if let Some(status) = status.code() {
+
            log::debug!(target: "worker", "Upload pack for {} exited with status {:?}", fetch.repo, status);
+
        } else {
+
            log::debug!(target: "worker", "Upload pack for {} exited with unknown status", fetch.repo);
+
        }
+

+
        if !status.success() {
+
            let mut err = Vec::new();
+
            stderr.read_to_end(&mut err)?;
+

+
            let err = String::from_utf8_lossy(&err);
+
            log::debug!(target: "worker", "Upload pack for {}: stderr: {}", fetch.repo, err);
+
        }

        Ok(vec![])
    }
@@ -191,11 +248,12 @@ pub struct WorkerPool {

impl WorkerPool {
    /// Create a new worker pool with the given parameters.
-
    pub fn with<G: Negotiator + 'static>(
+
    pub fn with<G: Signer + EcSign + 'static>(
        capacity: usize,
        timeout: time::Duration,
        storage: Storage,
        tasks: chan::Receiver<WorkerReq<G>>,
+
        name: String,
    ) -> Self {
        let mut pool = Vec::with_capacity(capacity);
        for _ in 0..capacity {
@@ -204,7 +262,10 @@ impl WorkerPool {
                storage: storage.clone(),
                timeout,
            };
-
            let thread = thread::spawn(|| worker.run());
+
            let thread = thread::Builder::new()
+
                .name(name.clone())
+
                .spawn(|| worker.run())
+
                .unwrap();

            pool.push(thread);
        }
@@ -220,6 +281,63 @@ impl WorkerPool {
                log::error!(target: "pool", "Worker failed: {err}");
            }
        }
+
        log::debug!(target: "pool", "Worker pool shutting down..");
+

        Ok(())
    }
}
+

+
#[derive(Debug)]
+
pub struct GitCommand {
+
    pub repo: Id,
+
    pub path: String,
+
    pub host: Option<(String, Option<u16>)>,
+
    pub extra: Vec<(String, Option<String>)>,
+
}
+

+
impl GitCommand {
+
    /// Parse a Git command packet-line.
+
    ///
+
    /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
+
    ///
+
    fn parse(input: &str) -> Option<Self> {
+
        let (left, right) = input.split_at(4);
+
        let len = usize::from_str_radix(left, 16).ok()?;
+
        if len != input.len() {
+
            return None;
+
        }
+
        let mut parts = right
+
            .strip_prefix("git-upload-pack ")?
+
            .split_terminator('\0');
+

+
        let path = parts.next()?.to_owned();
+
        let repo = path.strip_prefix('/')?.parse().ok()?;
+
        let host = match parts.next() {
+
            None | Some("") => None,
+
            Some(host) => {
+
                let host = host.strip_prefix("host=")?;
+
                match host.split_once(':') {
+
                    None => Some((host.to_owned(), None)),
+
                    Some((host, port)) => {
+
                        let port = port.parse::<u16>().ok()?;
+
                        Some((host.to_owned(), Some(port)))
+
                    }
+
                }
+
            }
+
        };
+
        let extra = parts
+
            .skip_while(|part| part.is_empty())
+
            .map(|part| match part.split_once('=') {
+
                None => (part.to_owned(), None),
+
                Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
+
            })
+
            .collect();
+

+
        Some(Self {
+
            repo,
+
            path,
+
            host,
+
            extra,
+
        })
+
    }
+
}
modified radicle/Cargo.toml
@@ -11,12 +11,13 @@ test = ["qcheck", "radicle-crypto/test"]
sql = ["sqlite"]

[dependencies]
-
amplify = { version = "4.0.0-beta.1", default-features = false, features = ["std"] }
+
amplify = { version = "4.0.0-beta.7", default-features = false, features = ["std"] }
base64 = { version= "0.13" }
byteorder = { version = "1.4" }
crossbeam-channel = { version = "0.5.6" }
ed25519-compact = { version = "2.0.2", features = ["pem"] }
-
cyphernet = { version = "0" }
+
# FIXME: Remove the extra cyphernet features once we're able to.
+
cyphernet = { version = "0", features = ["tor", "dns", "i2p", "nym", "ed25519"] }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
multibase = { version = "0.9.1" }
modified radicle/src/node.rs
@@ -1,11 +1,12 @@
mod features;

+
use amplify::WrapperMut;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
-
use std::{io, net, ops};
+
use std::{io, net};

-
use cyphernet::addr::{HostAddr, NetAddr};
+
use cyphernet::addr::{HostName, NetAddr};

use crate::crypto::PublicKey;
use crate::identity::Id;
@@ -23,32 +24,32 @@ pub const RESPONSE_OK: &str = "ok";
pub const RESPONSE_NOOP: &str = "noop";

/// Peer public protocol address.
-
#[derive(Wrapper, Clone, Eq, PartialEq, Debug, From)]
-
#[wrapper(Display, FromStr)]
-
pub struct Address(NetAddr<DEFAULT_PORT>);
+
#[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, From)]
+
#[wrapper(Deref, Display, FromStr)]
+
#[wrapper_mut(DerefMut)]
+
pub struct Address(NetAddr<HostName>);

-
impl cyphernet::addr::Addr for Address {
-
    fn port(&self) -> u16 {
-
        self.0.port()
+
impl cyphernet::addr::Host for Address {
+
    fn requires_proxy(&self) -> bool {
+
        self.0.requires_proxy()
    }
}

-
impl ops::Deref for Address {
-
    type Target = NetAddr<DEFAULT_PORT>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
+
impl cyphernet::addr::Addr for Address {
+
    fn port(&self) -> u16 {
+
        self.0.port()
    }
}

impl From<net::SocketAddr> for Address {
    fn from(addr: net::SocketAddr) -> Self {
        Address(NetAddr {
-
            host: HostAddr::Ip(addr.ip()),
-
            port: Some(addr.port()),
+
            host: HostName::Ip(addr.ip()),
+
            port: addr.port(),
        })
    }
}
+

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("failed to connect to node: {0}")]
@@ -63,8 +64,8 @@ pub enum Error {
pub trait Handle {
    /// The result of a fetch request.
    type FetchLookup;
-
    /// The peer session type.
-
    type Session;
+
    /// The peer sessions type.
+
    type Sessions;
    /// The error returned by all methods.
    type Error: std::error::Error;

@@ -88,7 +89,7 @@ pub trait Handle {
    /// Query the routing table entries.
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Self::Error>;
    /// Query the peer session state.
-
    fn sessions(&self) -> Result<chan::Receiver<(NodeId, Self::Session)>, Self::Error>;
+
    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
    /// Query the inventory.
    fn inventory(&self) -> Result<chan::Receiver<Id>, Self::Error>;
}
@@ -128,7 +129,7 @@ impl Node {
}

impl Handle for Node {
-
    type Session = ();
+
    type Sessions = ();
    type FetchLookup = ();
    type Error = Error;

@@ -233,7 +234,7 @@ impl Handle for Node {
        todo!();
    }

-
    fn sessions(&self) -> Result<chan::Receiver<(NodeId, Self::Session)>, Error> {
+
    fn sessions(&self) -> Result<Self::Sessions, Error> {
        todo!();
    }

modified radicle/src/test/arbitrary.rs
@@ -198,8 +198,8 @@ impl Arbitrary for Address {
            net::IpAddr::V6(net::Ipv6Addr::from(octets))
        };
        Address::from(cyphernet::addr::NetAddr {
-
            host: cyphernet::addr::HostAddr::Ip(ip),
-
            port: Some(u16::arbitrary(g)),
+
            host: cyphernet::addr::HostName::Ip(ip),
+
            port: u16::arbitrary(g),
        })
    }
}
modified radicle/src/test/fixtures.rs
@@ -77,3 +77,41 @@ pub fn repository<P: AsRef<Path>>(path: P) -> (git2::Repository, git2::Oid) {

    (repo, oid)
}
+

+
/// Generate random fixtures.
+
pub mod gen {
+
    use super::*;
+

+
    /// Generate a random string of the given length.
+
    pub fn string(length: usize) -> String {
+
        std::iter::repeat_with(fastrand::alphabetic)
+
            .take(length)
+
            .collect::<String>()
+
    }
+

+
    /// Generate a random email.
+
    pub fn email() -> String {
+
        format!("{}@{}.xyz", string(6), string(6))
+
    }
+

+
    /// Creates a regular repository at the given path with a couple of commits.
+
    pub fn repository<P: AsRef<Path>>(path: P) -> (git2::Repository, git2::Oid) {
+
        let repo = git2::Repository::init(path).unwrap();
+
        let sig = git2::Signature::now(string(6).as_str(), email().as_str()).unwrap();
+
        let head = git::initial_commit(&repo, &sig).unwrap();
+
        let oid = git::commit(
+
            &repo,
+
            &head,
+
            git::refname!("refs/heads/master").as_refstr(),
+
            string(16).as_str(),
+
            &sig,
+
        )
+
        .unwrap()
+
        .id();
+

+
        // Look, I don't really understand why we have to do this, but we do.
+
        drop(head);
+

+
        (repo, oid)
+
    }
+
}