Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Create `Transport` layer
Alexis Sellier committed 3 years ago
commit a925fb0e02672a754a3e3cb3dbd15b010210bb55
parent d3dd83447e56c0eeaa25eb1607a26f1e1ee2dd6a
5 files changed +171 -65
modified node/src/client.rs
@@ -9,6 +9,7 @@ use crate::collections::HashMap;
use crate::crypto::Signer;
use crate::protocol;
use crate::storage::git::Storage;
+
use crate::transport::Transport;

pub mod handle;

@@ -86,16 +87,17 @@ impl<R: Reactor, G: Signer> Client<R, G> {

        log::info!("Initializing client ({:?})..", network);

+
        let protocol = protocol::Protocol::new(
+
            config.protocol,
+
            RefClock::from(time),
+
            storage,
+
            addresses,
+
            signer,
+
            rng,
+
        );
        self.reactor.run(
            &config.listen,
-
            protocol::Protocol::new(
-
                config.protocol,
-
                RefClock::from(time),
-
                storage,
-
                addresses,
-
                signer,
-
                rng,
-
            ),
+
            Transport::new(protocol),
            self.events,
            self.commands,
        )?;
modified node/src/lib.rs
@@ -20,3 +20,4 @@ mod serde_ext;
mod storage;
#[cfg(test)]
mod test;
+
mod transport;
modified node/src/protocol.rs
@@ -280,47 +280,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
        }
    }

-
    ////////////////////////////////////////////////////////////////////////////
-
    // Periodic tasks
-
    ////////////////////////////////////////////////////////////////////////////
-

-
    /// Announce our inventory to all connected peers.
-
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
-
        let inv = Message::inventory(self.context.inventory_announcement()?, &self.context.signer);
-

-
        for addr in self.peers.negotiated().map(|(_, p)| p.addr) {
-
            self.context.write(addr, inv.clone());
-
        }
-
        Ok(())
-
    }
-

-
    fn prune_routing_entries(&mut self) {
-
        // TODO
-
    }
-

-
    fn maintain_connections(&mut self) {
-
        // TODO: Connect to all potential seeds.
-
        if self.peers.len() < TARGET_OUTBOUND_PEERS {
-
            let delta = TARGET_OUTBOUND_PEERS - self.peers.len();
-

-
            for _ in 0..delta {
-
                // TODO: Connect to random peer.
-
            }
-
        }
-
    }
-
}
-

-
impl<'r, S, T, G> nakamoto::Protocol for Protocol<S, T, G>
-
where
-
    T: WriteStorage<'r> + 'static,
-
    S: address_book::Store,
-
    G: crypto::Signer,
