Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Start control socket server
Alexis Sellier committed 3 years ago
commit 12dd6d4a5d04526e6a2fb7559c7d0becd93226ba
parent fc30dc26687862458a378901d3273e38a1b28c83
7 files changed +98 -103
modified node/src/client.rs
@@ -11,7 +11,6 @@ use crate::protocol;
use crate::storage::git::Storage;

pub mod handle;
-
pub mod socket;

/// Client configuration.
#[derive(Debug, Clone)]
deleted node/src/client/socket.rs
@@ -1,80 +0,0 @@
-
use std::io::prelude::*;
-
use std::io::BufReader;
-
use std::os::unix::net::UnixListener;
-
use std::os::unix::net::UnixStream;
-
use std::path::Path;
-
use std::str::FromStr;
-
use std::{fs, io, net};
-

-
use nakamoto_net::Reactor;
-

-
use crate::client;
-
use crate::client::handle::Handle;
-
use crate::identity::ProjId;
-

-
/// Default name for control socket file.
-
pub const DEFAULT_NAME: &str = "radicle.sock";
-

-
#[derive(thiserror::Error, Debug)]
-
pub enum Error {
-
    #[error("failed to bind control socket listener: {0}")]
-
    Bind(io::Error),
-
}
-

-
/// Listen for commands on the control socket, and process them.
-
pub fn listen<P: AsRef<Path>, R: Reactor>(path: P, handle: Handle<R>) -> Result<(), Error> {
-
    // Remove the socket file on startup before rebinding.
-
    fs::remove_file(&path).ok();
-

-
    let listener = UnixListener::bind(path).map_err(Error::Bind)?;
-
    for incoming in listener.incoming() {
-
        match incoming {
-
            Ok(mut stream) => {
-
                if let Err(e) = drain(&stream, &handle) {
-
                    log::error!("Received {} on control socket", e);
-

-
                    write!(stream, "error: {}", e).ok();
-

-
                    stream.flush().ok();
-
                    stream.shutdown(net::Shutdown::Both).ok();
-
                }
-
            }
-
            Err(e) => log::error!("Failed to open control socket stream: {}", e),
-
        }
-
    }
-

-
    Ok(())
-
}
-

-
#[derive(thiserror::Error, Debug)]
-
enum DrainError {
-
    #[error("invalid command argument `{0}`")]
-
    InvalidCommandArg(String),
-
    #[error("unknown command `{0}`")]
-
    UnknownCommand(String),
-
    #[error("invalid command")]
-
    InvalidCommand,
-
    #[error("client error: {0}")]
-
    Client(#[from] client::handle::Error),
-
}
-

-
fn drain<R: Reactor>(stream: &UnixStream, handle: &Handle<R>) -> Result<(), DrainError> {
-
    let mut reader = BufReader::new(stream);
-

-
    for line in reader.by_ref().lines().flatten() {
-
        match line.split_once(' ') {
-
            Some(("update", arg)) => {
-
                if let Ok(id) = ProjId::from_str(arg) {
-
                    if let Err(e) = handle.updated(id) {
-
                        return Err(DrainError::Client(e));
-
                    }
-
                } else {
-
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
-
                }
-
            }
-
            Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),
-
            None => return Err(DrainError::InvalidCommand),
-
        }
-
    }
-
    Ok(())
-
}
added node/src/control.rs
@@ -0,0 +1,81 @@
+
//! Client control socket implementation.
+
use std::io::prelude::*;
+
use std::io::BufReader;
+
use std::os::unix::net::UnixListener;
+
use std::os::unix::net::UnixStream;
+
use std::path::Path;
+
use std::str::FromStr;
+
use std::{fs, io, net};
+

+
use nakamoto_net::Reactor;
+

+
use crate::client;
+
use crate::client::handle::Handle;
+
use crate::identity::ProjId;
+

+
/// Default name for control socket file.
+
pub const DEFAULT_SOCKET_NAME: &str = "radicle.sock";
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error("failed to bind control socket listener: {0}")]
+
    Bind(io::Error),
+
}
+

+
/// Listen for commands on the control socket, and process them.
+
pub fn listen<P: AsRef<Path>, R: Reactor>(path: P, handle: Handle<R>) -> Result<(), Error> {
+
    // Remove the socket file on startup before rebinding.
+
    fs::remove_file(&path).ok();
+

+
    let listener = UnixListener::bind(path).map_err(Error::Bind)?;
+
    for incoming in listener.incoming() {
+
        match incoming {
+
            Ok(mut stream) => {
+
                if let Err(e) = drain(&stream, &handle) {
+
                    log::error!("Received {} on control socket", e);
+

+
                    write!(stream, "error: {}", e).ok();
+

+
                    stream.flush().ok();
+
                    stream.shutdown(net::Shutdown::Both).ok();
+
                }
+
            }
+
            Err(e) => log::error!("Failed to open control socket stream: {}", e),
+
        }
+
    }
+

+
    Ok(())
+
}
+

