Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement worker thread handover
Dr. Maxim Orlovsky committed 3 years ago
commit a7b1c19746d280551f6bd8fce9ca8fa2872d8d1d
parent e49226c50023e012657c022009d58489054d6f99
18 files changed +453 -203
modified Cargo.lock
@@ -1368,6 +1368,18 @@ dependencies = [
]

[[package]]
+
name = "io-reactor"
+
version = "0.1.0"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#2a09c1714bea9baf8a96b9bd09d2dac4181a5019"
+
dependencies = [
+
 "amplify",
+
 "crossbeam-channel",
+
 "libc",
+
 "popol",
+
 "socket2",
+
]
+

+
[[package]]
name = "iri-string"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1606,12 +1618,12 @@ dependencies = [
[[package]]
name = "netservices"
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#045ab140fd33ab3008a0ce8c86d8b9d01b08662c"
+
source = "git+https://github.com/cyphernet-wg/rust-netservices#2a09c1714bea9baf8a96b9bd09d2dac4181a5019"
dependencies = [
 "amplify",
 "cyphernet",
+
 "io-reactor",
 "libc",
-
 "poll-reactor",
 "socket2",
]

@@ -1916,18 +1928,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"

[[package]]
-
name = "poll-reactor"
-
version = "0.1.0"
-
source = "git+https://github.com/cyphernet-wg/rust-netservices#045ab140fd33ab3008a0ce8c86d8b9d01b08662c"
-
dependencies = [
-
 "amplify",
-
 "crossbeam-channel",
-
 "libc",
-
 "popol",
-
 "socket2",
-
]
-

-
[[package]]
name = "popol"
version = "1.0.0"
source = "git+https://github.com/Internet2-WG/popol?branch=api#02b8d4089bd234f3d9d46ef27f6e64cfbc45118a"
@@ -2222,12 +2222,12 @@ dependencies = [
 "cyphernet",
 "fastrand",
 "git-ref-format",
+
 "io-reactor",
 "lexopt",
 "log",
 "nakamoto-net",
 "netservices",
 "nonempty 0.8.1",
-
 "poll-reactor",
 "qcheck",
 "qcheck-macros",
 "radicle",
modified Cargo.toml
@@ -37,7 +37,7 @@ version = "0.3.0"
git = "https://github.com/cyphernet-wg/rust-cyphernet"
version = "0.1.0"

-
[patch.crates-io.poll-reactor]
+
[patch.crates-io.io-reactor]
git = "https://github.com/cyphernet-wg/rust-netservices"
version = "0.1.0"

modified radicle-crypto/src/lib.rs
@@ -1,4 +1,5 @@
use std::cmp::Ordering;
+
use std::sync::Arc;
use std::{fmt, ops::Deref, str::FromStr};

use ed25519_compact as ed25519;
@@ -26,28 +27,28 @@ pub type SharedSecret = [u8; 32];
/// Trait alias used for Diffie–Hellman key exchange.
#[cfg(feature = "cyphernet")]
pub trait Negotiator:
-
    cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone
+
    cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone + Send
{
}

#[cfg(feature = "cyphernet")]
impl<T> Negotiator for T where
-
    T: cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone
+
    T: cyphernet::crypto::Ecdh<Pk = PublicKey, Secret = SharedSecret, Err = Error> + Clone + Send
{
}

/// Error returned if signing fails, eg. due to an HSM or KMS.
-
#[derive(Debug, Error)]
+
#[derive(Debug, Clone, Error)]
#[error(transparent)]
pub struct SignerError {
    #[from]
-
    source: Box<dyn std::error::Error + Send + Sync>,
+
    source: Arc<dyn std::error::Error + Send + Sync>,
}

impl SignerError {
    pub fn new(source: impl std::error::Error + Send + Sync + 'static) -> Self {
        Self {
-
            source: Box::new(source),
+
            source: Arc::new(source),
        }
    }
}
modified radicle-crypto/src/test/signer.rs
@@ -71,3 +71,14 @@ impl Signer for MockSigner {
        Ok(self.sign(msg))
    }
}
+

+
#[cfg(feature = "cyphernet")]
+
impl cyphernet::crypto::Ecdh for MockSigner {
+
    type Pk = PublicKey;
+
    type Secret = crate::SharedSecret;
+
    type Err = crate::Error;
+

+
    fn ecdh(&self, _pk: &Self::Pk) -> Result<Self::Secret, Self::Err> {
+
        Ok([0; 32])
+
    }
+
}
modified radicle-node/Cargo.toml
@@ -6,7 +6,7 @@ authors = ["Alexis Sellier <alexis@radicle.xyz>"]
edition = "2021"

[features]
-
test = ["radicle/test", "radicle-crypto/test", "qcheck"]
+
test = ["radicle/test", "radicle-crypto/test", "radicle-crypto/cyphernet", "qcheck"]

[dependencies]
amplify = { version = "4.0.0-beta.1", default-features = false, features = ["std"] }
@@ -22,9 +22,9 @@ git-ref-format = { version = "0", features = ["serde", "macro"] }
lexopt = { version = "0.2.1" }
log = { version = "0.4.17", features = ["std"] }
nakamoto-net = { version = "0.3.0" }
-
netservices = { version = "0", features = ["poll-reactor", "socket2"] }
+
netservices = { version = "0", features = ["io-reactor", "socket2"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
-
poll-reactor = { version = "0", features = ["popol", "socket2"] }
+
io-reactor = { version = "0", features = ["popol", "socket2"] }
qcheck = { version = "1", default-features = false, optional = true }
sqlite = { version = "0.30.3" }
sqlite3-src = { version = "0.4.0", features = ["bundled"] } # Ensures static linking
@@ -41,6 +41,6 @@ features = ["sql"]

[dev-dependencies]
radicle = { path = "../radicle", version = "*", features = ["test"] }
-
radicle-crypto = { path = "../radicle-crypto", version = "*", features = ["test"] }
+
radicle-crypto = { path = "../radicle-crypto", version = "*", features = ["test", "cyphernet"] }
qcheck = { version = "1", default-features = false }
qcheck-macros = { version = "1", default-features = false }
modified radicle-node/src/client/handle.rs
@@ -46,25 +46,24 @@ impl<T> From<chan::SendError<T>> for Error {
    }
}

-
pub struct Handle {
-
    pub(crate) controller: reactor::Controller<service::Command>,
+
pub struct Handle<T: reactor::Handler> {
+
    pub(crate) controller: reactor::Controller<T>,
}

-
impl From<reactor::Controller<service::Command>> for Handle {
-
    fn from(controller: reactor::Controller<service::Command>) -> Handle {
+
impl<T: reactor::Handler> From<reactor::Controller<T>> for Handle<T> {
+
    fn from(controller: reactor::Controller<T>) -> Handle<T> {
        Handle { controller }
    }
}

-
impl Handle {
+
impl<T: reactor::Handler<Command = service::Command>> Handle<T> {
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
        self.controller.send(cmd)?;
-

        Ok(())
    }
}

-
impl radicle::node::Handle for Handle {
+
impl<T: reactor::Handler<Command = service::Command>> radicle::node::Handle for Handle<T> {
    type Session = Session;
    type FetchLookup = FetchLookup;
    type Error = Error;
modified radicle-node/src/lib.rs
@@ -15,6 +15,7 @@ pub mod test;
#[cfg(test)]
pub mod tests;
pub mod wire;
+
pub mod worker;

pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
pub use radicle::{collections, crypto, git, identity, node, profile, rad, storage};
modified radicle-node/src/main.rs
@@ -3,16 +3,19 @@ use std::{env, net, process, thread};
use anyhow::Context as _;
use cyphernet::addr::PeerAddr;
use nakamoto_net::{LocalDuration, LocalTime};
+
use netservices::wire::NetAccept;
use reactor::poller::popol;
use reactor::Reactor;

use radicle::profile;
+
use radicle::storage::WriteStorage;
use radicle_node::client::handle::Handle;
use radicle_node::client::{ADDRESS_DB_FILE, NODE_DIR, ROUTING_DB_FILE, TRACKING_DB_FILE};
use radicle_node::crypto::ssh::keystore::MemorySigner;
use radicle_node::prelude::{Address, NodeId};
-
use radicle_node::service::{routing, tracking};
+
use radicle_node::service::{routing, tracking, FetchResult};
use radicle_node::wire::Transport;
+
use radicle_node::worker::{WorkerReq, WorkerResp};
use radicle_node::{address, control, logger, service};

#[derive(Debug)]
@@ -117,17 +120,51 @@ fn main() -> anyhow::Result<()> {
    let tracking = tracking::Config::open(tracking_db)?;

    log::info!("Initializing service ({:?})..", network);
+
    let worker_storage = storage.clone();
    let service = service::Service::new(
        config, clock, routing, storage, addresses, tracking, signer, rng,
    );

-
    let wire = Transport::new(service, negotiator, proxy_addr, clock);
-
    let reactor = Reactor::new(wire, popol::Poller::new());
-
    let handle = Handle::from(reactor.controller());
+
    let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<MemorySigner>>();
+
    let workers = thread::spawn(move || {
+
        while let Ok(WorkerReq {
+
            fetch,
+
            session,
+
            channel,
+
            ..
+
        }) = worker_recv.recv()
+
        {
+
            let result = match worker_storage.repository(fetch.repo) {
+
                Ok(_) => todo!(),
+
                Err(err) => FetchResult::Error {
+
                    from: fetch.remote,
+
                    error: err.into(),
+
                },
+
            };
+
            if channel.send(WorkerResp { result, session }).is_err() {
+
                log::error!("Unable to report fetch result: worker channel disconnected");
+
            }
+
        }
+
    });
+

+
    let wire = Transport::new(service, worker_send, negotiator.clone(), proxy_addr, clock);
+
    let reactor =
+
        Reactor::new(wire, popol::Poller::new()).expect("unable to instantiate P2P reactor");
+
    let controller = reactor.controller();
+

+
    for socket in options.listen {
+
        let listener = NetAccept::bind(socket, negotiator.clone())?;
+
        controller.register_listener(listener)?;
+

+
        log::info!("Listening on {socket}..");
+
    }
+

+
    let handle = Handle::from(controller);
    let control = thread::spawn(move || control::listen(node, handle));

    control.join().unwrap()?;
    reactor.join().unwrap();
+
    workers.join().unwrap();

    Ok(())
}
modified radicle-node/src/service.rs
@@ -27,7 +27,7 @@ use crate::address;
use crate::address::AddressBook;
use crate::clock::Timestamp;
use crate::crypto;
-
use crate::crypto::{Signer, Verified};
+
use crate::crypto::{Negotiator, Signer, Verified};
use crate::git;
use crate::identity::{Doc, Id};
use crate::node;
@@ -107,6 +107,7 @@ pub enum FetchError {
}

/// Result of looking up seeds in our routing table.
+
/// This object is sent back to the caller who initiated the fetch.
#[derive(Debug)]
pub enum FetchLookup {
    /// Found seeds for the given project.
@@ -244,7 +245,7 @@ where
    R: routing::Store,
    A: address::Store,
    S: WriteStorage + 'static,
-
    G: crypto::Signer,
+
    G: Signer + Negotiator,
{
    pub fn new(
        config: Config,
@@ -282,17 +283,6 @@ where
        }
    }

-
    pub fn seeds(&self, id: &Id) -> Vec<(NodeId, &Session)> {
-
        if let Ok(seeds) = self.routing.get(id) {
-
            seeds
-
                .into_iter()
-
                .filter_map(|id| self.sessions.by_id(&id).map(|p| (id, p)))
-
                .collect()
-
        } else {
-
            vec![]
-
        }
-
    }
-

    /// Track a repository.
    /// Returns whether or not the tracking policy was updated.
    pub fn track_repo(&mut self, id: &Id, scope: tracking::Scope) -> Result<bool, tracking::Error> {
@@ -446,51 +436,36 @@ where
                    return;
                }

-
                let seeds = self.seeds(&id);
-
                let Some(seeds) = NonEmpty::from_vec(seeds) else {
-
                    log::error!("No seeds found for {}", id);
+
                let Ok(seeds) = self.routing.get(&id) else {
+
                    todo!();
+
                };
+
                let Some(seeds) = NonEmpty::from_vec(seeds.into_iter().collect()) else {
+
                    log::warn!("No seeds found for {}", id);
                    resp.send(FetchLookup::NotFound).ok();

                    return;
                };
                log::debug!("Found {} seeds for {}", seeds.len(), id);

-
                let mut repo = match self.storage.repository(id) {
-
                    Ok(repo) => repo,
-
                    Err(err) => {
-
                        log::error!("Error opening repo for {}: {}", id, err);
-
                        resp.send(FetchLookup::Error(err.into())).ok();
-

-
                        return;
-
                    }
-
                };
-

-
                let (results_, results) = chan::bounded(seeds.len());
+
                // FIXME: Get results back to user.
+
                let (_, results) = chan::bounded(seeds.len());
                resp.send(FetchLookup::Found {
-
                    seeds: seeds.clone().map(|(_, peer)| peer.id),
+
                    seeds: seeds.clone(),
                    results,
                })
                .ok();

                // TODO: Limit the number of seeds we fetch from? Randomize?
-
                for (peer_id, peer) in seeds {
-
                    match repo.fetch(&peer_id, Namespaces::default()) {
-
                        Ok(updated) => {
-
                            results_
-
                                .send(FetchResult::Fetched {
-
                                    from: peer.id,
-
                                    updated,
-
                                })
-
                                .ok();
-
                        }
-
                        Err(err) => {
-
                            results_
-
                                .send(FetchResult::Error {
-
                                    from: peer.id,
-
                                    error: err.into(),
-
                                })
-
                                .ok();
-
                        }
+
                for seed in seeds {
+
                    let session = self.sessions.get_mut(&seed).unwrap();
+
                    if let Some(upgrade) = session.upgrade(id) {
+
                        self.reactor.write(session.id, upgrade);
+
                        self.reactor
+
                            .fetch(session.id, id, Namespaces::default(), true);
+
                    } else {
+
                        // TODO: If we can't upgrade, it's because we're already fetching from
+
                        // this peer. So we need to queue the request, or find another peer.
+
                        todo!();
                    }
                }
            }
@@ -531,17 +506,19 @@ where
        }
    }

+
    pub fn fetch_complete(&mut self, _result: FetchResult) {
+
        // TODO(cloudhead): handle completed job with service business logic
+
    }
+

    pub fn accepted(&mut self, _addr: net::SocketAddr) {
        // Inbound connection attempt.
    }

    pub fn attempted(&mut self, id: NodeId, _addr: &Address) {
        let persistent = self.config.is_persistent(&id);
-
        let peer = self
-
            .sessions
-
            .entry(id)
-
            .or_insert_with(|| Session::new(id, Link::Outbound, persistent, self.rng.clone()));
-

+
        let peer = self.sessions.entry(id).or_insert_with(|| {
+
            Session::new(id, Link::Outbound, persistent, self.rng.clone(), self.clock)
+
        });
        peer.attempted();
    }

@@ -575,6 +552,7 @@ where
                    Link::Inbound,
                    self.config.is_persistent(&remote),
                    self.rng.clone(),
+
                    self.clock,
                ),
            );
        }
@@ -823,9 +801,15 @@ where
        debug!("Received {:?} from {}", &message, peer.id);

        match (&mut peer.state, message) {
-
            (session::State::Initial, Message::Initialize {}) => {
-
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
-
                // extra "acknowledgment" message sent when the `Initialize` is well received.
+
            (session::State::Connected { initialized, .. }, Message::Initialize {}) => {
+
                // Already initialized!
+
                if *initialized {
+
                    debug!(
+
                        "Disconnecting peer {} for sending us a message before initializing",
+
                        peer.id
+
                    );
+
                    return Err(session::Error::Misbehavior);
+
                }
                if peer.link.is_inbound() {
                    self.reactor.write_all(
                        peer.id,
@@ -838,25 +822,14 @@ where
                        ),
                    );
                }
+
                *initialized = true;
                // Nb. we don't set the peer timestamp here, since it is going to be
                // set after the first message is received only. Setting it here would
                // mean that messages received right after the handshake could be ignored.
-
                peer.state = session::State::Negotiated {
-
                    id: *remote,
-
                    since: self.clock,
-
                    ping: Default::default(),
-
                };
-
            }
-
            (session::State::Initial, _) => {
-
                debug!(
-
                    "Disconnecting peer {} for sending us a message before handshake",
-
                    peer.id
-
                );
-
                return Err(session::Error::Misbehavior);
            }
            // Process a peer announcement.
-
            (session::State::Negotiated { id: relayer, .. }, Message::Announcement(ann)) => {
-
                let relayer = *relayer;
+
            (session::State::Connected { .. }, Message::Announcement(ann)) => {
+
                let relayer = peer.id;

                // Returning true here means that the message should be relayed.
                if self.handle_announcement(&relayer, &ann)? {
@@ -875,7 +848,7 @@ where
                    return Ok(());
                }
            }
-
            (session::State::Negotiated { .. }, Message::Subscribe(subscribe)) => {
+
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
                for msg in self
                    .gossip
                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
@@ -884,14 +857,7 @@ where
                }
                peer.subscribe = Some(subscribe);
            }
-
            (session::State::Negotiated { .. }, Message::Initialize { .. }) => {
-
                debug!(
-
                    "Disconnecting peer {} for sending us a redundant handshake message",
-
                    peer.id
-
                );
-
                return Err(session::Error::Misbehavior);
-
            }
-
            (session::State::Negotiated { .. }, Message::Ping(Ping { ponglen, .. })) => {
+
            (session::State::Connected { .. }, Message::Ping(Ping { ponglen, .. })) => {
                // Ignore pings which ask for too much data.
                if ponglen > Ping::MAX_PONG_ZEROES {
                    return Ok(());
@@ -903,13 +869,18 @@ where
                    },
                );
            }
-
            (session::State::Negotiated { ping, .. }, Message::Pong { zeroes }) => {
+
            (session::State::Connected { ping, .. }, Message::Pong { zeroes }) => {
                if let session::PingState::AwaitingResponse(ponglen) = *ping {
                    if (ponglen as usize) == zeroes.len() {
                        *ping = session::PingState::Ok;
                    }
                }
            }
+
            (session::State::Connected { .. }, Message::Upgrade { repo }) => {
+
                // All we need is to instruct the transport to handover to the worker
+
                self.reactor
+
                    .fetch(*remote, repo, Namespaces::default(), false);
+
            }
            (session::State::Disconnected { .. }, msg) => {
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.id);
            }
@@ -1030,28 +1001,14 @@ where
    }

    fn choose_addresses(&mut self) -> Vec<(NodeId, Address)> {
-
        // TODO(cloudhead): Remove once noise is implemented.
-
        let mut initializing: Vec<NodeId> = Vec::new();
-
        let mut negotiated: HashMap<NodeId, &Session> = HashMap::new();
-
        for s in self.sessions.values() {
-
            if !s.link.is_outbound() {
-
                continue;
-
            }
-
            match s.state {
-
                // TODO(cloudhead): Remove when we have noise handshake.
-
                session::State::Initial => {
-
                    initializing.push(s.id);
-
                }
-
                session::State::Negotiated { id, .. } => {
-
                    negotiated.insert(id, s);
-
                }
-
                session::State::Disconnected { .. } => {}
-
            }
-
        }
+
        let sessions = self
+
            .sessions
+
            .values()
+
            .filter(|s| s.is_connected() && s.link.is_outbound())
+
            .map(|s| (s.id, s))
+
            .collect::<HashMap<_, _>>();

-
        let wanted = TARGET_OUTBOUND_PEERS
-
            .saturating_sub(initializing.len())
-
            .saturating_sub(negotiated.len());
+
        let wanted = TARGET_OUTBOUND_PEERS.saturating_sub(sessions.len());
        if wanted == 0 {
            return Vec::new();
        }
@@ -1059,9 +1016,7 @@ where
        self.addresses
            .entries()
            .unwrap()
-
            .filter(|(node_id, _)| {
-
                !initializing.contains(node_id) && !negotiated.contains_key(node_id)
-
            })
+
            .filter(|(node_id, _)| !sessions.contains_key(node_id))
            .take(wanted)
            .map(|(n, s)| (n, s.addr))
            .collect()
@@ -1253,23 +1208,19 @@ impl Sessions {
        Self(AddressBook::new(rng))
    }

-
    pub fn by_id(&self, id: &NodeId) -> Option<&Session> {
-
        self.0.values().find(|p| p.node_id() == Some(*id))
-
    }
-

    /// Iterator over fully negotiated peers.
    pub fn negotiated(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
        self.0
            .iter()
-
            .filter_map(move |(_, sess)| match &sess.state {
-
                session::State::Negotiated { id, .. } => Some((id, sess)),
+
            .filter_map(move |(id, sess)| match &sess.state {
+
                session::State::Connected { .. } => Some((id, sess)),
                _ => None,
            })
    }

    /// Iterator over mutable fully negotiated peers.
    pub fn negotiated_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
-
        self.0.iter_mut().filter(move |(_, p)| p.is_negotiated())
+
        self.0.iter_mut().filter(move |(_, p)| p.is_connected())
    }
}

modified radicle-node/src/service/message.rs
@@ -331,6 +331,9 @@ pub enum Message {
        /// The pong payload.
        zeroes: ZeroBytes,
    },
+

+
    /// Upgrade session to Git protocol for the given repository.
+
    Upgrade { repo: Id },
}

impl Message {
@@ -416,6 +419,7 @@ impl fmt::Debug for Message {
            }
            Self::Ping(Ping { ponglen, zeroes }) => write!(f, "Ping({ponglen}, {:?})", zeroes),
            Self::Pong { zeroes } => write!(f, "Pong({:?})", zeroes),
+
            Self::Upgrade { repo } => write!(f, "Upgrade({repo})"),
        }
    }
}
modified radicle-node/src/service/reactor.rs
@@ -4,6 +4,7 @@ use log::*;

use crate::prelude::*;
use crate::service::session::Session;
+
use crate::storage::{FetchError, Namespaces, RefUpdate};

use super::message::{Announcement, AnnouncementMessage};

@@ -16,12 +17,37 @@ pub enum Io {
    Connect(NodeId, Address),
    /// Disconnect from a peer.
    Disconnect(NodeId, DisconnectReason),
+
    /// Fetch repository data from a peer.
+
    Fetch(Fetch),
    /// Ask for a wakeup in a specified amount of time.
    Wakeup(LocalDuration),
    /// Emit an event.
    Event(Event),
}

+
/// Fetch job sent to worker thread.
+
#[derive(Debug, Clone)]
+
pub struct Fetch {
+
    /// Repo to fetch.
+
    pub repo: Id,
+
    /// Namespaces to fetch.
+
    pub namespaces: Namespaces,
+
    /// Connection we are fetching from
+
    pub remote: NodeId,
+
    /// Indicates whether the fetch request is initiated by a local party
+
    pub initiate: bool,
+
}
+

+
/// Result of a fetch request from a specific seed.
+
#[derive(Debug)]
+
#[allow(clippy::large_enum_variant)]
+
pub enum FetchResult {
+
    /// Successful fetch from a seed.
+
    Fetched { updated: Vec<RefUpdate> },
+
    /// Error fetching the resource from a seed.
+
    Error { from: NodeId, error: FetchError },
+
}
+

/// Interface to the network reactor.
#[derive(Debug, Default)]
pub struct Reactor {
@@ -61,6 +87,17 @@ impl Reactor {
        self.io.push_back(Io::Wakeup(after));
    }

+
    pub fn fetch(&mut self, remote: NodeId, repo: Id, namespaces: Namespaces, initiate: bool) {
+
        debug!("Fetch {} from {}..", repo, remote);
+

+
        self.io.push_back(Io::Fetch(Fetch {
+
            repo,
+
            namespaces,
+
            remote,
+
            initiate,
+
        }));
+
    }
+

    /// Broadcast a message to a list of peers.
    pub fn broadcast<'a>(
        &mut self,
modified radicle-node/src/service/session.rs
@@ -1,7 +1,7 @@
use crate::service::message;
use crate::service::message::Message;
use crate::service::storage;
-
use crate::service::{Link, LocalTime, NodeId, Reactor, Rng};
+
use crate::service::{Id, Link, LocalTime, NodeId, Reactor, Rng};

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub enum PingState {
@@ -14,19 +14,31 @@ pub enum PingState {
    Ok,
}

-
#[derive(Debug, Default, Clone)]
+
/// Session protocol.
+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+
pub enum Protocol {
+
    /// The default message-based gossip protocol.
+
    #[default]
+
    Gossip,
+
    /// Git smart protocol. Used for fetching repository data.
+
    /// This protocol is used after a connection upgrade via the
+
    /// [`Message::Upgrade`] message.
+
    Git,
+
}
+

+
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum State {
-
    /// Pre-handshake state.
-
    /// TODO(cloudhead): Remove once noise handshake is implemented.
-
    #[default]
-
    Initial,
-
    /// State after successful handshake.
-
    Negotiated {
-
        /// The peer's unique identifier.
-
        id: NodeId,
+
    /// Initial state after handshake protocol hand-off.
+
    Connected {
+
        /// Whether this session was initialized with a [`Message::Initialize`].
+
        initialized: bool,
+
        /// Connected since this time.
        since: LocalTime,
+
        /// Ping state.
        ping: PingState,
+
        /// Session protocol.
+
        protocol: Protocol,
    },
    /// When a peer is disconnected.
    Disconnected { since: LocalTime },
@@ -77,10 +89,15 @@ pub struct Session {
}

impl Session {
-
    pub fn new(id: NodeId, link: Link, persistent: bool, rng: Rng) -> Self {
+
    pub fn new(id: NodeId, link: Link, persistent: bool, rng: Rng, time: LocalTime) -> Self {
        Self {
            id,
-
            state: State::Initial,
+
            state: State::Connected {
+
                initialized: false,
+
                since: time,
+
                ping: PingState::default(),
+
                protocol: Protocol::default(),
+
            },
            link,
            subscribe: None,
            persistent,
@@ -90,8 +107,8 @@ impl Session {
        }
    }

-
    pub fn is_negotiated(&self) -> bool {
-
        matches!(self.state, State::Negotiated { .. })
+
    pub fn is_connected(&self) -> bool {
+
        matches!(self.state, State::Connected { .. })
    }

    pub fn attempts(&self) -> usize {
@@ -102,12 +119,27 @@ impl Session {
        self.attempts += 1;
    }

+
    pub fn upgrade(&mut self, repo: Id) -> Option<Message> {
+
        if let State::Connected { protocol, .. } = &mut self.state {
+
            if *protocol == Protocol::Gossip {
+
                *protocol = Protocol::Git;
+
                return Some(Message::Upgrade { repo });
+
            } else {
+
                log::error!(
+
                    "Attempted to upgrade protocol for {} which was already upgraded",
+
                    self.id
+
                );
+
            }
+
        }
+
        None
+
    }
+

    pub fn connected(&mut self, _link: Link) {
        self.attempts = 0;
    }

    pub fn ping(&mut self, reactor: &mut Reactor) -> Result<(), Error> {
-
        if let State::Negotiated { ping, .. } = &mut self.state {
+
        if let State::Connected { ping, .. } = &mut self.state {
            let msg = message::Ping::new(&mut self.rng);
            *ping = PingState::AwaitingResponse(msg.ponglen);

@@ -115,11 +147,4 @@ impl Session {
        }
        Ok(())
    }
-

-
    pub fn node_id(&self) -> Option<NodeId> {
-
        if let State::Negotiated { id, .. } = &self.state {
-
            return Some(*id);
-
        }
-
        None
-
    }
}
modified radicle-node/src/test/peer.rs
@@ -9,7 +9,7 @@ use crate::address;
use crate::address::Store;
use crate::clock::Timestamp;
use crate::crypto::test::signer::MockSigner;
-
use crate::crypto::Signer;
+
use crate::crypto::{Negotiator, Signer};
use crate::identity::Id;
use crate::node;
use crate::prelude::*;
@@ -42,7 +42,7 @@ pub struct Peer<S, G> {
impl<S, G> simulator::Peer<S, G> for Peer<S, G>
where
    S: WriteStorage + 'static,
-
    G: Signer + 'static,
+
    G: Signer + Negotiator + 'static,
{
    fn init(&mut self) {
        self.initialize()
@@ -103,7 +103,7 @@ impl Default for Config<MockSigner> {
impl<S, G> Peer<S, G>
where
    S: WriteStorage + 'static,
-
    G: Signer + 'static,
+
    G: Signer + Negotiator + 'static,
{
    pub fn config(
        name: &'static str,
modified radicle-node/src/test/simulator.rs
@@ -12,7 +12,7 @@ use log::*;
use nakamoto_net as nakamoto;
use nakamoto_net::{Link, LocalDuration, LocalTime};

-
use crate::crypto::Signer;
+
use crate::crypto::{Negotiator, Signer};
use crate::prelude::Address;
use crate::service::reactor::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
@@ -190,7 +190,7 @@ pub struct Simulation<S, G> {
    signer: PhantomData<G>,
}

-
impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
+
impl<S: WriteStorage + 'static, G: Signer + Negotiator> Simulation<S, G> {
    /// Create a new simulation.
    pub fn new(time: LocalTime, rng: fastrand::Rng, opts: Options) -> Self {
        Self {
@@ -592,6 +592,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    events.push_back(event);
                }
            }
+
            Io::Fetch(..) => todo!("I have no idea what to do here"),
        }
    }

modified radicle-node/src/wire/message.rs
@@ -19,6 +19,7 @@ pub enum MessageType {
    Subscribe = 8,
    Ping = 10,
    Pong = 12,
+
    Upgrade = 14,
}

impl From<MessageType> for u16 {
@@ -39,6 +40,7 @@ impl TryFrom<u16> for MessageType {
            8 => Ok(MessageType::Subscribe),
            10 => Ok(MessageType::Ping),
            12 => Ok(MessageType::Pong),
+
            14 => Ok(MessageType::Upgrade),
            _ => Err(other),
        }
    }
@@ -60,6 +62,7 @@ impl Message {
            },
            Self::Ping { .. } => MessageType::Ping,
            Self::Pong { .. } => MessageType::Pong,
+
            Self::Upgrade { .. } => MessageType::Upgrade,
        }
        .into()
    }
@@ -213,6 +216,9 @@ impl wire::Encode for Message {
            Self::Pong { zeroes } => {
                n += zeroes.encode(writer)?;
            }
+
            Self::Upgrade { repo } => {
+
                n += repo.encode(writer)?;
+
            }
        }

        if n > wire::Size::MAX as usize {
@@ -287,6 +293,10 @@ impl wire::Decode for Message {
                let zeroes = ZeroBytes::decode(reader)?;
                Ok(Self::Pong { zeroes })
            }
+
            Ok(MessageType::Upgrade) => {
+
                let repo = Id::decode(reader)?;
+
                Ok(Self::Upgrade { repo })
+
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }
    }
modified radicle-node/src/wire/transport.rs
@@ -6,9 +6,10 @@ use std::collections::VecDeque;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::sync::Arc;
-
use std::time::Duration;
+
use std::time::Instant;
use std::{io, net};

+
use crossbeam_channel as chan;
use cyphernet::addr::{Addr as _, HostAddr, PeerAddr};
use nakamoto_net::{DisconnectReason, Link, LocalTime};
use netservices::noise::NoiseXk;
@@ -21,19 +22,17 @@ use radicle::node::NodeId;
use radicle::storage::WriteStorage;

use crate::crypto::Signer;
-
use crate::service::reactor::Io;
+
use crate::service::reactor::{Fetch, Io};
use crate::service::{routing, session, Message, Service};
+
use crate::worker::{WorkerReq, WorkerResp};
use crate::{address, service};

-
/// The peer session type.
-
pub type Session<N> = NoiseXk<N>;
-

/// Reactor action.
-
type Action<G> = reactor::Action<NetAccept<Session<G>>, NetTransport<Session<G>, Message>>;
+
type Action<G> = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>, Message>>;

/// Peer connection state machine.
#[derive(Debug)]
-
enum Peer {
+
enum Peer<G: Negotiator> {
    /// The initial state before handshake is completed.
    Connecting { link: Link },
    /// The state after handshake is completed.
@@ -45,9 +44,22 @@ enum Peer {
        id: NodeId,
        reason: DisconnectReason<service::DisconnectReason>,
    },
+
    /// The state after we've started the process of upgraded the peer for a fetch.
+
    /// The request to handover the socket was made to the reactor.
+
    Upgrading {
+
        fetch: Fetch,
+
        link: Link,
+
        id: NodeId,
+
    },
+
    /// The peer is now upgraded and we are in control of the socket.
+
    Upgraded {
+
        link: Link,
+
        id: NodeId,
+
        response: chan::Receiver<WorkerResp<G>>,
+
    },
}

-
impl Peer {
+
impl<G: Negotiator> Peer<G> {
    /// Return a new connecting peer.
    fn connecting(link: Link) -> Self {
        Self::Connecting { link }
@@ -70,18 +82,61 @@ impl Peer {
            panic!("Peer::disconnected: session is not connected");
        }
    }
+

+
    /// Switch to upgrading state.
+
    fn upgrading(&mut self, fetch: Fetch) {
+
        if let Self::Connected { id, link } = self {
+
            *self = Self::Upgrading {
+
                fetch,
+
                id: *id,
+
                link: *link,
+
            };
+
        } else {
+
            panic!("Peer::upgrading: session is not connected");
+
        }
+
    }
+

+
    /// Switch to upgraded state.
+
    fn upgraded(&mut self, listener: chan::Receiver<WorkerResp<G>>) -> Fetch {
+
        if let Self::Upgrading { fetch, id, link } = self {
+
            let fetch = fetch.clone();
+

+
            *self = Self::Upgraded {
+
                id: *id,
+
                link: *link,
+
                response: listener,
+
            };
+
            fetch
+
        } else {
+
            panic!("Peer::upgraded: can't upgrade before handover");
+
        }
+
    }
+

+
    /// Switch back from upgraded to connected state.
+
    fn downgrade(&mut self) -> Link {
+
        if let Self::Upgraded { id, link, .. } = self {
+
            let link = *link;
+
            *self = Self::Connected { id: *id, link };
+

+
            link
+
        } else {
+
            panic!("Peer::downgrade: can't downgrade if not in upgraded state");
+
        }
+
    }
}

/// Transport protocol implementation for a set of peers.
pub struct Transport<R, S, W, G: Negotiator> {
    /// Backing service instance.
    service: Service<R, S, W, G>,
+
    /// Worker pool interface.
+
    worker: chan::Sender<WorkerReq<G>>,
    /// Used to performs X25519 key exchange.
    keypair: G,
    /// Internal queue of actions to send to the reactor.
    actions: VecDeque<Action<G>>,
    /// Peer sessions.
-
    peers: HashMap<RawFd, Peer>,
+
    peers: HashMap<RawFd, Peer<G>>,
    /// SOCKS5 proxy address.
    proxy: net::SocketAddr,
}
@@ -95,6 +150,7 @@ where
{
    pub fn new(
        mut service: Service<R, S, W, G>,
+
        worker: chan::Sender<WorkerReq<G>>,
        keypair: G,
        proxy: net::SocketAddr,
        clock: LocalTime,
@@ -103,6 +159,7 @@ where

        Self {
            service,
+
            worker,
            keypair,
            proxy,
            actions: VecDeque::new(),
@@ -141,6 +198,70 @@ where

        self.actions.push_back(Action::UnregisterTransport(fd));
    }
+

+
    fn upgrade(&mut self, fd: RawFd, fetch: Fetch) {
+
        let Some(peer) = self.peers.get_mut(&fd) else {
+
            log::error!(target: "transport", "Peer with fd {fd} was not found");
+
            return;
+
        };
+
        if let Peer::Disconnected { .. } = peer {
+
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
+
            return;
+
        };
+
        log::debug!(target: "transport", "Requesting transport handover from reactor for fd {fd}");
+
        peer.upgrading(fetch);
+

+
        self.actions.push_back(Action::UnregisterTransport(fd));
+
    }
+

+
    fn upgraded(&mut self, transport: NetTransport<NoiseXk<G>, Message>) {
+
        let fd = transport.as_raw_fd();
+
        let Some(peer) = self.peers.get_mut(&fd) else {
+
            log::error!(target: "transport", "Peer with fd {fd} was not found");
+
            return;
+
        };
+
        let (send, recv) = chan::bounded::<WorkerResp<G>>(1);
+
        let fetch = peer.upgraded(recv);
+
        // Downgrade the transport to a simple session and the buffer of incoming data that is
+
        // unprocessed. This buffer is provided as initial input to the worker.
+
        let Ok((session, drain)) = transport.downgrade() else {
+
            // This can happen in case the service attempts to send data to a peer after it has
+
            // initiated an upgrade protocol.
+
            panic!("Transport::upgraded: outgoing messages buffer is not empty");
+
        };
+

+
        if self
+
            .worker
+
            .send(WorkerReq {
+
                fetch,
+
                session,
+
                drain,
+
                channel: send,
+
            })
+
            .is_err()
+
        {
+
            log::error!(target: "transport", "Worker pool is disconnected; cannot send fetch request");
+
        }
+
    }
+

+
    fn fetch_complete(&mut self, resp: WorkerResp<G>) {
+
        let session = resp.session;
+
        let fd = session.as_raw_fd();
+
        let Some(peer) = self.peers.get_mut(&fd) else {
+
            log::error!(target: "transport", "Peer with fd {fd} was not found");
+
            return;
+
        };
+
        if let Peer::Disconnected { .. } = peer {
+
            log::error!(target: "transport", "Peer with fd {fd} is already disconnected");
+
            return;
+
        };
+
        let link = peer.downgrade();
+
        let transport = NetTransport::upgrade(session, link == Link::Inbound)
+
            .expect("unable to set socket into non-blocking mode");
+

+
        self.actions.push_back(Action::RegisterTransport(transport));
+
        self.service.fetch_complete(resp.result);
+
    }
}

impl<R, S, W, G> reactor::Handler for Transport<R, S, W, G>
@@ -150,10 +271,28 @@ where
    W: WriteStorage + Send + 'static,
    G: Signer + Negotiator + Send,
{
-
    type Listener = NetAccept<Session<G>>;
-
    type Transport = NetTransport<Session<G>, Message>;
+
    type Listener = NetAccept<NoiseXk<G>>;
+
    type Transport = NetTransport<NoiseXk<G>, Message>;
    type Command = service::Command;

+
    fn tick(&mut self, time: Instant) {
+
        // FIXME: Ensure that the time correctly converted.
+
        self.service
+
            .tick(LocalTime::from_secs(time.elapsed().as_secs()));
+

+
        let mut completed = Vec::new();
+
        for peer in self.peers.values() {
+
            if let Peer::Upgraded { response, .. } = peer {
+
                if let Ok(resp) = response.try_recv() {
+
                    completed.push(resp);
+
                }
+
            }
+
        }
+
        for resp in completed {
+
            self.fetch_complete(resp);
+
        }
+
    }
+

    fn handle_wakeup(&mut self) {
        self.service.wake()
    }
@@ -161,11 +300,9 @@ where
    fn handle_listener_event(
        &mut self,
        socket_addr: net::SocketAddr,
-
        event: ListenerEvent<Session<G>>,
-
        duration: Duration,
+
        event: ListenerEvent<NoiseXk<G>>,
+
        _: Instant,
    ) {
-
        self.service.tick(LocalTime::from_secs(duration.as_secs()));
-

        match event {
            ListenerEvent::Accepted(session) => {
                log::debug!(
@@ -176,7 +313,7 @@ where
                self.peers
                    .insert(session.as_raw_fd(), Peer::connecting(Link::Inbound));

-
                let transport = match NetTransport::<Session<G>, Message>::upgrade(session) {
+
                let transport = match NetTransport::<NoiseXk<G>, Message>::upgrade(session, true) {
                    Ok(transport) => transport,
                    Err(err) => {
                        log::error!(target: "transport", "Failed to upgrade accepted peer socket: {err}");
@@ -196,11 +333,9 @@ where
    fn handle_transport_event(
        &mut self,
        fd: RawFd,
-
        event: SessionEvent<Session<G>, Message>,
-
        duration: Duration,
+
        event: SessionEvent<NoiseXk<G>, Message>,
+
        _: Instant,
    ) {
-
        self.service.tick(LocalTime::from_secs(duration.as_secs()));
-

        match event {
            SessionEvent::SessionEstablished(node_id) => {
                log::debug!(target: "transport", "Session established with {node_id}");
@@ -271,7 +406,7 @@ where
    }

    fn handle_command(&mut self, cmd: Self::Command) {
-
        self.service.command(cmd)
+
        self.service.command(cmd);
    }

    fn handle_error(&mut self, err: reactor::Error<net::SocketAddr, RawFd>) {
@@ -287,6 +422,9 @@ where

                self.actions.push_back(Action::UnregisterTransport(fd));
            }
+
            reactor::Error::Poll(err) => {
+
                log::error!(target: "transport", "Can't poll connections: {}", err);
+
            }
        }
    }

@@ -297,13 +435,22 @@ where
    fn handover_transport(&mut self, transport: Self::Transport) {
        let fd = transport.as_raw_fd();

-
        if let Some(Peer::Disconnected { id, reason }) = self.peers.get(&fd) {
-
            // Disconnect TCP stream.
-
            drop(transport);
+
        match self.peers.get(&fd) {
+
            Some(Peer::Disconnected { id, reason }) => {
+
                // Disconnect TCP stream.
+
                drop(transport);

-
            self.service.disconnected(*id, reason);
-
        } else {
-
            todo!("The handover protocol is not implemented");
+
                self.service.disconnected(*id, reason);
+
            }
+
            Some(Peer::Upgrading { .. }) => {
+
                self.upgraded(transport);
+
            }
+
            Some(_) => {
+
                panic!("Transport::handover_transport: Unexpected peer with fd {fd} handed over from the reactor");
+
            }
+
            None => {
+
                panic!("Transport::handover_transport: Unknown peer with fd {fd} handed over");
+
            }
        }
    }
}
@@ -315,7 +462,7 @@ where
    W: WriteStorage + 'static,
    G: Signer + Negotiator,
{
-
    type Item = reactor::Action<NetAccept<Session<G>>, NetTransport<Session<G>, Message>>;
+
    type Item = reactor::Action<NetAccept<NoiseXk<G>>, NetTransport<NoiseXk<G>, Message>>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(event) = self.actions.pop_front() {
@@ -352,7 +499,7 @@ where
                        break;
                    }

-
                    match NetTransport::<Session<G>, Message>::connect(
+
                    match NetTransport::<NoiseXk<G>, Message>::connect(
                        PeerAddr::new(node_id, socket_addr),
                        &self.keypair,
                    ) {
@@ -376,7 +523,12 @@ where

                    return self.actions.pop_back();
                }
-
                Io::Wakeup(d) => return Some(reactor::Action::Wakeup(d.into())),
+
                Io::Wakeup(d) => return Some(reactor::Action::SetTimer(d.into())),
+
                Io::Fetch(fetch) => {
+
                    // TODO: Check that the node_id is connected, queue request otherwise.
+
                    let fd = self.by_id(&fetch.remote);
+
                    self.upgrade(fd, fetch);
+
                }
            }
        }
        None
added radicle-node/src/worker.rs
@@ -0,0 +1,21 @@
+
use crossbeam_channel as chan;
+
use netservices::noise::NoiseXk;
+

+
use radicle::crypto::Negotiator;
+

+
use crate::service::reactor::Fetch;
+
use crate::service::FetchResult;
+

+
/// Worker request.
+
pub struct WorkerReq<G: Negotiator> {
+
    pub fetch: Fetch,
+
    pub session: NoiseXk<G>,
+
    pub drain: Vec<u8>,
+
    pub channel: chan::Sender<WorkerResp<G>>,
+
}
+

+
/// Worker response.
+
pub struct WorkerResp<G: Negotiator> {
+
    pub result: FetchResult,
+
    pub session: NoiseXk<G>,
+
}
modified radicle/src/storage.rs
@@ -27,7 +27,7 @@ pub type BranchName = git::RefString;
pub type Inventory = Vec<Id>;

/// Describes one or more namespaces.
-
#[derive(Default, Debug)]
+
#[derive(Default, Debug, Clone)]
pub enum Namespaces {
    /// All namespaces.
    #[default]