Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Complete transcoder
Dr. Maxim Orlovsky committed 3 years ago
commit e9ae5897f77097ba6214865084a9beca8b92de67
parent 56fb4d67a925dbf4804c0cf11f201d8215133097
9 files changed +272 -268
modified radicle-node/src/client.rs
@@ -9,8 +9,7 @@ use radicle::crypto::Signer;
use crate::clock::RefClock;
use crate::profile::Profile;
use crate::service::routing;
-
use crate::transport::Transport;
-
use crate::wire::transcoder::PlainTranscoder;
+
use crate::wire::transcoder::NoHandshake;
use crate::wire::Wire;
use crate::{address, service};

@@ -131,11 +130,9 @@ impl<R: Reactor> Client<R> {
            rng,
        );

-
        let transcode = PlainTranscoder::default();
-

        self.reactor.run(
            &config.listen,
-
            Transport::new(Wire::new(service, transcode)),
+
            Wire::<_, _, _, _, NoHandshake>::new(service),
            self.events,
            self.commands,
        )?;
deleted radicle-node/src/decoder.rs
@@ -1,108 +0,0 @@
-
use std::io;
-
use std::marker::PhantomData;
-

-
use crate::service::message::Message;
-
use crate::wire;
-

-
/// Message stream decoder.
-
///
-
/// Used to for example turn a byte stream into network messages.
-
#[derive(Debug)]
-
pub struct Decoder<D = Message> {
-
    unparsed: Vec<u8>,
-
    item: PhantomData<D>,
-
}
-

-
impl<D> From<Vec<u8>> for Decoder<D> {
-
    fn from(unparsed: Vec<u8>) -> Self {
-
        Self {
-
            unparsed,
-
            item: PhantomData,
-
        }
-
    }
-
}
-

-
impl<D: wire::Decode> Decoder<D> {
-
    /// Create a new stream decoder.
-
    pub fn new(capacity: usize) -> Self {
-
        Self {
-
            unparsed: Vec::with_capacity(capacity),
-
            item: PhantomData,
-
        }
-
    }
-

-
    /// Input bytes into the decoder.
-
    pub fn input(&mut self, bytes: &[u8]) {
-
        self.unparsed.extend_from_slice(bytes);
-
    }
-

-
    /// Decode and return the next message. Returns [`None`] if nothing was decoded.
-
    pub fn decode_next(&mut self) -> Result<Option<D>, wire::Error> {
-
        let mut reader = io::Cursor::new(self.unparsed.as_mut_slice());
-

-
        match D::decode(&mut reader) {
-
            Ok(msg) => {
-
                let pos = reader.position() as usize;
-
                self.unparsed.drain(..pos);
-

-
                Ok(Some(msg))
-
            }
-
            Err(err) if err.is_eof() => Ok(None),
-
            Err(err) => Err(err),
-
        }
-
    }
-
}
-

-
impl<D: wire::Decode> io::Write for Decoder<D> {
-
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-
        self.input(buf);
-

-
        Ok(buf.len())
-
    }
-

-
    fn flush(&mut self) -> io::Result<()> {
-
        Ok(())
-
    }
-
}
-

-
impl<D: wire::Decode> Iterator for Decoder<D> {
-
    type Item = Result<D, wire::Error>;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        self.decode_next().transpose()
-
    }
-
}
-

