Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement `Client`
Alexis Sellier committed 3 years ago
commit e663e8c516889c8da95f92c811eaf0f19b230d69
parent 35b0af9a6833dbaf9089bd97ed98d386a8fd1256
11 files changed +228 -38
modified node/Cargo.toml
@@ -11,6 +11,7 @@ bs58 = { version = "0.4.0" }
ed25519-consensus = { version = "2.0.1" }
chrono = { version = "0.4.0" }
colored = { version = "1.9.0" }
+
crossbeam-channel = { version = "0.5.6" }
fastrand = { version = "1.8.0" }
git-ref-format = { version = "0", features = ["serde", "macro"] }
git2 = { version = "0.13" }
added node/src/client.rs
@@ -0,0 +1,112 @@
+
use std::net;
+
use std::path::Path;
+

+
use crossbeam_channel as chan;
+
use nakamoto_net::{LocalTime, Reactor};
+

+
use crate::clock::RefClock;
+
use crate::collections::HashMap;
+
use crate::crypto::Signer;
+
use crate::protocol;
+
use crate::storage::git::Storage;
+

+
/// Client configuration.
+
#[derive(Debug, Clone)]
+
pub struct Config {
+
    /// Client protocol configuration.
+
    pub protocol: protocol::Config,
+
    /// Client listen addresses.
+
    pub listen: Vec<net::SocketAddr>,
+
}
+

+
impl Config {
+
    /// Create a new configuration for the given network.
+
    pub fn new(network: protocol::Network) -> Self {
+
        Self {
+
            protocol: protocol::Config {
+
                network,
+
                ..protocol::Config::default()
+
            },
+
            ..Self::default()
+
        }
+
    }
+
}
+

+
impl Default for Config {
+
    fn default() -> Self {
+
        Self {
+
            protocol: protocol::Config::default(),
+
            listen: vec![([0, 0, 0, 0], 0).into()],
+
        }
+
    }
+
}
+

+
pub struct Client<R: Reactor> {
+
    reactor: R,
+
    storage: Storage,
+

+
    handle: chan::Sender<protocol::Command>,
+
    commands: chan::Receiver<protocol::Command>,
+
    shutdown: chan::Sender<()>,
+
    listening: chan::Receiver<net::SocketAddr>,
+
    events: Events,
+
}
+

+
impl<R: Reactor> Client<R> {
+
    pub fn new<P: AsRef<Path>, S: Signer>(
+
        path: P,
+
        signer: S,
+
    ) -> Result<Self, nakamoto_net::error::Error> {
+
        let (handle, commands) = chan::unbounded::<protocol::Command>();
+
        let (shutdown, shutdown_recv) = chan::bounded(1);
+
        let (listening_send, listening) = chan::bounded(1);
+
        let reactor = R::new(shutdown_recv, listening_send)?;
+
        let storage = Storage::open(path, signer)?;
+
        let events = Events {};
+

+
        Ok(Self {
+
            storage,
+
            reactor,
+
            handle,
+
            commands,
+
            listening,
+
            shutdown,
+
            events,
+
        })
+
    }
+

+
    pub fn run(mut self, config: Config) -> Result<(), nakamoto_net::error::Error> {
+
        let network = config.protocol.network;
+
        let rng = fastrand::Rng::new();
+
        let time = LocalTime::now();
+
        let storage = self.storage;
+
        let signer = storage.signer();
+
        let addresses = HashMap::with_hasher(rng.clone().into());
+

+
        log::info!("Initializing client ({:?})..", network);
+

+
        self.reactor.run(
+
            &config.listen,
+
            protocol::Protocol::new(
+
                config.protocol,
+
                RefClock::from(time),
+
                storage,
+
                addresses,
+
                signer,
+
                rng,
+
            ),
+
            self.events,
+
            self.commands,
+
        )?;
+

+
        Ok(())
+
    }
+
}
+

+
pub struct Events {}
+

