Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Add commnads to query internal service state
Alexis Sellier committed 3 years ago
commit 15f525299e05d508380f0b12ae639612f79cd241
parent efdc8c52c77fa6d3afbc602b9334a8f11cedf77f
6 files changed +191 -48
modified radicle-node/src/client/handle.rs
@@ -1,4 +1,5 @@
use std::net;
+
use std::sync::Arc;

use crossbeam_channel as chan;
use nakamoto_net::Waker;
@@ -6,7 +7,8 @@ use thiserror::Error;

use crate::identity::Id;
use crate::service;
-
use crate::service::{CommandError, FetchLookup};
+
use crate::service::{CommandError, FetchLookup, QueryState};
+
use crate::service::{NodeId, Session};

/// An error resulting from a handle method.
#[derive(Error, Debug)]
@@ -88,6 +90,45 @@ impl<W: Waker> traits::Handle for Handle<W> {
        Ok(())
    }

+
    fn routing(&self) -> Result<chan::Receiver<(Id, Vec<NodeId>)>, Error> {
+
        let (sender, receiver) = chan::unbounded();
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            for (id, nodes) in state.routing().iter() {
+
                if sender.send((*id, nodes.iter().cloned().collect())).is_err() {
+
                    break;
+
                }
+
            }
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        Ok(receiver)
+
    }
+

+
    fn sessions(&self) -> Result<chan::Receiver<(NodeId, Session)>, Error> {
+
        // TODO: This can be implemented once we have real peer sessions.
+
        todo!()
+
    }
+

+
    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
+
        let (sender, receiver) = chan::unbounded();
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            for id in state.inventory()?.iter() {
+
                if sender.send(*id).is_err() {
+
                    break;
+
                }
+
            }
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        Ok(receiver)
+
    }
+

    /// Ask the client to shutdown.
    fn shutdown(self) -> Result<(), Error> {
        self.shutdown.send(())?;
@@ -114,5 +155,11 @@ pub mod traits {
        fn command(&self, cmd: service::Command) -> Result<(), Error>;
        /// Ask the client to shutdown.
        fn shutdown(self) -> Result<(), Error>;
+
        /// Query the routing table.
+
        fn routing(&self) -> Result<chan::Receiver<(Id, Vec<NodeId>)>, Error>;
+
        /// Query the peer session state.
+
        fn sessions(&self) -> Result<chan::Receiver<(NodeId, Session)>, Error>;
+
        /// Query the inventory.
+
        fn inventory(&self) -> Result<chan::Receiver<Id>, Error>;
    }
}
modified radicle-node/src/control.rs
@@ -52,8 +52,6 @@ enum DrainError {
    InvalidCommandArg(String),
    #[error("unknown command `{0}`")]
    UnknownCommand(String),
-
    #[error("invalid command")]
-
    InvalidCommand,
    #[error("client error: {0}")]
    Client(#[from] client::handle::Error),
    #[error("i/o error: {0}")]
@@ -101,7 +99,38 @@ fn drain<H: Handle>(stream: &UnixStream, handle: &H) -> Result<(), DrainError> {
                }
            }
            Some((cmd, _)) => return Err(DrainError::UnknownCommand(cmd.to_owned())),
-
            None => return Err(DrainError::InvalidCommand),
+

+
            // Commands with no arguments.
+
            None => match line.as_str() {
+
                "routing" => match handle.routing() {
+
                    Ok(c) => {
+
                        let mut writer = LineWriter::new(stream);
+

+
                        for (id, seeds) in c.iter() {
+
                            let seeds = seeds
+
                                .into_iter()
+
                                .map(String::from)
+
                                .collect::<Vec<_>>()
+
                                .join(" ");
+
                            writeln!(writer, "{id} {seeds}",)?;
+
                        }
+
                    }
+
                    Err(e) => return Err(DrainError::Client(e)),
+
                },
+
                "inventory" => match handle.inventory() {
+
                    Ok(c) => {
+
                        let mut writer = LineWriter::new(stream);
+

+
                        for id in c.iter() {
+
                            writeln!(writer, "{id}")?;
+
                        }
+
                    }
+
                    Err(e) => return Err(DrainError::Client(e)),
+
                },
+
                _ => {
+
                    return Err(DrainError::UnknownCommand(line));
+
                }
+
            },
        }
    }
    Ok(())
