Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Only process messages from known nodes
cloudhead committed 2 years ago
commit ea4104dbd7315f6bd22a28454bb2e946b9f76759
parent 0ae3c00cd718bd087a7d714a688e6588edb8bbdf
9 files changed +213 -30
modified radicle-node/src/service.rs
@@ -872,6 +872,12 @@ where
        } else {
            debug!(target: "service", "Dropping peer {remote}..");

+
            if let Err(e) =
+
                self.addresses
+
                    .disconnected(&remote, &session.addr, reason.is_transient())
+
            {
+
                error!(target: "service", "Error updating address store: {e}");
+
            }
            self.sessions.remove(&remote);
            // Only re-attempt outbound connections, since we don't care if an inbound connection
            // is dropped.
@@ -925,6 +931,29 @@ where
            return Err(session::Error::InvalidTimestamp(timestamp));
        }

+
        // We don't process announcements from nodes we don't know, since the node announcement is
+
        // what provides DoS protection.
+
        //
+
        // Note that it's possible to *not* receive the node announcement, but receive the
+
        // subsequent announcements of a node in the case of historical gossip messages requested
+
        // from the `subscribe` message. This can happen if the cut-off time is after the node
+
        // announcement timestamp, but before the other announcements. In that case, we simply
+
        // ignore all announcements of that node until we get a node announcement.
+
        if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
+
            match self.addresses.get(announcer) {
+
                Ok(node) => {
+
                    if node.is_none() {
+
                        debug!(target: "service", "Ignoring announcement from unknown node {announcer}");
+
                        return Ok(false);
+
                    }
+
                }
+
                Err(e) => {
+
                    error!(target: "service", "Error looking up node in address book: {e}");
+
                    return Ok(false);
+
                }
+
            }
+
        }
+

        // Discard announcement messages we've already seen, otherwise update out last seen time.
        match self.gossip.announced(announcer, announcement) {
            Ok(fresh) => {
@@ -1286,6 +1315,8 @@ where
            }
        };

+
        debug!(target: "service", "Subscribing to messages since timestamp {since}..");