+
impl nakamoto_net::Publisher<protocol::Event> for Events {
+
    fn publish(&mut self, e: protocol::Event) {
+
        log::info!("Received event {:?}", e);
+
    }
+
}
modified node/src/crypto.rs
@@ -1,3 +1,5 @@
+
use std::rc::Rc;
+
use std::sync::Arc;
use std::{fmt, ops::Deref, str::FromStr};

use ed25519_consensus as ed25519;
@@ -13,6 +15,32 @@ pub trait Signer: 'static {
    fn sign(&self, msg: &[u8]) -> Signature;
}

+
impl<T> Signer for Rc<T>
+
where
+
    T: Signer + ?Sized,
+
{
+
    fn sign(&self, msg: &[u8]) -> Signature {
+
        self.deref().sign(msg)
+
    }
+

+
    fn public_key(&self) -> &PublicKey {
+
        self.deref().public_key()
+
    }
+
}
+

+
impl<T> Signer for Arc<T>
+
where
+
    T: Signer + ?Sized,
+
{
+
    fn sign(&self, msg: &[u8]) -> Signature {
+
        self.deref().sign(msg)
+
    }
+

+
    fn public_key(&self) -> &PublicKey {
+
        self.deref().public_key()
+
    }
+
}
+

/// The public/verification key.
#[derive(Serialize, Deserialize, Eq, Debug, Copy, Clone)]
#[serde(transparent)]
modified node/src/lib.rs
@@ -1,11 +1,13 @@
#![allow(dead_code)]
pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};

+
pub mod client;
+
pub mod crypto;
+

mod address_book;
mod address_manager;
mod clock;
mod collections;
-
mod crypto;
mod decoder;
mod git;
mod hash;
@@ -16,7 +18,3 @@ mod rad;
mod storage;
#[cfg(test)]
mod test;
-

-
pub fn run() -> anyhow::Result<()> {
-
    Ok(())
-
}
modified node/src/main.rs
@@ -1,3 +1,28 @@
+
use std::net;
+
use std::path::Path;
+

+
use radicle_node::client;
+
use radicle_node::crypto::{PublicKey, Signature, Signer};
+

+
type Reactor = nakamoto_net_poll::Reactor<net::TcpStream>;
+

+
struct FailingSigner {}
+

+
impl Signer for FailingSigner {
+
    fn public_key(&self) -> &PublicKey {
+
        panic!("Failing signer always fails!");
+
    }
+

+
    fn sign(&self, _msg: &[u8]) -> Signature {
+
        panic!("Failing signer always fails!");
+
    }
+
}
+

fn main() -> anyhow::Result<()> {
-
    radicle_node::run()
+
    let signer = FailingSigner {};
+
    let client = client::Client::<Reactor>::new(Path::new("."), signer)?;
+

+
    client.run(client::Config::default())?;
+

+
    Ok(())
}
modified node/src/protocol.rs
@@ -20,13 +20,14 @@ use crate::clock::RefClock;
use crate::collections::{HashMap, HashSet};
use crate::crypto;
use crate::identity::ProjId;
-
use crate::protocol::config::{Config, ProjectTracking};
-
use crate::protocol::message::{Envelope, Message};
+
use crate::protocol::config::ProjectTracking;
+
use crate::protocol::message::Message;
use crate::protocol::peer::{Peer, PeerError, PeerState};
use crate::storage::{self, WriteRepository};
use crate::storage::{Inventory, ReadStorage, Remotes, Unverified, WriteStorage};

-
pub const NETWORK_MAGIC: u32 = 0x819b43d9;
+
pub use crate::protocol::config::{Config, Network};
+

pub const DEFAULT_PORT: u16 = 8776;
pub const PROTOCOL_VERSION: u32 = 1;
pub const TARGET_OUTBOUND_PEERS: usize = 8;
@@ -44,6 +45,10 @@ pub type Routing = HashMap<ProjId, HashSet<NodeId>>;
/// Seconds since epoch.
pub type Timestamp = u64;

