Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Implement framer and multiplexer
Dr. Maxim Orlovsky committed 3 years ago
commit 13c9491e49481f9800b297c4077653f1d42efd84
parent 983b2dc534a90f54bfc8f83c665c760deccd9826
2 files changed +127 -9
modified radicle-node/src/wire.rs
@@ -542,7 +542,7 @@ where
            deserializer,
        }) = self.inboxes.get_mut(addr)
        {
-
            let bytes = transcoder.decrypt(raw_bytes);
+
            let bytes = transcoder.decode(raw_bytes);
            deserializer.input(&bytes);

            loop {
@@ -584,7 +584,7 @@ impl<R, S, W, G, H: Handshake> Iterator for Wire<R, S, W, G, H> {
                let Inbox { transcoder, .. } = self.inboxes.get_mut(&addr).expect(
                    "broken handshake implementation: data sent before handshake was complete",
                );
-
                let data = transcoder.encrypt(buf);
+
                let data = transcoder.encode(buf);
                Some(nakamoto::Io::Write(addr, data))
            }
            Some(Io::Event(e)) => Some(nakamoto::Io::Event(e)),
modified radicle-node/src/wire/transcoder.rs
@@ -1,5 +1,9 @@
-
use nakamoto_net::Link;
+
use std::collections::VecDeque;
use std::convert::Infallible;
+
use std::io;
+
use std::io::Read;
+

+
use nakamoto_net::Link;

// TODO: Implement Try trait once stabilized
/// Result of a state-machine transition.
@@ -50,15 +54,14 @@ impl Handshake for NoHandshake {
    }
}

-
/// Trait allowing transcoding a stream using some form of stream encryption
-
/// and/or encoding.
+
/// Trait allowing transcoding a stream using some form of stream encryption and/or encoding.
pub trait Transcode {
    /// Decodes data received from the remote peer and update the internal state
    /// of the transcoder, if necessary.
-
    fn decrypt(&mut self, data: &[u8]) -> Vec<u8>;
+
    fn decode(&mut self, data: &[u8]) -> Vec<u8>;

    /// Encodes data before sending it to the remote peer.
-
    fn encrypt(&mut self, data: Vec<u8>) -> Vec<u8>;
+
    fn encode(&mut self, data: Vec<u8>) -> Vec<u8>;
}

/// Transcoder which does nothing.
@@ -66,11 +69,126 @@ pub trait Transcode {
pub struct PlainTranscoder;

impl Transcode for PlainTranscoder {
-
    fn decrypt(&mut self, data: &[u8]) -> Vec<u8> {
+
    fn decode(&mut self, data: &[u8]) -> Vec<u8> {
        data.to_vec()
    }

-
    fn encrypt(&mut self, data: Vec<u8>) -> Vec<u8> {
+
    fn encode(&mut self, data: Vec<u8>) -> Vec<u8> {
        data
    }
}
+

+
pub type Frame = Vec<u8>;
+

+
#[derive(Copy, Clone, Debug)]
+
pub struct OversizedData(usize);
+

+
#[derive(Debug, Default)]
+
pub struct Framer<T: Transcode> {
+
    input: VecDeque<u8>,
+
    inner: T,
+
}
+

+
impl<T: Transcode> Framer<T> {
+
    pub fn new(inner: T) -> Self {
+
        Framer {
+
            input: Default::default(),
+
            inner,
+
        }
+
    }
+

+
    pub fn input(&mut self, encoded: &[u8]) {
+
        self.input.extend(self.inner.decode(encoded));
+
    }
+

+
    pub fn frame(&mut self, decoded: Vec<u8>) -> Result<Frame, OversizedData> {
+
        let mut data = self.inner.encode(decoded);
+
        let len = data.len();
+
        let len = u8::try_from(len).map_err(|_| OversizedData(len))?;
+
        let len = len.to_be_bytes();
+
        let mut buf = Vec::with_capacity(data.len() + 2);
+
        buf.extend(len);
+
        buf.append(&mut data);
+
        Ok(buf)
+
    }
+
}
+

+
impl<T: Transcode> Iterator for Framer<T> {
+
    type Item = Frame;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        if self.input.len() < 2 {
+
            return None;
+
        }
+
        let mut len = [0u8; 2];
+
        self.input
+
            .read_exact(&mut len)
+
            .expect("the length is checked");
+
        let len = u16::from_be_bytes(len) as usize;
+
        if self.input.len() < 2 + len {
+
            return None;
+
        }
+
        self.input.pop_front();
+
        self.input.pop_front();
+
        let reminder = self.input.split_off(len);
+
        let mut data = vec![0u8; len];
+
        self.input.read_exact(&mut data).expect("checked length");
+
        self.input = reminder;
+
        Some(data)
+
    }
+
}
+

+
#[derive(Copy, Clone, Debug)]
+
pub struct ChannelError;
+

+
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
+
pub struct MuxMsg {
+
    pub channel: u16,
+
    pub data: Vec<u8>,
+
}
+

+
impl TryFrom<Frame> for MuxMsg {
+
    type Error = ChannelError;
+

+
    fn try_from(frame: Frame) -> Result<Self, Self::Error> {
+
        if frame.len() < 2 {
+
            return Err(ChannelError);
+
        }
+
        let mut channel = [0u8; 2];
+
        let mut cursor = io::Cursor::new(frame);
+
        cursor
+
            .read_exact(&mut channel)
+
            .expect("the length is checked");
+
        let channel = u16::from_be_bytes(channel);
+
        Ok(MuxMsg {
+
            channel,
+
            data: cursor.into_inner(),
+
        })
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+

+
    #[test]
+
    fn transcode() {
+
        let mut pipeline = Framer::new(PlainTranscoder);
+

+
        let data = [
+
            0x00, 0x04, 0x00, 0x00, b'a', b'b', 0x00, 0x07, 0x00, 0x01, b'M', b'a', b'x', b'i',
+
            b'm',
+
        ];
+
        let mut expected_payloads = [(0u16, b"ab".to_vec()), (1, b"Maxim".to_vec())].into_iter();
+

+
        for byte in data {
+
            // Writing data byte by byte, ensuring that the reading is not broken
+
            pipeline.input(&[byte]);
+
            for frame in &mut pipeline {
+
                let msg = MuxMsg::try_from(frame).unwrap();
+
                let (channel, data) = expected_payloads.next().unwrap();
+
                assert_eq!(msg, MuxMsg { channel, data });
+
            }
+
        }
+
    }
+
}