Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
radicle/node: Create submodule "command"
Lorenz Leutgeb committed 7 months ago
commit 53f71384b64b6d20b868512db71a37a5eaa4a282
parent ed5fcd5e462a1156bbeacebf127ebf413eec6725
2 files changed +299 -213
modified crates/radicle/src/node.rs
@@ -3,6 +3,7 @@
mod features;

pub mod address;
+
pub mod command;
pub mod config;
pub mod db;
pub mod device;
@@ -43,6 +44,7 @@ use crate::storage::refs::RefsAt;
use crate::storage::RefUpdate;

pub use address::KnownAddress;
+
pub use command::{Command, CommandResult, ConnectOptions, Success, DEFAULT_TIMEOUT};
pub use config::Config;
pub use cyphernet::addr::{HostName, PeerAddr};
pub use db::Database;
@@ -55,8 +57,6 @@ pub use timestamp::Timestamp;
pub const PROTOCOL_VERSION: u8 = 1;
/// Default radicle protocol port.
pub const DEFAULT_PORT: u16 = 8776;
-
/// Default timeout when waiting for the node to respond with data.
-
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
/// Default timeout when waiting for an event to be received on the
/// [`Handle::subscribe`] channel.
pub const DEFAULT_SUBSCRIBE_TIMEOUT: time::Duration = time::Duration::from_secs(5);
@@ -436,97 +436,6 @@ impl TryFrom<&sqlite::Value> for Alias {
    }
}

-
/// Options passed to the "connect" node command.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
-
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
-
pub struct ConnectOptions {
-
    /// Establish a persistent connection.
-
    pub persistent: bool,
-
    /// How long to wait for the connection to be established.
-
    pub timeout: time::Duration,
-
}
-

-
impl Default for ConnectOptions {
-
    fn default() -> Self {
-
        Self {
-
            persistent: false,
-
            timeout: DEFAULT_TIMEOUT,
-
        }
-
    }
-
}
-

-
/// Result of a command, on the node control socket.
-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-
#[serde(untagged)]
-
pub enum CommandResult<T> {
-
    /// Response on node socket indicating that a command was carried out successfully.
-
    Okay(T),
-
    /// Response on node socket indicating that an error occured.
-
    Error {
-
        /// The reason for the error.
-
        #[serde(rename = "error")]
-
        reason: String,
-
    },
-
}
-

-
impl<T, E> From<Result<T, E>> for CommandResult<T>
-
where
-
    E: std::error::Error,
-
{
-
    fn from(result: Result<T, E>) -> Self {
-
        match result {
-
            Ok(t) => Self::Okay(t),
-
            Err(e) => Self::Error {
-
                reason: e.to_string(),
-
            },
-
        }
-
    }
-
}
-

-
impl From<Event> for CommandResult<Event> {
-
    fn from(event: Event) -> Self {
-
        Self::Okay(event)
-
    }
-
}
-

-
/// A success response.
-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
-
pub struct Success {
-
    /// Whether something was updated.
-
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
-
    updated: bool,
-
}
-

-
impl CommandResult<Success> {
-
    /// Create an "updated" response.
-
    pub fn updated(updated: bool) -> Self {
-
        Self::Okay(Success { updated })
-
    }
-

-
    /// Create an "ok" response.
-
    pub fn ok() -> Self {
-
        Self::Okay(Success { updated: false })
-
    }
-
}
-

-
impl CommandResult<()> {
-
    /// Create an error result.
-
    pub fn error(err: impl std::error::Error) -> Self {
-
        Self::Error {
-
            reason: err.to_string(),
-
        }
-
    }
-
}
-

-
impl<T: Serialize> CommandResult<T> {
-
    /// Write this command result to a stream, including a terminating LF character.
-
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
-
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
-
        w.write_all(b"\n")
-
    }
-
}
-

/// Peer public protocol address.
#[derive(Clone, Eq, PartialEq, Debug, Hash, From, Wrapper, WrapperMut, Serialize, Deserialize)]
#[wrapper(Deref, Display, FromStr)]
@@ -608,126 +517,6 @@ impl From<Address> for HostName {
    }
}