+
/// A protocol event.
+
#[derive(Debug, Clone)]
+
pub enum Event {}
+

/// Commands sent to the protocol by the operator.
#[derive(Debug)]
pub enum Command {
@@ -177,7 +182,7 @@ impl<T: ReadStorage + WriteStorage, S: address_book::Store, G: crypto::Signer> P
    }

    /// Get I/O outbox.
-
    pub fn outbox(&mut self) -> &mut VecDeque<Io<(), DisconnectReason>> {
+
    pub fn outbox(&mut self) -> &mut VecDeque<Io<Event, DisconnectReason>> {
        &mut self.context.io
    }

@@ -249,7 +254,7 @@ where
    S: address_book::Store,
    G: crypto::Signer,
{
-
    type Event = ();
+
    type Event = Event;
    type Command = Command;
    type DisconnectReason = DisconnectReason;

@@ -516,7 +521,7 @@ impl fmt::Display for DisconnectReason {
}

impl<S, T, G> Iterator for Protocol<S, T, G> {
-
    type Item = Io<(), DisconnectReason>;
+
    type Item = Io<Event, DisconnectReason>;

    fn next(&mut self) -> Option<Self::Item> {
        self.context.io.pop_front()
@@ -542,7 +547,7 @@ pub struct Context<S, T, G> {
    /// Tracks the location of projects.
    routing: Routing,
    /// Outgoing I/O queue.
-
    io: VecDeque<Io<(), DisconnectReason>>,
+
    io: VecDeque<Io<Event, DisconnectReason>>,
    /// Clock. Tells the time.
    clock: RefClock,
    /// Project storage.
@@ -629,7 +634,8 @@ impl<S, T, G> Context<S, T, G> {
        for msg in msgs {
            debug!("Write {:?} to {}", &msg, remote.ip());

-
            serde_json::to_writer(&mut buf, &Envelope::from(msg)).unwrap();
+
            let envelope = self.config.network.envelope(msg);
+
            serde_json::to_writer(&mut buf, &envelope).unwrap();
        }
        self.io.push_back(Io::Write(remote, buf.into_inner()));
    }
@@ -637,7 +643,8 @@ impl<S, T, G> Context<S, T, G> {
    fn write(&mut self, remote: net::SocketAddr, msg: Message) {
        debug!("Write {:?} to {}", &msg, remote.ip());

-
        let bytes = serde_json::to_vec(&Envelope::from(msg)).unwrap();
+
        let envelope = self.config.network.envelope(msg);
+
        let bytes = serde_json::to_vec(&envelope).unwrap();

        self.io.push_back(Io::Write(remote, bytes));
    }
modified node/src/protocol/config.rs
@@ -4,10 +4,34 @@ use git_url::Url;

use crate::collections::HashSet;
use crate::identity::{ProjId, UserId};
-
use crate::protocol::message::Address;
+
use crate::protocol::message::{Address, Envelope, Message};
+

+
/// Peer-to-peer network.
+
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
+
pub enum Network {
+
    #[default]
+
    Main,
+
    Test,
+
}
+

+
impl Network {
+
    pub fn magic(&self) -> u32 {
+
        match self {
+
            Self::Main => 0x819b43d9,
+
            Self::Test => 0x717ebaf8,
+
        }
+
    }
+

+
    pub fn envelope(&self, msg: Message) -> Envelope {
+
        Envelope {
+
            magic: self.magic(),
+
            msg,
+
        }
+
    }
+
}

/// Project tracking policy.
-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
pub enum ProjectTracking {
    /// Track all projects we come across.
    All { blocked: HashSet<ProjId> },
@@ -24,7 +48,7 @@ impl Default for ProjectTracking {
}

/// Project remote tracking policy.
-
#[derive(Debug, Default)]
+
#[derive(Debug, Default, Clone)]
pub enum RemoteTracking {
    /// Only track remotes of project delegates.
    #[default]
@@ -36,11 +60,13 @@ pub enum RemoteTracking {
}

/// Protocol configuration.
-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
pub struct Config {
    /// Peers to connect to on startup.
    /// Connections to these peers will be maintained.
    pub connect: Vec<net::SocketAddr>,
+
    /// Peer-to-peer network.
+
    pub network: Network,
    /// Project tracking policy.
    pub project_tracking: ProjectTracking,
    /// Project remote tracking policy.
@@ -57,6 +83,7 @@ impl Default for Config {
    fn default() -> Self {
        Self {
            connect: Vec::default(),
+
            network: Network::default(),
            project_tracking: ProjectTracking::default(),
            remote_tracking: RemoteTracking::default(),
            relay: true,
modified node/src/protocol/message.rs
@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};

use crate::crypto;
use crate::identity::ProjId;
-
use crate::protocol::{Context, NodeId, Timestamp, NETWORK_MAGIC, PROTOCOL_VERSION};
+
use crate::protocol::{Context, NodeId, Timestamp, PROTOCOL_VERSION};
use crate::storage;

/// Message envelope. All messages sent over the network are wrapped in this type.
@@ -98,15 +98,6 @@ pub enum Message {
    },
}

-
impl From<Message> for Envelope {
-
    fn from(msg: Message) -> Self {
-
        Self {
-
            magic: NETWORK_MAGIC,
-
            msg,
-
        }
-
    }
-
}
-

impl Message {
    pub fn hello(id: NodeId, timestamp: Timestamp, addrs: Vec<Address>, git: Url) -> Self {
        Self::Hello {
modified node/src/protocol/peer.rs
@@ -104,7 +104,7 @@ impl Peer {
        T: storage::ReadStorage + storage::WriteStorage,
        G: crypto::Signer,
    {
-
        if envelope.magic != NETWORK_MAGIC {
+
        if envelope.magic != ctx.config.network.magic() {
            return Err(PeerError::WrongMagic(envelope.magic));
        }
        debug!("Received {:?} from {}", &envelope.msg, self.ip());
modified node/src/storage/git.rs
@@ -1,4 +1,5 @@
use std::path::{Path, PathBuf};
+
use std::rc::Rc;
use std::{fmt, fs, io};

use git_ref_format::refspec;
@@ -25,7 +26,7 @@ pub static NAMESPACES_GLOB: Lazy<refspec::PatternString> =

pub struct Storage {
    path: PathBuf,
-
    signer: Box<dyn Signer>,
+
    signer: Rc<dyn Signer>,
}

impl fmt::Debug for Storage {
@@ -76,7 +77,7 @@ impl Storage {

        Ok(Self {
            path,
-
            signer: Box::new(signer),
+
            signer: Rc::new(signer),
        })
    }

@@ -84,6 +85,10 @@ impl Storage {
        self.path.as_path()
    }

+
    pub fn signer(&self) -> Rc<dyn Signer> {
+
        self.signer.clone()
+
    }
+

    pub fn projects(&self) -> Result<Vec<ProjId>, Error> {
        let mut projects = Vec::new();

modified node/src/test/peer.rs
@@ -129,11 +129,7 @@ where
    }

    pub fn receive(&mut self, peer: &net::SocketAddr, msg: Message) {
-
        let bytes = serde_json::to_vec(&Envelope {
-
            magic: NETWORK_MAGIC,
-
            msg,
-
        })
-
        .unwrap();
+
        let bytes = serde_json::to_vec(&self.config().network.envelope(msg)).unwrap();

        self.protocol.received_bytes(peer, &bytes);
    }
@@ -212,7 +208,7 @@ where
    }

    /// Get a draining iterator over the peers's I/O outbox.
-
    pub fn outbox(&mut self) -> impl Iterator<Item = Io<(), DisconnectReason>> + '_ {
+
    pub fn outbox(&mut self) -> impl Iterator<Item = Io<Event, DisconnectReason>> + '_ {
        self.protocol.outbox().drain(..)
    }
}