-
#[cfg(test)]
-
mod test {
-
    use super::*;
-
    use quickcheck_macros::quickcheck;
-

-
    const MSG_HELLO: &[u8] = &[5, b'h', b'e', b'l', b'l', b'o'];
-
    const MSG_BYE: &[u8] = &[3, b'b', b'y', b'e'];
-

-
    #[quickcheck]
-
    fn prop_decode_next(chunk_size: usize) {
-
        let mut bytes = vec![];
-
        let mut msgs = vec![];
-
        let mut decoder = Decoder::<String>::new(8);
-

-
        let chunk_size = 1 + chunk_size % MSG_HELLO.len() + MSG_BYE.len();
-

-
        bytes.extend_from_slice(MSG_HELLO);
-
        bytes.extend_from_slice(MSG_BYE);
-

-
        for chunk in bytes.as_slice().chunks(chunk_size) {
-
            decoder.input(chunk);
-

-
            while let Some(msg) = decoder.decode_next().unwrap() {
-
                msgs.push(msg);
-
            }
-
        }
-

-
        assert_eq!(decoder.unparsed.len(), 0);
-
        assert_eq!(msgs.len(), 2);
-
        assert_eq!(msgs[0], String::from("hello"));
-
        assert_eq!(msgs[1], String::from("bye"));
-
    }
-
}
added radicle-node/src/deserializer.rs
@@ -0,0 +1,108 @@
+
use std::io;
+
use std::marker::PhantomData;
+

+
use crate::service::message::Message;
+
use crate::wire;
+

+
/// Message stream deserializer.
+
///
+
/// Used to for example turn a byte stream into network messages.
+
#[derive(Debug)]
+
pub struct Deserializer<D = Message> {
+
    unparsed: Vec<u8>,
+
    item: PhantomData<D>,
+
}
+

+
impl<D> From<Vec<u8>> for Deserializer<D> {
+
    fn from(unparsed: Vec<u8>) -> Self {
+
        Self {
+
            unparsed,
+
            item: PhantomData,
+
        }
+
    }
+
}
+

+
impl<D: wire::Decode> Deserializer<D> {
+
    /// Create a new stream decoder.
+
    pub fn new(capacity: usize) -> Self {
+
        Self {
+
            unparsed: Vec::with_capacity(capacity),
+
            item: PhantomData,
+
        }
+
    }
+

+
    /// Input bytes into the decoder.
+
    pub fn input(&mut self, bytes: &[u8]) {
+
        self.unparsed.extend_from_slice(bytes);
+
    }
+

+
    /// Decode and return the next message. Returns [`None`] if nothing was decoded.
+
    pub fn deserialize_next(&mut self) -> Result<Option<D>, wire::Error> {
+
        let mut reader = io::Cursor::new(self.unparsed.as_mut_slice());
+

+
        match D::decode(&mut reader) {
+
            Ok(msg) => {
+
                let pos = reader.position() as usize;
+
                self.unparsed.drain(..pos);
+

+
                Ok(Some(msg))
+
            }
+
            Err(err) if err.is_eof() => Ok(None),
+
            Err(err) => Err(err),
+
        }
+
    }
+
}
+

+
impl<D: wire::Decode> io::Write for Deserializer<D> {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        self.input(buf);
+

+
        Ok(buf.len())
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        Ok(())
+
    }
+
}
+

+
impl<D: wire::Decode> Iterator for Deserializer<D> {
+
    type Item = Result<D, wire::Error>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.deserialize_next().transpose()
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+
    use quickcheck_macros::quickcheck;
+

+
    const MSG_HELLO: &[u8] = &[5, b'h', b'e', b'l', b'l', b'o'];
+
    const MSG_BYE: &[u8] = &[3, b'b', b'y', b'e'];
+

+
    #[quickcheck]
+
    fn prop_decode_next(chunk_size: usize) {
+
        let mut bytes = vec![];
+
        let mut msgs = vec![];
+
        let mut decoder = Deserializer::<String>::new(8);
+

+
        let chunk_size = 1 + chunk_size % MSG_HELLO.len() + MSG_BYE.len();
+

+
        bytes.extend_from_slice(MSG_HELLO);
+
        bytes.extend_from_slice(MSG_BYE);
+

+
        for chunk in bytes.as_slice().chunks(chunk_size) {
+
            decoder.input(chunk);
+

+
            while let Some(msg) = decoder.deserialize_next().unwrap() {
+
                msgs.push(msg);
+
            }
+
        }
+

+
        assert_eq!(decoder.unparsed.len(), 0);
+
        assert_eq!(msgs.len(), 2);
+
        assert_eq!(msgs[0], String::from("hello"));
+
        assert_eq!(msgs[1], String::from("bye"));
+
    }
+
}
modified radicle-node/src/lib.rs
@@ -2,7 +2,7 @@ pub mod address;
pub mod client;
pub mod clock;
pub mod control;
-
pub mod decoder;
+
pub mod deserializer;
pub mod logger;
pub mod service;
pub mod sql;
@@ -10,7 +10,6 @@ pub mod sql;
pub mod test;
#[cfg(test)]
pub mod tests;
-
pub mod transport;
pub mod wire;

pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
@@ -19,7 +18,7 @@ pub use radicle::{collections, crypto, git, hash, identity, node, profile, rad,
pub mod prelude {
    pub use crate::clock::Timestamp;
    pub use crate::crypto::{PublicKey, Signature, Signer};
-
    pub use crate::decoder::Decoder;
+
    pub use crate::deserializer::Deserializer;
    pub use crate::hash::Digest;
    pub use crate::identity::{Did, Id};
    pub use crate::service::filter::Filter;
modified radicle-node/src/service.rs
@@ -501,6 +501,7 @@ where
        peer.attempted();
    }

+
    // TODO: Split into two functions: `connected` and `negotiated`
    pub fn connected(
        &mut self,
        addr: std::net::SocketAddr,
deleted radicle-node/src/transport.rs
@@ -1,111 +0,0 @@
-
use std::net;
-
use std::ops::{Deref, DerefMut};
-

-
use nakamoto::LocalTime;
-
use nakamoto_net as nakamoto;
-
use nakamoto_net::{Io, Link};
-

-
use crate::address;
-
use crate::collections::HashMap;
-
use crate::crypto;
-
use crate::service::routing;
-
use crate::service::{Command, DisconnectReason, Event, Service};
-
use crate::storage::WriteStorage;
-
use crate::wire::transcoder::Transcode;
-
use crate::wire::Wire;
-

-
#[derive(Debug)]
-
struct Peer {
-
    _addr: net::SocketAddr,
-
}
-

-
#[derive(Debug)]
-
pub struct Transport<R, S, W, G, T: Transcode> {
-
    _peers: HashMap<net::IpAddr, Peer>,
-
    inner: Wire<R, S, W, G, T>,
-
}
-

-
impl<R, S, W, G, T: Transcode> Transport<R, S, W, G, T> {
-
    pub fn new(inner: Wire<R, S, W, G, T>) -> Self {
-
        Self {
-
            _peers: HashMap::default(),
-
            inner,
-
        }
-
    }
-
}
-

-
impl<R, S, W, G, T> nakamoto::Protocol for Transport<R, S, W, G, T>
-
where
-
    R: routing::Store,
-
    W: WriteStorage + 'static,
-
    S: address::Store,
-
    G: crypto::Signer,
-
    T: Transcode,
-
{
-
    type Event = Event;
-
    type Command = Command;
-
    type DisconnectReason = DisconnectReason;
-

-
    fn initialize(&mut self, time: LocalTime) {
-
        self.inner.initialize(time)
-
    }
-

-
    fn tick(&mut self, now: nakamoto::LocalTime) {
-
        self.inner.tick(now)
-
    }
-

-
    fn wake(&mut self) {
-
        self.inner.wake()
-
    }
-

-
    fn command(&mut self, cmd: Self::Command) {
-
        self.inner.command(cmd)
-
    }
-

-
    fn attempted(&mut self, addr: &std::net::SocketAddr) {
-
        self.inner.attempted(addr)
-
    }
-

-
    fn connected(
-
        &mut self,
-
        addr: std::net::SocketAddr,
-
        local_addr: &std::net::SocketAddr,
-
        link: Link,
-
    ) {
-
        self.inner.connected(addr, local_addr, link)
-
    }
-

-
    fn disconnected(
-
        &mut self,
-
        addr: &std::net::SocketAddr,
-
        reason: nakamoto::DisconnectReason<Self::DisconnectReason>,
-
    ) {
-
        self.inner.disconnected(addr, reason)
-
    }
-

-
    fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
-
        self.inner.received_bytes(addr, bytes)
-
    }
-
}
-

-
impl<R, S, W, G, T: Transcode> Iterator for Transport<R, S, W, G, T> {
-
    type Item = Io<Event, DisconnectReason>;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        self.inner.next()
-
    }
-
}
-

