Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Always send node announcement on handshake
Alexis Sellier committed 2 years ago
commit ff7338de17e2ebf7325d3b01ad4a526bcd152a02
parent e86e061a121495fb6318b8f5b87c2600421c0930
9 files changed +182 -86
modified radicle-node/src/runtime.rs
@@ -4,7 +4,7 @@ use std::io::{BufRead, BufReader};
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-
use std::{io, net, thread, time};
+
use std::{fs, io, net, thread, time};

use crossbeam_channel as chan;
use cyphernet::Ecdh;
@@ -16,16 +16,17 @@ use thiserror::Error;
use radicle::git;
use radicle::node::address;
use radicle::node::Handle as _;
-
use radicle::node::{ADDRESS_DB_FILE, ROUTING_DB_FILE, TRACKING_DB_FILE};
+
use radicle::node::{ADDRESS_DB_FILE, NODE_ANNOUNCEMENT_FILE, ROUTING_DB_FILE, TRACKING_DB_FILE};
use radicle::profile::Home;
use radicle::Storage;

use crate::control;
use crate::crypto::Signer;
use crate::node::{routing, NodeId};
+
use crate::service::message::NodeAnnouncement;
use crate::service::{tracking, Event};
-
use crate::wire;
use crate::wire::Wire;
+
use crate::wire::{self, Decode};
use crate::worker;
use crate::{service, LocalTime};

@@ -147,6 +148,31 @@ impl Runtime {

        log::info!(target: "node", "Default tracking policy set to '{}'", &config.policy);
        log::info!(target: "node", "Initializing service ({:?})..", network);
+

+
        let announcement = if let Some(ann) = fs::read(&node_dir.join(NODE_ANNOUNCEMENT_FILE))
+
            .ok()
+
            .and_then(|ann| NodeAnnouncement::decode(&mut ann.as_slice()).ok())
+
            .and_then(|ann| {
+
                if config.matches(&ann) {
+
                    Some(ann)
+
                } else {
+
                    None
+
                }
+
            }) {
+
            log::info!(
+
                target: "node",
+
                "Loaded existing node announcement from file (timestamp={}, work={})",
+
                ann.timestamp,
+
                ann.work(),
+
            );
+
            ann
+
        } else {
+
            config
+
                .node(clock.as_secs())
+
                .solve(Default::default())
+
                .expect("Runtime::init: unable to solve proof-of-work puzzle")
+
        };
+

        let emitter: Emitter<Event> = Default::default();
        let service = service::Service::new(
            config,
@@ -157,6 +183,7 @@ impl Runtime {
            tracking,
            signer.clone(),
            rng,
+
            announcement,
            emitter.clone(),
        );

modified radicle-node/src/service.rs
@@ -12,7 +12,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
-
use std::{fmt, net, str};
+
use std::{fmt, net};

use crossbeam_channel as chan;
use fastrand::Rng;
@@ -26,7 +26,6 @@ use crate::crypto;
use crate::crypto::{Signer, Verified};
use crate::identity::IdentityError;
use crate::identity::{Doc, Id};
-
use crate::node;
use crate::node::routing;
use crate::node::routing::InsertResult;
use crate::node::{Address, Features, FetchResult, Seed, Seeds};
@@ -197,6 +196,8 @@ pub struct Service<R, A, S, G> {
    clock: LocalTime,
    /// I/O outbox.
    outbox: Outbox,
+
    /// Cached local node announcement.
+
    node: NodeAnnouncement,
    /// Source of entropy.
    rng: Rng,
    /// Fetch requests initiated by user, which are waiting for results.
@@ -248,6 +249,7 @@ where
        tracking: tracking::Config,
        signer: G,
        rng: Rng,
+
        node: NodeAnnouncement,
        emitter: Emitter<Event>,
    ) -> Self {
        let sessions = Sessions::new(rng.clone());
@@ -259,6 +261,7 @@ where
            tracking,
            signer,
            rng,
+
            node,
            clock,
            routing,
            gossip: Gossip::default(),
@@ -397,6 +400,21 @@ where
                }
            }
        }
+
        // Ensure that our local node is in our address database.
+
        self.addresses
+
            .insert(
+
                &self.node_id(),
+
                self.node.features,
+
                self.node.alias().unwrap_or_default(),
+
                self.node.work(),
+
                self.node.timestamp,
+
                self.node
+
                    .addresses
+
                    .iter()
+
                    .map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
+
            )
+
            .expect("Service::initialize: error adding local node to address database");