-
{
-
    type Event = Event;
-
    type Command = Command;
-
    type DisconnectReason = DisconnectReason;
-

-
    fn initialize(&mut self, time: LocalTime) {
+
    pub fn initialize(&mut self, time: LocalTime) {
        trace!("Init {}", time.as_secs());

        self.start_time = time;
@@ -332,13 +292,13 @@ where
        }
    }

-
    fn tick(&mut self, now: nakamoto::LocalTime) {
+
    pub fn tick(&mut self, now: nakamoto::LocalTime) {
        trace!("Tick +{}", now - self.start_time);

        self.context.clock.set(now);
    }

-
    fn wake(&mut self) {
+
    pub fn wake(&mut self) {
        let now = self.context.clock.local_time();

        trace!("Wake +{}", now - self.start_time);
@@ -373,7 +333,7 @@ where
        }
    }

-
    fn command(&mut self, cmd: Self::Command) {
+
    pub fn command(&mut self, cmd: Command) {
        debug!("Command {:?}", cmd);

        match cmd {
@@ -468,7 +428,7 @@ where
        }
    }

-
    fn attempted(&mut self, addr: &std::net::SocketAddr) {
+
    pub fn attempted(&mut self, addr: &std::net::SocketAddr) {
        let ip = addr.ip();
        let persistent = self.context.config.is_persistent(addr);
        let peer = self
@@ -479,7 +439,7 @@ where
        peer.attempted();
    }

-
    fn connected(
+
    pub fn connected(
        &mut self,
        addr: std::net::SocketAddr,
        _local_addr: &std::net::SocketAddr,
@@ -512,10 +472,10 @@ where
        }
    }

-
    fn disconnected(
+
    pub fn disconnected(
        &mut self,
        addr: &std::net::SocketAddr,
-
        reason: nakamoto::DisconnectReason<Self::DisconnectReason>,
+
        reason: nakamoto::DisconnectReason<DisconnectReason>,
    ) {
        let since = self.local_time();
        let ip = addr.ip();
@@ -551,7 +511,7 @@ where
        }
    }

-
    fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
+
    pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
        let peer_ip = addr.ip();
        let (peer, msgs) = if let Some(peer) = self.peers.get_mut(&peer_ip) {
            let decoder = peer.inbox();
@@ -602,6 +562,35 @@ where
            self.context.relay(msg, negotiated.clone());
        }
    }
+

+
    ////////////////////////////////////////////////////////////////////////////
+
    // Periodic tasks
+
    ////////////////////////////////////////////////////////////////////////////
+

+
    /// Announce our inventory to all connected peers.
+
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
+
        let inv = Message::inventory(self.context.inventory_announcement()?, &self.context.signer);
+

+
        for addr in self.peers.negotiated().map(|(_, p)| p.addr) {
+
            self.context.write(addr, inv.clone());
+
        }
+
        Ok(())
+
    }
+

+
    fn prune_routing_entries(&mut self) {
+
        // TODO
+
    }
+

+
    fn maintain_connections(&mut self) {
+
        // TODO: Connect to all potential seeds.
+
        if self.peers.len() < TARGET_OUTBOUND_PEERS {
+
            let delta = TARGET_OUTBOUND_PEERS - self.peers.len();
+

+
            for _ in 0..delta {
+
                // TODO: Connect to random peer.
+
            }
+
        }
+
    }
}

impl<S, T, G> Deref for Protocol<S, T, G> {
modified node/src/test/peer.rs
@@ -15,15 +15,16 @@ use crate::protocol::message::*;
use crate::protocol::*;
use crate::storage::WriteStorage;
use crate::test::crypto::MockSigner;
+
use crate::transport;
use crate::*;

-
/// Protocol instantiation used for testing.
-
pub type Protocol<S> = crate::protocol::Protocol<HashMap<net::IpAddr, KnownAddress>, S, MockSigner>;
+
/// Transport instantiation used for testing.
+
pub type Transport<S> = transport::Transport<HashMap<net::IpAddr, KnownAddress>, S, MockSigner>;

#[derive(Debug)]
pub struct Peer<S> {
    pub name: &'static str,
-
    pub protocol: Protocol<S>,
+
    pub protocol: Transport<S>,
    pub ip: net::IpAddr,
    pub rng: fastrand::Rng,
    pub local_time: LocalTime,
@@ -32,7 +33,7 @@ pub struct Peer<S> {
    initialized: bool,
}

-
impl<'r, S> simulator::Peer<Protocol<S>> for Peer<S>
+
impl<'r, S> simulator::Peer<Transport<S>> for Peer<S>
where
    S: WriteStorage<'r> + 'static,
{
@@ -46,7 +47,7 @@ where
}

impl<S> Deref for Peer<S> {
-
    type Target = Protocol<S>;
+
    type Target = Transport<S>;

    fn deref(&self) -> &Self::Target {
        &self.protocol
@@ -92,7 +93,14 @@ where
        let local_time = LocalTime::now();
        let clock = RefClock::from(local_time);
        let signer = MockSigner::new(&mut rng);
-
        let protocol = Protocol::new(config, clock, storage, addrs, signer, rng.clone());
+
        let protocol = Transport::new(Protocol::new(
+
            config,
+
            clock,
+
            storage,
+
            addrs,
+
            signer,
+
            rng.clone(),
+
        ));
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, rng.u16(..));

@@ -135,7 +143,7 @@ where
    }

    pub fn connect_from(&mut self, peer: &Self) {
-
        let remote = simulator::Peer::<Protocol<S>>::addr(peer);
+
        let remote = simulator::Peer::<Transport<S>>::addr(peer);
        let local = net::SocketAddr::new(self.ip, self.rng.u16(..));
        let git = format!("file:///{}.git", remote.ip());
        let git = Url::from_bytes(git.as_bytes()).unwrap();
@@ -160,7 +168,7 @@ where
    }

    pub fn connect_to(&mut self, peer: &Self) {
-
        let remote = simulator::Peer::<Protocol<S>>::addr(peer);
+
        let remote = simulator::Peer::<Transport<S>>::addr(peer);

        self.initialize();
        self.protocol.attempted(&remote);
added node/src/transport.rs
@@ -0,0 +1,106 @@
+
use std::net;
+
use std::ops::{Deref, DerefMut};
+

+
use nakamoto::LocalTime;
+
use nakamoto_net as nakamoto;
+
use nakamoto_net::{Io, Link};
+

+
use crate::address_book;
+
use crate::collections::HashMap;
+
use crate::crypto;
+
use crate::protocol::{Command, DisconnectReason, Event, Protocol};
+
use crate::storage::WriteStorage;
+

+
#[derive(Debug)]
+
struct Peer {
+
    addr: net::SocketAddr,
+
}
+

+
#[derive(Debug)]
+
pub struct Transport<S, T, G> {
+
    peers: HashMap<net::IpAddr, Peer>,
+
    protocol: Protocol<S, T, G>,
+
}
+

+
impl<S, T, G> Transport<S, T, G> {
+
    pub fn new(protocol: Protocol<S, T, G>) -> Self {
+
        Self {
+
            peers: HashMap::default(),
+
            protocol,
+
        }
+
    }
+
}
+

+
impl<'r, S, T, G> nakamoto::Protocol for Transport<S, T, G>
+
where
+
    T: WriteStorage<'r> + 'static,
+
    S: address_book::Store,
+
    G: crypto::Signer,
+
{
+
    type Event = Event;
+
    type Command = Command;
+
    type DisconnectReason = DisconnectReason;
+

+
    fn initialize(&mut self, time: LocalTime) {
+
        self.protocol.initialize(time)
+
    }
+

+
    fn tick(&mut self, now: nakamoto::LocalTime) {
+
        self.protocol.tick(now)
+
    }
+

+
    fn wake(&mut self) {
+
        self.protocol.wake()
+
    }
+

+
    fn command(&mut self, cmd: Self::Command) {
+
        self.protocol.command(cmd)
+
    }
+

+
    fn attempted(&mut self, addr: &std::net::SocketAddr) {
+
        self.protocol.attempted(addr)
+
    }
+

+
    fn connected(
+
        &mut self,
+
        addr: std::net::SocketAddr,
+
        local_addr: &std::net::SocketAddr,
+
        link: Link,
+
    ) {
+
        self.protocol.connected(addr, local_addr, link)
+
    }
+

+
    fn disconnected(
+
        &mut self,
+
        addr: &std::net::SocketAddr,
+
        reason: nakamoto::DisconnectReason<Self::DisconnectReason>,
+
    ) {
+
        self.protocol.disconnected(addr, reason)
+
    }
+

+
    fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
+
        self.protocol.received_bytes(addr, bytes)
+
    }
+
}
+

+
impl<S, T, G> Iterator for Transport<S, T, G> {
+
    type Item = Io<Event, DisconnectReason>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.protocol.next()
+
    }
+
}
+

+
impl<S, T, G> Deref for Transport<S, T, G> {
+
    type Target = Protocol<S, T, G>;
+

+
    fn deref(&self) -> &Self::Target {
+
        &self.protocol
+
    }
+
}
+

+
impl<S, T, G> DerefMut for Transport<S, T, G> {
+
    fn deref_mut(&mut self) -> &mut Self::Target {
+
        &mut self.protocol
+
    }
+
}