modified radicle-node/src/service.rs
@@ -6,6 +6,7 @@ pub mod reactor;

use std::collections::BTreeMap;
use std::ops::{Deref, DerefMut};
+
use std::sync::Arc;
use std::{fmt, net, net::IpAddr};

use crossbeam_channel as chan;
@@ -30,12 +31,13 @@ use crate::identity::{Doc, Id};
use crate::service::config::ProjectTracking;
use crate::service::message::{Address, Announcement, AnnouncementMessage};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
-
use crate::service::peer::{Session, SessionError, SessionState};
+
use crate::service::peer::{SessionError, SessionState};
use crate::storage;
use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, WriteStorage};

pub use crate::service::config::{Config, Network};
pub use crate::service::message::{Envelope, Message};
+
pub use crate::service::peer::Session;

use self::gossip::Gossip;
use self::message::{InventoryAnnouncement, NodeFeatures};
@@ -109,19 +111,44 @@ pub enum FetchResult {
    },
}

+
/// Function used to query internal service state.
+
pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Send + Sync;
+

/// Commands sent to the service by the operator.
-
#[derive(Debug)]
pub enum Command {
+
    /// Announce repository references for given project id to peers.
    AnnounceRefs(Id),
+
    /// Connect to node with the given address.
    Connect(net::SocketAddr),
+
    /// Fetch the given project from the network.
    Fetch(Id, chan::Sender<FetchLookup>),
+
    /// Track the given project.
    Track(Id, chan::Sender<bool>),
+
    /// Untrack the given project.
    Untrack(Id, chan::Sender<bool>),
+
    /// Query the internal service state.
+
    QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
+
}
+

+
impl fmt::Debug for Command {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({})", id),
+
            Self::Connect(addr) => write!(f, "Connect({})", addr),
+
            Self::Fetch(id, _) => write!(f, "Fetch({})", id),
+
            Self::Track(id, _) => write!(f, "Track({})", id),
+
            Self::Untrack(id, _) => write!(f, "Untrack({})", id),
+
            Self::QueryState { .. } => write!(f, "QueryState(..)"),
+
        }
+
    }
}

/// Command-related errors.
#[derive(thiserror::Error, Debug)]
-
pub enum CommandError {}
+
pub enum CommandError {
+
    #[error(transparent)]
+
    Storage(#[from] storage::Error),
+
}

#[derive(Debug)]
pub struct Service<A, S, G> {
@@ -163,6 +190,21 @@ pub struct Service<A, S, G> {

impl<A, S, G> Service<A, S, G>
where
+
    G: crypto::Signer,
+
{
+
    /// Get the local node id.
+
    pub fn node_id(&self) -> NodeId {
+
        *self.signer.public_key()
+
    }
+

+
    /// Get the local service time.
+
    pub fn local_time(&self) -> LocalTime {
+
        self.clock.local_time()
+
    }
+
}
+

+
impl<A, S, G> Service<A, S, G>
+
where
    A: address_book::Store,
    S: WriteStorage + 'static,
    G: crypto::Signer,
@@ -201,10 +243,6 @@ where
        }
    }

-
    pub fn node_id(&self) -> NodeId {
-
        *self.signer.public_key()
-
    }
-