+

        // Setup subscription filter for tracked repos.
        self.filter = Filter::new(
            self.tracking
@@ -968,7 +986,6 @@ where
            AnnouncementMessage::Node(
                ann @ NodeAnnouncement {
                    features,
-
                    alias,
                    addresses,
                    ..
                },
@@ -980,12 +997,7 @@ where
                    return Ok(false);
                }

-
                if !ann.validate() {
-
                    warn!(target: "service", "Dropping node announcement from {announcer}: invalid proof-of-work");
-
                    return Ok(false);
-
                }
-

-
                let alias = match str::from_utf8(alias) {
+
                let alias = match ann.alias() {
                    Ok(s) => s,
                    Err(e) => {
                        warn!(target: "service", "Dropping node announcement from {announcer}: invalid alias: {e}");
@@ -1003,6 +1015,7 @@ where
                    announcer,
                    *features,
                    alias,
+
                    ann.work(),
                    timestamp,
                    addresses
                        .iter()
@@ -1152,11 +1165,11 @@ where
        // much bandwidth.

        gossip::handshake(
+
            self.node.clone(),
            self.clock.as_millis(),
            &self.storage,
            &self.signer,
            filter,
-
            &self.config,
        )
    }

@@ -1287,6 +1300,10 @@ where
            warn!(target: "service", "Attempted connection to peer {nid} which already has a session");
            return false;
        }
+
        if nid == self.node_id() {
+
            error!(target: "service", "Attempted connection to self");
+
            return false;
+
        }
        let persistent = self.config.is_persistent(&nid);

        if let Err(e) = self.addresses.attempted(&nid, &addr, self.time()) {
@@ -1426,6 +1443,7 @@ where
                // even if it's in a disconnected state. Those sessions are re-attempted automatically.
                entries
                    .filter(|(nid, _)| !self.sessions.contains_key(nid))
+
                    .filter(|(nid, _)| nid != &self.node_id())
                    .take(wanted)
                    .collect()
            }
@@ -1796,11 +1814,11 @@ mod gossip {
    }

    pub fn handshake<G: Signer, S: ReadStorage>(
+
        node: NodeAnnouncement,
        now: Timestamp,
        storage: &S,
        signer: &G,
        filter: Filter,
-
        config: &Config,
    ) -> Vec<Message> {
        let inventory = match storage.inventory() {
            Ok(i) => i,
@@ -1812,44 +1830,15 @@ mod gossip {
            }
        };

-
        let mut msgs = vec![
+
        vec![
+
            Message::node(node, signer),
            Message::inventory(gossip::inventory(now, inventory), signer),
            Message::subscribe(
                filter,
                now - SUBSCRIBE_BACKLOG_DELTA.as_millis() as u64,
                Timestamp::MAX,
            ),
-
        ];
-
        if let Some(m) = gossip::node(now, config) {
-
            msgs.push(Message::node(m, signer));
-
        };
-

-
        msgs
-
    }
-

-
    pub fn node(timestamp: Timestamp, config: &Config) -> Option<NodeAnnouncement> {
-
        let features = node::Features::SEED;
-
        let alias = config.alias();
-
        let addresses: BoundedVec<_, ADDRESS_LIMIT> = config
-
            .external_addresses
-
            .clone()
-
            .try_into()
-
            .expect("external addresses are within the limit");
-

-
        if addresses.is_empty() {
-
            return None;
-
        }
-

-
        Some(
-
            NodeAnnouncement {
-
                features,
-
                timestamp,
-
                alias,
-
                addresses,
-
                nonce: 0,
-
            }
-
            .solve(),
-
        )
+
        ]
    }

    pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement {
modified radicle-node/src/service/config.rs
@@ -1,7 +1,10 @@
use localtime::LocalDuration;

+
use radicle::node;
use radicle::node::Address;

+
use crate::bounded::BoundedVec;
+
use crate::service::message::{NodeAnnouncement, ADDRESS_LIMIT};
use crate::service::tracking::{Policy, Scope};
use crate::service::NodeId;

@@ -88,13 +91,43 @@ impl Config {
        self.connect.iter().any(|(i, _)| i == id)
    }

+
    pub fn features(&self) -> node::Features {
+
        node::Features::SEED
+
    }
+

+
    /// Check if a node announcement matches this configuration.
+
    pub fn matches(&self, other: &NodeAnnouncement) -> bool {
+
        let ann = self.node(other.timestamp);
+

+
        ann.features == other.features
+
            && ann.alias == other.alias
+
            && ann.addresses == other.addresses
+
    }
+

    pub fn alias(&self) -> [u8; 32] {
        let mut alias = [0u8; 32];

        if let Some(name) = &self.alias {
            alias[..name.len()].copy_from_slice(name.as_bytes());
        }
-

        alias
    }
+

+
    pub fn node(&self, timestamp: node::Timestamp) -> NodeAnnouncement {
+
        let features = self.features();
+
        let alias = self.alias();
+
        let addresses: BoundedVec<_, ADDRESS_LIMIT> = self
+
            .external_addresses
+
            .clone()
+
            .try_into()
+
            .expect("external addresses are within the limit");
+

+
        NodeAnnouncement {
+
            features,
+
            timestamp,
+
            alias,
+
            addresses,
+
            nonce: 0,
+
        }
+
    }
}
modified radicle-node/src/service/message.rs
@@ -1,4 +1,4 @@
-
use std::{fmt, io, mem};
+
use std::{fmt, io, mem, str};

use crate::crypto;
use crate::crypto::Unverified;
@@ -66,17 +66,22 @@ pub struct NodeAnnouncement {
}

impl NodeAnnouncement {
-
    /// Validate a node announcement message.
+
    /// Calculate the amount of work that went into creating this announcement.
    ///
-
    /// Checks that the proof-of-work is valid, by generating a single byte that
-
    /// must be zero.
+
    /// Proof-of-work uses the [`scrypt`] algorithm with the parameters in
+
    /// [`Announcement::POW_PARAMS`]. The "work" is calculated by counting the number of leading
+
    /// zero bits after running `scrypt` on a serialized [`NodeAnnouncement`] using
+
    /// [`wire::serialize`].
    ///
-
    /// `scrypt(encode(announcement)) == 0`
+
    /// In other words, `work = leading-zeros(scrypt(serialize(announcement)))`.
    ///
-
    pub fn validate(&self) -> bool {
+
    /// Higher numbers mean higher difficulty. For each increase in work, difficulty is doubled.
+
    /// For instance, an output of `7` is *four* times more work than an output of `5`.
+
    ///
+
    pub fn work(&self) -> u32 {
        let (n, r, p) = Announcement::POW_PARAMS;
        let params = scrypt::Params::new(n, r, p).expect("proof-of-work parameters are valid");
-
        let mut output = [0; 1];
+
        let mut output = vec![0; 32];

        scrypt::scrypt(
            wire::serialize(self).as_ref(),
@@ -86,26 +91,37 @@ impl NodeAnnouncement {
        )
        .expect("proof-of-work output vector is a valid length");

-
        output == [0]
+
        // Calculate the number of leading zero bits in the output vector.
+
        if let Some((zero_bytes, non_zero)) = output.iter().enumerate().find(|(_, &x)| x != 0) {
+
            zero_bytes as u32 * 8 + non_zero.leading_zeros()
+
        } else {
+
            output.len() as u32 * 8
+
        }
    }

-
    /// Solve the proof-of-work of a node announcement by iterating through different nonces.
-
    pub fn solve(mut self) -> Self {
+
    /// Solve the proof-of-work of a node announcement for the given target, by iterating through
+
    /// different nonces.
+
    ///
+
    /// If the given difficulty target is too high, there may not be a result. In that case, `None`
+
    /// is returned.
+
    pub fn solve(mut self, target: u32) -> Option<Self> {
        loop {
            if let Some(nonce) = self.nonce.checked_add(1) {
                self.nonce = nonce;

-
                if self.validate() {
+
                if self.work() >= target {
                    break;
                }
            } else {
-
                // If a very high difficulty is chosen, it's possible to iterate through all
-
                // possible values of the nonce without solving the puzzle. However, with "normal"
-
                // values, this is virtually impossible.
-
                panic!("could not solve proof-of-work!");
+
                return None;
            }
        }
-
        self
+
        Some(self)
+
    }
+

+
    /// Get the alias as a UTF-8 string.
+
    pub fn alias(&self) -> Result<&str, std::str::Utf8Error> {
+
        str::from_utf8(&self.alias)
    }
}

@@ -307,9 +323,9 @@ impl Announcement {
    ///
    /// `15, 8, 1` are usually the recommended parameters.
    ///
-
    #[cfg(test)]
+
    #[cfg(debug_assertions)]
    pub const POW_PARAMS: (u8, u32, u32) = (1, 1, 1);
-
    #[cfg(not(test))]
+
    #[cfg(not(debug_assertions))]
    pub const POW_PARAMS: (u8, u32, u32) = (15, 8, 1);
    /// Salt used for generating PoW.
    pub const POW_SALT: &[u8] = &[b'r', b'a', b'd'];
@@ -608,7 +624,9 @@ mod tests {
            nonce: 0,
        };

-
        assert!(!ann.validate());
-
        assert!(ann.solve().validate());
+
        assert_eq!(ann.work(), 0);
+
        assert_eq!(ann.clone().solve(1).unwrap().work(), 4);
+
        assert_eq!(ann.clone().solve(8).unwrap().work(), 9);
+
        assert_eq!(ann.solve(14).unwrap().work(), 16);
    }
}
modified radicle-node/src/test/peer.rs
@@ -162,6 +162,7 @@ where
        // Make sure the peer address is advertized.
        config.config.external_addresses.push(local_addr.into());

+
        let announcement = config.config.node(config.local_time.as_secs());
        let emitter: Emitter<Event> = Default::default();
        let service = Service::new(
            config.config,
@@ -172,6 +173,7 @@ where
            tracking,
            config.signer,
            config.rng.clone(),
+
            announcement,
            emitter,
        );

@@ -213,6 +215,7 @@ where
                    &peer.node_id(),
                    radicle::node::Features::default(),
                    peer.name,
+
                    0,
                    timestamp,
                    Some(known_address),
                )
@@ -266,7 +269,8 @@ where
                addresses: Some(net::SocketAddr::from((self.ip, node::DEFAULT_PORT)).into()).into(),
                nonce: 0,
            }
-
            .solve(),
+
            .solve(0)
+
            .unwrap(),
            self.signer(),
        )
    }
modified radicle/src/node.rs
@@ -37,6 +37,12 @@ pub const ROUTING_DB_FILE: &str = "routing.db";
pub const ADDRESS_DB_FILE: &str = "addresses.db";
/// Filename of tracking table database under the node directory.
pub const TRACKING_DB_FILE: &str = "tracking.db";
+
/// Filename of last node announcement, when running in debug mode.
+
#[cfg(debug_assertions)]
+
pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire.debug";
+
/// Filename of last node announcement.
+
#[cfg(not(debug_assertions))]
+
pub const NODE_ANNOUNCEMENT_FILE: &str = "announcement.wire";

/// Milliseconds since epoch.
pub type Timestamp = u64;
modified radicle/src/node/address/schema.sql
@@ -8,6 +8,8 @@ create table if not exists "nodes" (
  "features"           integer   not null,
  -- Node alias.
  "alias"              text      default null,
+
  --- Node announcement proof-of-work.
+
  "pow"                integer   default 0,
  -- Node announcement timestamp.
  "timestamp"          integer   not null
  --
modified radicle/src/node/address/store.rs
@@ -71,7 +71,7 @@ impl Store for Book {
    fn get(&self, node: &NodeId) -> Result<Option<types::Node>, Error> {
        let mut stmt = self
            .db
-
            .prepare("SELECT features, alias, timestamp FROM nodes WHERE id = ?")?;
+
            .prepare("SELECT features, alias, pow, timestamp FROM nodes WHERE id = ?")?;

        stmt.bind((1, node))?;

@@ -79,6 +79,7 @@ impl Store for Book {
            let features = row.read::<node::Features, _>("features");
            let alias = row.read::<&str, _>("alias").to_owned();
            let timestamp = row.read::<i64, _>("timestamp") as Timestamp;
+
            let pow = row.read::<i64, _>("pow") as u32;
            let mut addrs = Vec::new();

            let mut stmt = self
@@ -103,6 +104,7 @@ impl Store for Book {
            Ok(Some(types::Node {
                features,
                alias,
+
                pow,
                timestamp,
                addrs,
            }))
@@ -129,22 +131,24 @@ impl Store for Book {
        node: &NodeId,
        features: node::Features,
        alias: &str,
+
        pow: u32,
        timestamp: Timestamp,
        addrs: impl IntoIterator<Item = KnownAddress>,
    ) -> Result<bool, Error> {
        transaction(&self.db, move |db| {
            let mut stmt = db.prepare(
-
                "INSERT INTO nodes (id, features, alias, timestamp)
-
                 VALUES (?1, ?2, ?3, ?4)
+
                "INSERT INTO nodes (id, features, alias, pow, timestamp)
+
                 VALUES (?1, ?2, ?3, ?4, ?5)
                 ON CONFLICT DO UPDATE
-
                 SET features = ?2, alias = ?3, timestamp = ?4
-
                 WHERE timestamp < ?4",
+
                 SET features = ?2, alias = ?3, pow = ?4, timestamp = ?5
+
                 WHERE timestamp < ?5",
            )?;

            stmt.bind((1, node))?;
            stmt.bind((2, features))?;
            stmt.bind((3, alias))?;
-
            stmt.bind((4, timestamp as i64))?;
+
            stmt.bind((4, pow as i64))?;
+
            stmt.bind((5, timestamp as i64))?;
            stmt.next()?;

            for addr in addrs {
@@ -275,6 +279,7 @@ pub trait Store {
        node: &NodeId,
        features: node::Features,
        alias: &str,
+
        pow: u32,
        timestamp: Timestamp,
        addrs: impl IntoIterator<Item = KnownAddress>,
    ) -> Result<bool, Error>;
@@ -405,13 +410,14 @@ mod test {
            last_attempt: None,
        };
        let inserted = cache
-
            .insert(&alice, features, "alice", timestamp, [ka.clone()])
+
            .insert(&alice, features, "alice", 16, timestamp, [ka.clone()])
            .unwrap();
        assert!(inserted);

        let node = cache.get(&alice).unwrap().unwrap();

        assert_eq!(node.features, features);
+
        assert_eq!(node.pow, 16);
        assert_eq!(node.timestamp, timestamp);
        assert_eq!(node.alias.as_str(), "alice");
        assert_eq!(node.addrs, vec![ka]);
@@ -431,12 +437,12 @@ mod test {
            last_attempt: None,
        };
        let inserted = cache
-
            .insert(&alice, features, "alice", timestamp, [ka.clone()])
+
            .insert(&alice, features, "alice", 0, timestamp, [ka.clone()])
            .unwrap();
        assert!(inserted);

        let inserted = cache
-
            .insert(&alice, features, "alice", timestamp, [ka])
+
            .insert(&alice, features, "alice", 0, timestamp, [ka])
            .unwrap();
        assert!(!inserted);

@@ -457,31 +463,39 @@ mod test {
        };

        let updated = cache
-
            .insert(&alice, features, "alice", timestamp, [ka.clone()])
+
            .insert(&alice, features, "alice", 0, timestamp, [ka.clone()])
            .unwrap();
        assert!(updated);

        let updated = cache
-
            .insert(&alice, features, "~alice~", timestamp, [])
+
            .insert(&alice, features, "~alice~", 0, timestamp, [])
            .unwrap();
        assert!(!updated, "Can't update using the same timestamp");

        let updated = cache
-
            .insert(&alice, features, "~alice~", timestamp - 1, [])
+
            .insert(&alice, features, "~alice~", 0, timestamp - 1, [])
            .unwrap();
-
        assert!(!updated, "Can't update using  a smaller timestamp");
+
        assert!(!updated, "Can't update using a smaller timestamp");

        let node = cache.get(&alice).unwrap().unwrap();
        assert_eq!(node.alias, "alice");
        assert_eq!(node.timestamp, timestamp);
+
        assert_eq!(node.pow, 0);

        let updated = cache
-
            .insert(&alice, features, "~alice~", timestamp + 1, [])
+
            .insert(&alice, features, "~alice~", 0, timestamp + 1, [])
            .unwrap();
        assert!(updated, "Can update with a larger timestamp");

        let updated = cache
-
            .insert(&alice, node::Features::NONE, "~alice~", timestamp + 2, [])
+
            .insert(
+
                &alice,
+
                node::Features::NONE,
+
                "~alice~",
+
                1,
+
                timestamp + 2,
+
                [],
+
            )
            .unwrap();
        assert!(updated);

@@ -489,6 +503,7 @@ mod test {
        assert_eq!(node.features, node::Features::NONE);
        assert_eq!(node.alias, "~alice~");
        assert_eq!(node.timestamp, timestamp + 2);
+
        assert_eq!(node.pow, 1);
        assert_eq!(node.addrs, vec![ka]);
    }

@@ -512,10 +527,10 @@ mod test {
                last_attempt: None,
            };
            cache
-
                .insert(&alice, features, "alice", timestamp, [ka.clone()])
+
                .insert(&alice, features, "alice", 0, timestamp, [ka.clone()])
                .unwrap();
            cache
-
                .insert(&bob, features, "bob", timestamp, [ka])
+
                .insert(&bob, features, "bob", 0, timestamp, [ka])
                .unwrap();
        }
        assert_eq!(cache.len().unwrap(), 6);
@@ -550,7 +565,7 @@ mod test {
            };
            expected.push((id, ka.clone()));
            cache
-
                .insert(&id, features, "alias", timestamp, [ka])
+
                .insert(&id, features, "alias", 0, timestamp, [ka])
                .unwrap();
        }

modified radicle/src/node/address/types.rs
@@ -83,6 +83,8 @@ pub struct Node {
    pub features: node::Features,
    /// Advertized addresses
    pub addrs: Vec<KnownAddress>,
+
    /// Proof-of-work included in node announcement.
+
    pub pow: u32,
    /// When this data was published.
    pub timestamp: Timestamp,
}