Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Use `gix_packetline`
Archived lorenz opened 2 months ago

gix-packetline already is in the dependency closure of radicle-node. Use the well-tested and -used reader instead of ours.

cargo: Add feature for Tor support

node: Explicit default for AddressConfig

The intent to drop outgoing connections is modeled as Option::None which is brittle and easy to miss.

Extend enum AddressConfig with a variant that is more explicit.

Use development version of cyphernet

node: Introduce Announcers

node/test: Add timeouts to functions that wait

The two functions connect and converge used in tests potentially loop forever. Add timeout mechanisms to both.

node/reactor: Rewrite Runtime::run

Make this function clearer, and also have it log in case the service becomes too slow.

hooks: Enable typos, fix reported errors

I2P Support

Co-authored-by: ps

mega

cli-test: Refactor Path Handling

In the previous refactoring in commit 4894657b, the order of entries in $PATH was changed unintentionally. Revisit the order and use nicer APIs to handle paths.

34 files changed +479 -255 84320919 eff0b613
modified .codespellrc
@@ -1,4 +1,4 @@
[codespell]
-
skip = .git*,*.lock,.codespellrc
+
skip = .git*,*.lock,.codespellrc,target,.jj
check-hidden = true
-
ignore-words-list = ser,noes
+
ignore-words-list = set,noes
added .typos.toml
@@ -0,0 +1,16 @@
+
[default]
+
extend-ignore-re = [
+
    "[0-9a-f]{7}\\.\\.\\.?[0-9a-f]{7}", # Git range between two short commit IDs
+
    "[0-9a-f]{7}\\[\\.\\.\\]", # Shortened commit IDs as written in tests
+
    "did:key:z6Mk[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{44}",
+
    "rad://z[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{28}",
+
    "rad:z[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{28}",
+
    "z6Mk[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{44}",
+
]
+

+
[default.extend-identifiers]
+
"typ" = "typ" # We may write "typ" instead of "type". The latter is a Rust keyword.
+