+
#[derive(thiserror::Error, Debug)]
+
enum DrainError {
+
    #[error("invalid command argument `{0}`")]
+
    InvalidCommandArg(String),
+
    #[error("unknown command `{0}`")]
+
    UnknownCommand(String),
+
    #[error("invalid command")]
+
    InvalidCommand,
+
    #[error("client error: {0}")]
+
    Client(#[from] client::handle::Error),
+
}
+

+
fn drain<R: Reactor>(stream: &UnixStream, handle: &Handle<R>) -> Result<(), DrainError> {
+
    let mut reader = BufReader::new(stream);
+

+
    for line in reader.by_ref().lines().flatten() {
+
        match line.split_once(' ') {
+
            Some(("update", arg)) => {
+
                if let Ok(id) = ProjId::from_str(arg) {
+
                    if let Err(e) = handle.updated(id) {
+
                        return Err(DrainError::Client(e));
+
                    }
+
                } else {
+
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
+
                }
+
            }
+
            Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),
+
            None => return Err(DrainError::InvalidCommand),
+
        }
+
    }
+
    Ok(())
+
}
modified node/src/crypto.rs
@@ -1,4 +1,3 @@
-
use std::rc::Rc;
use std::sync::Arc;
use std::{fmt, ops::Deref, str::FromStr};

@@ -8,26 +7,13 @@ use thiserror::Error;

pub use ed25519::Signature;

-
pub trait Signer: 'static {
+
pub trait Signer: Send + Sync + 'static {
    /// Return this signer's public/verification key.
    fn public_key(&self) -> &PublicKey;
    /// Sign a message and return the signature.
    fn sign(&self, msg: &[u8]) -> Signature;
}

-
impl<T> Signer for Rc<T>
-
where
-
    T: Signer + ?Sized,
-
{
-
    fn sign(&self, msg: &[u8]) -> Signature {
-
        self.deref().sign(msg)
-
    }
-

-
    fn public_key(&self) -> &PublicKey {
-
        self.deref().public_key()
-
    }
-
}
-

impl<T> Signer for Arc<T>
where
    T: Signer + ?Sized,
modified node/src/lib.rs
@@ -2,6 +2,7 @@
pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};

pub mod client;
+
pub mod control;
pub mod crypto;

mod address_book;
modified node/src/main.rs
@@ -1,8 +1,9 @@
-
use std::net;
use std::path::Path;
+
use std::thread;
+
use std::{env, net};

-
use radicle_node::client;
use radicle_node::crypto::{PublicKey, Signature, Signer};
+
use radicle_node::{client, control};

type Reactor = nakamoto_net_poll::Reactor<net::TcpStream>;

@@ -21,8 +22,15 @@ impl Signer for FailingSigner {
fn main() -> anyhow::Result<()> {
    let signer = FailingSigner {};
    let client = client::Client::<Reactor>::new(Path::new("."), signer)?;
+
    let handle = client.handle();
+
    let config = client::Config::default();
+
    let socket = env::var("RAD_SOCKET").unwrap_or_else(|_| control::DEFAULT_SOCKET_NAME.to_owned());

-
    client.run(client::Config::default())?;
+
    let t1 = thread::spawn(move || control::listen(socket, handle));
+
    let t2 = thread::spawn(move || client.run(config));
+

+
    t1.join().unwrap()?;
+
    t2.join().unwrap()?;

    Ok(())
}
modified node/src/storage/git.rs
@@ -1,5 +1,5 @@
use std::path::{Path, PathBuf};
-
use std::rc::Rc;
+
use std::sync::Arc;
use std::{fmt, fs, io};

use git_ref_format::refspec;
@@ -26,7 +26,7 @@ pub static NAMESPACES_GLOB: Lazy<refspec::PatternString> =

pub struct Storage {
    path: PathBuf,
-
    signer: Rc<dyn Signer>,
+
    signer: Arc<dyn Signer>,
}

impl fmt::Debug for Storage {
@@ -77,7 +77,7 @@ impl Storage {

        Ok(Self {
            path,
-
            signer: Rc::new(signer),
+
            signer: Arc::new(signer),
        })
    }

@@ -85,7 +85,7 @@ impl Storage {
        self.path.as_path()
    }

-
    pub fn signer(&self) -> Rc<dyn Signer> {
+
    pub fn signer(&self) -> Arc<dyn Signer> {
        self.signer.clone()
    }