-
/// Command name.
-
#[derive(Debug, Clone, Serialize, Deserialize)]
-
#[serde(rename_all = "camelCase", tag = "command")]
-
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
-
pub enum Command {
-
    /// Announce repository references for given repository to peers.
-
    #[serde(rename_all = "camelCase")]
-
    AnnounceRefs { rid: RepoId },
-

-
    /// Announce local repositories to peers.
-
    #[serde(rename_all = "camelCase")]
-
    AnnounceInventory,
-

-
    /// Update node's inventory.
-
    AddInventory { rid: RepoId },
-

-
    /// Get the current node condiguration.
-
    Config,
-

-
    /// Get the node's listen addresses.
-
    ListenAddrs,
-

-
    /// Connect to node with the given address.
-
    #[serde(rename_all = "camelCase")]
-
    Connect {
-
        addr: config::ConnectAddress,
-
        opts: ConnectOptions,
-
    },
-

-
    /// Disconnect from a node.
-
    #[serde(rename_all = "camelCase")]
-
    Disconnect {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
    },
-

-
    /// Lookup seeds for the given repository in the routing table.
-
    #[serde(rename_all = "camelCase")]
-
    Seeds { rid: RepoId },
-

-
    /// Get the current peer sessions.
-
    Sessions,
-

-
    /// Get a specific peer session.
-
    Session {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
    },
-

-
    /// Fetch the given repository from the network.
-
    #[serde(rename_all = "camelCase")]
-
    Fetch {
-
        rid: RepoId,
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
        timeout: time::Duration,
-
    },
-

-
    /// Seed the given repository.
-
    #[serde(rename_all = "camelCase")]
-
    Seed { rid: RepoId, scope: policy::Scope },
-

-
    /// Unseed the given repository.
-
    #[serde(rename_all = "camelCase")]
-
    Unseed { rid: RepoId },
-

-
    /// Follow the given node.
-
    #[serde(rename_all = "camelCase")]
-
    Follow {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
        alias: Option<Alias>,
-
    },
-

-
    /// Unfollow the given node.
-
    #[serde(rename_all = "camelCase")]
-
    Unfollow {
-
        #[cfg_attr(
-
            feature = "schemars",
-
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
-
        )]
-
        nid: NodeId,
-
    },
-

-
    /// Get the node's status.
-
    Status,
-

-
    /// Get node debug information.
-
    Debug,
-

-
    /// Get the node's NID.
-
    NodeId,
-

-
    /// Shutdown the node.
-
    Shutdown,
-

-
    /// Subscribe to events.
-
    Subscribe,
-
}
-

-
impl Command {
-
    /// Write this command to a stream, including a terminating LF character.
-
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
-
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
-
        w.write_all(b"\n")
-
    }
-
}
-

/// Connection link direction.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
added crates/radicle/src/node/command.rs
@@ -0,0 +1,297 @@
+
//! Commands sent to the node via the control socket, and auxiliary types, as
+
//! well as their results (responses on the socket).
+

+
use std::io;
+
use std::time;
+

+
use serde::{Deserialize, Serialize};
+
use serde_json as json;
+

+
use crate::identity::RepoId;
+

+
use super::events::Event;
+
use super::NodeId;
+

+
/// Default timeout when waiting for the node to respond with data.
+
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(30);
+

+
/// Command name.
+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
#[serde(rename_all = "camelCase", tag = "command")]
+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
pub enum Command {
+
    /// Announce repository references for given repository to peers.
+
    #[serde(rename_all = "camelCase")]
+
    AnnounceRefs { rid: RepoId },
+

+
    /// Announce local repositories to peers.
+
    #[serde(rename_all = "camelCase")]
+
    AnnounceInventory,
+

+
    /// Update node's inventory.
+
    AddInventory { rid: RepoId },
+

+
    /// Get the current node condiguration.
+
    Config,
+

+
    /// Get the node's listen addresses.
+
    ListenAddrs,
+

+
    /// Connect to node with the given address.
+
    #[serde(rename_all = "camelCase")]
+
    Connect {
+
        addr: super::config::ConnectAddress,
+
        opts: ConnectOptions,
+
    },
+

+
    /// Disconnect from a node.
+
    #[serde(rename_all = "camelCase")]
+
    Disconnect {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
    },
+

+
    /// Lookup seeds for the given repository in the routing table.
+
    #[serde(rename_all = "camelCase")]
+
    Seeds { rid: RepoId },
+

+
    /// Get the current peer sessions.
+
    Sessions,
+

+
    /// Get a specific peer session.
+
    Session {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
    },
+

+
    /// Fetch the given repository from the network.
+
    #[serde(rename_all = "camelCase")]
+
    Fetch {
+
        rid: RepoId,
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
        timeout: time::Duration,
+
    },
+

+
    /// Seed the given repository.
+
    #[serde(rename_all = "camelCase")]
+
    Seed {
+
        rid: RepoId,
+
        scope: super::policy::Scope,
+
    },
+

+
    /// Unseed the given repository.
+
    #[serde(rename_all = "camelCase")]
+
    Unseed { rid: RepoId },
+

+
    /// Follow the given node.
+
    #[serde(rename_all = "camelCase")]
+
    Follow {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
        alias: Option<super::Alias>,
+
    },
+

+
    /// Unfollow the given node.
+
    #[serde(rename_all = "camelCase")]
+
    Unfollow {
+
        #[cfg_attr(
+
            feature = "schemars",
+
            schemars(with = "crate::schemars_ext::crypto::PublicKey")
+
        )]
+
        nid: NodeId,
+
    },
+

+
    /// Get the node's status.
+
    Status,
+

+
    /// Get node debug information.
+
    Debug,
+

+
    /// Get the node's NID.
+
    NodeId,
+

+
    /// Shutdown the node.
+
    Shutdown,
+

+
    /// Subscribe to events.
+
    Subscribe,
+
}
+

+
impl Command {
+
    /// Write this command to a stream, including a terminating LF character.
+
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
+
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
+
        w.write_all(b"\n")
+
    }
+
}
+