-
impl<R, S, W, G, T: Transcode> Deref for Transport<R, S, W, G, T> {
-
    type Target = Service<R, S, W, G>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.inner
-
    }
-
}
-

-
impl<R, S, W, G, T: Transcode> DerefMut for Transport<R, S, W, G, T> {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.inner
-
    }
-
}
modified radicle-node/src/wire.rs
@@ -1,20 +1,20 @@
pub mod message;
pub mod transcoder;

-
use std::collections::{BTreeMap, HashMap};
+
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::convert::TryFrom;
use std::net;
-
use std::ops::{Deref, DerefMut};
+
use std::ops::Deref;
use std::string::FromUtf8Error;
use std::{io, mem};

use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use nakamoto_net as nakamoto;
-
use nakamoto_net::Link;
+
use nakamoto_net::{Link, LocalTime};

use crate::address;
use crate::crypto::{PublicKey, Signature, Signer, Unverified};
-
use crate::decoder::Decoder;
+
use crate::deserializer::Deserializer;
use crate::git;
use crate::git::fmt;
use crate::hash::Digest;
@@ -26,7 +26,7 @@ use crate::service::{filter, routing};
use crate::storage::refs::Refs;
use crate::storage::refs::SignedRefs;
use crate::storage::WriteStorage;
-
use crate::wire::transcoder::Transcode;
+
use crate::wire::transcoder::{Handshake, HandshakeResult, Transcode};

/// The default type we use to represent sizes on the wire.
///
@@ -425,51 +425,119 @@ impl Decode for node::Features {
}

