Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Expose listen addresses
cloudhead committed 2 years ago
commit 9c9bbbe7bb54aefd844a4815c04aea3e34e4ab02
parent d86d99e0021258c701deeb293ab3b192ceb1d9aa
9 files changed +83 -18
modified radicle-cli/examples/rad-node.md
@@ -15,7 +15,7 @@ node status` command (or just `rad node` for short):

```
$ rad node status
-
✓ Node is running.
+
✓ Node is running and listening on [..].
```

The node also allows us to query data that it has access to such as
modified radicle-cli/src/commands/node/control.rs
@@ -189,7 +189,21 @@ pub fn connect(

pub fn status(node: &Node, profile: &Profile) -> anyhow::Result<()> {
    if node.is_running() {
-
        term::success!("Node is {}.", term::format::positive("running"));
+
        let listen = node
+
            .listen_addrs()?
+
            .into_iter()
+
            .map(|addr| addr.to_string())
+
            .collect::<Vec<_>>();
+

+
        if listen.is_empty() {
+
            term::success!("Node is {}.", term::format::positive("running"));
+
        } else {
+
            term::success!(
+
                "Node is {} and listening on {}.",
+
                term::format::positive("running"),
+
                listen.join(", ")
+
            );
+
        }
    } else {
        term::info!("Node is {}.", term::format::negative("stopped"));
        term::info!(
modified radicle-node/src/control.rs
@@ -110,6 +110,11 @@ where

            CommandResult::Okay(config).to_writer(writer)?;
        }
+
        Command::ListenAddrs => {
+
            let addrs = handle.listen_addrs()?;
+

+
            CommandResult::Okay(addrs).to_writer(writer)?;
+
        }
        Command::Seeds { rid } => {
            let seeds = handle.seeds(rid)?;

modified radicle-node/src/runtime.rs
@@ -244,8 +244,6 @@ impl Runtime {

            local_addrs.push(local_addr);
            wire.listen(listener);
-

-
            log::info!(target: "node", "Listening on {local_addr}..");
        }
        let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
modified radicle-node/src/runtime/handle.rs
@@ -1,3 +1,4 @@
+
use std::net;
use std::os::unix::net::UnixStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -190,6 +191,12 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

+
    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::ListenAddrs(sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

    fn fetch(
        &mut self,
        id: RepoId,
modified radicle-node/src/service.rs
@@ -13,7 +13,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
-
use std::{fmt, time};
+
use std::{fmt, net, time};

use crossbeam_channel as chan;
use fastrand::Rng;
@@ -179,6 +179,8 @@ pub enum Command {
    Disconnect(NodeId),
    /// Get the node configuration.
    Config(chan::Sender<Config>),
+
    /// Get the node's listen addresses.
+
    ListenAddrs(chan::Sender<Vec<std::net::SocketAddr>>),
    /// Lookup seeds for the given repository in the routing table.
    Seeds(RepoId, chan::Sender<Seeds>),
    /// Fetch the given repository from the network.
@@ -204,6 +206,7 @@ impl fmt::Debug for Command {
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Config(_) => write!(f, "Config"),
+
            Self::ListenAddrs(_) => write!(f, "ListenAddrs"),
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
            Self::Fetch(id, node, _, _) => write!(f, "Fetch({id}, {node})"),
            Self::Seed(id, scope, _) => write!(f, "Seed({id}, {scope})"),
@@ -370,6 +373,8 @@ pub struct Service<D, S, G> {
    started_at: Option<LocalTime>,
    /// Publishes events to subscribers.
    emitter: Emitter<Event>,
+
    /// Local listening addresses.
+
    listening: Vec<net::SocketAddr>,
}

impl<D, S, G> Service<D, S, G>
@@ -427,6 +432,7 @@ where
            last_announce: LocalTime::default(),
            started_at: None,
            emitter,
+
            listening: vec![],
        }
    }

@@ -705,6 +711,9 @@ where
            Command::Config(resp) => {
                resp.send(self.config.clone()).ok();
            }
+
            Command::ListenAddrs(resp) => {
+
                resp.send(self.listening.clone()).ok();
+
            }
            Command::Seeds(rid, resp) => match self.seeds(&rid) {
                Ok(seeds) => {
                    let (connected, disconnected) = seeds.partition();
@@ -1077,6 +1086,12 @@ where
        }
    }

+
    pub fn listening(&mut self, local_addr: net::SocketAddr) {
+
        log::info!(target: "node", "Listening on {local_addr}..");
+

+
        self.listening.push(local_addr);
+
    }
+

    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
        info!(target: "service", "Connected to {} ({:?})", remote, link);
        self.emitter.emit(Event::PeerConnected { nid: remote });
modified radicle-node/src/test/handle.rs
@@ -31,6 +31,10 @@ impl radicle::node::Handle for Handle {
        true
    }

+
    fn listen_addrs(&self) -> Result<Vec<std::net::SocketAddr>, Self::Error> {
+
        Ok(vec![])
+
    }
+

    fn config(&self) -> Result<Config, Self::Error> {
        Ok(Config::new(Alias::new("acme")))
    }
modified radicle-node/src/wire/protocol.rs
@@ -292,6 +292,8 @@ pub struct Wire<D, S, G: Signer + Ecdh> {
    outbound: RandomMap<RawFd, Outbound>,
    /// Inbound peers without a session.
    inbound: RandomMap<RawFd, Inbound>,
+
    /// Listening addresses that are not yet registered.
+
    listening: RandomMap<RawFd, net::SocketAddr>,
    /// Peer (established) sessions.
    peers: Peers,
    /// SOCKS5 proxy address.
@@ -320,11 +322,14 @@ where
            actions: VecDeque::new(),
            inbound: RandomMap::default(),
            outbound: RandomMap::default(),
+
            listening: RandomMap::default(),
            peers: Peers(RandomMap::default()),
        }
    }

    pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
+
        self.listening
+
            .insert(socket.as_raw_fd(), socket.local_addr());
        self.actions.push_back(Action::RegisterListener(socket));
    }

@@ -495,18 +500,23 @@ where
    }

    fn handle_registered(&mut self, fd: RawFd, id: ResourceId, typ: ResourceType) {
-
        if typ == ResourceType::Listener {
-
            // Not interested in listener resource registration.
-
            return;
-
        }
-
        if let Some(outbound) = self.outbound.get_mut(&fd) {
-
            log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
-
            outbound.id = Some(id);
-
        } else if let Some(inbound) = self.inbound.get_mut(&fd) {
-
            log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
-
            inbound.id = Some(id);
-
        } else {
-
            log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
+
        match typ {
+
            ResourceType::Listener => {
+
                if let Some(local_addr) = self.listening.remove(&fd) {
+
                    self.service.listening(local_addr);
+
                }
+
            }
+
            ResourceType::Transport => {
+
                if let Some(outbound) = self.outbound.get_mut(&fd) {
+
                    log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
+
                    outbound.id = Some(id);
+
                } else if let Some(inbound) = self.inbound.get_mut(&fd) {
+
                    log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
+
                    inbound.id = Some(id);
+
                } else {
+
                    log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
+
                }
+
            }
        }
    }

modified radicle/src/node.rs
@@ -435,6 +435,9 @@ pub enum Command {
    /// Get the current node condiguration.
    Config,

+
    /// Get the node's listen addresses.
+
    ListenAddrs,
+

    /// Connect to node with the given address.
    #[serde(rename_all = "camelCase")]
    Connect {
@@ -808,8 +811,10 @@ pub trait Handle: Clone + Sync + Send {

    /// Get the local Node ID.
    fn nid(&self) -> Result<NodeId, Self::Error>;
-
    /// Check if the node is running. to a peer.
+
    /// Check if the node is running.
    fn is_running(&self) -> bool;
+
    /// Get the node's bound listen addresses.
+
    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error>;
    /// Get the current node configuration.
    fn config(&self) -> Result<config::Config, Self::Error>;
    /// Connect to a peer.
@@ -970,6 +975,13 @@ impl Handle for Node {
            .map_err(Error::from)
    }

+
    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Error> {
+
        self.call::<Vec<net::SocketAddr>>(Command::ListenAddrs, DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse)?
+
            .map_err(Error::from)
+
    }
+

    fn is_running(&self) -> bool {
        let Ok(mut lines) = self.call::<Success>(Command::Status, DEFAULT_TIMEOUT) else {
            return false;