    pub fn seeds(&self, id: &Id) -> Box<dyn Iterator<Item = (&NodeId, &Session)> + '_> {
        if let Some(peers) = self.routing.get(id) {
            Box::new(
@@ -255,16 +293,6 @@ where
        todo!()
    }

-
    /// Get the connected peers.
-
    pub fn sessions(&self) -> &Sessions {
-
        &self.sessions
-
    }
-

-
    /// Get the current inventory.
-
    pub fn inventory(&self) -> Result<Inventory, storage::Error> {
-
        self.storage.inventory()
-
    }
-

    /// Get the storage instance.
    pub fn storage(&self) -> &S {
        &self.storage
@@ -275,36 +303,11 @@ where
        &mut self.storage
    }

-
    /// Get a project from storage, using the local node's key.
-
    pub fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::Error> {
-
        self.storage.get(&self.node_id(), proj)
-
    }
-

    /// Get the local signer.
    pub fn signer(&self) -> &G {
        &self.signer
    }

-
    /// Get the clock.
-
    pub fn clock(&self) -> &RefClock {
-
        &self.clock
-
    }
-

-
    /// Get the local service time.
-
    pub fn local_time(&self) -> LocalTime {
-
        self.clock.local_time()
-
    }
-

-
    /// Get service configuration.
-
    pub fn config(&self) -> &Config {
-
        &self.config
-
    }
-

-
    /// Get reference to routing table.
-
    pub fn routing(&self) -> &Routing {
-
        &self.routing
-
    }
-

    /// Get I/O reactor.
    pub fn reactor(&mut self) -> &mut Reactor {
        &mut self.reactor
@@ -463,6 +466,9 @@ where

                self.reactor.broadcast(ann, peers);
            }
+
            Command::QueryState(query, sender) => {
+
                sender.send(query(self)).ok();
+
            }
        }
    }

@@ -789,6 +795,52 @@ where
    }
}

+
/// Gives read access to the service state.
+
pub trait ServiceState {
+
    /// Get the connected peers.
+
    fn sessions(&self) -> &Sessions;
+
    /// Get the current inventory.
+
    fn inventory(&self) -> Result<Inventory, storage::Error>;
+
    /// Get a project from storage, using the local node's key.
+
    fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::Error>;
+
    /// Get the clock.
+
    fn clock(&self) -> &RefClock;
+
    /// Get service configuration.
+
    fn config(&self) -> &Config;
+
    /// Get reference to routing table.
+
    fn routing(&self) -> &Routing;
+
}
+

+
impl<A, S, G> ServiceState for Service<A, S, G>
+
where
+
    G: Signer,
+
    S: ReadStorage,
+
{
+
    fn sessions(&self) -> &Sessions {
+
        &self.sessions
+
    }
+

+
    fn inventory(&self) -> Result<Inventory, storage::Error> {
+
        self.storage.inventory()
+
    }
+

+
    fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::Error> {
+
        self.storage.get(&self.node_id(), proj)
+
    }
+

+
    fn clock(&self) -> &RefClock {
+
        &self.clock
+
    }
+

+
    fn config(&self) -> &Config {
+
        &self.config
+
    }
+

+
    fn routing(&self) -> &Routing {
+
        &self.routing
+
    }
+
}
+

#[derive(Debug, Clone)]
pub enum DisconnectReason {
    User,
modified radicle-node/src/service/peer.rs
@@ -1,7 +1,7 @@
use crate::service::message::*;
use crate::service::*;

-
#[derive(Debug, Default)]
+
#[derive(Debug, Default, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum SessionState {
    /// Initial peer state. For outgoing peers this
@@ -38,7 +38,7 @@ pub enum SessionError {
}

/// A peer session. Each connected peer will have one session.
-
#[derive(Debug)]
+
#[derive(Debug, Clone)]
pub struct Session {
    /// Peer address.
    pub addr: net::SocketAddr,
modified radicle-node/src/test/handle.rs
@@ -1,5 +1,7 @@
use std::sync::{Arc, Mutex};

+
use crossbeam_channel as chan;
+

use crate::client::handle::traits;
use crate::client::handle::Error;
use crate::identity::Id;
@@ -34,6 +36,18 @@ impl traits::Handle for Handle {
        Ok(())
    }

+
    fn routing(&self) -> Result<chan::Receiver<(Id, Vec<service::NodeId>)>, Error> {
+
        unimplemented!();
+
    }
+

+
    fn sessions(&self) -> Result<chan::Receiver<(service::NodeId, service::Session)>, Error> {
+
        unimplemented!();
+
    }
+

+
    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
+
        unimplemented!();
+
    }
+

    fn shutdown(self) -> Result<(), Error> {
        Ok(())
    }
modified radicle-node/src/test/tests.rs
@@ -11,6 +11,7 @@ use crate::service::filter::Filter;
use crate::service::message::*;
use crate::service::peer::*;
use crate::service::reactor::Io;
+
use crate::service::ServiceState as _;
use crate::service::*;
use crate::storage::git::Storage;
use crate::storage::ReadStorage;