#[derive(Debug)]
-
pub struct Wire<R, S, W, G, T: Transcode> {
-
    inboxes: HashMap<net::SocketAddr, Decoder>,
+
pub struct Inbox<T: Transcode> {
+
    pub transcoder: T,
+
    pub deserializer: Deserializer,
+
}
+

+
#[derive(Debug)]
+
pub struct Wire<R, S, W, G, H: Handshake> {
+
    handshakes: HashMap<net::SocketAddr, H>,
+
    handshake_queue: VecDeque<(net::SocketAddr, Vec<u8>)>,
+
    inboxes: HashMap<net::SocketAddr, Inbox<H::Transcoder>>,
    inner: service::Service<R, S, W, G>,
-
    #[allow(dead_code)]
-
    transcoder: T,
}

-
impl<R, S, W, G, T: Transcode> Wire<R, S, W, G, T> {
-
    pub fn new(inner: service::Service<R, S, W, G>, transcoder: T) -> Self {
+
impl<R, S, W, G, H: Handshake> Wire<R, S, W, G, H> {
+
    pub fn new(inner: service::Service<R, S, W, G>) -> Self {
        Self {
+
            handshakes: HashMap::new(),
+
            handshake_queue: Default::default(),
            inboxes: HashMap::new(),
            inner,
-
            transcoder,
        }
    }
}

-
impl<R, S, W, G, T> Wire<R, S, W, G, T>
+
impl<R, S, W, G, H> nakamoto::Protocol for Wire<R, S, W, G, H>
where
    R: routing::Store,
    S: address::Store,
    W: WriteStorage + 'static,
    G: Signer,
-
    T: Transcode,
+
    H: Handshake,
{
-
    pub fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
-
        self.inboxes.insert(addr, Decoder::new(256));
+
    type Event = service::Event;
+
    type Command = service::Command;
+
    type DisconnectReason = service::DisconnectReason;
+

+
    fn initialize(&mut self, time: LocalTime) {
+
        self.inner.initialize(time)
+
    }
+

+
    fn tick(&mut self, now: nakamoto::LocalTime) {
+
        self.inner.tick(now)
+
    }
+

+
    fn wake(&mut self) {
+
        self.inner.wake()
+
    }
+

+
    fn command(&mut self, cmd: Self::Command) {
+
        self.inner.command(cmd)
+
    }
+

+
    fn attempted(&mut self, addr: &std::net::SocketAddr) {
+
        self.inner.attempted(addr)
+
    }
+

+
    fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
+
        self.handshakes.insert(addr, H::new());
        self.inner.connected(addr, local_addr, link)
    }

-
    pub fn disconnected(
+
    fn disconnected(
        &mut self,
        addr: &net::SocketAddr,
        reason: nakamoto::DisconnectReason<service::DisconnectReason>,
    ) {
+
        self.handshakes.remove(addr);
        self.inboxes.remove(addr);
        self.inner.disconnected(addr, &reason)
    }

-
    pub fn received_bytes(&mut self, addr: &net::SocketAddr, bytes: &[u8]) {
-
        if let Some(inbox) = self.inboxes.get_mut(addr) {
-
            inbox.input(bytes);
+
    fn received_bytes(&mut self, addr: &net::SocketAddr, raw_bytes: &[u8]) {
+
        if let Some(handshake) = self.handshakes.remove(addr) {
+
            debug_assert!(!self.inboxes.contains_key(addr));
+

+
            match handshake.step(raw_bytes) {
+
                HandshakeResult::Next(handshake, reply) => {
+
                    self.handshakes.insert(*addr, handshake);
+
                    if !reply.is_empty() {
+
                        self.handshake_queue.push_back((*addr, reply));
+
                    }
+
                    return;
+
                }
+
                HandshakeResult::Complete(transcoder, reply) => {
+
                    log::debug!("handshake with peer {} is complete", addr);
+
                    if !reply.is_empty() {
+
                        self.handshake_queue.push_back((*addr, reply));
+
                    }
+
                    self.inboxes.insert(
+
                        *addr,
+
                        Inbox {
+
                            transcoder,
+
                            deserializer: Deserializer::new(256),
+
                        },
+
                    );
+
                }
+
                HandshakeResult::Error(err) => {
+
                    log::error!("invalid handshake input. Details: {}", err);
+
                    return;
+
                }
+
            }
+
        }
+

+
        if let Some(Inbox {
+
            transcoder,
+
            deserializer,
+
        }) = self.inboxes.get_mut(addr)
+
        {
+
            let bytes = transcoder.decrypt(raw_bytes);
+
            deserializer.input(&bytes);

            loop {
-
                match inbox.decode_next() {
+
                match deserializer.deserialize_next() {
                    Ok(Some(msg)) => self.inner.received_message(addr, msg),
                    Ok(None) => break,

@@ -487,10 +555,14 @@ where
    }
}

-
impl<R, S, W, G, T: Transcode> Iterator for Wire<R, S, W, G, T> {
+
impl<R, S, W, G, H: Handshake> Iterator for Wire<R, S, W, G, H> {
    type Item = nakamoto::Io<service::Event, service::DisconnectReason>;

    fn next(&mut self) -> Option<Self::Item> {
+
        if let Some((addr, handshake_data)) = self.handshake_queue.pop_front() {
+
            return Some(nakamoto::Io::Write(addr, handshake_data));
+
        }
+

        match self.inner.next() {
            Some(Io::Write(addr, msgs)) => {
                let mut buf = Vec::new();
@@ -500,7 +572,11 @@ impl<R, S, W, G, T: Transcode> Iterator for Wire<R, S, W, G, T> {
                    msg.encode(&mut buf)
                        .expect("writing to an in-memory buffer doesn't fail");
                }
-
                Some(nakamoto::Io::Write(addr, buf))
+
                let Inbox { transcoder, .. } = self.inboxes.get_mut(&addr).expect(
+
                    "broken handshake implementation: data sent before handshake was complete",
+
                );
+
                let data = transcoder.encrypt(buf);
+
                Some(nakamoto::Io::Write(addr, data))
            }
            Some(Io::Event(e)) => Some(nakamoto::Io::Event(e)),
            Some(Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
@@ -512,20 +588,6 @@ impl<R, S, W, G, T: Transcode> Iterator for Wire<R, S, W, G, T> {
    }
}

-
impl<R, S, W, G, T: Transcode> Deref for Wire<R, S, W, G, T> {
-
    type Target = service::Service<R, S, W, G>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.inner
-
    }
-
}
-

-
impl<R, S, W, G, T: Transcode> DerefMut for Wire<R, S, W, G, T> {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.inner
-
    }
-
}
-

#[cfg(test)]
mod tests {
    use super::*;
modified radicle-node/src/wire/message.rs
@@ -360,7 +360,7 @@ mod tests {
    use super::*;
    use quickcheck_macros::quickcheck;

-
    use crate::decoder::Decoder;
+
    use crate::deserializer::Deserializer;
    use crate::wire::{self, Encode};

    #[test]
@@ -412,7 +412,7 @@ mod tests {
    #[test]
    fn prop_message_decoder() {
        fn property(items: Vec<Message>) {
-
            let mut decoder = Decoder::<Message>::new(8);
+
            let mut decoder = Deserializer::<Message>::new(8);

            for item in &items {
                item.encode(&mut decoder).unwrap();
modified radicle-node/src/wire/transcoder.rs
@@ -1,6 +1,62 @@
-
pub trait Transcode {}
+
use std::convert::Infallible;
+

+
// TODO: Implement Try trait once stabilized
+
/// Result of a state-machine transition.
+
pub enum HandshakeResult<H: Handshake, T: Transcode> {
+
    Next(H, Vec<u8>),
+
    Complete(T, Vec<u8>),
+
    Error(H::Error),
+
}
+

+
pub trait Handshake: Sized {
+
    /// Errors which may happen during the handshake.
+
    type Error: std::error::Error;
+
    /// Underlying transcoder.
+
    type Transcoder: Transcode;
+

+
    /// Create a new handshake state-machine.
+
    fn new() -> Self;
+
    /// Advance the state-machine to the next state.
+
    fn step(self, input: &[u8]) -> HandshakeResult<Self, Self::Transcoder>;
+
}

#[derive(Debug, Default)]
+
pub struct NoHandshake;
+

+
impl Handshake for NoHandshake {
+
    type Error = Infallible;
+
    type Transcoder = PlainTranscoder;
+

+
    fn new() -> Self {
+
        NoHandshake
+
    }
+

+
    fn step(self, _input: &[u8]) -> HandshakeResult<Self, Self::Transcoder> {
+
        HandshakeResult::Complete(PlainTranscoder, vec![])
+
    }
+
}
+

+
/// Trait allowing transcoding a stream using some form of stream encryption
+
/// and/or encoding.
+
pub trait Transcode {
+
    /// Decodes data received from the remote peer and updates the internal state
+
    /// of the transcoder.
+
    fn decrypt(&mut self, data: &[u8]) -> Vec<u8>;
+

+
    /// Encodes data before sending it to the remote peer.
+
    fn encrypt(&mut self, data: Vec<u8>) -> Vec<u8>;
+
}
+

+
/// Transcoder which does nothing.
+
#[derive(Debug, Default)]
pub struct PlainTranscoder;

-
impl Transcode for PlainTranscoder {}
+
impl Transcode for PlainTranscoder {
+
    fn decrypt(&mut self, data: &[u8]) -> Vec<u8> {
+
        data.to_vec()
+
    }
+

+
    fn encrypt(&mut self, data: Vec<u8>) -> Vec<u8> {
+
        data
+
    }
+
}