Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Keep track of connection attempts
Alexis Sellier committed 3 years ago
commit ca0abc4356e4c48f9ef6ae4d0e74a79b5c08afc3
parent b5a00dfc4c67010f6da1b14c4a2dc32cd3a9453c
8 files changed +102 -44
modified radicle-node/src/address/store.rs
@@ -145,7 +145,7 @@ impl Store for Book {
                )?;
                stmt.bind((1, node))?;
                stmt.bind((2, AddressType::from(&addr.addr)))?;
-
                stmt.bind((3, addr.addr))?;
+
                stmt.bind((3, &addr.addr))?;
                stmt.bind((4, addr.source))?;
                stmt.bind((5, timestamp as i64))?;
                stmt.next()?;
@@ -197,6 +197,42 @@ impl Store for Book {
        }
        Ok(Box::new(entries.into_iter()))
    }
+

+
    fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE `addresses`
+
             SET last_attempt = ?1
+
             WHERE node = ?2
+
             AND type = ?3
+
             AND value = ?4",
+
        )?;
+

+
        stmt.bind((1, time as i64))?;
+
        stmt.bind((2, nid))?;
+
        stmt.bind((3, AddressType::from(addr)))?;
+
        stmt.bind((4, addr))?;
+
        stmt.next()?;
+

+
        Ok(())
+
    }
+

+
    fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE `addresses`
+
             SET last_success = ?1
+
             WHERE node = ?2
+
             AND type = ?3
+
             AND value = ?4",
+
        )?;
+

+
        stmt.bind((1, time as i64))?;
+
        stmt.bind((2, nid))?;
+
        stmt.bind((3, AddressType::from(addr)))?;
+
        stmt.bind((4, addr))?;
+
        stmt.next()?;
+

+
        Ok(())
+
    }
}

/// Address store.
@@ -226,6 +262,10 @@ pub trait Store {
    }
    /// Get the address entries in the store.
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error>;
+
    /// Mark a node as attempted at a certain time.
+
    fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
+
    /// Mark a node as successfully connected at a certain time.
+
    fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
}

impl TryFrom<&sql::Value> for Source {
modified radicle-node/src/service.rs
@@ -672,11 +672,11 @@ where
        // Inbound connection attempt.
    }