+

        vec![
            Message::node(self.node.clone(), &self.signer),
            Message::inventory(gossip::inventory(now.as_millis(), inventory), &self.signer),
@@ -1570,6 +1601,7 @@ where
                // Nb. we don't want to connect to any peers that already have a session with us,
                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
                entries
+
                    .filter(|(_, ka)| !ka.banned)
                    .filter(|(nid, _)| !self.sessions.contains_key(nid))
                    .filter(|(nid, _)| nid != &self.node_id())
                    .fold(HashMap::new(), |mut acc, (nid, addr)| {
@@ -1648,6 +1680,8 @@ where
                kas.into_iter()
                    .find(|ka| match (ka.last_success, ka.last_attempt) {
                        // If we succeeded the last time we tried, this is a good address.
+
                        // TODO: This will always be hit after a success, and never re-attempted after
+
                        //       the first failed attempt.
                        (Some(success), attempt) => success >= attempt.unwrap_or_default(),
                        // If we haven't succeeded yet, and we waited long enough, we can try this address.
                        (None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
modified radicle-node/src/service/gossip/store.rs
@@ -124,7 +124,7 @@ impl GossipStore {
            "SELECT node, type, message, signature, timestamp
             FROM announcements
             WHERE timestamp >= ?1 and timestamp < ?2
-
             ORDER BY timestamp",
+
             ORDER BY timestamp, node, type",
        )?;
        assert!(from <= to);

modified radicle-node/src/service/message.rs
@@ -271,7 +271,7 @@ impl From<RefsAnnouncement> for AnnouncementMessage {
impl fmt::Debug for AnnouncementMessage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
-
            Self::Node { .. } => write!(f, "Node(..)"),
+
            Self::Node(message) => write!(f, "Node({})", message.timestamp),
            Self::Inventory(message) => {
                write!(
                    f,
modified radicle-node/src/test/gossip.rs
@@ -1,9 +1,11 @@
use radicle::crypto::test::signer::MockSigner;
+
use radicle::node;
+
use radicle::test::fixtures::gen;

use crate::test::arbitrary;
use crate::{
    prelude::{LocalDuration, LocalTime, Message},
-
    service::message::InventoryAnnouncement,
+
    service::message::{InventoryAnnouncement, NodeAnnouncement},
};

pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Message> {
@@ -24,13 +26,25 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa
            }
        };

+
        msgs.push(Message::node(
+
            NodeAnnouncement {
+
                features: node::Features::SEED,
+
                timestamp: time.as_millis(),
+
                alias: node::Alias::new(gen::string(5)),
+
                addresses: None.into(),
+
                nonce: 0,
+
            }
+
            .solve(0)
+
            .unwrap(),
+
            &signer,
+
        ));
        msgs.push(Message::inventory(
            InventoryAnnouncement {
                inventory: arbitrary::vec(3).try_into().unwrap(),
                timestamp: time.as_millis(),
            },
            &signer,
-
        ))
+
        ));
    }
    msgs
}
modified radicle-node/src/test/peer.rs
@@ -31,6 +31,7 @@ use crate::storage::Inventory;
use crate::storage::{Namespaces, RemoteId, WriteStorage};
use crate::test::storage::MockStorage;
use crate::test::{arbitrary, fixtures, simulator};
+
use crate::wire::MessageType;
use crate::Link;
use crate::{LocalDuration, LocalTime};

@@ -324,6 +325,8 @@ where
        self.initialize();
        self.service
            .connected(remote_id, peer.address(), Link::Inbound);
+
        self.service
+
            .received_message(remote_id, peer.node_announcement());

        let mut msgs = self.messages(remote_id);
        msgs.find(|m| {
@@ -359,6 +362,8 @@ where
        self.service.attempted(remote_id, remote_addr.clone());
        self.service
            .connected(remote_id, remote_addr, Link::Outbound);
+
        self.service
+
            .received_message(remote_id, peer.node_announcement());

        let mut msgs = self.messages(remote_id);
        msgs.find(|m| {
@@ -378,7 +383,7 @@ where
        self.service.wake();
    }

-
    /// Drain outgoing messages sent from this peer to the remote address.
+
    /// Drain outgoing messages sent from this peer to the remote peer.
    pub fn messages(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
        let mut msgs = Vec::new();

@@ -393,6 +398,55 @@ where
        msgs.into_iter()
    }

+
    /// Drain outgoing *relayed* announcements to the remote peer. This doesn't include messages
+
    /// originating from our own node.
+
    pub fn relayed(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
+
        let mut filtered: Vec<Message> = Vec::new();
+
        let nid = *self.nid();
+

+
        for o in self.service.outbox().queue() {
+
            match o {
+
                Io::Write(a, messages) if *a == remote => {
+
                    let (relayed, other): (Vec<Message>, _) =
+
                        messages.iter().cloned().partition(|m| {
+
                            matches!(
+
                                m,
+
                                Message::Announcement(Announcement { node, .. })
+
                                if *node != nid
+
                            )
+
                        });
+
                    *messages = other;
+
                    filtered.extend(relayed);
+
                }
+
                _ => {}
+
            }
+
        }
+

+
        filtered.into_iter()
+
    }
+

+
    /// Drain outgoing inventories sent from this peer to the remote peer.
+
    pub fn inventory_announcements(&mut self, remote: NodeId) -> impl Iterator<Item = Message> {
+
        let mut invs: Vec<Message> = Vec::new();
+

+
        for o in self.service.outbox().queue() {
+
            match o {
+
                Io::Write(a, messages) if *a == remote => {
+
                    let (inventories, other): (Vec<Message>, _) =
+
                        messages.iter().cloned().partition(|m| {
+
                            MessageType::try_from(m.type_id())
+
                                == Ok(MessageType::InventoryAnnouncement)
+
                        });
+
                    *messages = other;
+
                    invs.extend(inventories);
+
                }
+
                _ => {}
+
            }
+
        }
+

+
        invs.into_iter()
+
    }
+

    /// Get a stream of the peer's emitted events.
    pub fn events(&mut self) -> Events {
        self.service.events()
modified radicle-node/src/tests.rs
@@ -360,6 +360,9 @@ fn test_inventory_pruning() {
        // Tell Alice about the amazing projects available
        alice.connect_to(&bob);
        for num_projs in test.peer_projects {
+
            let peer = Peer::new("other", [9, 9, 9, 9]);
+

+
            alice.receive(bob.id(), peer.node_announcement());
            alice.receive(
                bob.id(),
                Message::inventory(
@@ -367,7 +370,7 @@ fn test_inventory_pruning() {
                        inventory: test::arbitrary::vec::<Id>(num_projs).try_into().unwrap(),
                        timestamp: bob.local_time().as_millis(),
                    },
-
                    &MockSigner::default(),
+
                    peer.signer(),
                ),
            );
        }
@@ -437,13 +440,16 @@ fn test_announcement_rebroadcast() {
    let eve = Peer::new("eve", [9, 9, 9, 9]);

    alice.connect_to(&bob);
+
    alice.connect_from(&eve);
+
    alice.outbox().for_each(drop);
+

+
    log::debug!(target: "test", "Receiving gossips..");

    let received = test::gossip::messages(6, alice.local_time(), MAX_TIME_DELTA);
    for msg in received.iter().cloned() {
        alice.receive(bob.id(), msg);
    }

-
    alice.connect_from(&eve);
    alice.receive(
        eve.id(),
        Message::Subscribe(Subscribe {
@@ -454,8 +460,12 @@ fn test_announcement_rebroadcast() {
    );

    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();
-
    let received = received.into_iter().collect::<BTreeSet<_>>();
+
    let received = received
+
        .into_iter()
+
        .chain(Some(bob.node_announcement()))
+
        .collect::<BTreeSet<_>>();

+
    assert_eq!(relayed.len(), received.len());
    assert_eq!(relayed, received);
}

@@ -468,6 +478,7 @@ fn test_announcement_rebroadcast_duplicates() {
    let rids = arbitrary::set::<Id>(3..=3);

    alice.connect_to(&bob);
+
    alice.receive(bob.id, carol.node_announcement());

    // These are not expected to be relayed.
    let stale = {
@@ -489,6 +500,7 @@ fn test_announcement_rebroadcast_duplicates() {
        carol.elapse(LocalDuration::from_mins(1));
        anns.insert(carol.inventory_announcement());
        anns.insert(carol.node_announcement());
+
        anns.insert(bob.node_announcement());

        for rid in rids {
            alice.track_repo(&rid, tracking::Scope::All).unwrap();
@@ -519,7 +531,7 @@ fn test_announcement_rebroadcast_duplicates() {

    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();

-
    assert_eq!(relayed.len(), 8);
+
    assert_eq!(relayed.len(), 9);
    assert_eq!(relayed, expected);
}

@@ -557,8 +569,11 @@ fn test_announcement_rebroadcast_timestamp_filtered() {
        }),
    );

-
    let relayed = alice.messages(eve.id()).collect::<BTreeSet<_>>();
-
    let second = second.into_iter().collect::<BTreeSet<_>>();
+
    let relayed = alice.relayed(eve.id()).collect::<BTreeSet<_>>();
+
    let second = second
+
        .into_iter()
+
        .chain(Some(bob.node_announcement()))
+
        .collect::<BTreeSet<_>>();

    assert_eq!(relayed.len(), second.len());
    assert_eq!(relayed, second);
@@ -849,7 +864,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(eve.id()).next(),
+
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
@@ -858,7 +873,7 @@ fn test_inventory_relay() {
        if node == bob.node_id() && timestamp == now
    );
    assert_matches!(
-
        alice.messages(bob.id()).next(),
+
        alice.inventory_announcements(bob.id()).next(),
        None,
        "The inventory is not sent back to Bob"
    );
@@ -874,7 +889,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(eve.id()).next(),
+
        alice.inventory_announcements(eve.id()).next(),
        None,
        "Sending the same inventory again doesn't trigger a relay"
    );
@@ -890,7 +905,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(eve.id()).next(),
+
        alice.inventory_announcements(eve.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
@@ -912,7 +927,7 @@ fn test_inventory_relay() {
        ),
    );
    assert_matches!(
-
        alice.messages(bob.id()).next(),
+
        alice.inventory_announcements(bob.id()).next(),
        Some(Message::Announcement(Announcement {
            node,
            message: AnnouncementMessage::Inventory(InventoryAnnouncement { timestamp, .. }),
@@ -1051,10 +1066,12 @@ fn test_maintain_connections() {
    // We now import the other addresses.
    alice.import_addresses(&unconnected);

-
    // A transient error such as this will cause Alice to attempt a reconnection.
+
    // A non-transient error such as this will cause Alice to attempt a different peer.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    for peer in connected.iter() {
-
        alice.disconnected(peer.id(), &DisconnectReason::Connection(error.clone()));
+
        let reason = DisconnectReason::Dial(error.clone());
+
        assert!(!reason.is_transient());
+
        alice.disconnected(peer.id(), &reason);

        let id = alice
            .outbox()
@@ -1063,7 +1080,7 @@ fn test_maintain_connections() {
                _ => None,
            })
            .expect("Alice connects to a new peer");
-
        assert!(id != peer.id());
+
        assert_ne!(id, peer.id());
        unconnected.retain(|p| p.id() != id);
    }
    assert!(
@@ -1073,15 +1090,41 @@ fn test_maintain_connections() {
}

#[test]
+
fn test_maintain_connections_transient() {
+
    // Peers alice starts out connected to.
+
    let connected = vec![
+
        Peer::new("connected", [8, 8, 8, 1]),
+
        Peer::new("connected", [8, 8, 8, 2]),
+
        Peer::new("connected", [8, 8, 8, 3]),
+
    ];
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+

+
    for peer in connected.iter() {
+
        alice.connect_to(peer);
+
    }
+
    // A transient error such as this will cause Alice to attempt a reconnection.
+
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
+
    for peer in connected.iter() {
+
        alice.disconnected(peer.id(), &DisconnectReason::Connection(error.clone()));
+
        alice
+
            .outbox()
+
            .find(|o| matches!(o, Io::Connect(id, _) if id == &peer.id()))
+
            .unwrap();
+
    }
+
}
+

+
#[test]
fn test_maintain_connections_failed_attempt() {
-
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
    let reason =
+
        DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)));

-
    alice.connect_to(&bob);
+
    assert!(reason.is_transient());
+

+
    alice.connect_to(&eve);
    // Make sure Alice knows about Eve.
-
    alice.receive(bob.id, eve.node_announcement());
-
    alice.disconnected(bob.id(), &DisconnectReason::Command);
+
    alice.disconnected(eve.id(), &reason);
    alice
        .outbox()
        .find(|o| matches!(o, Io::Connect(id, _) if id == &eve.id))
@@ -1089,8 +1132,11 @@ fn test_maintain_connections_failed_attempt() {
    alice.attempted(eve.id, eve.addr());

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
-
    alice.disconnected(eve.id(), &DisconnectReason::Command);
-
    assert!(!alice.outbox().any(|o| matches!(o, Io::Connect(_, _))));
+
    alice.disconnected(eve.id(), &reason);
+
    assert_matches!(
+
        alice.outbox().find(|o| matches!(o, Io::Connect(_, _))),
+
        None
+
    );

    // Now pass some time and try again.
    alice.elapse(MAX_RECONNECTION_DELTA);
@@ -1100,7 +1146,7 @@ fn test_maintain_connections_failed_attempt() {
        .expect("Alice attempts Eve again");

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
-
    alice.disconnected(eve.id(), &DisconnectReason::Command);
+
    alice.disconnected(eve.id(), &reason);
    assert!(!alice.outbox().any(|o| matches!(o, Io::Connect(_, _))));
    // Or even after some short time..
    alice.elapse(MIN_RECONNECTION_DELTA);
modified radicle/src/node/address/schema.sql
@@ -11,7 +11,9 @@ create table if not exists "nodes" (
  --- Node announcement proof-of-work.
  "pow"                integer   default 0,
  -- Node announcement timestamp.
-
  "timestamp"          integer   not null
+
  "timestamp"          integer   not null,
+
  -- If this node is banned. Used as a boolean.
+
  "banned"             integer   default false
  --
) strict;

@@ -30,6 +32,8 @@ create table if not exists "addresses" (
  "last_attempt"       integer   default null,
  -- Local time at which we successfully connected to this node.
  "last_success"       integer   default null,
+
  -- If this address is banned from use. Used as a boolean.
+
  "banned"             integer   default false,
  -- Nb. This constraint allows more than one node to share the same address.
  -- This is useful in circumstances when a node wants to rotate its key, but
  -- remain reachable at the same address. The old entry will eventually be
modified radicle/src/node/address/store.rs
@@ -115,7 +115,7 @@ impl Store for Book {
    fn addresses(&self, node: &NodeId) -> Result<Vec<KnownAddress>, Error> {
        let mut addrs = Vec::new();
        let mut stmt = self.db.prepare(
-
            "SELECT type, value, source, last_attempt, last_success FROM addresses WHERE node = ?",
+
            "SELECT type, value, source, last_attempt, last_success, banned FROM addresses WHERE node = ?",
        )?;
        stmt.bind((1, node))?;

@@ -130,12 +130,14 @@ impl Store for Book {
            let last_success = row
                .read::<Option<i64>, _>("last_success")
                .map(|t| LocalTime::from_millis(t as u128));
+
            let banned = row.read::<i64, _>("banned").is_positive();

            addrs.push(KnownAddress {
                addr,
                source,
                last_success,
                last_attempt,
+
                banned,
            });
        }
        Ok(addrs)
@@ -267,7 +269,7 @@ impl Store for Book {
    fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error> {
        let mut stmt = self
            .db
-
            .prepare("SELECT node, type, value, source, last_success, last_attempt FROM addresses ORDER BY node")?
+
            .prepare("SELECT node, type, value, source, last_success, last_attempt, banned FROM addresses ORDER BY node")?
            .into_iter();
        let mut entries = Vec::new();

@@ -280,6 +282,7 @@ impl Store for Book {
            let last_attempt = row.read::<Option<i64>, _>("last_attempt");
            let last_success = last_success.map(|t| LocalTime::from_millis(t as u128));
            let last_attempt = last_attempt.map(|t| LocalTime::from_millis(t as u128));
+
            let banned = row.read::<i64, _>("banned").is_positive();

            entries.push((
                node,
@@ -288,6 +291,7 @@ impl Store for Book {
                    source,
                    last_success,
                    last_attempt,
+
                    banned,
                },
            ));
        }
@@ -329,6 +333,23 @@ impl Store for Book {

        Ok(())
    }
+

+
    fn disconnected(&mut self, nid: &NodeId, addr: &Address, transient: bool) -> Result<(), Error> {
+
        // Ban address if not a transient failure.
+
        if !transient {
+
            let mut stmt = self.db.prepare(
+
                "UPDATE `addresses`
+
                 SET banned = 1
+
                 WHERE node = ?1 AND type = ?2 AND value = ?3",
+
            )?;
+

+
            stmt.bind((1, nid))?;
+
            stmt.bind((2, AddressType::from(addr)))?;
+
            stmt.bind((3, addr))?;
+
            stmt.next()?;
+
        }
+
        Ok(())
+
    }
}

impl AliasStore for Book {
@@ -361,7 +382,7 @@ pub trait Store {
        timestamp: Timestamp,
        addrs: impl IntoIterator<Item = KnownAddress>,
    ) -> Result<bool, Error>;
-
    /// Remove an address from the store.
+
    /// Remove a node from the store.
    fn remove(&mut self, id: &NodeId) -> Result<bool, Error>;
    /// Mark a repo as synced on the given node.
    fn synced(
@@ -388,6 +409,8 @@ pub trait Store {
    fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
    /// Mark a node as successfully connected at a certain time.
    fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
+
    /// Mark a node as disconnected.
+
    fn disconnected(&mut self, nid: &NodeId, addr: &Address, transient: bool) -> Result<(), Error>;
}

impl TryFrom<&sql::Value> for Source {
@@ -519,6 +542,7 @@ mod test {
            source: Source::Peer,
            last_success: None,
            last_attempt: None,
+
            banned: false,
        };
        let inserted = cache
            .insert(
@@ -554,6 +578,7 @@ mod test {
            source: Source::Peer,
            last_success: None,
            last_attempt: None,
+
            banned: false,
        };
        let inserted = cache
            .insert(&alice, features, alias.clone(), 0, timestamp, [ka.clone()])
@@ -581,6 +606,7 @@ mod test {
            source: Source::Peer,
            last_success: None,
            last_attempt: None,
+
            banned: false,
        };

        let updated = cache
@@ -641,6 +667,7 @@ mod test {
                source: Source::Peer,
                last_success: None,
                last_attempt: None,
+
                banned: false,
            };
            cache
                .insert(
@@ -686,6 +713,7 @@ mod test {
                // TODO: Test times as well.
                last_success: None,
                last_attempt: None,
+
                banned: false,
            };
            expected.push((id, ka.clone()));
            cache
modified radicle/src/node/address/types.rs
@@ -140,6 +140,8 @@ pub struct KnownAddress {
    pub last_success: Option<LocalTime>,
    /// Last time this address was tried.
    pub last_attempt: Option<LocalTime>,
+
    /// Whether this address has been banned.
+
    pub banned: bool,
}

impl KnownAddress {
@@ -150,6 +152,7 @@ impl KnownAddress {
            source,
            last_success: None,
            last_attempt: None,
+
            banned: false,
        }
    }
}