Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Implement IP-based banning
Merged did:key:z6MksFqX...wzpT opened 1 year ago

The previous banning implementation didn’t work, since it included port numbers in the address value.

Here, we keep a separate table with (node, ip) pairs and update this table when a node connects or is banned.

8 files changed +129 -34 72c71501 a6cb4d0f
modified radicle-node/src/service.rs
@@ -11,6 +11,7 @@ pub mod session;

use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap, VecDeque};
+
use std::net::IpAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, net, time};
@@ -1161,25 +1162,25 @@ where
    }

    /// Inbound connection attempt.
-
    pub fn accepted(&mut self, addr: Address) -> bool {
-
        // Always accept trusted connections, even if we already reached
+
    pub fn accepted(&mut self, ip: IpAddr) -> bool {
+
        // Always accept localhost connections, even if we already reached
        // our inbound connection limit.
-
        if addr.is_trusted() {
+
        if ip.is_loopback() || ip.is_unspecified() {
            return true;
        }
        // Check for inbound connection limit.
        if self.sessions.inbound().count() >= self.config.limits.connection.inbound {
            return false;
        }
-
        match self.db.addresses().is_banned(&addr) {
+
        match self.db.addresses().is_ip_banned(ip) {
            Ok(banned) => {
                if banned {
                    return false;
                }
            }
-
            Err(e) => error!(target: "service", "Error querying ban status for {addr}: {e}"),
+
            Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
        }
-
        let host: HostName = addr.into();
+
        let host: HostName = ip.into();

        if self.limiter.limit(
            host.clone(),
@@ -1211,7 +1212,7 @@ where
    }

    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
-
        info!(target: "service", "Connected to {} ({:?})", remote, link);
+
        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
        self.emitter.emit(Event::PeerConnected { nid: remote });

        let msgs = self.initial(link);
@@ -1230,6 +1231,17 @@ where
                    );
                }
                Entry::Vacant(e) => {
+
                    if let HostName::Ip(ip) = addr.host {
+
                        if !ip.is_loopback() && !self.config.is_proxy_ip(ip) {
+
                            if let Err(e) =
+
                                self.db
+
                                    .addresses_mut()
+
                                    .record_ip(&remote, ip, self.clock.into())
+
                            {
+
                                log::error!(target: "service", "Error recording IP address for {remote}: {e}");
+
                            }
+
                        }
+
                    }
                    let peer = e.insert(Session::inbound(
                        remote,
                        addr,
modified radicle-node/src/wire/protocol.rs
@@ -514,26 +514,31 @@ where
    ) {
        match event {
            ListenerEvent::Accepted(connection) => {
-
                let Ok(addr) = connection.remote_addr() else {
+
                let Ok(remote) = connection.remote_addr() else {
                    log::warn!(target: "wire", "Accepted connection doesn't have remote address; dropping..");
                    drop(connection);

                    return;
                };
-
                let addr: NetAddr<HostName> = addr.into();
+
                let InetHost::Ip(ip) = remote.host else {
+
                    log::error!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
+
                    drop(connection);
+

+
                    return;
+
                };
                let fd = connection.as_raw_fd();
-
                log::debug!(target: "wire", "Accepting inbound connection from {addr} (fd={fd})..");
+
                log::debug!(target: "wire", "Inbound connection from {remote} (fd={fd})..");

                // If the service doesn't want to accept this connection,
                // we drop the connection here, which disconnects the socket.
-
                if !self.service.accepted(addr.clone().into()) {
-
                    log::debug!(target: "wire", "Rejecting inbound connection from {addr} (fd={fd})..");
+
                if !self.service.accepted(ip) {
+
                    log::debug!(target: "wire", "Rejecting inbound connection from {ip} (fd={fd})..");
                    drop(connection);

                    return;
                }

-
                let session = accept::<G>(addr.clone(), connection, self.signer.clone());
+
                let session = accept::<G>(remote.clone().into(), connection, self.signer.clone());
                let transport = match NetTransport::with_session(session, Link::Inbound) {
                    Ok(transport) => transport,
                    Err(err) => {
@@ -541,8 +546,15 @@ where
                        return;
                    }
                };
+
                log::debug!(target: "wire", "Accepted inbound connection from {remote} (fd={fd})..");

-
                self.inbound.insert(fd, Inbound { id: None, addr });
+
                self.inbound.insert(
+
                    fd,
+
                    Inbound {
+
                        id: None,
+
                        addr: remote.into(),
+
                    },
+
                );
                self.actions
                    .push_back(reactor::Action::RegisterTransport(transport))
            }
modified radicle/src/node.rs
@@ -414,15 +414,6 @@ impl Address {
        }
    }

-
    /// Check whether this address is trusted.
-
    /// Returns true if the address is 127.0.0.1 or 0.0.0.0.
-
    pub fn is_trusted(&self) -> bool {
-
        match self.0.host {
-
            HostName::Ip(ip) => ip.is_loopback() || ip.is_unspecified(),
-
            _ => false,
-
        }
-
    }
-

    /// Check whether this address is globally routable.
    pub fn is_routable(&self) -> bool {
        match self.0.host {
modified radicle/src/node/address/store.rs
@@ -1,3 +1,4 @@
+
use std::net::IpAddr;
use std::str::FromStr;

use localtime::LocalTime;
@@ -65,13 +66,17 @@ pub trait Store {
    }
    /// Check if an address is banned. Also returns `true` if the node this address belongs
    /// to is banned.
-
    fn is_banned(&self, addr: &Address) -> Result<bool, Error>;
+
    fn is_addr_banned(&self, addr: &Address) -> Result<bool, Error>;
+
    /// Check if an IP is banned.
+
    fn is_ip_banned(&self, ip: IpAddr) -> Result<bool, Error>;
    /// Get the address entries in the store.
    fn entries(&self) -> Result<Box<dyn Iterator<Item = AddressEntry>>, Error>;
    /// Mark a node as attempted at a certain time.
    fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
    /// Mark a node as successfully connected at a certain time.
    fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
+
    /// Record a node IP address and connection time.
+
    fn record_ip(&self, nid: &NodeId, ip: IpAddr, time: Timestamp) -> Result<(), Error>;
    /// Mark a node as disconnected.
    fn disconnected(
        &mut self,
@@ -112,12 +117,12 @@ impl Store for Database {
        }
    }

-
    fn is_banned(&self, addr: &Address) -> Result<bool, Error> {
+
    fn is_addr_banned(&self, addr: &Address) -> Result<bool, Error> {
        let mut stmt = self.db.prepare(
            "SELECT a.banned, n.banned
             FROM addresses AS a
             JOIN nodes AS n ON a.node = n.id
-
             WHERE value = ?1 AND type =?2",
+
             WHERE value = ?1 AND type = ?2",
        )?;
        stmt.bind((1, addr))?;
        stmt.bind((2, AddressType::from(addr)))?;
@@ -133,6 +138,17 @@ impl Store for Database {
        }
    }

+
    fn is_ip_banned(&self, ip: IpAddr) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT banned
+
             FROM ips
+
             WHERE ip = ?1 AND banned > 0",
+
        )?;
+
        stmt.bind((1, ip.to_string().as_str()))?;
+

+
        Ok(stmt.into_iter().next().is_some())
+
    }
+

    fn addresses_of(&self, node: &NodeId) -> Result<Vec<KnownAddress>, Error> {
        let mut addrs = Vec::new();
        let mut stmt = self.db.prepare(
@@ -325,10 +341,26 @@ impl Store for Database {
        })
    }

+
    fn record_ip(&self, nid: &NodeId, ip: IpAddr, time: Timestamp) -> Result<(), Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO ips (ip, node, last_attempt)
+
             VALUES (?1, ?2, ?3)
+
             ON CONFLICT DO UPDATE
+
             SET last_attempt = ?3
+
             WHERE last_attempt < ?3",
+
        )?;
+
        stmt.bind((1, ip.to_string().as_str()))?;
+
        stmt.bind((2, nid))?;
+
        stmt.bind((3, &time))?;
+
        stmt.next()?;
+

+
        Ok(())
+
    }
+

    fn disconnected(
        &mut self,
        nid: &NodeId,
-
        _addr: &Address,
+
        addr: &Address,
        severity: Severity,
    ) -> Result<(), Error> {
        transaction(&self.db, |db| {
@@ -348,7 +380,11 @@ impl Store for Database {
                stmt.bind((1, nid))?;
                stmt.next()?;

-
                let mut stmt = db.prepare("UPDATE `addresses` SET banned = 1 WHERE node = ?1")?;
+
                let mut stmt = db.prepare("UPDATE `addresses` SET banned = 1 WHERE value = ?1")?;
+
                stmt.bind((1, addr))?;
+
                stmt.next()?;
+

+
                let mut stmt = db.prepare("UPDATE `ips` SET banned = 1 WHERE node = ?1")?;
                stmt.bind((1, nid))?;
                stmt.next()?;
            }
@@ -438,6 +474,7 @@ mod test {

    use super::*;
    use crate::test::arbitrary;
+
    use cyphernet::addr::NetAddr;
    use localtime::LocalTime;

    #[test]
@@ -725,8 +762,18 @@ mod test {
    #[test]
    fn test_disconnected_ban() {
        let alice = arbitrary::gen::<NodeId>(1);
+
        let ip1: net::Ipv4Addr = [8, 8, 8, 8].into();
+
        let ip2: net::Ipv4Addr = [9, 9, 9, 9].into();
        let ka1 = arbitrary::gen::<KnownAddress>(1);
+
        let ka1 = KnownAddress {
+
            addr: Address::from(NetAddr::new(ip1.into(), 8776)),
+
            ..ka1
+
        };
        let ka2 = arbitrary::gen::<KnownAddress>(1);
+
        let ka2 = KnownAddress {
+
            addr: Address::from(NetAddr::new(ip2.into(), 8776)),
+
            ..ka2
+
        };
        let mut db = Database::memory().unwrap();
        let features = node::Features::SEED;
        let timestamp = Timestamp::from(LocalTime::now());
@@ -740,6 +787,9 @@ mod test {
            [ka1.clone(), ka2.clone()],
        )
        .unwrap();
+
        db.record_ip(&alice, ip1.into(), timestamp).unwrap();
+
        db.record_ip(&alice, ip2.into(), timestamp).unwrap();
+

        let node = db.get(&alice).unwrap().unwrap();
        assert_eq!(node.penalty, Penalty::default());

@@ -758,9 +808,15 @@ mod test {
        assert!(node.banned);

        for addr in node.addrs {
-
            assert!(addr.banned);
+
            if addr.addr == ka1.addr {
+
                assert!(addr.banned);
+
            } else {
+
                assert!(!addr.banned);
+
            }
        }
-
        assert!(db.is_banned(&ka1.addr).unwrap());
-
        assert!(db.is_banned(&ka2.addr).unwrap()); // Banned because node is banned.
+
        assert!(db.is_addr_banned(&ka1.addr).unwrap());
+
        assert!(db.is_addr_banned(&ka2.addr).unwrap()); // Banned because node is banned.
+
        assert!(db.is_ip_banned(ip1.into()).unwrap());
+
        assert!(db.is_ip_banned(ip2.into()).unwrap());
    }
}
modified radicle/src/node/config.rs
@@ -7,7 +7,7 @@ use localtime::LocalDuration;

use crate::node;
use crate::node::policy::{Policy, Scope};
-
use crate::node::{db, Address, Alias, NodeId};
+
use crate::node::{address, db, Address, Alias, NodeId};

/// Target number of peers to maintain connections to.
pub const TARGET_OUTBOUND_PEERS: usize = 8;
@@ -359,6 +359,15 @@ impl Config {
        self.peer(id).is_some()
    }

+
    /// Check if the given IP address is our proxy.
+
    pub fn is_proxy_ip(&self, ip: net::IpAddr) -> bool {
+
        match self.tor {
+
            None => false,
+
            Some(TorConfig::Proxy { address }) => address.ip() == ip,
+
            Some(TorConfig::Transparent) => address::is_local(&ip),
+
        }
+
    }
+

    /// Are we a relay node? This determines what we do with gossip messages from other peers.
    pub fn is_relay(&self) -> bool {
        match self.relay {
modified radicle/src/node/db.rs
@@ -28,6 +28,7 @@ const MIGRATIONS: &[&str] = &[
    include_str!("db/migrations/1.sql"),
    include_str!("db/migrations/2.sql"),
    include_str!("db/migrations/3.sql"),
+
    include_str!("db/migrations/4.sql"),
];

#[derive(Error, Debug)]
added radicle/src/node/db/migrations/4.sql
@@ -0,0 +1,14 @@
+
-- Peer IP addresses.
+
create table if not exists "ips" (
+
  -- IP address. This *omits* the port.
+
  "ip"            text      primary key not null,
+
  -- Node this address belongs to. Can be `null` if the session is not yet
+
  -- established.
+
  "node"          text      references "nodes" ("id") on delete cascade,
+
  -- When this connection was last attempted by the peer.
+
  "last_attempt"  integer   not null,
+
  -- If this IP is banned. Used as a boolean.
+
  "banned"        integer   default false
+
  --
+
) strict;
+

modified radicle/src/node/db/schema.sql
@@ -15,13 +15,13 @@ create table if not exists "nodes" (
  --
) strict;

-
-- Node addresses.
+
-- Node addresses. These are adresses advertized by a node.
create table if not exists "addresses" (
  -- Node ID.
  "node"               text      not null references "nodes" ("id") on delete cascade,
  -- Address type.
  "type"               text      not null,
-
  -- Address value.
+
  -- Address value, including port.
  "value"              text      not null,
  -- Where we got this address from.
  "source"             text      not null,