Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: stream timeout semantics for `Node::call`
Fintan Halpenny committed 2 years ago
commit 027bfbef67fd2af6ec1ae41a07b1b245c525d943
parent c8a24d558416661fc3b39e9dffd323b71a23f224
5 files changed +119 -66
modified radicle-cli/src/commands/node/events.rs
@@ -1,10 +1,13 @@
use std::time;

-
use radicle::node::Handle;
+
use radicle::node::{Event, Handle};

-
pub fn run(node: impl Handle, count: usize, timeout: time::Duration) -> anyhow::Result<()> {
+
pub fn run<H>(node: H, count: usize, timeout: time::Duration) -> anyhow::Result<()>
+
where
+
    H: Handle<Event = Result<Event, <H as Handle>::Error>>,
+
{
    let events = node.subscribe(timeout)?;
-
    for (i, event) in events.enumerate() {
+
    for (i, event) in events.into_iter().enumerate() {
        let event = event?;
        let obj = serde_json::to_string(&event)?;

modified radicle-node/src/control.rs
@@ -30,12 +30,12 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<H: Handle<Error = runtime::HandleError> + 'static>(
-
    listener: UnixListener,
-
    handle: H,
-
) -> Result<(), Error>
+
pub fn listen<E, H>(listener: UnixListener, handle: H) -> Result<(), Error>
where
+
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
+
    CommandResult<E>: From<H::Event>,
+
    E: serde::Serialize,
{
    log::debug!(target: "control", "Control thread listening on socket..");
    let nid = handle.nid()?;
@@ -74,12 +74,12 @@ enum CommandError {
    Io(#[from] io::Error),
}

-
fn command<H: Handle<Error = runtime::HandleError> + 'static>(
-
    stream: &UnixStream,
-
    mut handle: H,
-
) -> Result<(), CommandError>
+
fn command<E, H>(stream: &UnixStream, mut handle: H) -> Result<(), CommandError>
where
+
    H: Handle<Error = runtime::HandleError> + 'static,
    H::Sessions: serde::Serialize,
+
    CommandResult<E>: From<H::Event>,
+
    E: serde::Serialize,
{
    let mut reader = BufReader::new(stream);
    let mut writer = LineWriter::new(stream);
@@ -185,8 +185,7 @@ where
        Command::Subscribe => match handle.subscribe(MAX_TIMEOUT) {
            Ok(events) => {
                for e in events {
-
                    let event = e?;
-
                    CommandResult::Okay(event).to_writer(&mut writer)?;
+
                    CommandResult::from(e).to_writer(&mut writer)?;
                }
            }
            Err(e) => return Err(CommandError::Runtime(e)),
modified radicle-node/src/runtime/handle.rs
@@ -124,6 +124,8 @@ impl Handle {

impl radicle::node::Handle for Handle {
    type Sessions = Vec<radicle::node::Session>;
+
    type Events = Events;
+
    type Event = Event;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Self::Error> {
@@ -263,11 +265,8 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn subscribe(
-
        &self,
-
        _timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Error>>>, Error> {
-
        Ok(Box::new(self.events().into_iter().map(Ok)))
+
    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
+
        Ok(self.events())
    }

    fn sessions(&self) -> Result<Self::Sessions, Error> {
modified radicle-node/src/test/handle.rs
@@ -22,6 +22,8 @@ pub struct Handle {
impl radicle::node::Handle for Handle {
    type Error = HandleError;
    type Sessions = Vec<radicle::node::Session>;
+
    type Events = Vec<Self::Event>;
+
    type Event = Result<Event, Self::Error>;

    fn nid(&self) -> Result<NodeId, Self::Error> {
        Ok(NodeId::from_str("z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK").unwrap())
@@ -81,11 +83,8 @@ impl radicle::node::Handle for Handle {
        Ok(self.following.lock().unwrap().insert(id))
    }

-
    fn subscribe(
-
        &self,
-
        _timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Self::Error>>>, Self::Error> {
-
        Ok(Box::new(std::iter::empty()))
+
    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
+
        Ok(vec![])
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error> {
modified radicle/src/node.rs
@@ -15,6 +15,7 @@ pub mod timestamp;

use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader};
+
use std::marker::PhantomData;
use std::ops::{ControlFlow, Deref};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
@@ -50,6 +51,9 @@ pub const DEFAULT_SOCKET_NAME: &str = "control.sock";
pub const DEFAULT_PORT: u16 = 8776;
/// Default timeout when waiting for the node to respond with data.
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
+
/// Default timeout when waiting for an event to be received on the
+
/// [`Handle::subscribe`] channel.
+
pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
/// Maximum length in bytes of a node alias.
pub const MAX_ALIAS_LENGTH: usize = 32;
/// Penalty threshold at which point we avoid connecting to this node.
@@ -338,6 +342,26 @@ pub enum CommandResult<T> {
    },
}

+
impl<T, E> From<Result<T, E>> for CommandResult<T>
+
where
+
    E: std::error::Error,
+
{
+
    fn from(result: Result<T, E>) -> Self {
+
        match result {
+
            Ok(t) => Self::Okay(t),
+
            Err(e) => Self::Error {
+
                reason: e.to_string(),
+
            },
+
        }
+
    }
+
}
+

+
impl From<Event> for CommandResult<Event> {
+
    fn from(event: Event) -> Self {
+
        Self::Okay(event)
+
    }
+
}
+

/// A success response.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Success {
@@ -856,6 +880,8 @@ pub enum ConnectResult {
pub trait Handle: Clone + Sync + Send {
    /// The peer sessions type.
    type Sessions;
+
    type Events: IntoIterator<Item = Self::Event>;
+
    type Event;
    /// The error returned by all methods.
    type Error: std::error::Error + Send + Sync + 'static;

@@ -905,10 +931,53 @@ pub trait Handle: Clone + Sync + Send {
    /// Query the peer session state.
    fn sessions(&self) -> Result<Self::Sessions, Self::Error>;
    /// Subscribe to node events.
-
    fn subscribe(
-
        &self,
-
        timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Self::Error>>>, Self::Error>;
+
    fn subscribe(&self, timeout: time::Duration) -> Result<Self::Events, Self::Error>;
+
}
+

+
/// Iterator of results `T` when passing a [`Command`] to [`Node::call`].
+
///
+
/// The iterator blocks for a `timeout` duration, returning [`Error::TimedOut`]
+
/// if the duration is reached.
+
pub struct LineIter<T> {
+
    stream: BufReader<UnixStream>,
+
    timeout: time::Duration,
+
    witness: PhantomData<T>,
+
}
+

+
impl<T: DeserializeOwned> Iterator for LineIter<T> {
+
    type Item = Result<T, Error>;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        let mut l = String::new();
+

+
        self.stream
+
            .get_ref()
+
            .set_read_timeout(Some(self.timeout))
+
            .ok();
+

+
        match self.stream.read_line(&mut l) {
+
            Ok(0) => None,
+
            Ok(_) => {
+
                let result: CommandResult<T> = match json::from_str(&l) {
+
                    Err(e) => {
+
                        return Some(Err(Error::InvalidJson {
+
                            response: l.clone(),
+
                            error: e,
+
                        }))
+
                    }
+
                    Ok(result) => result,
+
                };
+
                match result {
+
                    CommandResult::Okay(result) => Some(Ok(result)),
+
                    CommandResult::Error { reason } => Some(Err(Error::Command { reason })),
+
                }
+
            }
+
            Err(e) => match e.kind() {
+
                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Some(Err(Error::TimedOut)),
+
                _ => Some(Err(Error::Io(e))),
+
            },
+
        }
+
    }
}

/// Public node & device identifier.
@@ -929,33 +998,19 @@ impl Node {
    }

    /// Call a command on the node.
-
    pub fn call<T: DeserializeOwned>(
+
    pub fn call<T: DeserializeOwned + Send + 'static>(
        &self,
        cmd: Command,
        timeout: time::Duration,
-
    ) -> Result<impl Iterator<Item = Result<T, Error>>, Error> {
+
    ) -> Result<LineIter<T>, Error> {
        let stream = UnixStream::connect(&self.socket)
            .map_err(|e| Error::Connect(self.socket.clone(), e.kind()))?;
        cmd.to_writer(&stream)?;
-

-
        stream.set_read_timeout(Some(timeout))?;
-

-
        Ok(BufReader::new(stream).lines().map(move |l| {
-
            let l = l.map_err(|e| match e.kind() {
-
                io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Error::TimedOut,
-
                _ => Error::Io(e),
-
            })?;
-

-
            let result: CommandResult<T> = json::from_str(&l).map_err(|e| Error::InvalidJson {
-
                response: l.clone(),
-
                error: e,
-
            })?;
-

-
            match result {
-
                CommandResult::Okay(result) => Ok(result),
-
                CommandResult::Error { reason } => Err(Error::Command { reason }),
-
            }
-
        }))
+
        Ok(LineIter {
+
            stream: BufReader::new(stream),
+
            timeout,
+
            witness: PhantomData,
+
        })
    }

    /// Announce refs of the given `rid` to the given seeds.
@@ -1029,6 +1084,8 @@ impl Node {
// attempt to return iterators instead of allocating vecs.
impl Handle for Node {
    type Sessions = Vec<Session>;
+
    type Events = LineIter<Event>;
+
    type Event = Result<Event, Error>;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Error> {
@@ -1049,6 +1106,7 @@ impl Handle for Node {
        let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
            return false;
        };
+

        let Some(Ok(_)) = lines.next() else {
            return false;
        };
@@ -1122,29 +1180,29 @@ impl Handle for Node {
    }

    fn follow(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut lines = self.call::<Success>(Command::Follow { nid, alias }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

    fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

    fn unfollow(&mut self, nid: NodeId) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut lines = self.call::<Success>(Command::Unfollow { nid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

    fn unseed(&mut self, rid: RepoId) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse {})??;
+
        let mut lines = self.call::<Success>(Command::Unseed { rid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }
@@ -1166,26 +1224,21 @@ impl Handle for Node {
    }

    fn update_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
-
        let mut line = self.call::<Success>(Command::UpdateInventory { rid }, DEFAULT_TIMEOUT)?;
-
        let response = line.next().ok_or(Error::EmptyResponse {})??;
+
        let mut lines = self.call::<Success>(Command::UpdateInventory { rid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;

        Ok(response.updated)
    }

-
    fn subscribe(
-
        &self,
-
        timeout: time::Duration,
-
    ) -> Result<Box<dyn Iterator<Item = Result<Event, Error>>>, Error> {
-
        let events = self.call(Command::Subscribe, timeout)?;
-

-
        Ok(Box::new(events))
+
    fn subscribe(&self, timeout: time::Duration) -> Result<LineIter<Event>, Error> {
+
        self.call(Command::Subscribe, timeout)
    }

    fn sessions(&self) -> Result<Self::Sessions, Error> {
        let sessions = self
            .call::<Vec<Session>>(Command::Sessions, DEFAULT_TIMEOUT)?
            .next()
-
            .ok_or(Error::EmptyResponse {})??;
+
            .ok_or(Error::EmptyResponse)??;

        Ok(sessions)
    }