Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: keep a healthy connection to the network
Slack Coder committed 3 years ago
commit f3d867af4d4192fab694404f9c4ba9f0e2fe1cb4
parent ff045e0813f912ef2b1177b2998e74902c6c1467
2 files changed +105 -6
modified radicle-node/src/service.rs
@@ -945,14 +945,66 @@ where
        }
    }

-
    fn maintain_connections(&mut self) {
-
        // TODO: Connect to all potential seeds.
-
        if self.sessions.len() < TARGET_OUTBOUND_PEERS {
-
            let delta = TARGET_OUTBOUND_PEERS - self.sessions.len();
+
    fn next_connections(
+
        rng: &Rng,
+
        sessions: &Sessions,
+
        address_pool: &mut dyn Iterator<Item = (NodeId, address::KnownAddress)>,
+
    ) -> Vec<Address> {
+
        let mut initializing: Vec<Address> = Vec::new();
+
        let mut negotiated: HashMap<NodeId, &Session> = HashMap::new();
+
        for (_, s) in sessions.iter() {
+
            if s.link != Link::Outbound {
+
                continue;
+
            }
+
            match s.state {
+
                session::State::Initial => {
+
                    initializing.push(s.addr.into());
+
                }
+
                session::State::Negotiated { .. } => {
+
                    let node_id = s
+
                        .node_id()
+
                        .expect("negotiated sessions must have a node ID");
+
                    negotiated.insert(node_id, s);
+
                }
+
                _ => continue,
+
            };
+
        }
+

+
        let wanted = TARGET_OUTBOUND_PEERS
+
            .saturating_sub(initializing.len())
+
            .saturating_sub(negotiated.len());
+
        if wanted == 0 {
+
            return Vec::new();
+
        }

-
            for _ in 0..delta {
-
                // TODO: Connect to random peer.
+
        // All nodes are considered equal
+
        let mut address_pool: Vec<_> = address_pool
+
            .filter(|(node_id, s)| {
+
                !initializing.contains(&s.addr) && !negotiated.contains_key(node_id)
+
            })
+
            .take(wanted)
+
            .collect();
+

+
        let mut next = Vec::new();
+
        loop {
+
            if address_pool.is_empty() {
+
                break;
            }
+
            let i = rng.usize(0..address_pool.len());
+
            let (_, addr) = address_pool.swap_remove(i);
+
            next.push(addr.addr);
+
        }
+
        next
+
    }
+

+
    fn maintain_connections(&mut self) {
+
        let mut address_pool = self
+
            .addresses
+
            .entries()
+
            .expect("address store be accessible");
+
        let next = Self::next_connections(&self.rng, &self.sessions, &mut address_pool);
+
        for addr in next {
+
            self.reactor.connect(addr.clone());
        }
    }
}
modified radicle-node/src/tests.rs
@@ -649,6 +649,53 @@ fn test_persistent_peer_reconnect() {
}

#[test]
+
fn test_maintain_connections() {
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
+

+
    let connected = vec![
+
        Peer::new("connected 1", [8, 8, 8, 1], MockStorage::empty()),
+
        Peer::new("connected 2", [8, 8, 8, 2], MockStorage::empty()),
+
        Peer::new("connected 3", [8, 8, 8, 3], MockStorage::empty()),
+
    ];
+
    let mut unconnected = vec![
+
        Peer::new("new 1", [9, 9, 9, 1], MockStorage::empty()),
+
        Peer::new("new 2", [9, 9, 9, 2], MockStorage::empty()),
+
        Peer::new("new 3", [9, 9, 9, 3], MockStorage::empty()),
+
    ];
+

+
    for peer in connected.iter() {
+
        alice.connect_to(peer);
+
    }
+
    assert_eq!(
+
        connected.len(),
+
        alice.sessions().len(),
+
        "alice should be connected to all peers"
+
    );
+

+
    for peer in unconnected.iter() {
+
        alice.receive(&connected[0].addr(), peer.node_announcement());
+
    }
+

+
    for peer in connected.iter() {
+
        alice.disconnected(
+
            &peer.addr(),
+
            &nakamoto::DisconnectReason::Protocol(DisconnectReason::User),
+
        );
+

+
        let addr = alice
+
            .outbox()
+
            .find_map(|o| match o {
+
                Io::Connect(addr) => Some(addr),
+
                _ => None,
+
            })
+
            .expect("Alice connects to a new peer");
+
        assert!(addr != peer.addr());
+
        unconnected.retain(|p| p.addr() != addr);
+
    }
+
    assert!(unconnected.is_empty());
+
}
+

+
#[test]
fn test_push_and_pull() {
    let tempdir = tempfile::tempdir().unwrap();