Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Test control socket
Alexis Sellier committed 3 years ago
commit 400ba8d9352e79a6ab9a557d3758b07caed7075d
parent 12dd6d4a5d04526e6a2fb7559c7d0becd93226ba
4 files changed +90 -10
modified node/src/client/handle.rs
@@ -53,14 +53,14 @@ pub struct Handle<R: Reactor> {
    pub(crate) listening: chan::Receiver<net::SocketAddr>,
}

-
impl<R: Reactor> Handle<R> {
+
impl<R: Reactor> traits::Handle for Handle<R> {
    /// Notify the client that a project has been updated.
-
    pub fn updated(&self, id: ProjId) -> Result<(), Error> {
+
    fn updated(&self, id: ProjId) -> Result<(), Error> {
        self.command(protocol::Command::AnnounceInventory(id))
    }

    /// Send a command to the command channel, and wake up the event loop.
-
    pub fn command(&self, cmd: protocol::Command) -> Result<(), Error> {
+
    fn command(&self, cmd: protocol::Command) -> Result<(), Error> {
        self.commands.send(cmd)?;
        R::wake(&self.waker)?;

@@ -68,10 +68,23 @@ impl<R: Reactor> Handle<R> {
    }

    /// Ask the client to shutdown.
-
    pub fn shutdown(self) -> Result<(), Error> {
+
    fn shutdown(self) -> Result<(), Error> {
        self.shutdown.send(())?;
        R::wake(&self.waker)?;

        Ok(())
    }
}
+

+
pub mod traits {
+
    use super::*;
+

+
    pub trait Handle {
+
        /// Notify the client that a project has been updated.
+
        fn updated(&self, id: ProjId) -> Result<(), Error>;
+
        /// Send a command to the command channel, and wake up the event loop.
+
        fn command(&self, cmd: protocol::Command) -> Result<(), Error>;
+
        /// Ask the client to shutdown.
+
        fn shutdown(self) -> Result<(), Error>;
+
    }
+
}
modified node/src/control.rs
@@ -7,10 +7,8 @@ use std::path::Path;
use std::str::FromStr;
use std::{fs, io, net};

-
use nakamoto_net::Reactor;
-

use crate::client;
-
use crate::client::handle::Handle;
+
use crate::client::handle::traits::Handle;
use crate::identity::ProjId;

/// Default name for control socket file.
@@ -23,7 +21,7 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<P: AsRef<Path>, R: Reactor>(path: P, handle: Handle<R>) -> Result<(), Error> {
+
pub fn listen<P: AsRef<Path>, H: Handle>(path: P, handle: H) -> Result<(), Error> {
    // Remove the socket file on startup before rebinding.
    fs::remove_file(&path).ok();

@@ -34,10 +32,12 @@ pub fn listen<P: AsRef<Path>, R: Reactor>(path: P, handle: Handle<R>) -> Result<
                if let Err(e) = drain(&stream, &handle) {
                    log::error!("Received {} on control socket", e);

-
                    write!(stream, "error: {}", e).ok();
+
                    writeln!(stream, "error: {}", e).ok();

                    stream.flush().ok();
                    stream.shutdown(net::Shutdown::Both).ok();
+
                } else {
+
                    writeln!(stream, "ok").ok();
                }
            }
            Err(e) => log::error!("Failed to open control socket stream: {}", e),
@@ -59,7 +59,7 @@ enum DrainError {
    Client(#[from] client::handle::Error),
}

-
fn drain<R: Reactor>(stream: &UnixStream, handle: &Handle<R>) -> Result<(), DrainError> {
+
fn drain<H: Handle>(stream: &UnixStream, handle: &H) -> Result<(), DrainError> {
    let mut reader = BufReader::new(stream);

    for line in reader.by_ref().lines().flatten() {
@@ -79,3 +79,43 @@ fn drain<R: Reactor>(stream: &UnixStream, handle: &Handle<R>) -> Result<(), Drai
    }
    Ok(())
}
+

+
#[cfg(test)]
+
mod tests {
+
    use std::io::prelude::*;
+
    use std::os::unix::net::UnixStream;
+
    use std::{net, thread};
+

+
    use super::*;
+
    use crate::identity::ProjId;
+
    use crate::test;
+

+
    #[test]
+
    fn test_control_socket() {
+
        let tmp = tempfile::tempdir().unwrap();
+
        let handle = test::handle::Handle::default();
+
        let socket = tmp.path().join("alice.sock");
+
        let proj = test::arbitrary::gen::<ProjId>(1);
+

+
        thread::spawn({
+
            let socket = socket.clone();
+
            let handle = handle.clone();
+

+
            move || listen(socket, handle)
+
        });
+

+
        let mut stream = loop {
+
            if let Ok(stream) = UnixStream::connect(&socket) {
+
                break stream;
+
            }
+
        };
+
        writeln!(&stream, "update {}", proj).unwrap();
+

+
        let mut buf = [0; 2];
+
        stream.shutdown(net::Shutdown::Write).unwrap();
+
        stream.read_exact(&mut buf).unwrap();
+

+
        assert_eq!(&buf, &[b'o', b'k']);
+
        assert!(handle.updates.lock().unwrap().contains(&proj));
+
    }
+
}
modified node/src/test.rs
@@ -2,6 +2,7 @@ pub(crate) mod arbitrary;
pub(crate) mod assert;
pub(crate) mod crypto;
pub(crate) mod fixtures;
+
pub(crate) mod handle;
pub(crate) mod logger;
pub(crate) mod peer;
pub(crate) mod storage;
added node/src/test/handle.rs
@@ -0,0 +1,26 @@
+
use std::sync::{Arc, Mutex};
+

+
use crate::client::handle;
+
use crate::client::handle::traits;
+
use crate::{client, identity, protocol};
+

+
#[derive(Default, Clone)]
+
pub struct Handle {
+
    pub updates: Arc<Mutex<Vec<identity::ProjId>>>,
+
}
+

+
impl traits::Handle for Handle {
+
    fn updated(&self, id: identity::ProjId) -> Result<(), handle::Error> {
+
        self.updates.lock().unwrap().push(id);
+

+
        Ok(())
+
    }
+

+
    fn command(&self, _cmd: protocol::Command) -> Result<(), handle::Error> {
+
        Ok(())
+
    }
+

+
    fn shutdown(self) -> Result<(), client::handle::Error> {
+
        Ok(())
+
    }
+
}