-
    pub fn attempted(&mut self, nid: NodeId, addr: &Address) {
+
    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
        debug!(target: "service", "Attempted connection to {nid} ({addr})");

        if let Some(sess) = self.sessions.get_mut(&nid) {
-
            sess.to_attempted();
+
            sess.to_attempted(addr);
        } else {
            #[cfg(debug_assertions)]
            panic!("Service::attempted: unknown session {nid}@{addr}");
@@ -691,8 +691,12 @@ where

        if link.is_outbound() {
            if let Some(peer) = self.sessions.get_mut(&remote) {
-
                peer.to_connected(self.clock);
+
                let attempted = peer.to_connected(self.clock);
                self.reactor.write_all(peer, msgs);
+

+
                if let Err(e) = self.addresses.connected(&remote, &attempted, self.time()) {
+
                    error!(target: "service", "Error updating address book with connection: {e}");
+
                }
            }
        } else {
            match self.sessions.entry(remote) {
@@ -1276,6 +1280,9 @@ where
        }
        let persistent = self.config.is_persistent(&nid);

+
        if let Err(e) = self.addresses.attempted(&nid, &addr, self.time()) {
+
            error!(target: "service", "Error updating address book with connection attempt: {e}");
+
        }
        self.sessions.insert(
            nid,
            Session::outbound(
@@ -1396,26 +1403,36 @@ where
        }
    }

-
    fn choose_addresses(&mut self) -> Vec<(NodeId, Address)> {
-
        let sessions = self
+
    /// Get a list of peers available to connect to.
+
    fn available_peers(&mut self) -> Vec<(NodeId, Address)> {
+
        let outbound = self
            .sessions
            .values()
-
            .filter(|s| s.is_connected() && s.link.is_outbound())
-
            .map(|s| (s.id, s))
-
            .collect::<HashMap<_, _>>();
+
            .filter(|s| s.link.is_outbound())
+
            .filter(|s| s.is_connected() || s.is_connecting())
+
            .count();

-
        let wanted = TARGET_OUTBOUND_PEERS.saturating_sub(sessions.len());
+
        let wanted = TARGET_OUTBOUND_PEERS.saturating_sub(outbound);
+
        // Don't connect to more peers than needed.
        if wanted == 0 {
            return Vec::new();
        }

-
        self.addresses
-
            .entries()
-
            .unwrap()
-
            .filter(|(node_id, _)| !sessions.contains_key(node_id))
-
            .take(wanted)
-
            .map(|(n, s)| (n, s.addr))
-
            .collect()
+
        match self.addresses.entries() {
+
            Ok(entries) => {
+
                // Nb. we don't want to connect to any peers that already have a session with us,
+
                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
+
                entries
+
                    .filter(|(nid, _)| !self.sessions.contains_key(nid))
+
                    .take(wanted)
+
                    .map(|(n, s)| (n, s.addr))
+
                    .collect()
+
            }
+
            Err(e) => {
+
                error!(target: "service", "Unable to lookup available peers in address book: {e}");
+
                Vec::new()
+
            }
+
        }
    }

    /// Fetch all repositories that are tracked but missing from our inventory.
@@ -1456,11 +1473,7 @@ where
    }

    fn maintain_connections(&mut self) {
-
        let addrs = self.choose_addresses();
-
        if addrs.is_empty() {
-
            debug!(target: "service", "No eligible peers available to connect to");
-
        }
-
        for (id, addr) in addrs {
+
        for (id, addr) in self.available_peers() {
            self.connect(id, addr.clone());
        }
    }
modified radicle-node/src/service/session.rs
@@ -1,10 +1,10 @@
use std::collections::{HashSet, VecDeque};
-
use std::fmt;
+
use std::{fmt, mem};

use crate::service::config::Limits;
use crate::service::message;
use crate::service::message::Message;
-
use crate::service::{Id, LocalTime, NodeId, Reactor, Rng};
+
use crate::service::{Address, Id, LocalTime, NodeId, Reactor, Rng};
use crate::Link;

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
@@ -24,7 +24,7 @@ pub enum State {
    /// Initial state for outgoing connections.
    Initial,
    /// Connection attempted successfully.
-
    Attempted,
+
    Attempted { addr: Address },
    /// Initial state after handshake protocol hand-off.
    Connected {
        /// Connected since this time.
@@ -49,7 +49,7 @@ impl fmt::Display for State {
            Self::Initial => {
                write!(f, "initial")
            }
-
            Self::Attempted => {
+
            Self::Attempted { .. } => {
                write!(f, "attempted")
            }
            Self::Connected { .. } => {
@@ -244,26 +244,31 @@ impl Session {
        None
    }

-
    pub fn to_attempted(&mut self) {
+
    pub fn to_attempted(&mut self, addr: Address) {
        assert!(
            self.is_initial(),
            "Can only transition to 'attempted' state from 'initial' state"
        );
-
        self.state = State::Attempted;
+
        self.state = State::Attempted { addr };
        self.attempts += 1;
    }

-
    pub fn to_connected(&mut self, since: LocalTime) {
-
        assert!(
-
            self.is_connecting(),
-
            "Can only transition to 'connected' state from 'connecting' state"
-
        );
+
    pub fn to_connected(&mut self, since: LocalTime) -> Address {
        self.attempts = 0;
-
        self.state = State::Connected {
-
            since,
-
            ping: PingState::default(),
-
            fetching: HashSet::default(),
-
        };
+

+
        let previous = mem::replace(
+
            &mut self.state,
+
            State::Connected {
+
                since,
+
                ping: PingState::default(),
+
                fetching: HashSet::default(),
+
            },
+
        );
+
        if let State::Attempted { addr } = previous {
+
            addr
+
        } else {
+
            panic!("Session::to_connected: can only transition to 'connected' state from 'connecting' state");
+
        }
    }

    /// Move the session state to "disconnected". Returns any pending RID
modified radicle-node/src/test/peer.rs
@@ -332,7 +332,7 @@ where
            .find(|o| matches!(o, Io::Connect { .. }))
            .unwrap();

-
        self.service.attempted(remote_id, &remote_addr);
+
        self.service.attempted(remote_id, remote_addr);
        self.service.connected(remote_id, Link::Outbound);

        let mut msgs = self.messages(remote_id);
modified radicle-node/src/test/simulator.rs
@@ -378,7 +378,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    Input::Connecting { id, addr } => {
                        if self.attempts.insert((node, id)) {
                            // TODO: Also call `inbound` for inbound attempts.
-
                            p.attempted(id, &addr);
+
                            p.attempted(id, addr);
                        }
                    }
                    Input::Connected { id, link } => {
modified radicle-node/src/tests.rs
@@ -926,7 +926,7 @@ fn test_persistent_peer_reconnect_attempt() {
            .find(|io| matches!(io, Io::Connect(a, _) if a == &bob.id()))
            .unwrap();

-
        alice.attempted(bob.id(), &bob.address());
+
        alice.attempted(bob.id(), bob.address());
    }
}

@@ -966,7 +966,7 @@ fn test_persistent_peer_reconnect_success() {
        })
        .expect("Alice attempts a re-connection");

-
    alice.attempted(bob.id(), &bob.addr());
+
    alice.attempted(bob.id(), bob.addr());
    alice.connected(bob.id(), Link::Outbound);
}

modified radicle-node/src/wire/protocol.rs
@@ -798,7 +798,7 @@ where
                        NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
                    }) {
                        Ok(transport) => {
-
                            self.service.attempted(node_id, &addr);
+
                            self.service.attempted(node_id, addr);
                            // TODO: Keep track of peer address for when peer disconnects before
                            // handshake is complete.
                            self.peers
modified radicle/src/sql.rs
@@ -88,7 +88,7 @@ impl TryFrom<&sql::Value> for Address {
    }
}

-
impl sql::BindableWithIndex for Address {
+
impl sql::BindableWithIndex for &Address {
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
        self.to_string().bind(stmt, i)
    }