Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Make user connections persistent
Alexis Sellier committed 2 years ago
commit 41bff1b3bb9913c29355095f013503ef7a84bc2e
parent 840fc496b8a07a99ccacbe4ed3d592c9016beb62
12 files changed +126 -47
modified radicle-cli/src/commands/node/control.rs
@@ -127,7 +127,7 @@ pub fn connect(node: &mut Node, nid: NodeId, addr: Address) -> anyhow::Result<()
        "Connecting to {}@{addr}...",
        term::format::node(&nid)
    ));
-
    if let Err(err) = node.connect(nid, addr.clone()) {
+
    if let Err(err) = node.connect(nid, addr.clone(), node::ConnectOptions { persistent: true }) {
        spinner.error(format!(
            "Failed to connect to {}@{}: {}",
            term::format::node(&nid),
modified radicle-node/src/control.rs
@@ -92,9 +92,9 @@ where
    let cmd: Command = json::from_str(input)?;

    match cmd {
-
        Command::Connect { addr } => {
+
        Command::Connect { addr, opts } => {
            let (nid, addr) = addr.into();
-
            if let Err(e) = handle.connect(nid, addr) {
+
            if let Err(e) = handle.connect(nid, addr, opts) {
                return Err(CommandError::Runtime(e));
            } else {
                CommandResult::Okay { updated: true }.to_writer(writer)?;
modified radicle-node/src/main.rs
@@ -57,7 +57,7 @@ impl Options {
            match arg {
                Long("connect") => {
                    let peer: PeerAddr<NodeId, Address> = parser.value()?.parse()?;
-
                    config.connect.push(peer.into());
+
                    config.connect.insert(peer.into());
                }
                Long("external-address") => {
                    let addr = parser.value()?.parse()?;
modified radicle-node/src/runtime/handle.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::{fmt, io, time};

use crossbeam_channel as chan;
-
use radicle::node::Seeds;
+
use radicle::node::{ConnectOptions, Seeds};
use reactor::poller::popol::PopolWaker;
use thiserror::Error;

@@ -143,8 +143,13 @@ impl radicle::node::Handle for Handle {
        true
    }

-
    fn connect(&mut self, node: NodeId, addr: radicle::node::Address) -> Result<(), Error> {
-
        self.command(service::Command::Connect(node, addr))?;
+
    fn connect(
+
        &mut self,
+
        node: NodeId,
+
        addr: radicle::node::Address,
+
        opts: ConnectOptions,
+
    ) -> Result<(), Error> {
+
        self.command(service::Command::Connect(node, addr, opts))?;

        Ok(())
    }
modified radicle-node/src/service.rs
@@ -20,6 +20,7 @@ use log::*;

use radicle::node::address;
use radicle::node::address::{AddressBook, KnownAddress};
+
use radicle::node::ConnectOptions;

use crate::crypto;
use crate::crypto::{Signer, Verified};
@@ -123,7 +124,7 @@ pub enum Command {
    /// Announce local inventory to peers.
    SyncInventory(chan::Sender<bool>),
    /// Connect to node with the given address.
-
    Connect(NodeId, Address),
+
    Connect(NodeId, Address, ConnectOptions),
    /// Disconnect from node.
    Disconnect(NodeId),
    /// Lookup seeds for the given repository in the routing table.
@@ -148,7 +149,7 @@ impl fmt::Debug for Command {
            Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
            Self::SyncInventory(_) => write!(f, "SyncInventory(..)"),
-
            Self::Connect(id, addr) => write!(f, "Connect({id}, {addr})"),
+
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
            Self::Disconnect(id) => write!(f, "Disconnect({id})"),
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
            Self::Fetch(id, node, _) => write!(f, "Fetch({id}, {node})"),
@@ -490,7 +491,10 @@ where
        info!(target: "service", "Received command {:?}", cmd);

        match cmd {
-
            Command::Connect(nid, addr) => {
+
            Command::Connect(nid, addr, opts) => {
+
                if opts.persistent {
+
                    self.config.connect.insert((nid, addr.clone()).into());
+
                }
                self.connect(nid, addr);
            }
            Command::Disconnect(nid) => {
modified radicle-node/src/test/environment.rs
@@ -21,8 +21,8 @@ use radicle::identity::Id;
use radicle::node::address::Book;
use radicle::node::routing::Store;
use radicle::node::tracking::store as TrackingStore;
-
use radicle::node::Handle as _;
use radicle::node::{Alias, ADDRESS_DB_FILE, TRACKING_DB_FILE};
+
use radicle::node::{ConnectOptions, Handle as _};
use radicle::profile;
use radicle::profile::Home;
use radicle::profile::Profile;
@@ -164,7 +164,9 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
        let local_events = self.handle.events();
        let remote_events = remote.handle.events();

-
        self.handle.connect(remote.id, remote.addr.into()).unwrap();
+
        self.handle
+
            .connect(remote.id, remote.addr.into(), ConnectOptions::default())
+
            .unwrap();

        local_events
            .iter()
modified radicle-node/src/test/handle.rs
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use std::{io, time};

use crate::identity::Id;
-
use crate::node::{Alias, Event, FetchResult, Seeds};
+
use crate::node::{Alias, ConnectOptions, Event, FetchResult, Seeds};
use crate::runtime::HandleError;
use crate::service::tracking;
use crate::service::NodeId;
@@ -28,7 +28,12 @@ impl radicle::node::Handle for Handle {
        true
    }

-
    fn connect(&mut self, _node: NodeId, _addr: radicle::node::Address) -> Result<(), Self::Error> {
+
    fn connect(
+
        &mut self,
+
        _node: NodeId,
+
        _addr: radicle::node::Address,
+
        _opts: ConnectOptions,
+
    ) -> Result<(), Self::Error> {
        unimplemented!();
    }

modified radicle-node/src/test/peer.rs
@@ -7,7 +7,7 @@ use std::str::FromStr;
use log::*;

use radicle::node::address::Store;
-
use radicle::node::{address, Alias};
+
use radicle::node::{address, Alias, ConnectOptions};
use radicle::rad;
use radicle::storage::ReadRepository;
use radicle::Storage;
@@ -326,8 +326,11 @@ where
        let remote_addr = simulator::Peer::<T, H>::addr(peer);

        self.initialize();
-
        self.service
-
            .command(Command::Connect(remote_id, remote_addr.clone()));
+
        self.service.command(Command::Connect(
+
            remote_id,
+
            remote_addr.clone(),
+
            ConnectOptions::default(),
+
        ));

        self.outbox()
            .find(|o| matches!(o, Io::Connect { .. }))
modified radicle-node/src/tests.rs
@@ -9,6 +9,7 @@ use std::time;
use crossbeam_channel as chan;
use netservices::Direction as Link;
use radicle::node::routing::Store as _;
+
use radicle::node::ConnectOptions;
use radicle::storage::ReadRepository;

use crate::collections::{HashMap, HashSet};
@@ -116,10 +117,11 @@ fn test_disconnecting_unresponsive_peer() {
fn test_redundant_connect() {
    let mut alice = Peer::new("alice", [8, 8, 8, 8]);
    let bob = Peer::new("bob", [9, 9, 9, 9]);
+
    let opts = ConnectOptions::default();

-
    alice.command(Command::Connect(bob.id(), bob.address()));
-
    alice.command(Command::Connect(bob.id(), bob.address()));
-
    alice.command(Command::Connect(bob.id(), bob.address()));
+
    alice.command(Command::Connect(bob.id(), bob.address(), opts.clone()));
+
    alice.command(Command::Connect(bob.id(), bob.address(), opts.clone()));
+
    alice.command(Command::Connect(bob.id(), bob.address(), opts));

    // Only one connection attempt is made.
    assert_matches!(
@@ -141,7 +143,11 @@ fn test_connection_kept_alive() {
    )
    .initialize([&mut alice, &mut bob]);

-
    alice.command(service::Command::Connect(bob.id(), bob.address()));
+
    alice.command(service::Command::Connect(
+
        bob.id(),
+
        bob.address(),
+
        ConnectOptions::default(),
+
    ));
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");

@@ -201,18 +207,21 @@ fn test_inbound_connection() {

#[test]
fn test_persistent_peer_connect() {
+
    use std::collections::HashSet;
+

    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
+
    let connect = HashSet::<ConnectAddress>::from_iter([
+
        (bob.id(), bob.address()).into(),
+
        (eve.id(), eve.address()).into(),
+
    ]);
    let mut alice = Peer::config(
        "alice",
        [7, 7, 7, 7],
        MockStorage::empty(),
        peer::Config {
            config: Config {
-
                connect: vec![
-
                    (bob.id(), bob.address()).into(),
-
                    (eve.id(), eve.address()).into(),
-
                ],
+
                connect,
                ..Config::new(node::Alias::new("alice"))
            },
            ..peer::Config::default()
@@ -221,10 +230,15 @@ fn test_persistent_peer_connect() {

    alice.initialize();

-
    let mut outbox = alice.outbox();
-
    assert_matches!(outbox.next(), Some(Io::Connect(a, _)) if a == bob.id());
-
    assert_matches!(outbox.next(), Some(Io::Connect(a, _)) if a == eve.id());
-
    assert_matches!(outbox.find(|o| matches!(o, Io::Connect { .. })), None);
+
    let outbox = alice.outbox().collect::<Vec<_>>();
+
    outbox
+
        .iter()
+
        .find(|o| matches!(o, Io::Connect(a, _) if *a == bob.id()))
+
        .unwrap();
+
    outbox
+
        .iter()
+
        .find(|o| matches!(o, Io::Connect(a, _) if *a == eve.id()))
+
        .unwrap();
}

#[test]
@@ -886,6 +900,8 @@ fn test_inventory_relay() {

#[test]
fn test_persistent_peer_reconnect_attempt() {
+
    use std::collections::HashSet;
+

    let mut bob = Peer::new("bob", [8, 8, 8, 8]);
    let mut eve = Peer::new("eve", [9, 9, 9, 9]);
    let mut alice = Peer::config(
@@ -894,10 +910,10 @@ fn test_persistent_peer_reconnect_attempt() {
        MockStorage::empty(),
        peer::Config {
            config: Config {
-
                connect: vec![
+
                connect: HashSet::from_iter([
                    (bob.id(), bob.address()).into(),
                    (eve.id(), eve.address()).into(),
-
                ],
+
                ]),
                ..Config::new(node::Alias::new("alice"))
            },
            ..peer::Config::default()
@@ -943,6 +959,8 @@ fn test_persistent_peer_reconnect_attempt() {

#[test]
fn test_persistent_peer_reconnect_success() {
+
    use std::collections::HashSet;
+

    let bob = Peer::config(
        "bob",
        [9, 9, 9, 9],
@@ -955,7 +973,7 @@ fn test_persistent_peer_reconnect_success() {
        MockStorage::empty(),
        peer::Config {
            config: Config {
-
                connect: vec![(bob.id, bob.addr()).into()],
+
                connect: HashSet::from_iter([(bob.id, bob.addr()).into()]),
                ..Config::new(node::Alias::new("alice"))
            },
            ..peer::Config::default()
@@ -1246,8 +1264,16 @@ fn test_push_and_pull() {
    local::register(alice.storage().clone());

    // Alice and Bob connect to Eve.
-
    alice.command(service::Command::Connect(eve.id(), eve.address()));
-
    bob.command(service::Command::Connect(eve.id(), eve.address()));
+
    alice.command(service::Command::Connect(
+
        eve.id(),
+
        eve.address(),
+
        ConnectOptions::default(),
+
    ));
+
    bob.command(service::Command::Connect(
+
        eve.id(),
+
        eve.address(),
+
        ConnectOptions::default(),
+
    ));

    // Alice creates a new project.
    let (proj_id, _, _) = rad::init(
@@ -1364,9 +1390,21 @@ fn prop_inventory_exchange_dense() {
        }

        // Fully-connected.
-
        bob.command(Command::Connect(alice.id(), alice.address()));
-
        bob.command(Command::Connect(eve.id(), eve.address()));
-
        eve.command(Command::Connect(alice.id(), alice.address()));
+
        bob.command(Command::Connect(
+
            alice.id(),
+
            alice.address(),
+
            ConnectOptions::default(),
+
        ));
+
        bob.command(Command::Connect(
+
            eve.id(),
+
            eve.address(),
+
            ConnectOptions::default(),
+
        ));
+
        eve.command(Command::Connect(
+
            alice.id(),
+
            alice.address(),
+
            ConnectOptions::default(),
+
        ));

        let mut peers: HashMap<_, _> = [
            (alice.node_id(), alice),
modified radicle-node/src/tests/e2e.rs
@@ -8,7 +8,7 @@ use radicle::test::fixtures;
use radicle::{assert_matches, rad};

use crate::node::config::Limits;
-
use crate::node::Config;
+
use crate::node::{Config, ConnectOptions};
use crate::service;
use crate::service::tracking::Scope;
use crate::storage::git::transport;
@@ -734,8 +734,13 @@ fn test_connection_crossing() {
    let mut alice = alice.spawn();
    let mut bob = bob.spawn();

-
    alice.handle.connect(bob.id, bob.addr.into()).unwrap();
-
    bob.handle.connect(alice.id, alice.addr.into()).unwrap();
+
    alice
+
        .handle
+
        .connect(bob.id, bob.addr.into(), ConnectOptions::default())
+
        .unwrap();
+
    bob.handle
+
        .connect(alice.id, alice.addr.into(), ConnectOptions::default())
+
        .unwrap();

    thread::sleep(time::Duration::from_secs(1));

modified radicle/src/node.rs
@@ -196,6 +196,13 @@ impl FromStr for Alias {
    }
}

+
/// Options passed to the "connect" node command.
+
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
+
pub struct ConnectOptions {
+
    /// Establish a persistent connection.
+
    pub persistent: bool,
+
}
+

/// Result of a command, on the node control socket.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "status")]
@@ -249,7 +256,7 @@ impl From<CommandResult> for Result<bool, Error> {
}

/// Peer public protocol address.
-
#[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, From, Serialize, Deserialize)]
+
#[derive(Wrapper, WrapperMut, Clone, Eq, PartialEq, Debug, Hash, From, Serialize, Deserialize)]
#[wrapper(Deref, Display, FromStr)]
#[wrapper_mut(DerefMut)]
pub struct Address(#[serde(with = "crate::serde_ext::string")] NetAddr<HostName>);
@@ -292,7 +299,10 @@ pub enum Command {

    /// Connect to node with the given address.
    #[serde(rename_all = "camelCase")]
-
    Connect { addr: config::ConnectAddress },
+
    Connect {
+
        addr: config::ConnectAddress,
+
        opts: ConnectOptions,
+
    },

    /// Lookup seeds for the given repository in the routing table.
    #[serde(rename_all = "camelCase")]
@@ -563,7 +573,12 @@ pub trait Handle: Clone + Sync + Send {
    /// Check if the node is running. to a peer.
    fn is_running(&self) -> bool;
    /// Connect to a peer.
-
    fn connect(&mut self, node: NodeId, addr: Address) -> Result<(), Self::Error>;
+
    fn connect(
+
        &mut self,
+
        node: NodeId,
+
        addr: Address,
+
        opts: ConnectOptions,
+
    ) -> Result<(), Self::Error>;
    /// Lookup the seeds of a given repository in the routing table.
    fn seeds(&mut self, id: Id) -> Result<Seeds, Self::Error>;
    /// Fetch a repository from the network.
@@ -700,10 +715,11 @@ impl Handle for Node {
        matches!(result, CommandResult::Okay { .. })
    }

-
    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), Error> {
+
    fn connect(&mut self, nid: NodeId, addr: Address, opts: ConnectOptions) -> Result<(), Error> {
        self.call::<CommandResult>(
            Command::Connect {
                addr: (nid, addr).into(),
+
                opts,
            },
            DEFAULT_TIMEOUT,
        )?
modified radicle/src/node/config.rs
@@ -1,3 +1,4 @@
+
use std::collections::HashSet;
use std::ops::Deref;

use cyphernet::addr::PeerAddr;
@@ -40,7 +41,7 @@ impl Default for Limits {
}

/// Full address used to connect to a remote node.
-
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Hash)]
#[serde(transparent)]
pub struct ConnectAddress(#[serde(with = "crate::serde_ext::string")] PeerAddr<NodeId, Address>);

@@ -78,7 +79,7 @@ pub struct Config {
    pub alias: Alias,
    /// Peers to connect to on startup.
    /// Connections to these peers will be maintained.
-
    pub connect: Vec<ConnectAddress>,
+
    pub connect: HashSet<ConnectAddress>,
    /// Specify the node's public addresses
    pub external_addresses: Vec<Address>,
    /// Peer-to-peer network.
@@ -97,7 +98,7 @@ impl Config {
    pub fn new(alias: Alias) -> Self {
        Self {
            alias,
-
            connect: Vec::default(),
+
            connect: HashSet::default(),
            external_addresses: vec![],
            network: Network::default(),
            relay: true,