+
/// Options passed to the "connect" node command.
+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
pub struct ConnectOptions {
+
    /// Establish a persistent connection.
+
    pub persistent: bool,
+
    /// How long to wait for the connection to be established.
+
    pub timeout: time::Duration,
+
}
+

+
impl Default for ConnectOptions {
+
    fn default() -> Self {
+
        Self {
+
            persistent: false,
+
            timeout: DEFAULT_TIMEOUT,
+
        }
+
    }
+
}
+

+
/// Result of a command, on the node control socket.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[serde(untagged)]
+
pub enum CommandResult<T> {
+
    /// Response on node socket indicating that a command was carried out successfully.
+
    Okay(T),
+
    /// Response on node socket indicating that an error occured.
+
    Error {
+
        /// The reason for the error.
+
        #[serde(rename = "error")]
+
        reason: String,
+
    },
+
}
+

+
impl<T, E> From<Result<T, E>> for CommandResult<T>
+
where
+
    E: std::error::Error,
+
{
+
    fn from(result: Result<T, E>) -> Self {
+
        match result {
+
            Ok(t) => Self::Okay(t),
+
            Err(e) => Self::Error {
+
                reason: e.to_string(),
+
            },
+
        }
+
    }
+
}
+

+
impl From<Event> for CommandResult<Event> {
+
    fn from(event: Event) -> Self {
+
        Self::Okay(event)
+
    }
+
}
+

+
/// A success response.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
+
pub struct Success {
+
    /// Whether something was updated.
+
    #[serde(default, skip_serializing_if = "crate::serde_ext::is_default")]
+
    pub(super) updated: bool,
+
}
+

+
impl CommandResult<Success> {
+
    /// Create an "updated" response.
+
    pub fn updated(updated: bool) -> Self {
+
        Self::Okay(Success { updated })
+
    }
+

+
    /// Create an "ok" response.
+
    pub fn ok() -> Self {
+
        Self::Okay(Success { updated: false })
+
    }
+
}
+

+
impl CommandResult<()> {
+
    /// Create an error result.
+
    pub fn error(err: impl std::error::Error) -> Self {
+
        Self::Error {
+
            reason: err.to_string(),
+
        }
+
    }
+
}
+

+
impl<T: Serialize> CommandResult<T> {
+
    /// Write this command result to a stream, including a terminating LF character.
+
    pub fn to_writer(&self, mut w: impl io::Write) -> io::Result<()> {
+
        json::to_writer(&mut w, self).map_err(|_| io::ErrorKind::InvalidInput)?;
+
        w.write_all(b"\n")
+
    }
+
}
+

+
#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
+
mod test {
+
    use super::*;
+
    use std::collections::VecDeque;
+

+
    use localtime::LocalTime;
+

+
    use crate::assert_matches;
+
    use crate::node::{Seeds, State};
+

+
    #[test]
+
    fn command_result() {
+
        #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+
        struct Test {
+
            value: u32,
+
        }
+

+
        assert_eq!(json::to_string(&CommandResult::Okay(true)).unwrap(), "true");
+
        assert_eq!(
+
            json::to_string(&CommandResult::Okay(Test { value: 42 })).unwrap(),
+
            "{\"value\":42}"
+
        );
+
        assert_eq!(
+
            json::from_str::<CommandResult<Test>>("{\"value\":42}").unwrap(),
+
            CommandResult::Okay(Test { value: 42 })
+
        );
+
        assert_eq!(json::to_string(&CommandResult::ok()).unwrap(), "{}");
+
        assert_eq!(
+
            json::to_string(&CommandResult::updated(true)).unwrap(),
+
            "{\"updated\":true}"
+
        );
+
        assert_eq!(
+
            json::to_string(&CommandResult::error(io::Error::from(
+
                io::ErrorKind::NotFound
+
            )))
+
            .unwrap(),
+
            "{\"error\":\"entity not found\"}"
+
        );
+

+
        json::from_str::<CommandResult<State>>(
+
            &serde_json::to_string(&CommandResult::Okay(State::Connected {
+
                since: LocalTime::now(),
+
                ping: Default::default(),
+
                fetching: Default::default(),
+
                latencies: VecDeque::default(),
+
                stable: false,
+
            }))
+
            .unwrap(),
+
        )
+
        .unwrap();
+

+
        assert_matches!(
+
            json::from_str::<CommandResult<State>>(
+
                r#"{"connected":{"since":1699636852107,"fetching":[]}}"#
+
            ),
+
            Ok(CommandResult::Okay(_))
+
        );
+
        assert_matches!(
+
            json::from_str::<CommandResult<Seeds>>(
+
                r#"[{"nid":"z6MksmpU5b1dS7oaqF2bHXhQi1DWy2hB7Mh9CuN7y1DN6QSz","addrs":[{"addr":"seed.radicle.example.com:8776","source":"peer","lastSuccess":1699983994234,"lastAttempt":1699983994000,"banned":false}],"state":{"connected":{"since":1699983994,"fetching":[]}}}]"#
+
            ),
+
            Ok(CommandResult::Okay(_))
+
        );
+
    }
+
}