Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement `sessions` command
Alexis Sellier committed 2 years ago
commit 1a1e63d99858f6c67c41d360361c01d887a19c34
parent f73389fc626e561cd4ece48c5103ef2e14c84dcc
17 files changed +304 -135
modified Cargo.lock
@@ -1442,9 +1442,12 @@ checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f"

[[package]]
name = "localtime"
-
version = "1.2.0"
+
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b2f2e37f115cdc432fcf760063d45a928f0ea80bcf10be3a6e516cbba883b15e"
+
checksum = "71c67b83b03434bb31132aef0b314b8a49a0db55ce195c7e3c29d27bbf003819"
+
dependencies = [
+
 "serde",
+
]

[[package]]
name = "log"
@@ -1935,6 +1938,7 @@ dependencies = [
 "git-ref-format",
 "json-color",
 "lexopt",
+
 "localtime",
 "log",
 "nonempty 0.8.1",
 "pretty_assertions",
modified radicle-cli/Cargo.toml
@@ -16,6 +16,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "std"
git-ref-format = { version = "0.2.3", features = ["macro"] }
json-color = { version = "0.7" }
lexopt = { version = "0.2" }
+
localtime = { version = "1.2.0" }
log = { version = "0.4", features = ["std"] }
nonempty = { version = "0.8" }
# N.b. this is required to use macros, even though it's re-exported
modified radicle-cli/examples/rad-node.md
@@ -17,7 +17,7 @@ node status` command (or just `rad node` for short):

```
$ rad node status
-
✓ The node is running
+
✓ Node is running
```

The node also allows us to connect with other nodes in the
modified radicle-cli/src/commands/node/control.rs
@@ -4,12 +4,15 @@ use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::{process, thread, time};

use anyhow::Context as _;
+
use localtime::LocalTime;

+
use radicle::node;
use radicle::node::{Address, Handle as _, NodeId};
use radicle::Node;
use radicle::{profile, Profile};

use crate::terminal as term;
+
use crate::terminal::Element as _;

pub fn start(daemon: bool, options: Vec<OsString>, profile: &Profile) -> anyhow::Result<()> {
    // Ask passphrase here, otherwise it'll be a fatal error when running the daemon
@@ -119,10 +122,17 @@ pub fn connect(node: &mut Node, nid: NodeId, addr: Address) -> anyhow::Result<()

pub fn status(node: &Node, profile: &Profile) -> anyhow::Result<()> {
    if node.is_running() {
-
        term::success!("The node is {}", term::format::positive("running"));
+
        term::success!("Node is {}", term::format::positive("running"));
    } else {
-
        term::info!("The node is {}", term::format::negative("stopped"));
+
        term::info!("Node is {}", term::format::negative("stopped"));
    }
+

+
    let sessions = sessions(node)?;
+
    if let Some(table) = sessions {
+
        term::blank();
+
        table.print();
+
    }
+

    if profile.home.node().join("node.log").exists() {
        term::blank();
        // If we're running the node via `systemd` for example, there won't be a log file
@@ -131,3 +141,48 @@ pub fn status(node: &Node, profile: &Profile) -> anyhow::Result<()> {
    }
    Ok(())
}
+

+
pub fn sessions(node: &Node) -> Result<Option<term::Table<4, term::Label>>, node::Error> {
+
    let sessions = node.sessions()?;
+
    if sessions.is_empty() {
+
        return Ok(None);
+
    }
+
    let mut table = term::Table::new(term::table::TableOptions::bordered());
+
    let now = LocalTime::now();
+

+
    table.push([
+
        term::format::bold("Peer").into(),
+
        term::format::bold("Address").into(),
+
        term::format::bold("State").into(),
+
        term::format::bold("Since").into(),
+
    ]);
+
    table.divider();
+

+
    for sess in sessions {
+
        let nid = term::format::tertiary(sess.nid).into();
+
        let (addr, state, time) = match sess.state {
+
            node::State::Initial => (
+
                term::Label::blank(),
+
                term::Label::from(term::format::dim("initial")),
+
                term::Label::blank(),
+
            ),
+
            node::State::Attempted { addr } => (
+
                addr.to_string().into(),
+
                term::Label::from(term::format::tertiary("attempted")),
+
                term::Label::blank(),
+
            ),
+
            node::State::Connected { addr, since, .. } => (
+
                addr.to_string().into(),
+
                term::Label::from(term::format::positive("connected")),
+
                term::format::dim(now - since).into(),
+
            ),
+
            node::State::Disconnected { retry_at, .. } => (
+
                term::Label::blank(),
+
                term::Label::from(term::format::negative("disconnected")),
+
                term::format::dim(retry_at - now).into(),
+
            ),
+
        };
+
        table.push([nid, addr, state, time]);
+
    }
+
    Ok(Some(table))
+
}
modified radicle-node/src/control.rs
@@ -34,7 +34,10 @@ pub enum Error {
pub fn listen<H: Handle<Error = runtime::HandleError> + 'static>(
    listener: UnixListener,
    handle: H,
-
) -> Result<(), Error> {
+
) -> Result<(), Error>
+
where
+
    H::Sessions: serde::Serialize,
+
{
    log::debug!(target: "control", "Control thread listening on socket..");
    let nid = handle.nid()?;

@@ -79,7 +82,10 @@ enum CommandError {
fn command<H: Handle<Error = runtime::HandleError> + 'static>(
    stream: &UnixStream,
    mut handle: H,
-
) -> Result<(), CommandError> {
+
) -> Result<(), CommandError>
+
where
+
    H::Sessions: serde::Serialize,
+
{
    let mut reader = BufReader::new(stream);
    let mut writer = LineWriter::new(stream);
    let mut line = String::new();
@@ -109,6 +115,11 @@ fn command<H: Handle<Error = runtime::HandleError> + 'static>(

            json::to_writer(writer, &seeds)?;
        }
+
        CommandName::Sessions => {
+
            let sessions = handle.sessions()?;
+

+
            json::to_writer(writer, &sessions)?;
+
        }
        CommandName::TrackRepo => {
            let (rid, scope) = parse::args(cmd)?;

modified radicle-node/src/runtime/handle.rs
@@ -14,9 +14,9 @@ use crate::profile::Home;
use crate::runtime::Emitter;
use crate::service;
use crate::service::tracking;
+
use crate::service::NodeId;
use crate::service::{CommandError, QueryState};
use crate::service::{Event, Events};
-
use crate::service::{NodeId, Sessions};
use crate::wire;
use crate::wire::StreamId;
use crate::worker::TaskResult;
@@ -121,7 +121,7 @@ impl Handle {
}

impl radicle::node::Handle for Handle {
-
    type Sessions = Sessions;
+
    type Sessions = Vec<radicle::node::Session>;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Self::Error> {
@@ -211,7 +211,16 @@ impl radicle::node::Handle for Handle {
    fn sessions(&self) -> Result<Self::Sessions, Error> {
        let (sender, receiver) = chan::unbounded();
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            sender.send(state.sessions().clone()).ok();
+
            let sessions = state
+
                .sessions()
+
                .iter()
+
                .map(|(nid, s)| radicle::node::Session {
+
                    nid: *nid,
+
                    state: s.state.clone(),
+
                })
+
                .collect();
+
            sender.send(sessions).ok();
+

            Ok(())
        });
        let (err_sender, err_receiver) = chan::bounded(1);
modified radicle-node/src/service.rs
@@ -710,7 +710,7 @@ where
        }
    }

-
    pub fn connected(&mut self, remote: NodeId, link: Link) {
+
    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
        info!(target: "service", "Connected to {} ({:?})", remote, link);
        self.emitter.emit(Event::PeerConnected { nid: remote });

@@ -736,6 +736,7 @@ where
                Entry::Vacant(e) => {
                    let peer = e.insert(Session::inbound(
                        remote,
+
                        addr,
                        self.config.is_persistent(&remote),
                        self.rng.clone(),
                        self.clock,
@@ -1538,7 +1539,7 @@ where
pub trait ServiceState {
    /// Get the Node ID.
    fn nid(&self) -> &NodeId;
-
    /// Get the connected peers.
+
    /// Get the existing sessions.
    fn sessions(&self) -> &Sessions;
    /// Get a repository from storage, using the local node's key.
    fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, IdentityError>;
modified radicle-node/src/service/session.rs
@@ -1,5 +1,5 @@
use std::collections::{HashSet, VecDeque};
-
use std::{fmt, mem};
+
use std::fmt;

use crate::service::config::Limits;
use crate::service::message;
@@ -7,60 +7,7 @@ use crate::service::message::Message;
use crate::service::{Address, Id, LocalTime, NodeId, Outbox, Rng};
use crate::Link;

-
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
-
pub enum PingState {
-
    #[default]
-
    /// The peer has not been sent a ping.
-
    None,
-
    /// A ping has been sent and is waiting on the peer's response.
-
    AwaitingResponse(u16),
-
    /// The peer was successfully pinged.
-
    Ok,
-
}
-

-
#[derive(Debug, Clone)]
-
#[allow(clippy::large_enum_variant)]
-
pub enum State {
-
    /// Initial state for outgoing connections.
-
    Initial,
-
    /// Connection attempted successfully.
-
    Attempted { addr: Address },
-
    /// Initial state after handshake protocol hand-off.
-
    Connected {
-
        /// Connected since this time.
-
        since: LocalTime,
-
        /// Ping state.
-
        ping: PingState,
-
        /// Ongoing fetches.
-
        fetching: HashSet<Id>,
-
    },
-
    /// When a peer is disconnected.
-
    Disconnected {
-
        /// Since when has this peer been disconnected.
-
        since: LocalTime,
-
        /// When to retry the connection.
-
        retry_at: LocalTime,
-
    },
-
}
-

-
impl fmt::Display for State {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        match self {
-
            Self::Initial => {
-
                write!(f, "initial")
-
            }
-
            Self::Attempted { .. } => {
-
                write!(f, "attempted")
-
            }
-
            Self::Connected { .. } => {
-
                write!(f, "connected")
-
            }
-
            Self::Disconnected { .. } => {
-
                write!(f, "disconnected")
-
            }
-
        }
-
    }
-
}
+
pub use crate::node::{PingState, State};

/// Return value of [`Session::fetch`].
#[derive(Debug)]
@@ -171,6 +118,7 @@ impl Session {

    pub fn inbound(
        id: NodeId,
+
        addr: Address,
        persistent: bool,
        rng: Rng,
        time: LocalTime,
@@ -179,6 +127,7 @@ impl Session {
        Self {
            id,
            state: State::Connected {
+
                addr,
                since: time,
                ping: PingState::default(),
                fetching: HashSet::default(),
@@ -256,19 +205,18 @@ impl Session {
    pub fn to_connected(&mut self, since: LocalTime) -> Address {
        self.attempts = 0;

-
        let previous = mem::replace(
-
            &mut self.state,
-
            State::Connected {
-
                since,
-
                ping: PingState::default(),
-
                fetching: HashSet::default(),
-
            },
-
        );
-
        if let State::Attempted { addr } = previous {
-
            addr
+
        let addr = if let State::Attempted { addr } = &self.state {
+
            addr.clone()
        } else {
-
            panic!("Session::to_connected: can only transition to 'connected' state from 'connecting' state");
-
        }
+
            panic!("Session::to_connected: can only transition to 'connected' state from 'attempted' state");
+
        };
+
        self.state = State::Connected {
+
            addr: addr.clone(),
+
            since,
+
            ping: PingState::default(),
+
            fetching: HashSet::default(),
+
        };
+
        addr
    }

    /// Move the session state to "disconnected". Returns any pending RID
modified radicle-node/src/test/handle.rs
@@ -6,8 +6,8 @@ use std::{io, time};
use crate::identity::Id;
use crate::node::{Event, FetchResult, Seeds};
use crate::runtime::HandleError;
+
use crate::service::tracking;
use crate::service::NodeId;
-
use crate::service::{self, tracking};

#[derive(Default, Clone)]
pub struct Handle {
@@ -18,7 +18,7 @@ pub struct Handle {

impl radicle::node::Handle for Handle {
    type Error = HandleError;
-
    type Sessions = service::Sessions;
+
    type Sessions = Vec<radicle::node::Session>;

    fn nid(&self) -> Result<NodeId, Self::Error> {
        Ok(NodeId::from_str("z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK").unwrap())
modified radicle-node/src/test/peer.rs
@@ -304,7 +304,8 @@ where
        let remote_id = simulator::Peer::<S, G>::id(peer);

        self.initialize();
-
        self.service.connected(remote_id, Link::Inbound);
+
        self.service
+
            .connected(remote_id, peer.address(), Link::Inbound);

        let mut msgs = self.messages(remote_id);
        msgs.find(|m| {
@@ -334,8 +335,9 @@ where
            .find(|o| matches!(o, Io::Connect { .. }))
            .unwrap();

-
        self.service.attempted(remote_id, remote_addr);
-
        self.service.connected(remote_id, Link::Outbound);
+
        self.service.attempted(remote_id, remote_addr.clone());
+
        self.service
+
            .connected(remote_id, remote_addr, Link::Outbound);

        let mut msgs = self.messages(remote_id);
        msgs.find(|m| {
modified radicle-node/src/test/simulator.rs
@@ -8,7 +8,7 @@ use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
use std::rc::Rc;
use std::sync::Arc;
-
use std::{fmt, io};
+
use std::{fmt, io, net};

use localtime::{LocalDuration, LocalTime};
use log::*;
@@ -56,6 +56,8 @@ pub enum Input {
    Connected {
        /// Remote peer id.
        id: NodeId,
+
        /// Remote peer address.
+
        addr: Address,
        /// Link direction.
        link: Link,
    },
@@ -381,13 +383,13 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            p.attempted(id, addr);
                        }
                    }
-
                    Input::Connected { id, link } => {
+
                    Input::Connected { id, addr, link } => {
                        let conn = (node, id);

                        let attempted = link.is_outbound() && self.attempts.remove(&conn);
                        if attempted || link.is_inbound() {
                            if self.connections.insert(conn) {
-
                                p.connected(id, link);
+
                                p.connected(id, addr, link);
                            }
                        }
                    }
@@ -498,7 +500,10 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    Scheduled {
                        node,
                        remote,
-
                        input: Input::Connecting { id: remote, addr },
+
                        input: Input::Connecting {
+
                            id: remote,
+
                            addr: addr.clone(),
+
                        },
                    },
                );

@@ -535,6 +540,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        remote: node,
                        input: Input::Connected {
                            id: node,
+
                            addr: Address::from(net::SocketAddr::from(([0, 0, 0, 0], 0))),
                            link: Link::Inbound,
                        },
                    },
@@ -547,6 +553,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        node,
                        input: Input::Connected {
                            id: remote,
+
                            addr,
                            link: Link::Outbound,
                        },
                    },
modified radicle-node/src/tests.rs
@@ -967,7 +967,7 @@ fn test_persistent_peer_reconnect_success() {
        .expect("Alice attempts a re-connection");

    alice.attempted(bob.id(), bob.addr());
-
    alice.connected(bob.id(), Link::Outbound);
+
    alice.connected(bob.id(), bob.addr(), Link::Outbound);
}

#[test]
modified radicle-node/src/tests/e2e.rs
@@ -729,8 +729,18 @@ fn test_connection_crossing() {

    thread::sleep(time::Duration::from_secs(1));

-
    let s1 = alice.handle.sessions().unwrap().contains_key(&bob.id);
-
    let s2 = bob.handle.sessions().unwrap().contains_key(&alice.id);
+
    let s1 = alice
+
        .handle
+
        .sessions()
+
        .unwrap()
+
        .iter()
+
        .any(|s| s.nid == bob.id);
+
    let s2 = bob
+
        .handle
+
        .sessions()
+
        .unwrap()
+
        .iter()
+
        .any(|s| s.nid == alice.id);

    assert!(s1 ^ s2, "Exactly one session should be established");
}
modified radicle-node/src/wire/protocol.rs
@@ -141,12 +141,17 @@ impl Streams {
/// Peer connection state machine.
enum Peer {
    /// The initial state of an inbound peer before handshake is completed.
-
    Inbound {},
+
    Inbound { addr: NetAddr<HostName> },
    /// The initial state of an outbound peer before handshake is completed.
-
    Outbound { id: NodeId },
+
    Outbound {
+
        addr: NetAddr<HostName>,
+
        nid: NodeId,
+
    },
    /// The state after handshake is completed.
    /// Peers in this state are handled by the underlying service.
    Connected {
+
        #[allow(dead_code)]
+
        addr: NetAddr<HostName>,
        link: Link,
        nid: NodeId,
        inbox: Deserializer<Frame>,
@@ -155,7 +160,7 @@ enum Peer {
    /// The peer was scheduled for disconnection. Once the transport is handed over
    /// by the reactor, we can consider it disconnected.
    Disconnecting {
-
        id: Option<NodeId>,
+
        nid: Option<NodeId>,
        reason: DisconnectReason,
    },
}
@@ -163,8 +168,8 @@ enum Peer {
impl std::fmt::Debug for Peer {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
-
            Self::Inbound {} => write!(f, "Inbound"),
-
            Self::Outbound { id } => write!(f, "Outbound({id})"),
+
            Self::Inbound { addr } => write!(f, "Inbound({addr})"),
+
            Self::Outbound { nid, .. } => write!(f, "Outbound({nid})"),
            Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
        }
@@ -175,49 +180,58 @@ impl Peer {
    /// Return the peer's id, if any.
    fn id(&self) -> Option<&NodeId> {
        match self {
-
            Peer::Outbound { id }
-
            | Peer::Connected { nid: id, .. }
-
            | Peer::Disconnecting { id: Some(id), .. } => Some(id),
-
            Peer::Inbound {} => None,
-
            Peer::Disconnecting { id: None, .. } => None,
+
            Peer::Outbound { nid, .. }
+
            | Peer::Connected { nid, .. }
+
            | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
+
            Peer::Inbound { .. } => None,
+
            Peer::Disconnecting { nid: None, .. } => None,
        }
    }

    /// Return a new inbound connecting peer.
-
    fn inbound() -> Self {
-
        Self::Inbound {}
+
    fn inbound(addr: NetAddr<HostName>) -> Self {
+
        Self::Inbound { addr }
    }

-
    /// Return a new inbound connecting peer.
-
    fn outbound(id: NodeId) -> Self {
-
        Self::Outbound { id }
+
    /// Return a new outbound connecting peer.
+
    fn outbound(addr: NetAddr<HostName>, nid: NodeId) -> Self {
+
        Self::Outbound { addr, nid }
    }

    /// Switch to connected state.
-
    fn connected(&mut self, id: NodeId) -> Link {
-
        if let Self::Inbound {} = self {
+
    fn connected(&mut self, nid: NodeId) -> (NetAddr<HostName>, Link) {
+
        if let Self::Inbound { addr } = self {
            let link = Link::Inbound;
+
            let addr = addr.clone();

            *self = Self::Connected {
                link,
-
                nid: id,
+
                addr: addr.clone(),
+
                nid,
                inbox: Deserializer::default(),
                streams: Streams::new(link),
            };
-
            link
-
        } else if let Self::Outbound { id: expected } = self {
-
            assert_eq!(id, *expected);
+
            (addr, link)
+
        } else if let Self::Outbound {
+
            addr,
+
            nid: expected,
+
        } = self
+
        {
+
            assert_eq!(nid, *expected);
+

            let link = Link::Outbound;
+
            let addr = addr.clone();

            *self = Self::Connected {
                link,
-
                nid: id,
+
                addr: addr.clone(),
+
                nid,
                inbox: Deserializer::default(),
                streams: Streams::new(link),
            };
-
            link
+
            (addr, link)
        } else {
-
            panic!("Peer::connected: session for {id} is already established");
+
            panic!("Peer::connected: session for {nid} is already established");
        }
    }

@@ -227,14 +241,14 @@ impl Peer {
            streams.shutdown();

            *self = Self::Disconnecting {
-
                id: Some(*nid),
+
                nid: Some(*nid),
                reason,
            };
-
        } else if let Self::Inbound {} = self {
-
            *self = Self::Disconnecting { id: None, reason };
-
        } else if let Self::Outbound { id } = self {
+
        } else if let Self::Inbound { .. } = self {
+
            *self = Self::Disconnecting { nid: None, reason };
+
        } else if let Self::Outbound { nid, .. } = self {
            *self = Self::Disconnecting {
-
                id: Some(*id),
+
                nid: Some(*nid),
                reason,
            };
        } else {
@@ -280,9 +294,9 @@ impl Peers {

    fn active(&self) -> impl Iterator<Item = (RawFd, &NodeId)> {
        self.0.iter().filter_map(|(fd, peer)| match peer {
-
            Peer::Inbound {} => None,
-
            Peer::Outbound { id } => Some((*fd, id)),
-
            Peer::Connected { nid: id, .. } => Some((*fd, id)),
+
            Peer::Inbound { .. } => None,
+
            Peer::Outbound { nid, .. } => Some((*fd, nid)),
+
            Peer::Connected { nid, .. } => Some((*fd, nid)),
            Peer::Disconnecting { .. } => None,
        })
    }
@@ -463,7 +477,10 @@ where
                    "Accepting inbound peer connection from {}..",
                    connection.remote_addr()
                );
-
                self.peers.insert(connection.as_raw_fd(), Peer::inbound());
+
                self.peers.insert(
+
                    connection.as_raw_fd(),
+
                    Peer::inbound(connection.remote_addr().into()),
+
                );

                let session = accept::<G>(connection, self.signer.clone());
                let transport = match NetTransport::with_session(session, Link::Inbound) {
@@ -519,9 +536,9 @@ where
                    log::error!(target: "wire", "Session not found for fd {fd}");
                    return;
                };
-
                let link = peer.connected(id);
+
                let (addr, link) = peer.connected(id);

-
                self.service.connected(id, link);
+
                self.service.connected(id, addr.into(), link);
            }
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected {
@@ -691,7 +708,9 @@ where
        match self.peers.entry(fd) {
            Entry::Occupied(e) => {
                match e.get() {
-
                    Peer::Disconnecting { id, reason, .. } => {
+
                    Peer::Disconnecting {
+
                        nid: id, reason, ..
+
                    } => {
                        // Disconnect TCP stream.
                        drop(transport);

@@ -771,11 +790,13 @@ where
                        NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
                    }) {
                        Ok(transport) => {
-
                            self.service.attempted(node_id, addr);
+
                            self.service.attempted(node_id, addr.clone());
                            // TODO: Keep track of peer address for when peer disconnects before
                            // handshake is complete.
-
                            self.peers
-
                                .insert(transport.as_raw_fd(), Peer::outbound(node_id));
+
                            self.peers.insert(
+
                                transport.as_raw_fd(),
+
                                Peer::outbound(addr.to_inner(), node_id),
+
                            );

                            self.actions
                                .push_back(reactor::Action::RegisterTransport(transport));
modified radicle/Cargo.toml
@@ -15,7 +15,7 @@ crossbeam-channel = { version = "0.5.6" }
cyphernet = { version = "0.2.0", features = ["tor", "dns", "ed25519"] }
fastrand = { version = "1.9.0" }
multibase = { version = "0.9.1" }
-
localtime = { version = "1.2.0" }
+
localtime = { version = "1.2.0", features = ["serde"] }
log = { version = "0.4.17", features = ["std"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
once_cell = { version = "1.13" }
modified radicle/src/node.rs
@@ -14,6 +14,7 @@ use std::{fmt, io, net, thread, time};

use amplify::WrapperMut;
use cyphernet::addr::{HostName, NetAddr};
+
use localtime::LocalTime;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json as json;
@@ -48,6 +49,64 @@ pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire";
/// Milliseconds since epoch.
pub type Timestamp = u64;

+
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
+
pub enum PingState {
+
    #[default]
+
    /// The peer has not been sent a ping.
+
    None,
+
    /// A ping has been sent and is waiting on the peer's response.
+
    AwaitingResponse(u16),
+
    /// The peer was successfully pinged.
+
    Ok,
+
}
+

+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
#[allow(clippy::large_enum_variant)]
+
pub enum State {
+
    /// Initial state for outgoing connections.
+
    Initial,
+
    /// Connection attempted successfully.
+
    Attempted { addr: Address },
+
    /// Initial state after handshake protocol hand-off.
+
    Connected {
+
        /// Remote address.
+
        addr: Address,
+
        /// Connected since this time.
+
        since: LocalTime,
+
        /// Ping state.
+
        #[serde(skip)]
+
        ping: PingState,
+
        /// Ongoing fetches.
+
        fetching: HashSet<Id>,
+
    },
+
    /// When a peer is disconnected.
+
    Disconnected {
+
        /// Since when has this peer been disconnected.
+
        since: LocalTime,
+
        /// When to retry the connection.
+
        retry_at: LocalTime,
+
    },
+
}
+

+
impl fmt::Display for State {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::Initial => {
+
                write!(f, "initial")
+
            }
+
            Self::Attempted { .. } => {
+
                write!(f, "attempted")
+
            }
+
            Self::Connected { .. } => {
+
                write!(f, "connected")
+
            }
+
            Self::Disconnected { .. } => {
+
                write!(f, "disconnected")
+
            }
+
        }
+
    }
+
}
+

/// Result of a command, on the node control socket.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "status")]
@@ -101,10 +160,10 @@ impl From<CommandResult> for Result<bool, Error> {
}

/// Peer public protocol address.
-
#[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, From)]
+
#[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, From, Serialize, Deserialize)]
#[wrapper(Deref, Display, FromStr)]
#[wrapper_mut(DerefMut)]
-
pub struct Address(NetAddr<HostName>);
+
pub struct Address(#[serde(with = "crate::serde_ext::string")] NetAddr<HostName>);

impl cyphernet::addr::Host for Address {
    fn requires_proxy(&self) -> bool {
@@ -141,6 +200,8 @@ pub enum CommandName {
    Connect,
    /// Lookup seeds for the given repository in the routing table.
    Seeds,
+
    /// Get the current peer sessions.
+
    Sessions,
    /// Fetch the given repository from the network.
    Fetch,
    /// Track the given repository.
@@ -207,6 +268,13 @@ impl Command {
    }
}

+
/// An established network connection with a peer.
+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
pub struct Session {
+
    pub nid: NodeId,
+
    pub state: State,
+
}
+

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
#[serde(tag = "state", content = "id")]
@@ -541,7 +609,7 @@ impl Node {
// TODO(finto): repo_policies, node_policies, and routing should all
// attempt to return iterators instead of allocating vecs.
impl Handle for Node {
-
    type Sessions = ();
+
    type Sessions = Vec<Session>;
    type Error = Error;

    fn nid(&self) -> Result<NodeId, Error> {
@@ -694,7 +762,14 @@ impl Handle for Node {
    }

    fn sessions(&self) -> Result<Self::Sessions, Error> {
-
        todo!();
+
        let sessions = self
+
            .call::<&str, Vec<Session>>(CommandName::Sessions, [], DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse {
+
                cmd: CommandName::Sessions,
+
            })??;
+

+
        Ok(sessions)
    }

    fn shutdown(self) -> Result<(), Error> {
modified radicle/src/serde_ext.rs
@@ -24,6 +24,31 @@ pub mod string {
    }
}

+
/// Unlike the default `serde` instance for `LocalTime`, this encodes and decodes using seconds
+
/// instead of milliseconds.
+
pub mod localtime {
+
    use localtime::LocalTime;
+
    use serde::{de, Deserialize, Deserializer, Serializer};
+

+
    pub fn serialize<S>(value: &LocalTime, serializer: S) -> Result<S::Ok, S::Error>
+
    where
+
        S: Serializer,
+
    {
+
        serializer.collect_str(&value.as_secs())
+
    }
+

+
    pub fn deserialize<'de, D>(deserializer: D) -> Result<LocalTime, D::Error>
+
    where
+
        D: Deserializer<'de>,
+
    {
+
        let seconds: u64 = String::deserialize(deserializer)?
+
            .parse()
+
            .map_err(de::Error::custom)?;
+

+
        Ok(LocalTime::from_secs(seconds))
+
    }
+
}
+

/// Return true if the given value is the default for that type.
pub fn is_default<T: Default + PartialEq>(t: &T) -> bool {
    t == &T::default()