+
[type.codespell]
+
check-file = false
+
extend-glob = [".codespellrc"]
modified Cargo.lock
@@ -676,8 +676,7 @@ dependencies = [
[[package]]
name = "cypheraddr"
version = "0.4.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "ba5c54d2ad4ab9941383519471b75d12abc1a7b4779265e233168f2703a730d9"
+
source = "git+https://github.com/lorenzleutgeb/cyphernet.rs.git?branch=push-ooltwtzkpvlk#ca0fedcac9f1075d516ce1f0129f4aa9b73ee81e"
dependencies = [
 "amplify",
 "base32",
@@ -689,8 +688,7 @@ dependencies = [
[[package]]
name = "cyphergraphy"
version = "0.3.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b67c16c8ef5ddcdab57aab83fd8e770540ea3682ccdae09642c63575b0da2184"
+
source = "git+https://github.com/lorenzleutgeb/cyphernet.rs.git?branch=push-ooltwtzkpvlk#ca0fedcac9f1075d516ce1f0129f4aa9b73ee81e"
dependencies = [
 "amplify",
 "ec25519",
@@ -700,8 +698,7 @@ dependencies = [
[[package]]
name = "cyphernet"
version = "0.5.2"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "ac949369884a7a1d802cc669821269c707be8cec4d65043382e253733d2e62e1"
+
source = "git+https://github.com/lorenzleutgeb/cyphernet.rs.git?branch=push-ooltwtzkpvlk#ca0fedcac9f1075d516ce1f0129f4aa9b73ee81e"
dependencies = [
 "cypheraddr",
 "cyphergraphy",
@@ -2388,8 +2385,7 @@ dependencies = [
[[package]]
name = "noise-framework"
version = "0.4.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b57e96e713d599dc58755d0e5bb2238908a63e13f624f70c8345fdb7d8b51bae"
+
source = "git+https://github.com/lorenzleutgeb/cyphernet.rs.git?branch=push-ooltwtzkpvlk#ca0fedcac9f1075d516ce1f0129f4aa9b73ee81e"
dependencies = [
 "amplify",
 "chacha20poly1305",
@@ -3133,6 +3129,7 @@ dependencies = [
 "crossbeam-channel",
 "cyphernet",
 "fastrand",
+
 "gix-packetline",
 "lexopt",
 "log",
 "mio 1.0.4",
@@ -3895,8 +3892,7 @@ dependencies = [
[[package]]
name = "socks5-client"
version = "0.4.1"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "ffc7dcf6fab1d65d82d633006a4cc658d76ce436e01cf1a7c71873c0eeba324c"
+
source = "git+https://github.com/lorenzleutgeb/cyphernet.rs.git?branch=push-ooltwtzkpvlk#ca0fedcac9f1075d516ce1f0129f4aa9b73ee81e"
dependencies = [
 "amplify",
 "cypheraddr",
modified Cargo.toml
@@ -31,6 +31,7 @@ dunce = "1.0.5"
fastrand = { version = "2.0.0", default-features = false }
git2 = { version = "0.19.0", default-features = false, features = ["vendored-libgit2"] }
gix-hash = { version = "0.22.1", default-features = false, features = ["sha1"] }
+
gix-packetline = { version = "0.21.1", default-features = false }
human-panic = "2.0.6"
itertools = "0.14"
lexopt = "0.3.0"
@@ -97,3 +98,7 @@ clippy.must_use_candidate = "deny"
inherits = "release"
debug = true
incremental = false
+

+
[patch.crates-io]
+
cyphernet = { git = "https://github.com/lorenzleutgeb/cyphernet.rs.git", branch = "push-ooltwtzkpvlk" }
+
cypheraddr = { git = "https://github.com/lorenzleutgeb/cyphernet.rs.git", branch = "push-ooltwtzkpvlk" }
modified build.rs
@@ -31,7 +31,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
        // x.y.z, with efe10f95be being a unique prefix of the OID of
        // `HEAD`, and the working directory was dirty.
        // If this is a build pointing to a commit that has release tag, this
-
        // will just return the tag name itelf, e.g. `releases/x.y.z`.
+
        // will just return the tag name itself, e.g. `releases/x.y.z`.
        // If all fails, we just use `hash`, which, in the worst case is
        // still "unknown" (see above) but in most cases will just be
        // the short OID of `HEAD`.
modified crates/radicle-cli-test/src/lib.rs
@@ -4,17 +4,12 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync;
-
use std::{env, ffi, fs, io, mem};
+
use std::{env, fs, io, mem};

use snapbox::cmd::{Command, OutputAssert};
use snapbox::{Assert, Substitutions};
use thiserror::Error;

-
#[cfg(windows)]
-
const PATH_SEPARATOR: char = ';';
-
#[cfg(not(windows))]
-
const PATH_SEPARATOR: char = ':';
-

/// Used to ensure the build task is only run once.
static BUILD: sync::Once = sync::Once::new();

@@ -462,11 +457,7 @@ impl TestFormula {
                    vec![]
                };

-
                let bins = bins(self.cwd.clone())
-
                    .iter()
-
                    .map(|p| p.as_os_str())
-
                    .collect::<Vec<_>>()
-
                    .join(ffi::OsStr::new(&PATH_SEPARATOR.to_string()));
+
                let bins = std::env::join_paths(bins(self.cwd.clone())).unwrap();

                let command = Command::new(cmd.clone())
                    .env_clear()
@@ -531,13 +522,17 @@ impl TestFormula {
    }
}

-
/// Get the list of binary paths to use as `PATH` for the tests,
+
/// Get the list of binary paths to use as `$PATH` for the tests,
/// starting with the current working directory.
fn bins(cwd: PathBuf) -> Vec<PathBuf> {
-
    let mut bins: Vec<PathBuf> = env::var("PATH")
-
        .map(|env_path| env_path.split(PATH_SEPARATOR).map(PathBuf::from).collect())
-
        .unwrap_or_default();
+
    let mut bins: Vec<PathBuf> = Vec::new();
+

+
    // Add current working directory to `$PATH`,
+
    // this makes it more convenient to execute scripts during testing.
+
    bins.push(cwd);

+
    // If we're running in a cargo build environment, add the target
+
    // directory to `$PATH` as well.
    if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") {
        let profile = cfg!(debug_assertions)
            .then_some("debug")
@@ -551,6 +546,11 @@ fn bins(cwd: PathBuf) -> Vec<PathBuf> {
        )
    }

+
    // Add the "real" `$PATH`.
+
    if let Ok(path) = env::var("PATH") {
+
        bins.extend(env::split_paths(&path));
+
    }
+

    #[cfg(windows)]
    {
        // Radicle CLI tests rely on various Unix coreutils
@@ -562,9 +562,6 @@ fn bins(cwd: PathBuf) -> Vec<PathBuf> {
        bins.push(PathBuf::from(r#"C:\Program Files\Git\usr\bin"#));
    }

-
    // Add current working directory to `$PATH`,
-
    // this makes it more convenient to execute scripts during testing.
-
    bins.push(cwd);
    bins
}

modified crates/radicle-cli/Cargo.toml
@@ -13,6 +13,11 @@ rust-version.workspace = true
name = "rad"
path = "src/main.rs"

+
[features]
+
default = ["i2p", "tor"]
+
i2p = ["radicle/i2p"]
+
tor = ["radicle/tor"]
+

[dependencies]
anyhow = "1"
chrono = { workspace = true, features = ["clock", "std"] }
modified crates/radicle-cli/src/terminal/format.rs
@@ -37,6 +37,7 @@ pub fn addr_compact(address: &Address) -> Paint<String> {
    let host = match address.host() {
        HostName::Ip(ip) => ip.to_string(),
        HostName::Dns(dns) => dns.clone(),
+
        #[cfg(feature = "tor")]
        HostName::Tor(onion) => {
            let onion = onion.to_string();
            let start = onion.chars().take(8).collect::<String>();
@@ -46,6 +47,11 @@ pub fn addr_compact(address: &Address) -> Paint<String> {
                .collect::<String>();
            format!("{start}…{end}")
        }
+
        #[cfg(feature = "i2p")]
+
        HostName::I2p(i2p) => {
+
            // TODO: Base32 addresses can be shortened like onion addresses.
+
            i2p.to_string()
+
        }
        _ => unreachable!(),
    };

modified crates/radicle-crypto/src/lib.rs
@@ -172,7 +172,18 @@ impl TryFrom<String> for Signature {
        ]),
    ),
)]
-
pub struct PublicKey(pub ed25519::PublicKey);
+
pub struct PublicKey(pub amplify::Bytes32);
+

+
impl PublicKey {
+
    /// Verify the signature for a given payload.
+
    pub fn verify(
+
        &self,
+
        payload: impl AsRef<[u8]>,
+
        signature: &ed25519::Signature,
+
    ) -> Result<(), ed25519::Error> {
+
        ed25519::PublicKey::new(self.0.to_byte_array()).verify(payload, signature)
+
    }
+
}

#[cfg(feature = "cyphernet")]
impl cyphernet::display::MultiDisplay<cyphernet::display::Encoding> for PublicKey {
@@ -186,7 +197,9 @@ impl cyphernet::display::MultiDisplay<cyphernet::display::Encoding> for PublicKe
#[cfg(feature = "ssh")]
impl From<PublicKey> for ssh_key::PublicKey {
    fn from(key: PublicKey) -> Self {
-
        ssh_key::PublicKey::from(ssh_key::public::Ed25519PublicKey(**key))
+
        ssh_key::PublicKey::from(ssh_key::public::Ed25519PublicKey(
+
            key.deref().to_byte_array(),
+
        ))
    }
}

@@ -195,24 +208,24 @@ impl cyphernet::EcPk for PublicKey {
    const COMPRESSED_LEN: usize = 32;
    const CURVE_NAME: &'static str = "Edwards25519";

-
    type Compressed = [u8; 32];
+
    type Compressed = amplify::Bytes32;

    fn base_point() -> Self {
        unimplemented!()
    }

    fn to_pk_compressed(&self) -> Self::Compressed {
-
        *self.0.deref()
+
        amplify::Bytes32::from_byte_array(self.deref().to_byte_array())
    }

    fn from_pk_compressed(pk: Self::Compressed) -> Result<Self, cyphernet::EcPkInvalid> {
-
        Ok(PublicKey::from(pk))
+
        Ok(PublicKey::from(pk.to_byte_array()))
    }

    fn from_pk_compressed_slice(slice: &[u8]) -> Result<Self, cyphernet::EcPkInvalid> {
        ed25519::PublicKey::from_slice(slice)
            .map_err(|_| cyphernet::EcPkInvalid::default())
-
            .map(Self)
+
            .map(Self::from)
    }
}

@@ -224,7 +237,8 @@ impl SecretKey {
    /// Elliptic-curve Diffie-Hellman.
    pub fn ecdh(&self, pk: &PublicKey) -> Result<[u8; 32], ed25519::Error> {
        let scalar = self.seed().scalar();
-
        let ge = edwards25519::GeP3::from_bytes_vartime(pk).ok_or(Error::InvalidPublicKey)?;
+
        let ge = edwards25519::GeP3::from_bytes_vartime(&pk.deref().to_byte_array())
+
            .ok_or(Error::InvalidPublicKey)?;

        Ok(edwards25519::ge_scalarmult(&scalar, &ge).to_bytes())
    }
@@ -333,13 +347,19 @@ impl fmt::Debug for PublicKey {

impl From<ed25519::PublicKey> for PublicKey {
    fn from(other: ed25519::PublicKey) -> Self {
-
        Self(other)
+
        Self(amplify::Bytes32::from_byte_array(*other.deref()))
+
    }
+
}
+

+
impl From<PublicKey> for ed25519::PublicKey {
+
    fn from(val: PublicKey) -> Self {
+
        ed25519::PublicKey::new(val.0.to_byte_array())
    }
}

impl From<[u8; 32]> for PublicKey {
    fn from(other: [u8; 32]) -> Self {
-
        Self(ed25519::PublicKey::new(other))
+
        Self(amplify::Bytes32::from_byte_array(other))
    }
}

@@ -347,7 +367,7 @@ impl TryFrom<&[u8]> for PublicKey {
    type Error = ed25519::Error;

    fn try_from(other: &[u8]) -> Result<Self, Self::Error> {
-
        ed25519::PublicKey::from_slice(other).map(Self)
+
        ed25519::PublicKey::from_slice(other).map(Self::from)
    }
}

@@ -362,7 +382,7 @@ impl PublicKey {
    pub fn to_human(&self) -> String {
        let mut buf = [0; 2 + ed25519::PublicKey::BYTES];
        buf[..2].copy_from_slice(&Self::MULTICODEC_TYPE);
-
        buf[2..].copy_from_slice(self.0.deref());
+
        buf[2..].copy_from_slice(self.0.to_byte_array().as_slice());

        multibase::encode(multibase::Base::Base58Btc, buf)
    }
@@ -397,7 +417,7 @@ impl FromStr for PublicKey {
        if let Some(bytes) = bytes.strip_prefix(&Self::MULTICODEC_TYPE) {
            let key = ed25519::PublicKey::from_slice(bytes)?;

-
            Ok(Self(key))
+
            Ok(key.into())
        } else {
            Err(PublicKeyError::Multicodec(Self::MULTICODEC_TYPE))
        }
@@ -413,7 +433,7 @@ impl TryFrom<String> for PublicKey {
}

impl Deref for PublicKey {
-
    type Target = ed25519::PublicKey;
+
    type Target = amplify::Bytes32;

    fn deref(&self) -> &Self::Target {
        &self.0
modified crates/radicle-crypto/src/ssh.rs
@@ -51,7 +51,9 @@ impl ExtendedSignature {
    /// Convert to OpenSSH standard PEM format.
    pub fn to_pem(&self) -> Result<String, ExtendedSignatureError> {
        ssh_key::SshSig::new(
-
            ssh_key::public::KeyData::from(ssh_key::public::Ed25519PublicKey(**self.key)),
+
            ssh_key::public::KeyData::from(ssh_key::public::Ed25519PublicKey(
+
                (*self.key).to_byte_array(),
+
            )),
            String::from("radicle"),
            ssh_key::HashAlg::Sha256,
            ssh_key::Signature::new(ssh_key::Algorithm::Ed25519, **self.sig)?,
modified crates/radicle-crypto/src/ssh/keystore.rs
@@ -335,7 +335,7 @@ impl MemorySigner {
            .ok_or_else(|| MemorySignerError::NotFound(public_path.to_path_buf()))?;

        secret
-
            .validate_public_key(&public)
+
            .validate_public_key(&public.into())
            .map_err(|_| MemorySignerError::KeyMismatch {
                secret: keystore.secret_key_path().to_path_buf(),
                public: public_path.to_path_buf(),
@@ -348,7 +348,7 @@ impl MemorySigner {
    /// the public key from the secret key.
    pub fn from_secret(secret: Zeroizing<SecretKey>) -> Self {
        Self {
-
            public: PublicKey(secret.public_key()),
+
            public: PublicKey((*secret.public_key()).into()),
            secret,
        }
    }
modified crates/radicle-crypto/src/test/arbitrary.rs
@@ -18,6 +18,6 @@ impl Arbitrary for PublicKey {
        let seed = Seed::new(bytes);
        let keypair = KeyPair::from_seed(seed);

-
        PublicKey(keypair.pk)
+
        keypair.pk.into()
    }
}
modified crates/radicle-node/Cargo.toml
@@ -10,9 +10,11 @@ build = "build.rs"
rust-version.workspace = true

[features]
-
default = ["backtrace", "systemd", "structured-logger", "socket2"]
+
default = ["backtrace", "i2p", "systemd", "structured-logger", "socket2", "tor"]
+
i2p = ["cyphernet/i2p", "radicle/i2p", "radicle-protocol/i2p"]
systemd = ["dep:radicle-systemd"]
test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "radicle-protocol/test", "qcheck", "snapbox"]
+
tor = ["cyphernet/tor", "radicle/tor", "radicle-protocol/tor"]

[dependencies]
backtrace = { version = "0.3.75", optional = true }
@@ -21,8 +23,9 @@ bytes = { workspace = true }
chrono = { workspace = true, features = ["clock"] }
colored = { workspace = true }
crossbeam-channel = { workspace = true }
-
cyphernet = { workspace = true, features = ["tor", "dns", "ed25519", "p2p-ed25519", "noise-framework", "noise_sha2"] }
+
cyphernet = { workspace = true, features = ["dns", "ed25519", "p2p-ed25519", "noise-framework", "noise_sha2"] }
fastrand = { workspace = true }
+
gix-packetline = { workspace = true, features = ["blocking-io"] }
lexopt = { workspace = true }
log = { workspace = true, features = ["kv", "std"] }
mio = { version = "1", features = ["net", "os-poll"] }
modified crates/radicle-node/src/fingerprint.rs
@@ -68,7 +68,7 @@ impl Fingerprint {
        home: &Home,
        secret_key: &impl std::ops::Deref<Target = crypto::SecretKey>,
    ) -> Result<(), Error> {
-
        let public_key = crypto::PublicKey(secret_key.deref().public_key());
+
        let public_key = secret_key.deref().public_key().into();
        let mut file = std::fs::OpenOptions::new()
            .create_new(true)
            .write(true)
@@ -86,7 +86,7 @@ impl Fingerprint {
        &self,
        secret_key: &impl std::ops::Deref<Target = crypto::SecretKey>,
    ) -> FingerprintVerification {
-
        let public_key = crypto::PublicKey(secret_key.deref().public_key());
+
        let public_key = secret_key.deref().public_key().into();
        if crypto::ssh::fmt::fingerprint(&public_key) == self.0 {
            FingerprintVerification::Match
        } else {
modified crates/radicle-node/src/reactor.rs
@@ -34,6 +34,12 @@ const SECONDS_IN_AN_HOUR: u64 = 60 * 60;
/// Maximum amount of time to wait for I/O.
const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);

+
/// Maximum duration we accept the service to spend handling events (and errors,
+
/// ticking, etc.) without warning. We set this to be warned whenever the
+
/// service becomes so slow that we would not be able to handle at least 100
+
/// "requests" per second, i.e. `1s / 100 = 10ms`.
+
const LAG_TIMEOUT: Duration = Duration::from_millis(10);
+

/// A resource which can be managed by the reactor.
pub trait EventHandler {
    /// The type of reactions which this resource may generate upon receiving
@@ -371,10 +377,9 @@ impl<H: ReactionHandler> Runtime<H> {

    fn run(mut self) {
        loop {
-
            let before_poll = Instant::now();
            let timeout = self
                .timeouts
-
                .next_expiring_from(before_poll)
+
                .next_expiring_from(Instant::now())
                .unwrap_or(WAIT_TIMEOUT);

            self.register_interests()
@@ -384,31 +389,38 @@ impl<H: ReactionHandler> Runtime<H> {

            let mut events = Events::with_capacity(1024);

-
            // Blocking
+
            // Block and wait for I/O events or timeout.
            let res = self.poll.poll(&mut events, Some(timeout));

-
            self.service.tick();
+
            // This instant allows us to measure the time spent by the service
+
            // to handle the result of polling.
            let tick = Instant::now();

-
            // The way this is currently used basically ignores which keys have
-
            // timed out. So as long as *something* timed out, we wake the service.
-
            let timers_fired = self.timeouts.remove_expired_by(tick);
-
            if timers_fired > 0 {
-
                log::trace!(target: "reactor", "Timer has fired");
-
                self.service.timer_reacted();
-
            }
+
            // We inform the service that time has advanced.
+
            self.service.tick();

+
            // We inform the service about errors during polling.
            if let Err(err) = res {
                log::warn!(target: "reactor", "Failure during polling: {err}");
                self.service.handle_error(Error::Poll(err));
            }

-
            let awoken = self.handle_events(tick, events);
+
            // We inform the service that some timers have reacted.
+
            // The way this is currently used basically ignores which
+
            // timers have expired. As long as *something* timed out,
+
            // we inform the service.
+
            let timers_fired = self.timeouts.remove_expired_by(tick);
+
            if timers_fired > 0 {
+
                log::trace!(target: "reactor", "Timer has fired");
+
                self.service.timer_reacted();
+
            }

            log::trace!(target: "reactor", "Duration between tick and events handled: {:?}", Instant::now().duration_since(tick));

            // Process the commands only if we awoken by the waker.
-
            if awoken {
+
            if events.is_empty() {
+
                log::trace!(target: "reactor", "Woke up from poll without events");
+
            } else if self.handle_events(tick, events) {
                loop {
                    match self.receiver.try_recv() {
                        Err(TryRecvError::Empty) => break,
@@ -421,6 +433,11 @@ impl<H: ReactionHandler> Runtime<H> {
                }
            }

+
            let duration = Instant::now().duration_since(tick);
+
            if duration > LAG_TIMEOUT {
+
                log::warn!(target: "reactor", "Service took {:?} to tick and handle errors and events, which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
+
            }
+

            self.handle_actions(tick);
        }
    }
modified crates/radicle-node/src/test/node.rs
@@ -7,6 +7,7 @@ use std::{
    collections::{BTreeMap, BTreeSet},
    fs, io, iter, net, process, thread, time,
    time::Duration,
+
    time::SystemTime,
};

use crossbeam_channel as chan;
@@ -107,22 +108,37 @@ impl<G: Signer<Signature> + cyphernet::Ecdh> NodeHandle<G> {
            .connect(remote.id, remote.addr.into(), ConnectOptions::default())
            .ok();

-
        local_events
-
            .iter()
-
            .find(|e| {
-
                matches!(
-
                    e, Event::PeerConnected { nid } if nid == &remote.id
-
                )
-
            })
-
            .unwrap();
-
        remote_events
-
            .iter()
-
            .find(|e| {
-
                matches!(
-
                    e, Event::PeerConnected { nid } if nid == &self.id
-
                )
-
            })
-
            .unwrap();
+
        // This timeout is rather arbitrary. The main point is to avoid waiting
+
        // indefinitely, but to also give slow CI enough time to settle.
+
        const TIMEOUT: Duration = Duration::from_secs(60);
+

+
        log::debug!(target: "test", "Waiting {:?} for node {} to be connected to peer {} (direction 1/2).", TIMEOUT, self.id, remote.id);
+
        if let Err(err) = local_events.wait(
+
            |event| match event {
+
                Event::PeerConnected { nid } if *nid == remote.id => Some(()),
+
                _ => None,
+
            },
+
            TIMEOUT,
+
        ) {
+
            panic!(
+
                "Failed to connect node {} to peer {} within {:?}: {err}",
+
                self.id, remote.id, TIMEOUT
+
            );
+
        }
+

+
        log::debug!(target: "test", "Waiting {:?} for node {} to be connected to peer {} (direction 2/2).", TIMEOUT, remote.id, self.id);
+
        if let Err(err) = remote_events.wait(
+
            |event| match event {
+
                Event::PeerConnected { nid } if *nid == self.id => Some(()),
+
                _ => None,
+
            },
+
            TIMEOUT,
+
        ) {
+
            panic!(
+
                "Failed to connect node {} to peer {} within {:?}: {err}",
+
                remote.id, self.id, TIMEOUT
+
            );
+
        }

        self
    }
@@ -578,6 +594,12 @@ pub fn converge<'a, G: Signer<Signature> + cyphernet::Ecdh + 'static>(
        }
    }

+
    // This timeout is rather arbitrary. The main point is to avoid waiting
+
    // indefinitely, but to also give slow CI enough time to settle.
+
    const TIMEOUT_DURATION: Duration = Duration::from_secs(30);
+

+
    let timeout = SystemTime::now() + TIMEOUT_DURATION;
+

    // Then, while there are nodes remaining to converge, check each node to see if
    // its routing table has all routes. If so, remove it from the remaining nodes.
    while !remaining.is_empty() {
@@ -595,6 +617,11 @@ pub fn converge<'a, G: Signer<Signature> + cyphernet::Ecdh + 'static>(
            true
        });
        thread::sleep(Duration::from_millis(100));
+

+
        if SystemTime::now() > timeout {
+
            log::warn!(target: "test", "Nodes did not converge within {:?}. Remaining nodes: {:?}", TIMEOUT_DURATION, remaining.keys());
+
            break;
+
        }
    }
    all_routes
}
modified crates/radicle-node/src/wire.rs
@@ -19,6 +19,7 @@ use radicle::node::device::Device;

use radicle::collections::{RandomMap, RandomSet};
use radicle::crypto;
+
#[cfg(any(feature = "tor", feature = "i2p"))]
use radicle::node::config::AddressConfig;
use radicle::node::Link;
use radicle::node::NodeId;
@@ -1083,6 +1084,30 @@ pub fn dial<G: Ecdh<Pk = NodeId>>(
    signer: G,
    config: &radicle::node::Config,
) -> io::Result<WireSession<G>> {
+
    #[cfg(any(feature = "tor", feature = "i2p"))]
+
    fn proxy_or_forward<H: std::fmt::Display>(
+
        config: &AddressConfig,
+
        global_proxy: Option<net::SocketAddr>,
+
        host: H,
+
        port: u16,
+
    ) -> io::Result<NetAddr<InetHost>> {
+
        match config {
+
            // In proxy mode, simply use the configured proxy address.
+
            // This takes precedence over any global proxy.
+
            AddressConfig::Proxy { address } => Ok((*address).into()),
+
            // In "forward" mode, if a global proxy is set, we use that, otherwise
+
            // we treat the address as a regular DNS name.
+
            AddressConfig::Forward => Ok(global_proxy
+
                .map(Into::into)
+
                .unwrap_or_else(|| NetAddr::new(InetHost::Dns(host.to_string()), port))),
+
            // If address type support isn't configured, refuse to connect.
+
            AddressConfig::Drop => Err(io::Error::new(
+
                io::ErrorKind::Unsupported,
+
                "no configuration found for address type",
+
            )),
+
        }
+
    }
+

    // Determine what address to establish a TCP connection with, given the remote peer
    // address and our node configuration.
    let inet_addr: NetAddr<InetHost> = match (&remote_addr.host, config.proxy) {
@@ -1092,27 +1117,12 @@ pub fn dial<G: Ecdh<Pk = NodeId>>(
        (HostName::Dns(_), Some(proxy)) => proxy.into(),
        (HostName::Dns(dns), None) => NetAddr::new(InetHost::Dns(dns.clone()), remote_addr.port),
        // For onion addresses, handle with care.
-
        (HostName::Tor(onion), proxy) => match config.onion {
-
            // In onion proxy mode, simply use the configured proxy address.
-
            // This takes precedence over any global proxy.
-
            Some(AddressConfig::Proxy { address }) => address.into(),
-
            // In "forward" mode, if a global proxy is set, we use that, otherwise
-
            // we treat `.onion` addresses as regular DNS names.
-
            Some(AddressConfig::Forward) => {
-
                if let Some(proxy) = proxy {
-
                    proxy.into()
-
                } else {
-
                    NetAddr::new(InetHost::Dns(onion.to_string()), remote_addr.port)
-
                }
-
            }
-
            // If onion address support isn't configured, refuse to connect.
-
            None => {
-
                return Err(io::Error::new(
-
                    io::ErrorKind::Unsupported,
-
                    "no configuration found for .onion addresses",
-
                ));
-
            }
-
        },
+
        #[cfg(feature = "tor")]
+
        (HostName::Tor(onion), proxy) => {
+
            proxy_or_forward(&config.onion, proxy, onion, remote_addr.port)?
+
        }
+
        #[cfg(feature = "i2p")]
+
        (HostName::I2p(i2p), proxy) => proxy_or_forward(&config.i2p, proxy, i2p, remote_addr.port)?,
        _ => {
            return Err(io::Error::new(
                io::ErrorKind::Unsupported,
modified crates/radicle-node/src/worker.rs
@@ -141,15 +141,51 @@ impl Worker {

                let timeout = channels.timeout();
                let (mut stream_r, stream_w) = channels.split();
-
                let header = match upload_pack::pktline::git_request(&mut stream_r) {
-
                    Ok(header) => header,
-
                    Err(e) => {
+

+
                let mut iter = gix_packetline::blocking_io::StreamingPeekableIter::new(
+
                    &mut stream_r,
+
                    &[gix_packetline::PacketLineRef::Flush],
+
                    false, /* packet tracing */
+
                );
+

+
                let header = match iter.read_line() {
+
                    None => {
+
                        return FetchResult::Responder {
+
                            rid: None,
+
                            result: Err(UploadError::PacketLine(std::io::Error::new(
+
                                std::io::ErrorKind::UnexpectedEof,
+
                                "unexpected end of stream while reading upload-pack header",
+
                            ))),
+
                        }
+
                    }
+
                    Some(Err(e)) => {
                        return FetchResult::Responder {
                            rid: None,
                            result: Err(UploadError::PacketLine(e)),
                        }
                    }
+
                    Some(Ok(Err(e))) => {
+
                        return FetchResult::Responder {
+
                            rid: None,
+
                            result: Err(UploadError::PacketLine(std::io::Error::new(
+
                                std::io::ErrorKind::InvalidData,
+
                                format!("invalid upload-pack header: {e}"),
+
                            ))),
+
                        }
+
                    }
+
                    Some(Ok(Ok(header))) => header,
                };
+

+
                let Some(header) = upload_pack::GitRequest::from_packetline(header) else {
+
                    return FetchResult::Responder {
+
                        rid: None,
+
                        result: Err(UploadError::PacketLine(std::io::Error::new(
+
                            std::io::ErrorKind::InvalidData,
+
                            "failed to parse upload-pack header",
+
                        ))),
+
                    };
+
                };
+

                log::debug!(target: "worker", "Spawning upload-pack process for {} on stream {stream}..", header.repo);

                if let Err(e) = self.is_authorized(remote, header.repo) {
modified crates/radicle-node/src/worker/upload_pack.rs
@@ -25,7 +25,7 @@ pub fn upload_pack<R, W>(
    remote: NodeId,
    storage: &Storage,
    emitter: &Emitter<Event>,
-
    header: &pktline::GitRequest,
+
    header: &GitRequest,
    mut recv: R,
    send: W,
    timeout: Duration,
@@ -239,119 +239,63 @@ where
    }
}

-
pub(super) mod pktline {
-
    use std::io;
-
    use std::io::Read;
-
    use std::str;
-

-
    use radicle::prelude::RepoId;
-

-
    pub const HEADER_LEN: usize = 4;
-

-
    /// Read and parse the `GitRequest` data from the client side.
-
    pub fn git_request<R>(reader: &mut R) -> io::Result<GitRequest>
-
    where
-
        R: io::Read,
-
    {
-
        let mut reader = Reader::new(reader);
-
        let (header, _) = reader.read_request_pktline()?;
-
        Ok(header)
-
    }
-

-
    struct Reader<'a, R> {
-
        stream: &'a mut R,
-
    }
-

-
    impl<'a, R: io::Read> Reader<'a, R> {
-
        /// Create a new packet-line reader.
-
        pub fn new(stream: &'a mut R) -> Self {
-
            Self { stream }
-
        }
-

-
        /// Parse a Git request packet-line.
-
        ///
-
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
-
        ///
-
        fn read_request_pktline(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
-
            let mut pktline = [0u8; 1024];
-
            let length = self.read_pktline(&mut pktline)?;
-
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
-
                return Err(io::ErrorKind::InvalidInput.into());
-
            };
-
            Ok((cmd, Vec::from(&pktline[..length])))
-
        }
-

-
        /// Parse a Git packet-line.
-
        fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
            self.read_exact(&mut buf[..HEADER_LEN])?;
-

-
            let length = str::from_utf8(&buf[..HEADER_LEN])
-
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
-
            let length = usize::from_str_radix(length, 16)
-
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
-

-
            self.read_exact(&mut buf[HEADER_LEN..length])?;
-

-
            Ok(length)
-
        }
-
    }
-

-
    impl<R: io::Read> io::Read for Reader<'_, R> {
-
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
            self.stream.read(buf)
-
        }
-
    }
+
/// The Git request packet-line for a repository.
+
///
+
/// See <https://git-scm.com/docs/pack-protocol.html#_git_transport>.
+
///
+
/// Example: `0032git-upload-pack /rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5.git\0host=myserver.com\0`
+
#[derive(Debug)]
+
pub struct GitRequest {
+
    pub repo: RepoId,
+
    #[allow(dead_code)]
+
    pub path: String,
+
    #[allow(dead_code)]
+
    pub host: Option<(String, Option<u16>)>,
+
    pub extra: Vec<(String, Option<String>)>,
+
}

-
    /// The Git request packet-line for a Heartwood repository.
-
    ///
-
    /// Example: `0032git-upload-pack /rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5.git\0host=myserver.com\0`
-
    #[derive(Debug)]
-
    pub struct GitRequest {
-
        pub repo: RepoId,
-
        #[allow(dead_code)]
-
        pub path: String,
-
        #[allow(dead_code)]
-
        pub host: Option<(String, Option<u16>)>,
-
        pub extra: Vec<(String, Option<String>)>,
+
impl GitRequest {
+
    pub(super) fn from_packetline(
+
        packet_line: gix_packetline::PacketLineRef<'_>,
+
    ) -> Option<GitRequest> {
+
        packet_line.as_slice().and_then(Self::parse)
    }

-
    impl GitRequest {
-
        /// Parse a Git command from a packet-line.
-
        fn parse(input: &[u8]) -> Option<Self> {
-
            let input = str::from_utf8(input).ok()?;
-
            let mut parts = input
-
                .strip_prefix("git-upload-pack ")?
-
                .split_terminator('\0');
-

-
            let path = parts.next()?.to_owned();
-
            let repo = path.strip_prefix('/')?.parse().ok()?;
-
            let host = match parts.next() {
-
                None | Some("") => None,
-
                Some(host) => {
-
                    let host = host.strip_prefix("host=")?;
-
                    match host.split_once(':') {
-
                        None => Some((host.to_owned(), None)),
-
                        Some((host, port)) => {
-
                            let port = port.parse::<u16>().ok()?;
-
                            Some((host.to_owned(), Some(port)))
-
                        }
+
    /// Parse a Git command from a packet-line.
+
    fn parse(input: &[u8]) -> Option<Self> {
+
        let input = std::str::from_utf8(input).ok()?;
+
        let mut parts = input
+
            .strip_prefix("git-upload-pack ")?
+
            .split_terminator('\0');
+

+
        let path = parts.next()?.to_owned();
+
        let repo = path.strip_prefix('/')?.parse().ok()?;
+
        let host = match parts.next() {
+
            None | Some("") => None,
+
            Some(host) => {
+
                let host = host.strip_prefix("host=")?;
+
                match host.split_once(':') {
+
                    None => Some((host.to_owned(), None)),
+
                    Some((host, port)) => {
+
                        let port = port.parse::<u16>().ok()?;
+
                        Some((host.to_owned(), Some(port)))
                    }
                }
-
            };
-
            let extra = parts
-
                .skip_while(|part| part.is_empty())
-
                .map(|part| match part.split_once('=') {
-
                    None => (part.to_owned(), None),
-
                    Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
-
                })
-
                .collect();
-

-
            Some(Self {
-
                repo,
-
                path,
-
                host,
-
                extra,
+
            }
+
        };
+
        let extra = parts
+
            .skip_while(|part| part.is_empty())
+
            .map(|part| match part.split_once('=') {
+
                None => (part.to_owned(), None),
+
                Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
            })
-
        }
+
            .collect();
+

+
        Some(Self {
+
            repo,
+
            path,
+
            host,
+
            extra,
+
        })
    }
}
modified crates/radicle-protocol/Cargo.toml
@@ -9,13 +9,15 @@ edition.workspace = true
rust-version.workspace = true

[features]
+
i2p = ["cypheraddr/i2p"]
test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "qcheck"]
+
tor = ["cypheraddr/tor"]

[dependencies]
bloomy = "1.2"
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
-
cypheraddr = { workspace = true, features = ["serde", "tor"] }
+
cypheraddr = { workspace = true, features = ["serde"] }
fastrand = { workspace = true }
log = { workspace = true, features = ["std"] }
nonempty = { workspace = true, features = ["serialize"] }
modified crates/radicle-protocol/src/service.rs
@@ -30,6 +30,8 @@ use radicle::node;
use radicle::node::address;
use radicle::node::address::Store as _;
use radicle::node::address::{AddressBook, AddressType, KnownAddress};
+
#[cfg(feature = "tor")]
+
use radicle::node::config::AddressConfig;
use radicle::node::config::{PeerConfig, RateLimit};
use radicle::node::device::Device;
use radicle::node::refs::Store as _;
@@ -404,7 +406,7 @@ where
    G: crypto::signature::Signer<crypto::Signature>,
{
    pub fn new(
-
        config: Config,
+
        mut config: Config,
        db: Stores<D>,
        storage: S,
        policies: policy::Config<Write>,
@@ -427,6 +429,10 @@ where
                .with_max_capacity(fetcher::MaxQueueSize::default());
            FetcherService::new(config)
        };
+

+
        // For backwards compatibility, ensure that we are announcer of our own Node ID.
+
        config.announcers.insert(*signer.public_key());
+

        Self {
            config,
            storage,
@@ -1548,12 +1554,15 @@ where
                // from a new repository being initialized.
                self.seed_discovered(message.rid, *announcer, message.timestamp);

-
                // Update sync status of announcer for this repo.
-
                if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
+
                // Update sync status of announcers for this repo.
+
                for refs in refs
+
                    .iter()
+
                    .filter(|r| self.config.announcers.contains(&r.remote))
+
                {
                    debug!(
                        target: "service",
-
                        "Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
-
                        message.rid, refs.at, message.timestamp
+
                        "Refs announcement of {announcer} for {} contains announcer {} (t={})",
+
                        message.rid, refs, message.timestamp
                    );
                    match self.db.seeds_mut().synced(
                        &message.rid,
@@ -2572,10 +2581,18 @@ where
    ///
    /// If the [`Address`] is an `.onion` address and the service supports onion
    /// routing then this will return `true`.
+
    ///
+
    /// # I2P
+
    ///
+
    /// If the [`Address`] is an I2P address and the service supports I2P
+
    /// connections then this will return `true`.
    fn is_supported_address(&self, address: &Address) -> bool {
        match AddressType::from(address) {
            // Only consider onion addresses if configured.
-
            AddressType::Onion => self.config.onion.is_some(),
+
            #[cfg(feature = "tor")]
+
            AddressType::Onion => self.config.onion != AddressConfig::Drop,
+
            #[cfg(feature = "i2p")]
+
            AddressType::I2p => self.config.i2p != AddressConfig::Drop,
            AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
        }
    }
modified crates/radicle-protocol/src/service/limiter.rs
@@ -156,7 +156,7 @@ mod test {
    }

    #[test]
-
    fn test_limitter_refill() {
+
    fn test_limiter_refill() {
        let mut r = RateLimiter::default();
        let t = (3, 0.2); // Three tokens burst. One token every 5 seconds.
        let a = HostName::Dns(String::from("seed.radicle.example.com"));
@@ -188,7 +188,7 @@ mod test {

    #[test]
    #[rustfmt::skip]
-
    fn test_limitter_multi() {
+
    fn test_limiter_multi() {
        let t = (1, 1.0); // One token per second. One token burst.
        let n = arbitrary::gen::<NodeId>(1);
        let n = Some(&n);
@@ -208,7 +208,7 @@ mod test {

    #[test]
    #[rustfmt::skip]
-
    fn test_limitter_different_rates() {
+
    fn test_limiter_different_rates() {
        let t1 = (1, 1.0); // One token per second. One token burst.
        let t2 = (2, 2.0); // Two tokens per second. Two token burst.
        let n = arbitrary::gen::<NodeId>(1);
modified crates/radicle-protocol/src/wire.rs
@@ -15,6 +15,9 @@ use std::string::FromUtf8Error;

use bytes::{Buf, BufMut};

+
#[cfg(feature = "i2p")]
+
use cypheraddr::i2p;
+
#[cfg(feature = "tor")]
use cypheraddr::tor;

use radicle::crypto::{PublicKey, Signature, Unverified};
@@ -58,8 +61,12 @@ pub enum Invalid {
    Alias(#[from] node::AliasError),
    #[error("invalid user agent string: {err}")]
    InvalidUserAgent { err: String },
+
    #[cfg(feature = "tor")]
    #[error("invalid onion address: {0}")]
    OnionAddr(#[from] tor::OnionAddrDecodeError),
+
    #[cfg(feature = "i2p")]
+
    #[error("invalid i2p address: {0}")]
+
    I2pAddr(#[from] i2p::I2pAddrParseError),
    #[error("invalid timestamp: {actual_millis} millis")]
    Timestamp { actual_millis: u64 },

@@ -173,7 +180,7 @@ impl Encode for u64 {

impl Encode for PublicKey {
    fn encode(&self, buf: &mut impl BufMut) {
-
        self.deref().encode(buf)
+
        self.deref().to_byte_array().encode(buf)
    }
}

@@ -257,12 +264,20 @@ impl Encode for Refs {
    }
}

+
#[cfg(feature = "tor")]
impl Encode for cypheraddr::tor::OnionAddrV3 {
    fn encode(&self, buf: &mut impl BufMut) {
        self.into_raw_bytes().encode(buf)
    }
}

+
#[cfg(feature = "i2p")]
+
impl Encode for i2p::I2pAddr {
+
    fn encode(&self, buf: &mut impl BufMut) {
+
        self.to_string().encode(buf)
+
    }
+
}
+

impl Encode for UserAgent {
    fn encode(&self, buf: &mut impl BufMut) {
        self.as_ref().encode(buf)
@@ -536,6 +551,7 @@ impl Decode for node::Features {
    }
}

+
#[cfg(feature = "tor")]
impl Decode for tor::OnionAddrV3 {
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
        let bytes: [u8; tor::ONION_V3_RAW_LEN] = Decode::decode(buf)?;
@@ -545,6 +561,16 @@ impl Decode for tor::OnionAddrV3 {
    }
}

+
#[cfg(feature = "i2p")]
+
impl Decode for i2p::I2pAddr {
+
    fn decode(buf: &mut impl Buf) -> Result<Self, Error> {
+
        let s = String::decode(buf)?;
+
        let addr = i2p::I2pAddr::from_str(&s).map_err(Invalid::from)?;
+

+
        Ok(addr)
+
    }
+
}
+

impl Encode for Timestamp {
    fn encode(&self, buf: &mut impl BufMut) {
        self.deref().encode(buf)
modified crates/radicle-protocol/src/wire/message.rs
@@ -2,7 +2,11 @@ use std::{mem, net};

use bytes::Buf;
use bytes::BufMut;
-
use cypheraddr::{tor, HostName, NetAddr};
+
#[cfg(feature = "i2p")]
+
use cypheraddr::i2p;
+
#[cfg(feature = "tor")]
+
use cypheraddr::tor;
+
use cypheraddr::{HostName, NetAddr};
use radicle::crypto::Signature;
use radicle::git::Oid;
use radicle::identity::RepoId;
@@ -79,7 +83,10 @@ pub enum AddressType {
    Ipv4 = 1,
    Ipv6 = 2,
    Dns = 3,
+
    #[cfg(feature = "tor")]
    Onion = 4,
+
    #[cfg(feature = "i2p")]
+
    I2p = 5,
}

impl From<AddressType> for u8 {
@@ -94,7 +101,10 @@ impl From<&Address> for AddressType {
            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
            HostName::Dns(_) => AddressType::Dns,
+
            #[cfg(feature = "tor")]
            HostName::Tor(_) => AddressType::Onion,
+
            #[cfg(feature = "i2p")]
+
            HostName::I2p(_) => AddressType::I2p,
            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
        }
    }
@@ -108,7 +118,10 @@ impl TryFrom<u8> for AddressType {
            1 => Ok(AddressType::Ipv4),
            2 => Ok(AddressType::Ipv6),
            3 => Ok(AddressType::Dns),
+
            #[cfg(feature = "tor")]
            4 => Ok(AddressType::Onion),
+
            #[cfg(feature = "i2p")]
+
            5 => Ok(AddressType::I2p),
            _ => Err(other),
        }
    }
@@ -343,7 +356,7 @@ impl wire::Decode for Message {

impl wire::Encode for Address {
    fn encode(&self, buf: &mut impl BufMut) {
-
        match self.host {
+
        match &self.host {
            HostName::Ip(net::IpAddr::V4(ip)) => {
                u8::from(AddressType::Ipv4).encode(buf);
                ip.octets().encode(buf);
@@ -356,10 +369,16 @@ impl wire::Encode for Address {
                u8::from(AddressType::Dns).encode(buf);
                dns.encode(buf);
            }
+
            #[cfg(feature = "tor")]
            HostName::Tor(addr) => {
                u8::from(AddressType::Onion).encode(buf);
                addr.encode(buf);
            }
+
            #[cfg(feature = "i2p")]
+
            HostName::I2p(addr) => {
+
                u8::from(AddressType::I2p).encode(buf);
+
                addr.encode(buf);
+
            }
            _ => {
                unimplemented!(
                    "Encoding not defined for addresses of the same type as the following: {:?}",
@@ -393,11 +412,18 @@ impl wire::Decode for Address {

                HostName::Dns(dns)
            }
+
            #[cfg(feature = "tor")]
            Ok(AddressType::Onion) => {
                let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;

                HostName::Tor(onion)
            }
+
            #[cfg(feature = "i2p")]
+
            Ok(AddressType::I2p) => {
+
                let i2p: i2p::I2pAddr = wire::Decode::decode(buf)?;
+

+
                HostName::I2p(i2p)
+
            }
            Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
        };
        let port = u16::decode(buf)?;
modified crates/radicle-schemars/src/main.rs
@@ -81,7 +81,7 @@ fn print_schema() -> io::Result<()> {

            #[derive(JsonSchema)]
            #[schemars(untagged)]
-
            #[allow(dead_code)]
+
            #[allow(dead_code, clippy::large_enum_variant)]
            enum CommandResult {
                Nid(radicle::node::NodeId),
                Config(Box<radicle::node::Config>),
modified crates/radicle-ssh/src/agent/client.rs
@@ -52,20 +52,7 @@ pub enum Error {

impl Error {
    pub fn is_not_running(&self) -> bool {
-
        match self {
-
            Self::EnvVar { .. } | Self::BadAuthSock { .. } => true,
-
            #[cfg(windows)]
-
            Self::Connect { source, .. }
-
                if source.kind() == std::io::ErrorKind::ConnectionRefused =>
-
            {
-
                // On Windows, a named pipe might be used, and if no
-
                // agent is running, we might get a "connection refused"
-
                // error, even though the `SSH_AUTH_SOCK` environment
-
                // variable is set and the named pipe exists.
-
                true
-
            }
-
            _ => false,
-
        }
+
        matches!(self, Self::EnvVar { .. } | Self::BadAuthSock { .. })
    }
}

modified crates/radicle-ssh/src/encoding.rs
@@ -56,7 +56,7 @@ pub trait Encoding {
    /// May panic if the argument is greater than [`u32::MAX`].
    /// This is a convenience method, to spare callers casting or converting
    /// [`usize`] to [`u32`]. If callers end up in a situation where they
-
    /// need to push a 32-bit unisgned integer, but the value they would
+
    /// need to push a 32-bit unsigned integer, but the value they would
    /// like to push does not fit 32 bits, then the implementation will not
    /// comply with the SSH format anyway.
    fn extend_usize(&mut self, u: usize) {
modified crates/radicle/Cargo.toml
@@ -11,6 +11,7 @@ rust-version.workspace = true

[features]
default = []
+
i2p = ["cyphernet/i2p"]
test = ["tempfile", "qcheck", "radicle-crypto/test", "radicle-cob/test"]
logger = ["colored", "chrono"]
qcheck = [
@@ -24,6 +25,7 @@ schemars = [
  "radicle-localtime/schemars",
  "dep:schemars"
]
+
tor = ["cyphernet/tor"]

[dependencies]
amplify = { workspace = true, features = ["std"] }
@@ -32,7 +34,7 @@ bytesize = { version = "2", features = ["serde"] }
chrono = { workspace = true, features = ["clock"], optional = true }
colored = { workspace = true, optional = true }
crossbeam-channel = { workspace = true }
-
cyphernet = { workspace = true, features = ["tor", "dns", "p2p-ed25519"] }
+
cyphernet = { workspace = true, features = ["dns", "p2p-ed25519"] }
dunce = { workspace = true }
fast-glob = { version = "0.3.2" }
fastrand = { workspace = true, features = ["std"] }
modified crates/radicle/src/node.rs
@@ -428,8 +428,8 @@ impl TryFrom<&sqlite::Value> for Alias {
    feature = "schemars",
    derive(schemars::JsonSchema),
    schemars(description = "\
-
    An IP address, or a DNS name, or a Tor onion name, followed by the symbol ':', \
-
    followed by a TCP port number.\
+
    An IP address, or a DNS name, or a Tor onion name, or I2P address,
+
    followed by the symbol ':', followed by a TCP port number.\
")
)]
pub struct Address(
@@ -439,6 +439,7 @@ pub struct Address(
        regex(pattern = r"^.+:((6553[0-5])|(655[0-2][0-9])|(65[0-4][0-9]{2})|(6[0-4][0-9]{3})|([1-5][0-9]{4})|([0-5]{0,5})|([0-9]{1,4}))$"),
        extend("examples" = [
            "xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd.onion:8776",
+
            "f2atcc7udeub5kh4nkljtjwyk7ikjviorufzgwnfwhkphljl3vhq.b32.i2p:8776",
            "seed.example.com:8776",
            "192.0.2.0:31337",
        ]),
@@ -477,6 +478,7 @@ impl Address {
    }

    /// Returns `true` if the [`HostName`] is a Tor onion address.
+
    #[cfg(feature = "tor")]
    pub fn is_onion(&self) -> bool {
        match self.0.host {
            HostName::Tor(_) => true,
@@ -484,6 +486,15 @@ impl Address {
        }
    }

+
    /// Returns `true` if the [`HostName`] is an I2P address.
+
    #[cfg(feature = "i2p")]
+
    pub fn is_i2p(&self) -> bool {
+
        match self.0.host {
+
            HostName::I2p(_) => true,
+
            _ => false,
+
        }
+
    }
+

    /// Return the port number of the [`Address`].
    pub fn port(&self) -> u16 {
        self.0.port
modified crates/radicle/src/node/address.rs
@@ -201,7 +201,10 @@ pub enum AddressType {
    Ipv4 = 1,
    Ipv6 = 2,
    Dns = 3,
+
    #[cfg(feature = "tor")]
    Onion = 4,
+
    #[cfg(feature = "i2p")]
+
    I2p = 5,
}

impl From<AddressType> for u8 {
@@ -216,7 +219,10 @@ impl From<&Address> for AddressType {
            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
            HostName::Dns(_) => AddressType::Dns,
+
            #[cfg(feature = "tor")]
            HostName::Tor(_) => AddressType::Onion,
+
            #[cfg(feature = "i2p")]
+
            HostName::I2p(_) => AddressType::I2p,
            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
        }
    }
@@ -230,7 +236,10 @@ impl TryFrom<u8> for AddressType {
            1 => Ok(AddressType::Ipv4),
            2 => Ok(AddressType::Ipv6),
            3 => Ok(AddressType::Dns),
+
            #[cfg(feature = "tor")]
            4 => Ok(AddressType::Onion),
+
            #[cfg(feature = "i2p")]
+
            5 => Ok(AddressType::I2p),
            _ => Err(other),
        }
    }
modified crates/radicle/src/node/address/store.rs
@@ -535,7 +535,10 @@ impl TryFrom<&sql::Value> for AddressType {
                "ipv4" => Ok(AddressType::Ipv4),
                "ipv6" => Ok(AddressType::Ipv6),
                "dns" => Ok(AddressType::Dns),
+
                #[cfg(feature = "tor")]
                "onion" => Ok(AddressType::Onion),
+
                #[cfg(feature = "i2p")]
+
                "i2p" => Ok(AddressType::I2p),
                _ => Err(err),
            },
            _ => Err(err),
@@ -549,7 +552,10 @@ impl sql::BindableWithIndex for AddressType {
            Self::Ipv4 => "ipv4".bind(stmt, i),
            Self::Ipv6 => "ipv6".bind(stmt, i),
            Self::Dns => "dns".bind(stmt, i),
+
            #[cfg(feature = "tor")]
            Self::Onion => "onion".bind(stmt, i),
+
            #[cfg(feature = "i2p")]
+
            Self::I2p => "i2p".bind(stmt, i),
        }
    }
}
modified crates/radicle/src/node/config.rs
@@ -19,7 +19,9 @@ pub type ProtocolVersion = u8;
pub mod seeds {
    use std::{str::FromStr, sync::LazyLock};

-
    use cyphernet::addr::{tor::OnionAddrV3, HostName, NetAddr};
+
    #[cfg(feature = "tor")]
+
    use cyphernet::addr::tor::OnionAddrV3;
+
    use cyphernet::addr::{HostName, NetAddr};

    use super::{ConnectAddress, NodeId, PeerAddr};

@@ -38,6 +40,7 @@ pub mod seeds {
            NodeId::from_str("z6MkrLMMsiPWUcNPHcRajuMi9mDfYckSoJyPwwnknocNYPm7").unwrap(),
            vec![
                HostName::Dns("iris.radicle.xyz".to_owned()),
+
                #[cfg(feature = "tor")]
                #[allow(clippy::unwrap_used)] // Value is manually verified.
                OnionAddrV3::from_str(
                    "irisradizskwweumpydlj4oammoshkxxjur3ztcmo7cou5emc6s5lfid.onion",
@@ -55,6 +58,7 @@ pub mod seeds {
            NodeId::from_str("z6Mkmqogy2qEM2ummccUthFEaaHvyYmYBYh3dbe9W4ebScxo").unwrap(),
            vec![
                HostName::Dns("rosa.radicle.xyz".to_owned()),
+
                #[cfg(feature = "tor")]
                #[allow(clippy::unwrap_used)] // Value is manually verified.
                OnionAddrV3::from_str(
                    "rosarad5bxgdlgjnzzjygnsxrwxmoaj4vn7xinlstwglxvyt64jlnhyd.onion",
@@ -271,7 +275,7 @@ pub struct RateLimits {
    schemars(description = "\
    A node address to connect to. Format: An Ed25519 public key in multibase encoding, \
    followed by the symbol '@', followed by an IP address, or a DNS name, or a Tor onion \
-
    name, followed by the symbol ':', followed by a TCP port number.\
+
    name, or an I2P address, followed by the symbol ':', followed by a TCP port number.\
")
)]
pub struct ConnectAddress(
@@ -282,6 +286,7 @@ pub struct ConnectAddress(
        extend("examples" = [
            "z6MkrLMMsiPWUcNPHcRajuMi9mDfYckSoJyPwwnknocNYPm7@rosa.radicle.xyz:8776",
            "z6MkvUJtYD9dHDJfpevWRT98mzDDpdAtmUjwyDSkyqksUr7C@xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd.onion:8776",
+
            "z6Mkvky2mnSYCTUMKRdAUoZXBXLLKtnWEkWeYQcGjjnmobAU@f2atcc7udeub5kh4nkljtjwyk7ikjviorufzgwnfwhkphljl3vhq.b32.i2p:8776",
            "z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi@seed.example.com:8776",
            "z6MkkfM3tPXNPrPevKr3uSiQtHPuwnNhu2yUVjgd2jXVsVz5@192.0.2.0:31337",
        ]),
@@ -348,9 +353,10 @@ pub enum Relay {
}

/// Proxy configuration.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
+
#[derive(Debug, Copy, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "mode")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
#[cfg(any(feature = "tor", feature = "i2p"))]
pub enum AddressConfig {
    /// Proxy connections to this address type.
    Proxy {
@@ -360,6 +366,9 @@ pub enum AddressConfig {
    /// Forward address to the next layer. Either this is the global proxy,
    /// or the operating system, via DNS.
    Forward,
+
    /// Drop connections to this address type.
+
    #[default]
+
    Drop,
}

/// Default seeding policy. Applies when no repository policies for the given repo are found.
@@ -428,8 +437,13 @@ pub struct Config {
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub proxy: Option<net::SocketAddr>,
    /// Onion address config.
-
    #[serde(default, skip_serializing_if = "Option::is_none")]
-
    pub onion: Option<AddressConfig>,
+
    #[cfg(feature = "tor")]
+
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
+
    pub onion: AddressConfig,
+
    /// I2P address config.
+
    #[cfg(feature = "i2p")]
+
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
+
    pub i2p: AddressConfig,
    /// Peer-to-peer network.
    #[serde(default)]
    pub network: Network,
@@ -459,6 +473,8 @@ pub struct Config {
    /// the environment variable `RAD_PASSPHRASE`.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub secret: Option<std::path::PathBuf>,
+
    #[serde(default, skip_serializing_if = "HashSet::is_empty")]
+
    pub announcers: HashSet<super::NodeId>,
}

impl Config {
@@ -478,7 +494,10 @@ impl Config {
            external_addresses: vec![],
            network: Network::default(),
            proxy: None,
-
            onion: None,
+
            #[cfg(feature = "tor")]
+
            onion: AddressConfig::Drop,
+
            #[cfg(feature = "i2p")]
+
            i2p: AddressConfig::Drop,
            relay: Relay::default(),
            limits: Limits::default(),
            workers: Workers::default(),
@@ -486,6 +505,7 @@ impl Config {
            seeding_policy: DefaultSeedingPolicy::default(),
            extra: json::Map::default(),
            secret: None,
+
            announcers: HashSet::new(),
        }
    }

modified crates/radicle/src/test/arbitrary.rs
@@ -6,8 +6,10 @@ use std::{iter, net};

use crypto::test::signer::MockSigner;
use crypto::{PublicKey, Unverified};
-
use cyphernet::addr::tor::OnionAddrV3;
-
use cyphernet::EcPk;
+
#[cfg(feature = "i2p")]
+
use cyphernet::addr::i2p::I2pAddr;
+
#[cfg(feature = "tor")]
+
use cyphernet::{addr::tor::OnionAddrV3, EcPk};
use qcheck::Arbitrary;

use crate::collections::RandomMap;
@@ -293,13 +295,38 @@ impl Arbitrary for Address {
                    .unwrap()
                    .to_string(),
            ),
+
            #[cfg(feature = "tor")]
            AddressType::Onion => {
                let pk = PublicKey::arbitrary(g);
                let addr = OnionAddrV3::from(
-
                    cyphernet::ed25519::PublicKey::from_pk_compressed(**pk).unwrap(),
+
                    cyphernet::ed25519::PublicKey::from_pk_compressed(*pk).unwrap(),
                );
                cyphernet::addr::HostName::Tor(addr)
            }
+
            #[cfg(feature = "i2p")]
+
            AddressType::I2p => {
+
                let address = if bool::arbitrary(g) {
+
                    let mut rng = fastrand::Rng::with_seed(u64::arbitrary(g));
+

+
                    let name: String = iter::repeat_with(|| rng.alphanumeric()).take(56).collect();
+

+
                    name + ".b32"
+
                } else {
+
                    g.choose(&["iris.radicle.example", "rosa.radicle.example"])
+
                        .unwrap()
+
                        .to_string()
+
                };
+

+
                let suffix = if bool::arbitrary(g) {
+
                    ".i2p"
+
                } else {
+
                    ".i2p.alt"
+
                };
+

+
                let address = address + suffix;
+

+
                cyphernet::addr::HostName::I2p(I2pAddr::from_str(&address).unwrap())
+
            }
        };

        Address::from(cyphernet::addr::NetAddr {
modified flake.nix
@@ -235,6 +235,13 @@
              hooks =
                {
                  alejandra.enable = true;
+
                  typos = {
+
                    enable = true;
+
                    settings = {
+
                      verbose = true;
+
                      write = true;
+
                    };
+
                  };
                  codespell = {
                    enable = true;
                    entry = "${lib.getExe pkgs.codespell} -w";