Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Improve control socket API
cloudhead committed 2 years ago
commit b6751855388454acf691f47e705f5032fa6ab15f
parent d88cce484a4a0dd557a625b7fdbda216d13993d1
3 files changed +159 -73
modified radicle-node/src/control.rs
@@ -108,45 +108,45 @@ where
        Command::Config => {
            let config = handle.config()?;

-
            json::to_writer(writer, &config)?;
+
            CommandResult::Okay(config).to_writer(writer)?;
        }
        Command::Seeds { rid } => {
            let seeds = handle.seeds(rid)?;

-
            json::to_writer(writer, &seeds)?;
+
            CommandResult::Okay(seeds).to_writer(writer)?;
        }
        Command::Sessions => {
            let sessions = handle.sessions()?;

-
            json::to_writer(writer, &sessions)?;
+
            CommandResult::Okay(sessions).to_writer(writer)?;
        }
        Command::TrackRepo { rid, scope } => match handle.track_repo(rid, scope) {
-
            Ok(updated) => {
-
                CommandResult::Okay { updated }.to_writer(writer)?;
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
            }
            Err(e) => {
                return Err(CommandError::Runtime(e));
            }
        },
        Command::UntrackRepo { rid } => match handle.untrack_repo(rid) {
-
            Ok(updated) => {
-
                CommandResult::Okay { updated }.to_writer(writer)?;
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
            }
            Err(e) => {
                return Err(CommandError::Runtime(e));
            }
        },
        Command::TrackNode { nid, alias } => match handle.track_node(nid, alias) {
-
            Ok(updated) => {
-
                CommandResult::Okay { updated }.to_writer(writer)?;
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
            }
            Err(e) => {
                return Err(CommandError::Runtime(e));
            }
        },
        Command::UntrackNode { nid } => match handle.untrack_node(nid) {
-
            Ok(updated) => {
-
                CommandResult::Okay { updated }.to_writer(writer)?;
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
            }
            Err(e) => {
                return Err(CommandError::Runtime(e));
@@ -165,8 +165,8 @@ where
            CommandResult::ok().to_writer(writer).ok();
        }
        Command::SyncInventory => match handle.sync_inventory() {
-
            Ok(updated) => {
-
                CommandResult::Okay { updated }.to_writer(writer)?;
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
            }
            Err(e) => {
                return Err(CommandError::Runtime(e));
@@ -176,19 +176,17 @@ where
            Ok(events) => {
                for e in events {
                    let event = e?;
-
                    let event = serde_json::to_string(&event)?;
-

-
                    writeln!(&mut writer, "{event}")?;
+
                    CommandResult::Okay(event).to_writer(&mut writer)?;
                }
            }
-
            Err(e) => log::error!(target: "control", "Error subscribing to events: {e}"),
+
            Err(e) => return Err(CommandError::Runtime(e)),
        },
        Command::Status => {
            CommandResult::ok().to_writer(writer).ok();
        }
        Command::NodeId => match handle.nid() {
            Ok(nid) => {
-
                writeln!(writer, "{nid}")?;
+
                CommandResult::Okay(nid).to_writer(writer)?;
            }
            Err(e) => return Err(CommandError::Runtime(e)),
        },
@@ -267,7 +265,7 @@ mod tests {
            let stream = BufReader::new(stream);
            let line = stream.lines().next().unwrap().unwrap();

-
            assert_eq!(line, json::json!({ "status": "ok" }).to_string());
+
            assert_eq!(line, json::json!({}).to_string());
        }

        for rid in &rids {
modified radicle/src/node.rs
@@ -69,14 +69,17 @@ pub enum PingState {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
+
#[serde(rename_all = "camelCase")]
pub enum State {
    /// Initial state for outgoing connections.
    Initial,
    /// Connection attempted successfully.
    Attempted,
    /// Initial state after handshake protocol hand-off.
+
    #[serde(rename_all = "camelCase")]
    Connected {
        /// Connected since this time.
+
        #[serde(with = "crate::serde_ext::localtime::time")]
        since: LocalTime,
        /// Ping state.
        #[serde(skip)]
@@ -85,10 +88,13 @@ pub enum State {
        fetching: HashSet<Id>,
    },
    /// When a peer is disconnected.
+
    #[serde(rename_all = "camelCase")]
    Disconnected {
        /// Since when has this peer been disconnected.
+
        #[serde(with = "crate::serde_ext::localtime::time")]
        since: LocalTime,
        /// When to retry the connection.
+
        #[serde(with = "crate::serde_ext::localtime::time")]
        retry_at: LocalTime,
    },
}
@@ -214,40 +220,48 @@ pub struct ConnectOptions {

/// Result of a command, on the node control socket.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-
#[serde(tag = "status")]
-
pub enum CommandResult {
+
#[serde(untagged)]
+
pub enum CommandResult<T> {
    /// Response on node socket indicating that a command was carried out successfully.
-
    #[serde(rename = "ok")]
-
    Okay {
-
        /// Whether the command had any effect.
-
        #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
-
        updated: bool,
-
    },
+
    Okay(T),
    /// Response on node socket indicating that an error occured.
    Error {
        /// The reason for the error.
+
        #[serde(rename = "error")]
        reason: String,
    },
}

-
impl CommandResult {
+
/// A success response.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
pub struct Success {
+
    /// Whether something was updated.
+
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
+
    updated: bool,
+
}
+

+
impl CommandResult<Success> {
    /// Create an "updated" response.
-
    pub fn updated() -> Self {
-
        Self::Okay { updated: true }
+
    pub fn updated(updated: bool) -> Self {
+
        Self::Okay(Success { updated })
    }

    /// Create an "ok" response.
    pub fn ok() -> Self {
-
        Self::Okay { updated: false }
+
        Self::Okay(Success { updated: false })
    }
+
}

+
impl CommandResult<()> {
    /// Create an error result.
    pub fn error(err: impl std::error::Error) -> Self {
        Self::Error {
            reason: err.to_string(),
        }
    }
+
}

+
impl<T: Serialize> CommandResult<T> {
    /// Write this command result to a stream, including a terminating LF character.
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
@@ -255,15 +269,6 @@ impl CommandResult {
    }
}

-
impl From<CommandResult> for Result<bool, Error> {
-
    fn from(value: CommandResult) -> Self {
-
        match value {
-
            CommandResult::Okay { updated } => Ok(updated),
-
            CommandResult::Error { reason } => Err(Error::Node(reason)),
-
        }
-
    }
-
}
-

/// Peer public protocol address.
#[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, Hash, From, Serialize, Deserialize)]
#[wrapper(Deref, Display, FromStr)]
@@ -438,6 +443,7 @@ impl Seed {
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
/// Represents a set of seeds with associated metadata. Uses an RNG
/// underneath, so every iteration returns a different ordering.
+
#[serde(into = "Vec<Seed>", from = "Vec<Seed>")]
pub struct Seeds(address::AddressBook<NodeId, Seed>);

impl Seeds {
@@ -480,6 +486,20 @@ impl Seeds {
    }
}

+
impl From<Seeds> for Vec<Seed> {
+
    fn from(seeds: Seeds) -> Vec<Seed> {
+
        seeds.0.into_shuffled().map(|(_, v)| v).collect()
+
    }
+
}
+

+
impl From<Vec<Seed>> for Seeds {
+
    fn from(other: Vec<Seed>) -> Seeds {
+
        Seeds(address::AddressBook::from_iter(
+
            other.into_iter().map(|s| (s.nid, s)),
+
        ))
+
    }
+
}
+

/// Announcement result returned by [`Node::announce`].
#[derive(Debug)]
pub struct AnnounceResult {
@@ -631,7 +651,9 @@ impl Error {
pub enum CallError {
    #[error("i/o: {0}")]
    Io(#[from] io::Error),
-
    #[error("received invalid json in response to command: '{response}': {error}")]
+
    #[error("command error: {reason}")]
+
    Command { reason: String },
+
    #[error("received invalid json `{response}` in response to command: {error}")]
    InvalidJson {
        response: String,
        error: json::Error,
@@ -739,12 +761,16 @@ impl Node {
                    e
                }
            })?;
-
            let v = json::from_str(&l).map_err(|e| CallError::InvalidJson {
-
                response: l,
-
                error: e,
-
            })?;
-

-
            Ok(v)
+
            let result: CommandResult<T> =
+
                json::from_str(&l).map_err(|e| CallError::InvalidJson {
+
                    response: l.clone(),
+
                    error: e,
+
                })?;
+

+
            match result {
+
                CommandResult::Okay(result) => Ok(result),
+
                CommandResult::Error { reason } => Err(CallError::Command { reason }),
+
            }
        }))
    }

@@ -806,13 +832,13 @@ impl Handle for Node {
    }

    fn is_running(&self) -> bool {
-
        let Ok(mut lines) = self.call::<CommandResult>(Command::Status, DEFAULT_TIMEOUT) else {
+
        let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
            return false;
        };
-
        let Some(Ok(result)) = lines.next() else {
+
        let Some(Ok(_)) = lines.next() else {
            return false;
        };
-
        matches!(result, CommandResult::Okay { .. })
+
        true
    }

    fn config(&self) -> Result<config::Config, Error> {
@@ -844,8 +870,8 @@ impl Handle for Node {
    }

    fn seeds(&mut self, rid: Id) -> Result<Seeds, Error> {
-
        let seeds: Seeds = self
-
            .call(Command::Seeds { rid }, DEFAULT_TIMEOUT)?
+
        let seeds = self
+
            .call::<Seeds>(Command::Seeds { rid }, DEFAULT_TIMEOUT)?
            .next()
            .ok_or(Error::EmptyResponse)??;

@@ -874,52 +900,52 @@ impl Handle for Node {
    }

    fn track_node(&mut self, nid: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
-
        let mut line = self.call(Command::TrackNode { nid, alias }, DEFAULT_TIMEOUT)?;
-
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut line = self.call::<Success>(Command::TrackNode { nid, alias }, DEFAULT_TIMEOUT)?;
+
        let response = line.next().ok_or(Error::EmptyResponse)??;

-
        response.into()
+
        Ok(response.updated)
    }

    fn track_repo(&mut self, rid: Id, scope: tracking::Scope) -> Result<bool, Error> {
-
        let mut line = self.call(Command::TrackRepo { rid, scope }, DEFAULT_TIMEOUT)?;
-
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut line = self.call::<Success>(Command::TrackRepo { rid, scope }, DEFAULT_TIMEOUT)?;
+
        let response = line.next().ok_or(Error::EmptyResponse)??;

-
        response.into()
+
        Ok(response.updated)
    }

    fn untrack_node(&mut self, nid: NodeId) -> Result<bool, Error> {
-
        let mut line = self.call(Command::UntrackNode { nid }, DEFAULT_TIMEOUT)?;
-
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse)??;
+
        let mut line = self.call::<Success>(Command::UntrackNode { nid }, DEFAULT_TIMEOUT)?;
+
        let response = line.next().ok_or(Error::EmptyResponse)??;

-
        response.into()
+
        Ok(response.updated)
    }

    fn untrack_repo(&mut self, rid: Id) -> Result<bool, Error> {
-
        let mut line = self.call(Command::UntrackRepo { rid }, DEFAULT_TIMEOUT)?;
-
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {})??;
+
        let mut line = self.call::<Success>(Command::UntrackRepo { rid }, DEFAULT_TIMEOUT)?;
+
        let response = line.next().ok_or(Error::EmptyResponse {})??;

-
        response.into()
+
        Ok(response.updated)
    }

    fn announce_refs(&mut self, rid: Id) -> Result<(), Error> {
-
        for line in self.call::<CommandResult>(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)? {
+
        for line in self.call::<Success>(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)? {
            line?;
        }
        Ok(())
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
-
        for line in self.call::<CommandResult>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
+
        for line in self.call::<Success>(Command::AnnounceInventory, DEFAULT_TIMEOUT)? {
            line?;
        }
        Ok(())
    }

    fn sync_inventory(&mut self) -> Result<bool, Error> {
-
        let mut line = self.call(Command::SyncInventory, DEFAULT_TIMEOUT)?;
-
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {})??;
+
        let mut line = self.call::<Success>(Command::SyncInventory, DEFAULT_TIMEOUT)?;
+
        let response = line.next().ok_or(Error::EmptyResponse {})??;

-
        response.into()
+
        Ok(response.updated)
    }

    fn subscribe(
@@ -934,6 +960,7 @@ impl Handle for Node {
                CallError::InvalidJson { .. } => {
                    io::Error::new(io::ErrorKind::InvalidInput, err.to_string())
                }
+
                CallError::Command { reason } => io::Error::new(io::ErrorKind::Other, reason),
            })
        })))
    }
@@ -948,7 +975,7 @@ impl Handle for Node {
    }

    fn shutdown(self) -> Result<(), Error> {
-
        for line in self.call::<CommandResult>(Command::Shutdown, DEFAULT_TIMEOUT)? {
+
        for line in self.call::<Success>(Command::Shutdown, DEFAULT_TIMEOUT)? {
            line?;
        }
        // Wait until the shutdown has completed.
@@ -986,6 +1013,7 @@ impl AliasStore for HashMap<NodeId, Alias> {
#[cfg(test)]
mod test {
    use super::*;
+
    use crate::assert_matches;

    #[test]
    fn test_alias() {
@@ -1001,4 +1029,40 @@ mod test {
        assert!(Alias::from_str("cloud head").is_err());
        assert!(Alias::from_str("cloudhead\n").is_err());
    }
+

+
    #[test]
+
    fn test_command_result() {
+
        #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+
        struct Test {
+
            value: u32,
+
        }
+

+
        assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
+
        assert_eq!(
+
            json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
+
            "{\"value\":42}"
+
        );
+
        assert_eq!(
+
            json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
+
            CommandResult::Okay(Test { value: 42 })
+
        );
+
        assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
+
        assert_eq!(
+
            json::to_string(&CommandResult::updated(true)).unwrap(),
+
            "{\"updated\":true}"
+
        );
+
        assert_eq!(
+
            json::to_string(&CommandResult::error(io::Error::from(
+
                io::ErrorKind::NotFound
+
            )))
+
            .unwrap(),
+
            "{\"error\":\"entity not found\"}"
+
        );
+
        assert_matches!(
+
            json::from_str::<CommandResult<Seeds>>(
+
                r#"[{"nid":"z6Mkux1aUQD2voWWukVb5nNUR7thrHveQG4pDQua8nVhib7Z","addrs":[],"state":{"connected":{"since":1699636852107,"fetching":[]}}}]"#
+
            ),
+
            Ok(CommandResult::Okay(_))
+
        );
+
    }
}
modified radicle/src/node/address/types.rs
@@ -6,9 +6,9 @@ use localtime::LocalTime;
use nonempty::NonEmpty;

use crate::collections::RandomMap;
-
use crate::node;
use crate::node::{Address, Alias};
use crate::prelude::Timestamp;
+
use crate::{node, profile};

/// A map with the ability to randomly select values.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -59,8 +59,8 @@ impl<K: hash::Hash + Eq, V> AddressBook<K, V> {
    }
}

-
impl<K: hash::Hash + Eq + Ord, V> AddressBook<K, V> {
-
    /// Return a shuffled iterator over the keys.
+
impl<K: hash::Hash + Eq + Ord + Copy, V> AddressBook<K, V> {
+
    /// Return a shuffled iterator.
    pub fn shuffled(&self) -> std::vec::IntoIter<(&K, &V)> {
        let mut items = self.inner.iter().collect::<Vec<_>>();
        items.sort_by_key(|(k, _)| *k);
@@ -69,12 +69,36 @@ impl<K: hash::Hash + Eq + Ord, V> AddressBook<K, V> {
        items.into_iter()
    }

+
    /// Turn this object into a shuffled iterator.
+
    pub fn into_shuffled(self) -> impl Iterator<Item = (K, V)> {
+
        let mut items = self.inner.into_iter().collect::<Vec<_>>();
+
        items.sort_by_key(|(k, _)| *k);
+
        self.rng.borrow_mut().shuffle(&mut items);
+

+
        items.into_iter()
+
    }
+

    /// Cycle through the keys at random. The random cycle repeats ad-infintum.
    pub fn cycle(&self) -> impl Iterator<Item = &K> {
        self.shuffled().map(|(k, _)| k).cycle()
    }
}

+
impl<K: hash::Hash + Eq, V> FromIterator<(K, V)> for AddressBook<K, V> {
+
    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+
        let rng = profile::env::rng();
+
        let mut inner = RandomMap::with_hasher(rng.clone().into());
+

+
        for (k, v) in iter {
+
            inner.insert(k, v);
+
        }
+
        Self {
+
            inner,
+
            rng: RefCell::new(rng),
+
        }
+
    }
+
}
+

impl<K: hash::Hash + Eq, V> Deref for AddressBook<K, V> {
    type Target = RandomMap<K, V>;