Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add client control socket
Alexis Sellier committed 3 years ago
commit fc30dc26687862458a378901d3273e38a1b28c83
parent e663e8c516889c8da95f92c811eaf0f19b230d69
5 files changed +179 -0
modified node/src/client.rs
@@ -10,6 +10,9 @@ use crate::crypto::Signer;
use crate::protocol;
use crate::storage::git::Storage;

+
pub mod handle;
+
pub mod socket;
+

/// Client configuration.
#[derive(Debug, Clone)]
pub struct Config {
@@ -101,6 +104,16 @@ impl<R: Reactor> Client<R> {

        Ok(())
    }
+

+
    /// Create a new handle to communicate with the client.
+
    pub fn handle(&self) -> handle::Handle<R> {
+
        handle::Handle {
+
            waker: self.reactor.waker(),
+
            commands: self.handle.clone(),
+
            shutdown: self.shutdown.clone(),
+
            listening: self.listening.clone(),
+
        }
+
    }
}

pub struct Events {}
added node/src/client/handle.rs
@@ -0,0 +1,77 @@
+
use std::net;
+

+
use crossbeam_channel as chan;
+
use nakamoto_net::Reactor;
+
use thiserror::Error;
+

+
use crate::identity::ProjId;
+
use crate::protocol;
+
use crate::protocol::CommandError;
+

+
/// An error resulting from a handle method.
+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// The command channel is no longer connected.
+
    #[error("command channel is not connected")]
+
    NotConnected,
+
    /// The command returned an error.
+
    #[error("command failed: {0}")]
+
    Command(#[from] CommandError),
+
    /// The operation timed out.
+
    #[error("the operation timed out")]
+
    Timeout,
+
    /// An I/O error occured.
+
    #[error(transparent)]
+
    Io(#[from] std::io::Error),
+
}
+

+
impl From<chan::RecvError> for Error {
+
    fn from(_: chan::RecvError) -> Self {
+
        Self::NotConnected
+
    }
+
}
+

+
impl From<chan::RecvTimeoutError> for Error {
+
    fn from(err: chan::RecvTimeoutError) -> Self {
+
        match err {
+
            chan::RecvTimeoutError::Timeout => Self::Timeout,
+
            chan::RecvTimeoutError::Disconnected => Self::NotConnected,
+
        }
+
    }
+
}
+

+
impl<T> From<chan::SendError<T>> for Error {
+
    fn from(_: chan::SendError<T>) -> Self {
+
        Self::NotConnected
+
    }
+
}
+

+
pub struct Handle<R: Reactor> {
+
    pub(crate) commands: chan::Sender<protocol::Command>,
+
    pub(crate) waker: R::Waker,
+
    pub(crate) shutdown: chan::Sender<()>,
+
    pub(crate) listening: chan::Receiver<net::SocketAddr>,
+
}
+

+
impl<R: Reactor> Handle<R> {
+
    /// Notify the client that a project has been updated.
+
    pub fn updated(&self, id: ProjId) -> Result<(), Error> {
+
        self.command(protocol::Command::AnnounceInventory(id))
+
    }
+

+
    /// Send a command to the command channel, and wake up the event loop.
+
    pub fn command(&self, cmd: protocol::Command) -> Result<(), Error> {
+
        self.commands.send(cmd)?;
+
        R::wake(&self.waker)?;
+

+
        Ok(())
+
    }
+

+
    /// Ask the client to shutdown.
+
    pub fn shutdown(self) -> Result<(), Error> {
+
        self.shutdown.send(())?;
+
        R::wake(&self.waker)?;
+

+
        Ok(())
+
    }
+
}
added node/src/client/socket.rs
@@ -0,0 +1,80 @@
+
use std::io::prelude::*;
+
use std::io::BufReader;
+
use std::os::unix::net::UnixListener;
+
use std::os::unix::net::UnixStream;
+
use std::path::Path;
+
use std::str::FromStr;
+
use std::{fs, io, net};
+

+
use nakamoto_net::Reactor;
+

+
use crate::client;
+
use crate::client::handle::Handle;
+
use crate::identity::ProjId;
+

+
/// Default name for control socket file.
+
pub const DEFAULT_NAME: &str = "radicle.sock";
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error("failed to bind control socket listener: {0}")]
+
    Bind(io::Error),
+
}
+

+
/// Listen for commands on the control socket, and process them.
+
pub fn listen<P: AsRef<Path>, R: Reactor>(path: P, handle: Handle<R>) -> Result<(), Error> {
+
    // Remove the socket file on startup before rebinding.
+
    fs::remove_file(&path).ok();
+

+
    let listener = UnixListener::bind(path).map_err(Error::Bind)?;
+
    for incoming in listener.incoming() {
+
        match incoming {
+
            Ok(mut stream) => {
+
                if let Err(e) = drain(&stream, &handle) {
+
                    log::error!("Received {} on control socket", e);
+

+
                    write!(stream, "error: {}", e).ok();
+

+
                    stream.flush().ok();
+
                    stream.shutdown(net::Shutdown::Both).ok();
+
                }
+
            }
+
            Err(e) => log::error!("Failed to open control socket stream: {}", e),
+
        }
+
    }
+

+
    Ok(())
+
}
+

+
#[derive(thiserror::Error, Debug)]
+
enum DrainError {
+
    #[error("invalid command argument `{0}`")]
+
    InvalidCommandArg(String),
+
    #[error("unknown command `{0}`")]
+
    UnknownCommand(String),
+
    #[error("invalid command")]
+
    InvalidCommand,
+
    #[error("client error: {0}")]
+
    Client(#[from] client::handle::Error),
+
}
+

+
fn drain<R: Reactor>(stream: &UnixStream, handle: &Handle<R>) -> Result<(), DrainError> {
+
    let mut reader = BufReader::new(stream);
+

+
    for line in reader.by_ref().lines().flatten() {
+
        match line.split_once(' ') {
+
            Some(("update", arg)) => {
+
                if let Ok(id) = ProjId::from_str(arg) {
+
                    if let Err(e) = handle.updated(id) {
+
                        return Err(DrainError::Client(e));
+
                    }
+
                } else {
+
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
+
                }
+
            }
+
            Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),
+
            None => return Err(DrainError::InvalidCommand),
+
        }
+
    }
+
    Ok(())
+
}
modified node/src/protocol.rs
@@ -54,8 +54,13 @@ pub enum Event {}
pub enum Command {
    Connect(net::SocketAddr),
    Fetch(ProjId, net::SocketAddr),
+
    AnnounceInventory(ProjId),
}

+
/// Command-related errors.
+
#[derive(thiserror::Error, Debug)]
+
pub enum CommandError {}
+

#[derive(Debug)]
pub struct Protocol<S, T, G> {
    /// Peers currently or recently connected.
@@ -330,6 +335,9 @@ where
                    })
                    .unwrap();
            }
+
            Command::AnnounceInventory(_proj) => {
+
                todo!()
+
            }
        }
    }

modified node/src/rad.rs
@@ -107,6 +107,7 @@ pub fn init<S: storage::WriteStorage>(
    git::set_upstream(
        repo,
        REMOTE_NAME,
+
        // FIXME: Should use default branch here.
        "master",
        &format!("refs/remotes/{user_id}/heads/master"),
    )?;