Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Introduce connection penalty system
cloudhead committed 2 years ago
commit 6f7c2dca91a0c721ef44562ef70b130c58a905fe
parent bdcaa6047441d6a7652a1929b27909c16eb86ba6
6 files changed +212 -83
modified radicle-node/src/service.rs
@@ -29,7 +29,7 @@ use radicle::node::config::PeerConfig;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
-
use radicle::node::ConnectOptions;
+
use radicle::node::{ConnectOptions, Penalty, Severity};
use radicle::storage::RepositoryError;

use crate::crypto;
@@ -118,6 +118,14 @@ impl SyncedRouting {
    }
}

+
/// A peer we can connect to.
+
#[derive(Debug, Clone)]
+
struct Peer {
+
    nid: NodeId,
+
    addresses: Vec<KnownAddress>,
+
    penalty: Penalty,
+
}
+

/// General service error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -1029,6 +1037,7 @@ where
            }
        };
        let link = session.link;
+
        let addr = session.addr.clone();

        self.fetching.retain(|_, fetching| {
            if fetching.from != remote {
@@ -1058,15 +1067,31 @@ where
            self.outbox.wakeup(delay);
        } else {
            debug!(target: "service", "Dropping peer {remote}..");
+
            self.sessions.remove(&remote);
+

+
            let severity = match reason {
+
                DisconnectReason::Dial(_)
+
                | DisconnectReason::Fetch(_)
+
                | DisconnectReason::Connection(_) => {
+
                    if self.is_online() {
+
                        // If we're "online", there's something wrong with this
+
                        // peer connection specifically.
+
                        Severity::Medium
+
                    } else {
+
                        Severity::Low
+
                    }
+
                }
+
                DisconnectReason::Session(e) => e.severity(),
+
                DisconnectReason::Command => Severity::Low,
+
            };

-
            if let Err(e) =
-
                self.db
-
                    .addresses_mut()
-
                    .disconnected(&remote, &session.addr, reason.is_transient())
+
            if let Err(e) = self
+
                .db
+
                .addresses_mut()
+
                .disconnected(&remote, &addr, severity)
            {
                error!(target: "service", "Error updating address store: {e}");
            }
-
            self.sessions.remove(&remote);
            // Only re-attempt outbound connections, since we don't care if an inbound connection
            // is dropped.
            if link.is_outbound() {
@@ -1590,6 +1615,15 @@ where
        ]
    }

+
    /// Try to guess whether we're online or not.
+
    fn is_online(&self) -> bool {
+
        self.sessions
+
            .connected()
+
            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
+
            .count()
+
            > 0
+
    }
+

    /// Update our routing table with our local node's inventory.
    fn sync_inventory(&mut self) -> Result<SyncedRouting, Error> {
        let inventory = self.storage.inventory()?;
@@ -1892,24 +1926,35 @@ where
        }
    }

-
    /// Get a list of peers available to connect to.
-
    fn available_peers(&mut self) -> HashMap<NodeId, Vec<KnownAddress>> {
+
    /// Get a list of peers available to connect to, sorted by lowest penalty.
+
    fn available_peers(&mut self) -> Vec<Peer> {
        match self.db.addresses().entries() {
            Ok(entries) => {
                // Nb. we don't want to connect to any peers that already have a session with us,
                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
-
                entries
-
                    .filter(|(_, ka)| !ka.banned)
-
                    .filter(|(nid, _)| !self.sessions.contains_key(nid))
-
                    .filter(|(nid, _)| nid != &self.node_id())
-
                    .fold(HashMap::new(), |mut acc, (nid, addr)| {
-
                        acc.entry(nid).or_default().push(addr);
+
                let mut peers = entries
+
                    .filter(|entry| !entry.address.banned)
+
                    .filter(|entry| !entry.penalty.is_threshold_reached())
+
                    .filter(|entry| !self.sessions.contains_key(&entry.node))
+
                    .filter(|entry| &entry.node != self.nid())
+
                    .fold(HashMap::new(), |mut acc, entry| {
+
                        acc.entry(entry.node)
+
                            .and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
+
                            .or_insert_with(|| Peer {
+
                                nid: entry.node,
+
                                addresses: vec![entry.address],
+
                                penalty: entry.penalty,
+
                            });
                        acc
                    })
+
                    .into_values()
+
                    .collect::<Vec<_>>();
+
                peers.sort_by_key(|p| p.penalty);
+
                peers
            }
            Err(e) => {
                error!(target: "service", "Unable to lookup available peers in address book: {e}");
-
                HashMap::new()
+
                Vec::new()
            }
        }
    }
@@ -1974,8 +2019,9 @@ where
        for (id, ka) in self
            .available_peers()
            .into_iter()
-
            .filter_map(|(nid, kas)| {
-
                kas.into_iter()
+
            .filter_map(|peer| {
+
                peer.addresses
+
                    .into_iter()
                    .find(|ka| match (ka.last_success, ka.last_attempt) {
                        // If we succeeded the last time we tried, this is a good address.
                        // TODO: This will always be hit after a success, and never re-attempted after
@@ -1986,7 +2032,7 @@ where
                        // If we've never tried this address, it's worth a try.
                        (None, None) => true,
                    })
-
                    .map(|ka| (nid, ka))
+
                    .map(|ka| (peer.nid, ka))
            })
            .take(wanted)
        {
@@ -2094,18 +2140,6 @@ impl DisconnectReason {
    pub fn is_connection_err(&self) -> bool {
        matches!(self, Self::Connection(_))
    }
-

-
    // TODO: These aren't quite correct, since dial errors *can* be transient, eg.
-
    // temporary DNS issue.
-
    pub fn is_transient(&self) -> bool {
-
        match self {
-
            Self::Dial(_) => false,
-
            Self::Connection(_) => true,
-
            Self::Command => false,
-
            Self::Fetch(_) => true,
-
            Self::Session(err) => err.is_transient(),
-
        }
-
    }
}

impl fmt::Display for DisconnectReason {
modified radicle-node/src/service/session.rs
@@ -2,6 +2,7 @@ use std::collections::HashSet;
use std::fmt;

use crate::node::config::Limits;
+
use crate::node::Severity;
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, Id, LocalTime, NodeId, Outbox, Rng};
@@ -9,7 +10,7 @@ use crate::Link;

pub use crate::node::{PingState, State};

-
#[derive(thiserror::Error, Debug)]
+
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
    /// The remote peer sent an invalid announcement timestamp,
    /// for eg. a timestamp far in the future.
@@ -28,13 +29,13 @@ pub enum Error {
}

impl Error {
-
    /// Check whether this error is transient.
-
    pub fn is_transient(&self) -> bool {
+
    /// Return the severity for this error.
+
    pub fn severity(&self) -> Severity {
        match self {
-
            Self::InvalidTimestamp(_) => false,
-
            Self::ProtocolMismatch => true,
-
            Self::Misbehavior => false,
-
            Self::Timeout => true,
+
            Self::InvalidTimestamp(_) => Severity::High,
+
            Self::ProtocolMismatch => Severity::High,
+
            Self::Misbehavior => Severity::High,
+
            Self::Timeout => Severity::Low,
        }
    }
}
modified radicle-node/src/tests.rs
@@ -9,6 +9,7 @@ use std::time;
use crossbeam_channel as chan;
use netservices::Direction as Link;
use radicle::identity::Visibility;
+
use radicle::node::address::Store;
use radicle::node::routing::Store as _;
use radicle::node::{ConnectOptions, DEFAULT_TIMEOUT};
use radicle::storage::refs::RefsAt;
@@ -898,7 +899,12 @@ fn test_refs_announcement_offline() {

    // Now we restart Alice's node. It should pick up that something's changed in storage.
    alice.elapse(LocalDuration::from_secs(60));
-
    alice.disconnected(bob.id, &DisconnectReason::Command);
+
    alice
+
        .database_mut()
+
        .addresses_mut()
+
        .remove(&bob.id)
+
        .unwrap(); // Make sure we don't reconnect automatically.
+
    alice.disconnected(bob.id, &DisconnectReason::Session(session::Error::Timeout));
    alice.outbox().for_each(drop);
    alice.restart();
    alice.connect_to(&bob);
@@ -1157,11 +1163,9 @@ fn test_maintain_connections() {
    alice.import_addresses(&unconnected);

    // A non-transient error such as this will cause Alice to attempt a different peer.
-
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
+
    let error = session::Error::Misbehavior;
    for peer in connected.iter() {
-
        let reason = DisconnectReason::Dial(error.clone());
-
        assert!(!reason.is_transient());
-
        alice.disconnected(peer.id(), &reason);
+
        alice.disconnected(peer.id(), &DisconnectReason::Session(error));

        let id = alice
            .outbox()
@@ -1210,8 +1214,6 @@ fn test_maintain_connections_failed_attempt() {
    let reason =
        DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)));

-
    assert!(reason.is_transient());
-

    alice.connect_to(&eve);
    // Make sure Alice knows about Eve.
    alice.disconnected(eve.id(), &reason);
modified radicle/src/node.rs
@@ -45,6 +45,8 @@ pub const DEFAULT_PORT: u16 = 8776;
pub const DEFAULT_TIMEOUT: time::Duration = time::Duration::from_secs(9);
/// Maximum length in bytes of a node alias.
pub const MAX_ALIAS_LENGTH: usize = 32;
+
/// Penalty threshold at which point we avoid connecting to this node.
+
pub const PENALTY_THRESHOLD: u8 = 32;
/// Filename of node database under the node directory.
pub const NODE_DB_FILE: &str = "node.db";
/// Filename of policies database under the node directory.
@@ -128,6 +130,26 @@ impl fmt::Display for State {
    }
}

+
/// Severity of a peer misbehavior or a connection problem.
+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+
pub enum Severity {
+
    Low = 0,
+
    Medium = 1,
+
    High = 8,
+
}
+

+
/// Node connection penalty. Nodes with a high penalty are deprioritized as peers.
+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord)]
+
pub struct Penalty(u8);
+

+
impl Penalty {
+
    /// If the penalty threshold is reached, at which point we should just avoid
+
    /// connecting to this node.
+
    pub fn is_threshold_reached(&self) -> bool {
+
        self.0 >= PENALTY_THRESHOLD
+
    }
+
}
+

/// Repository sync status for our own refs.
#[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "status")]
modified radicle/src/node/address.rs
@@ -10,7 +10,7 @@ use localtime::LocalTime;
use nonempty::NonEmpty;

use crate::collections::RandomMap;
-
use crate::node::{Address, Alias};
+
use crate::node::{Address, Alias, Penalty};
use crate::prelude::Timestamp;
use crate::{node, profile};

@@ -130,6 +130,8 @@ pub struct Node {
    pub pow: u32,
    /// When this data was published.
    pub timestamp: Timestamp,
+
    /// Node connection penalty.
+
    pub penalty: Penalty,
}

/// A known address.
modified radicle/src/node/address/store.rs
@@ -6,7 +6,7 @@ use thiserror::Error;

use crate::node;
use crate::node::address::{AddressType, KnownAddress, Node, Source};
-
use crate::node::{Address, Alias, AliasError, AliasStore, Database, NodeId};
+
use crate::node::{Address, Alias, AliasError, AliasStore, Database, NodeId, Penalty, Severity};
use crate::prelude::Timestamp;
use crate::sql::transaction;

@@ -22,6 +22,17 @@ pub enum Error {
    NoRows,
}

+
/// An entry returned by the store.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct AddressEntry {
+
    /// Node ID.
+
    pub node: NodeId,
+
    /// Node penalty.
+
    pub penalty: Penalty,
+
    /// Node address.
+
    pub address: KnownAddress,
+
}
+

/// Address store.
///
/// Used to store node addresses and metadata.
@@ -51,20 +62,25 @@ pub trait Store {
        self.len().map(|l| l == 0)
    }
    /// Get the address entries in the store.
-
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error>;
+
    fn entries(&self) -> Result<Box<dyn Iterator<Item = AddressEntry>>, Error>;
    /// Mark a node as attempted at a certain time.
    fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
    /// Mark a node as successfully connected at a certain time.
    fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
    /// Mark a node as disconnected.
-
    fn disconnected(&mut self, nid: &NodeId, addr: &Address, transient: bool) -> Result<(), Error>;
+
    fn disconnected(
+
        &mut self,
+
        nid: &NodeId,
+
        addr: &Address,
+
        severity: Severity,
+
    ) -> Result<(), Error>;
}

impl Store for Database {
    fn get(&self, node: &NodeId) -> Result<Option<Node>, Error> {
        let mut stmt = self
            .db
-
            .prepare("SELECT features, alias, pow, timestamp FROM nodes WHERE id = ?")?;
+
            .prepare("SELECT features, alias, pow, penalty, timestamp FROM nodes WHERE id = ?")?;

        stmt.bind((1, node))?;

@@ -73,6 +89,8 @@ impl Store for Database {
            let alias = Alias::from_str(row.read::<&str, _>("alias"))?;
            let timestamp = row.read::<i64, _>("timestamp") as Timestamp;
            let pow = row.read::<i64, _>("pow") as u32;
+
            let penalty = row.read::<i64, _>("penalty").min(u8::MAX as i64);
+
            let penalty = Penalty(penalty as u8);
            let addrs = self.addresses_of(node)?;

            Ok(Some(Node {
@@ -80,6 +98,7 @@ impl Store for Database {
                alias,
                pow,
                timestamp,
+
                penalty,
                addrs,
            }))
        } else {
@@ -183,10 +202,15 @@ impl Store for Database {
        Ok(self.db.change_count() > 0)
    }

-
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error> {
+
    fn entries(&self) -> Result<Box<dyn Iterator<Item = AddressEntry>>, Error> {
        let mut stmt = self
            .db
-
            .prepare("SELECT node, type, value, source, last_success, last_attempt, banned FROM addresses ORDER BY node")?
+
            .prepare(
+
                "SELECT a.node, a.type, a.value, a.source, a.last_success, a.last_attempt, a.banned, n.penalty
+
                 FROM addresses AS a
+
                 JOIN nodes AS n ON a.node = n.id
+
                 ORDER BY n.penalty ASC, n.id ASC",
+
            )?
            .into_iter();
        let mut entries = Vec::new();

@@ -200,17 +224,20 @@ impl Store for Database {
            let last_success = last_success.map(|t| LocalTime::from_millis(t as u128));
            let last_attempt = last_attempt.map(|t| LocalTime::from_millis(t as u128));
            let banned = row.read::<i64, _>("banned").is_positive();
+
            let penalty = row.read::<i64, _>("penalty");
+
            let penalty = Penalty(penalty as u8); // Clamped at `u8::MAX`.

-
            entries.push((
+
            entries.push(AddressEntry {
                node,
-
                KnownAddress {
+
                penalty,
+
                address: KnownAddress {
                    addr,
                    source,
                    last_success,
                    last_attempt,
                    banned,
                },
-
            ));
+
            });
        }
        Ok(Box::new(entries.into_iter()))
    }
@@ -234,37 +261,47 @@ impl Store for Database {
    }

    fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> {
-
        let mut stmt = self.db.prepare(
-
            "UPDATE `addresses`
-
             SET last_success = ?1
-
             WHERE node = ?2
-
             AND type = ?3
-
             AND value = ?4",
-
        )?;
-

-
        stmt.bind((1, time as i64))?;
-
        stmt.bind((2, nid))?;
-
        stmt.bind((3, AddressType::from(addr)))?;
-
        stmt.bind((4, addr))?;
-
        stmt.next()?;
-

-
        Ok(())
-
    }
-

-
    fn disconnected(&mut self, nid: &NodeId, addr: &Address, transient: bool) -> Result<(), Error> {
-
        // Ban address if not a transient failure.
-
        if !transient {
-
            let mut stmt = self.db.prepare(
+
        transaction(&self.db, |db| {
+
            let mut stmt = db.prepare(
                "UPDATE `addresses`
-
                 SET banned = 1
-
                 WHERE node = ?1 AND type = ?2 AND value = ?3",
+
                 SET last_success = ?1
+
                 WHERE node = ?2
+
                 AND type = ?3
+
                 AND value = ?4",
            )?;

+
            stmt.bind((1, time as i64))?;
+
            stmt.bind((2, nid))?;
+
            stmt.bind((3, AddressType::from(addr)))?;
+
            stmt.bind((4, addr))?;
+
            stmt.next()?;
+

+
            // Reduce penalty by half on successful connect.
+
            db.prepare("UPDATE `nodes` SET penalty = penalty / 2 WHERE node = ?1")?;
+

            stmt.bind((1, nid))?;
-
            stmt.bind((2, AddressType::from(addr)))?;
-
            stmt.bind((3, addr))?;
            stmt.next()?;
-
        }
+

+
            Ok(())
+
        })
+
    }
+

+
    fn disconnected(
+
        &mut self,
+
        nid: &NodeId,
+
        _addr: &Address,
+
        severity: Severity,
+
    ) -> Result<(), Error> {
+
        let mut stmt = self.db.prepare(
+
            "UPDATE `nodes`
+
             SET penalty = penalty + ?2
+
             WHERE id = ?1",
+
        )?;
+

+
        stmt.bind((1, nid))?;
+
        stmt.bind((2, severity as i64))?;
+
        stmt.next()?;
+

        Ok(())
    }
}
@@ -584,7 +621,11 @@ mod test {
                last_attempt: None,
                banned: false,
            };
-
            expected.push((id, ka.clone()));
+
            expected.push(AddressEntry {
+
                node: id,
+
                penalty: Penalty::default(),
+
                address: ka.clone(),
+
            });
            cache
                .insert(&id, features, alias.clone(), 0, timestamp, [ka])
                .unwrap();
@@ -592,10 +633,37 @@ mod test {

        let mut actual = cache.entries().unwrap().collect::<Vec<_>>();

-
        actual.sort_by_key(|(i, _)| *i);
-
        expected.sort_by_key(|(i, _)| *i);
+
        actual.sort_by_key(|ae| ae.node);
+
        expected.sort_by_key(|ae| ae.node);

        assert_eq!(cache.len().unwrap(), actual.len());
        assert_eq!(actual, expected);
    }
+

+
    #[test]
+
    fn test_disconnected() {
+
        let alice = arbitrary::gen::<NodeId>(1);
+
        let addr = arbitrary::gen::<Address>(1);
+
        let mut cache = Database::memory().unwrap();
+
        let features = node::Features::SEED;
+
        let timestamp = LocalTime::now().as_millis();
+

+
        cache
+
            .insert(&alice, features, Alias::new("alice"), 16, timestamp, [])
+
            .unwrap();
+
        let node = cache.get(&alice).unwrap().unwrap();
+
        assert_eq!(node.penalty, Penalty::default());
+

+
        cache.disconnected(&alice, &addr, Severity::Low).unwrap();
+
        let node = cache.get(&alice).unwrap().unwrap();
+
        assert_eq!(node.penalty, Penalty::default());
+

+
        cache.disconnected(&alice, &addr, Severity::Medium).unwrap();
+
        let node = cache.get(&alice).unwrap().unwrap();
+
        assert_eq!(node.penalty, Penalty(1));
+

+
        cache.disconnected(&alice, &addr, Severity::High).unwrap();
+
        let node = cache.get(&alice).unwrap().unwrap();
+
        assert_eq!(node.penalty, Penalty(9));
+
    }
}