Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: Change behavior on conflicting connections
Merged did:key:z6MksFqX...wzpT opened 2 years ago

Instead of dropping the existing connection(s), we drop the same connection on both sides, by using a simple rule that yields the same result whether the node sees the connection as inbound or outbound.

The trick is to use something that both nodes agree on, for instance whos public key is greater than the other, and use that information to close the same connection on both sides.

7 files changed +273 -113 e695d06b 0bcec941
modified radicle-node/src/runtime/handle.rs
@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::{fmt, io, time};

use crossbeam_channel as chan;
-
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
+
use radicle::node::{ConnectOptions, ConnectResult, Link, Seeds};
use radicle::storage::refs::RefsAt;
use reactor::poller::popol::PopolWaker;
use thiserror::Error;
@@ -264,6 +264,11 @@ impl radicle::node::Handle for Handle {
                .iter()
                .map(|(nid, s)| radicle::node::Session {
                    nid: *nid,
+
                    link: if s.link.is_inbound() {
+
                        Link::Inbound
+
                    } else {
+
                        Link::Outbound
+
                    },
                    addr: s.addr.clone(),
                    state: s.state.clone(),
                })
modified radicle-node/src/service.rs
@@ -1148,8 +1148,19 @@ where
        }
    }

-
    pub fn disconnected(&mut self, remote: NodeId, reason: &DisconnectReason) {
+
    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
        let since = self.local_time();
+
        let Some(session) = self.sessions.get_mut(&remote) else {
+
            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
+
            // disconnection event once the transport is dropped.
+
            trace!(target: "service", "Redundant disconnection for {} ({})", remote, reason);
+
            return;
+
        };
+
        // In cases of connection conflicts, there may be disconnections of one of the two
+
        // connections. In that case we don't want the service to remove the session.
+
        if session.link != link {
+
            return;
+
        }

        info!(target: "service", "Disconnected from {} ({})", remote, reason);
        self.emitter.emit(Event::PeerDisconnected {
@@ -1157,13 +1168,6 @@ where
            reason: reason.to_string(),
        });

-
        let Some(session) = self.sessions.get_mut(&remote) else {
-
            if cfg!(debug_assertions) {
-
                panic!("Service::disconnected: unknown session {remote}");
-
            } else {
-
                return;
-
            }
-
        };
        let link = session.link;
        let addr = session.addr.clone();

@@ -1210,7 +1214,9 @@ where
                    }
                }
                DisconnectReason::Session(e) => e.severity(),
-
                DisconnectReason::Command => Severity::Low,
+
                DisconnectReason::Command
+
                | DisconnectReason::Conflict
+
                | DisconnectReason::SelfConnection => Severity::Low,
            };

            if let Err(e) = self
@@ -2307,6 +2313,10 @@ pub enum DisconnectReason {
    Fetch(FetchError),
    /// Session error.
    Session(session::Error),
+
    /// Session conflicts with existing session.
+
    Conflict,
+
    /// Connection to self.
+
    SelfConnection,
    /// User requested disconnect
    Command,
}
@@ -2333,6 +2343,8 @@ impl fmt::Display for DisconnectReason {
            Self::Dial(err) => write!(f, "{err}"),
            Self::Connection(err) => write!(f, "{err}"),
            Self::Command => write!(f, "command"),
+
            Self::SelfConnection => write!(f, "self-connection"),
+
            Self::Conflict => write!(f, "conflict"),
            Self::Session(err) => write!(f, "{err}"),
            Self::Fetch(err) => write!(f, "fetch: {err}"),
        }
modified radicle-node/src/test/simulator.rs
@@ -396,7 +396,8 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            log::error!(target: "sim", "Connection is attempted and connected at the same time");
                        }
                        if attempt || connection {
-
                            p.disconnected(id, &reason);
+
                            p.disconnected(id, Link::Inbound, &reason);
+
                            p.disconnected(id, Link::Outbound, &reason);
                        }
                    }
                    Input::Wake => p.wake(),
modified radicle-node/src/tests.rs
@@ -900,7 +900,11 @@ fn test_refs_announcement_offline() {
        .addresses_mut()
        .remove(&bob.id)
        .unwrap(); // Make sure we don't reconnect automatically.
-
    alice.disconnected(bob.id, &DisconnectReason::Session(session::Error::Timeout));
+
    alice.disconnected(
+
        bob.id,
+
        Link::Outbound,
+
        &DisconnectReason::Session(session::Error::Timeout),
+
    );
    alice.outbox().for_each(drop);
    alice.restart();
    alice.connect_to(&bob);
@@ -1079,7 +1083,7 @@ fn test_persistent_peer_reconnect_attempt() {
    let reason = DisconnectReason::Session(session::Error::Misbehavior);

    for _ in 0..3 {
-
        alice.disconnected(bob.id(), &reason);
+
        alice.disconnected(bob.id(), Link::Outbound, &reason);
        alice.elapse(service::MAX_RECONNECTION_DELTA);
        alice
            .outbox()
@@ -1116,7 +1120,11 @@ fn test_persistent_peer_reconnect_success() {

    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
-
    alice.disconnected(bob.id(), &DisconnectReason::Connection(error));
+
    alice.disconnected(
+
        bob.id(),
+
        Link::Outbound,
+
        &DisconnectReason::Connection(error),
+
    );
    alice.elapse(service::MIN_RECONNECTION_DELTA);
    alice.elapse(service::MIN_RECONNECTION_DELTA); // Trigger a second wakeup to test idempotence.

@@ -1163,7 +1171,7 @@ fn test_maintain_connections() {
    // A non-transient error such as this will cause Alice to attempt a different peer.
    let error = session::Error::Misbehavior;
    for peer in connected.iter() {
-
        alice.disconnected(peer.id(), &DisconnectReason::Session(error));
+
        alice.disconnected(peer.id(), Link::Outbound, &DisconnectReason::Session(error));

        let id = alice
            .outbox()
@@ -1197,7 +1205,11 @@ fn test_maintain_connections_transient() {
    // A transient error such as this will cause Alice to attempt a reconnection.
    let error = Arc::new(io::Error::from(io::ErrorKind::ConnectionReset));
    for peer in connected.iter() {
-
        alice.disconnected(peer.id(), &DisconnectReason::Connection(error.clone()));
+
        alice.disconnected(
+
            peer.id(),
+
            Link::Outbound,
+
            &DisconnectReason::Connection(error.clone()),
+
        );
        alice
            .outbox()
            .find(|o| matches!(o, Io::Connect(id, _) if id == &peer.id()))
@@ -1212,9 +1224,9 @@ fn test_maintain_connections_failed_attempt() {
    let reason =
        DisconnectReason::Connection(Arc::new(io::Error::from(io::ErrorKind::ConnectionReset)));

-
    alice.connect_to(&eve);
    // Make sure Alice knows about Eve.
-
    alice.disconnected(eve.id(), &reason);
+
    alice.connect_to(&eve);
+
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    alice
        .outbox()
        .find(|o| matches!(o, Io::Connect(id, _) if id == &eve.id))
@@ -1222,7 +1234,7 @@ fn test_maintain_connections_failed_attempt() {
    alice.attempted(eve.id, eve.addr());

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
-
    alice.disconnected(eve.id(), &reason);
+
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    assert_matches!(
        alice.outbox().find(|o| matches!(o, Io::Connect(_, _))),
        None
@@ -1236,7 +1248,7 @@ fn test_maintain_connections_failed_attempt() {
        .expect("Alice attempts Eve again");

    // Disconnect Eve and make sure Alice doesn't try to re-connect immediately.
-
    alice.disconnected(eve.id(), &reason);
+
    alice.disconnected(eve.id(), Link::Outbound, &reason);
    assert!(!alice.outbox().any(|o| matches!(o, Io::Connect(_, _))));
    // Or even after some short time..
    alice.elapse(MIN_RECONNECTION_DELTA);
modified radicle-node/src/tests/e2e.rs
@@ -1,7 +1,7 @@
use std::{collections::HashSet, thread, time};

use radicle::crypto::{test::signer::MockSigner, Signer};
-
use radicle::node::{Alias, FetchResult, Handle as _, DEFAULT_TIMEOUT};
+
use radicle::node::{Alias, ConnectResult, FetchResult, Handle as _, DEFAULT_TIMEOUT};
use radicle::storage::{
    ReadRepository, ReadStorage, RefUpdate, RemoteRepository, SignRepository, ValidateRepository,
    WriteRepository, WriteStorage,
@@ -769,10 +769,6 @@ fn test_concurrent_fetches() {
}

#[test]
-
#[ignore = "failing"]
-
#[should_panic]
-
// TODO: This test currently passes but the behavior is wrong. The test should not panic.
-
// We should figure out why we end up with no sessions established.
fn test_connection_crossing() {
    logger::init(log::Level::Debug);

@@ -780,33 +776,61 @@ fn test_connection_crossing() {
    let alice = Node::init(tmp.path(), Config::test(Alias::new("alice")));
    let bob = Node::init(tmp.path(), Config::test(Alias::new("bob")));

-
    let mut alice = alice.spawn();
-
    let mut bob = bob.spawn();
+
    let alice = alice.spawn();
+
    let bob = bob.spawn();
+
    let preferred = alice.id.max(bob.id);

-
    alice
-
        .handle
-
        .connect(bob.id, bob.addr.into(), ConnectOptions::default())
-
        .unwrap();
-
    bob.handle
-
        .connect(alice.id, alice.addr.into(), ConnectOptions::default())
-
        .unwrap();
+
    log::debug!(target: "test", "Preferred peer is {preferred}");
+

+
    let t1 = thread::spawn({
+
        let mut alice = alice.handle.clone();
+

+
        move || {
+
            alice
+
                .connect(bob.id, bob.addr.into(), ConnectOptions::default())
+
                .unwrap()
+
        }
+
    });
+
    let t2 = thread::spawn({
+
        let mut bob = bob.handle.clone();
+
        move || {
+
            bob.connect(alice.id, alice.addr.into(), ConnectOptions::default())
+
                .unwrap()
+
        }
+
    });
+

+
    let r1 = t1.join().unwrap();
+
    let r2 = t2.join().unwrap();
+

+
    // Note that the non-preferred peer will have their outbound connection fail, and this
+
    // could already show up as the result of the call here (but not always).
+
    if preferred == alice.id {
+
        assert_matches!(r1, ConnectResult::Connected);
+
    } else {
+
        assert_matches!(r2, ConnectResult::Connected);
+
    }

    thread::sleep(time::Duration::from_secs(1));

-
    let s1 = alice
-
        .handle
-
        .sessions()
-
        .unwrap()
-
        .iter()
-
        .any(|s| s.nid == bob.id);
-
    let s2 = bob
-
        .handle
-
        .sessions()
-
        .unwrap()
-
        .iter()
-
        .any(|s| s.nid == alice.id);
+
    let alice_s = alice.handle.sessions().unwrap();
+
    let bob_s = bob.handle.sessions().unwrap();

-
    assert!(s1 ^ s2, "Exactly one session should be established");
+
    // Both sessions are established.
+
    let s1 = alice_s.iter().find(|s| s.nid == bob.id).unwrap();
+
    let s2 = bob_s.iter().find(|s| s.nid == alice.id).unwrap();
+

+
    log::debug!(target: "test", "{:?}", alice.handle.sessions());
+
    log::debug!(target: "test", "{:?}", bob.handle.sessions());
+

+
    if preferred == alice.id {
+
        assert_eq!(s1.link, radicle::node::Link::Outbound);
+
        assert_eq!(s2.link, radicle::node::Link::Inbound);
+
    } else {
+
        assert_eq!(s1.link, radicle::node::Link::Inbound);
+
        assert_eq!(s2.link, radicle::node::Link::Outbound);
+
    }
+
    assert_eq!(alice_s.len(), 1);
+
    assert_eq!(bob_s.len(), 1);
}

#[test]
modified radicle-node/src/wire/protocol.rs
@@ -178,6 +178,7 @@ enum Peer {
    /// The peer was scheduled for disconnection. Once the transport is handed over
    /// by the reactor, we can consider it disconnected.
    Disconnecting {
+
        link: Link,
        nid: Option<NodeId>,
        reason: DisconnectReason,
    },
@@ -201,6 +202,13 @@ impl Peer {
        }
    }

+
    fn link(&self) -> Link {
+
        match self {
+
            Peer::Connected { link, .. } => *link,
+
            Peer::Disconnecting { link, .. } => *link,
+
        }
+
    }
+

    /// Connected peer.
    fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
        Self::Connected {
@@ -211,20 +219,6 @@ impl Peer {
            streams: Streams::new(link),
        }
    }
-

-
    /// Switch to disconnecting state.
-
    fn disconnecting(&mut self, reason: DisconnectReason) {
-
        if let Self::Connected { nid, streams, .. } = self {
-
            streams.shutdown();
-

-
            *self = Self::Disconnecting {
-
                nid: Some(*nid),
-
                reason,
-
            };
-
        } else {
-
            panic!("Peer::disconnected: session is not connected ({self:?})");
-
        }
-
    }
}

/// Holds connected peers.
@@ -263,9 +257,9 @@ impl Peers {
            .map(|(fd, peer)| (*fd, peer))
    }

-
    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
+
    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
        self.0.iter().filter_map(|(id, peer)| match peer {
-
            Peer::Connected { nid, .. } => Some((*id, nid)),
+
            Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
            Peer::Disconnecting { .. } => None,
        })
    }
@@ -336,22 +330,44 @@ where
        self.actions.push_back(Action::RegisterListener(socket));
    }

-
    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) {
-
        match self.peers.get_mut(&id) {
-
            Some(Peer::Disconnecting { .. }) => {
-
                log::error!(target: "wire", "Peer with id={id} is already disconnecting");
-
            }
-
            Some(peer) => {
-
                log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
-

-
                peer.disconnecting(reason);
-
                self.actions.push_back(Action::UnregisterTransport(id));
-
            }
-
            None => {
+
    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
+
        match self.peers.entry(id) {
+
            Entry::Vacant(_) => {
                // Connecting peer with no session.
                log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
                self.actions.push_back(Action::UnregisterTransport(id));
+

+
                // Check for attempted outbound connections. Unestablished inbound connections don't
+
                // have an NID yet.
+
                self.outbound
+
                    .values()
+
                    .find(|o| o.id == Some(id))
+
                    .map(|o| (o.nid, Link::Outbound))
            }
+
            Entry::Occupied(mut e) => match e.get_mut() {
+
                Peer::Disconnecting { nid, link, .. } => {
+
                    log::error!(target: "wire", "Peer with id={id} is already disconnecting");
+

+
                    nid.map(|n| (n, *link))
+
                }
+
                Peer::Connected {
+
                    nid, streams, link, ..
+
                } => {
+
                    log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
+
                    let nid = *nid;
+
                    let link = *link;
+

+
                    streams.shutdown();
+
                    e.insert(Peer::Disconnecting {
+
                        nid: Some(nid),
+
                        link,
+
                        reason,
+
                    });
+
                    self.actions.push_back(Action::UnregisterTransport(id));
+

+
                    Some((nid, link))
+
                }
+
            },
        }
    }

@@ -435,8 +451,11 @@ where
            log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
        } else if let Some(outbound) = self.outbound.remove(&fd) {
            log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
-
            self.service
-
                .disconnected(outbound.nid, &DisconnectReason::connection());
+
            self.service.disconnected(
+
                outbound.nid,
+
                Link::Outbound,
+
                &DisconnectReason::connection(),
+
            );
        } else {
            log::warn!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
        }
@@ -542,36 +561,10 @@ where
                // Make sure we don't try to connect to ourselves by mistake.
                if &nid == self.signer.public_key() {
                    log::error!(target: "wire", "Self-connection detected, disconnecting..");
+
                    self.disconnect(id, DisconnectReason::SelfConnection);

-
                    self.disconnect(
-
                        id,
-
                        DisconnectReason::Dial(Arc::new(io::Error::from(
-
                            io::ErrorKind::AlreadyExists,
-
                        ))),
-
                    );
                    return;
                }
-
                log::debug!(target: "wire", "Session established with {nid} (id={id}) (fd={fd})");
-

-
                let conflicting = self
-
                    .peers
-
                    .active()
-
                    .filter(|(other, d)| **d == nid && *other != id)
-
                    .map(|(id, _)| id)
-
                    .collect::<Vec<_>>();
-

-
                for id in conflicting {
-
                    log::warn!(
-
                        target: "wire", "Closing conflicting session with {nid} (id={id})"
-
                    );
-
                    self.disconnect(
-
                        id,
-
                        DisconnectReason::Dial(Arc::new(io::Error::from(
-
                            io::ErrorKind::AlreadyExists,
-
                        ))),
-
                    );
-
                }
-

                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
                    (peer.addr, Link::Inbound)
                } else if let Some(peer) = self.outbound.remove(&fd) {
@@ -581,9 +574,97 @@ where
                    log::error!(target: "wire", "Session for {nid} (id={id}) not found");
                    return;
                };
-
                self.peers
-
                    .insert(id, Peer::connected(nid, addr.clone(), link));
-
                self.service.connected(nid, addr.into(), link);
+
                log::debug!(
+
                    target: "wire",
+
                    "Session established with {nid} (id={id}) (fd={fd}) ({})",
+
                    if link.is_inbound() { "inbound" } else { "outbound" }
+
                );
+

+
                // Connections to close.
+
                let mut disconnect = Vec::new();
+

+
                // Handle conflicting connections.
+
                // This is typical when nodes have mutually configured their nodes to connect to
+
                // each other on startup. We handle this by deterministically choosing one node
+
                // whos outbound connection is the one that is kept. The other connections are
+
                // dropped.
+
                {
+
                    // Whether we have precedence in case of conflicting connections.
+
                    // Having precedence means that our outbound connection will win over
+
                    // the other node's outbound connection.
+
                    let precedence = *self.signer.public_key() > nid;
+

+
                    // Pre-existing connections that conflict with this newly established session.
+
                    // Note that we can't know whether a connection is conflicting before we get the
+
                    // remote static key.
+
                    let mut conflicting = Vec::new();
+

+
                    // Active sessions with the same NID but a different Resource ID are conflicting.
+
                    conflicting.extend(
+
                        self.peers
+
                            .active()
+
                            .filter(|(c_id, d, _)| **d == nid && *c_id != id)
+
                            .map(|(c_id, _, link)| (c_id, link)),
+
                    );
+

+
                    // Outbound connection attempts with the same remote key but a different file
+
                    // descriptor are conflicting.
+
                    conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
+
                        if other.nid == nid && *c_fd != fd {
+
                            other.id.map(|c_id| (c_id, Link::Outbound))
+
                        } else {
+
                            None
+
                        }
+
                    }));
+

+
                    for (c_id, c_link) in conflicting {
+
                        // If we have precedence, the inbound connection is closed.
+
                        // In the case where both connections are inbound or outbound,
+
                        // we close the newer connection, ie. the one with the higher
+
                        // resource id.
+
                        let close = match (link, c_link) {
+
                            (Link::Inbound, Link::Outbound) => {
+
                                if precedence {
+
                                    id
+
                                } else {
+
                                    c_id
+
                                }
+
                            }
+
                            (Link::Outbound, Link::Inbound) => {
+
                                if precedence {
+
                                    c_id
+
                                } else {
+
                                    id
+
                                }
+
                            }
+
                            (Link::Inbound, Link::Inbound) => id.max(c_id),
+
                            (Link::Outbound, Link::Outbound) => id.max(c_id),
+
                        };
+

+
                        log::warn!(
+
                            target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
+
                        );
+
                        disconnect.push(close);
+
                    }
+
                }
+
                for id in &disconnect {
+
                    log::warn!(
+
                        target: "wire", "Closing conflicting session (id={id}) with {nid}.."
+
                    );
+
                    // Disconnect and return the associated NID of the peer, if available.
+
                    if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
+
                        // We disconnect the session eagerly because otherwise we will get the new
+
                        // `connected` event before the `disconnect`, resulting in a duplicate
+
                        // connection.
+
                        self.service
+
                            .disconnected(nid, link, &DisconnectReason::Conflict);
+
                    }
+
                }
+
                if !disconnect.contains(&id) {
+
                    self.peers
+
                        .insert(id, Peer::connected(nid, addr.clone(), link));
+
                    self.service.connected(nid, addr.into(), link);
+
                }
            }
            SessionEvent::Data(data) => {
                if let Some(Peer::Connected {
@@ -726,8 +807,11 @@ where
                        }

                        if let Some(id) = peer.id() {
-
                            self.service
-
                                .disconnected(*id, &DisconnectReason::connection());
+
                            self.service.disconnected(
+
                                *id,
+
                                peer.link(),
+
                                &DisconnectReason::connection(),
+
                            );
                        } else {
                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
                        }
@@ -748,7 +832,9 @@ where
        match self.peers.entry(id) {
            Entry::Occupied(e) => {
                match e.get() {
-
                    Peer::Disconnecting { nid, reason, .. } => {
+
                    Peer::Disconnecting {
+
                        nid, reason, link, ..
+
                    } => {
                        log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");

                        // Disconnect TCP stream.
@@ -756,7 +842,13 @@ where

                        // If there is no NID, the service is not aware of the peer.
                        if let Some(nid) = nid {
-
                            self.service.disconnected(*nid, reason);
+
                            // In the case of a conflicting connection, there will be two resources
+
                            // for the peer. However, at the service level, there is only one, and
+
                            // it is identified by NID.
+
                            //
+
                            // Therefore, we specify which of the connections we're closing by
+
                            // passing the `link`.
+
                            self.service.disconnected(*nid, *link, reason);
                        }
                        e.remove();
                    }
@@ -847,8 +939,11 @@ where
                        Err(err) => {
                            log::error!(target: "wire", "Error establishing connection to {addr}: {err}");

-
                            self.service
-
                                .disconnected(node_id, &DisconnectReason::Dial(Arc::new(err)));
+
                            self.service.disconnected(
+
                                node_id,
+
                                Link::Outbound,
+
                                &DisconnectReason::Dial(Arc::new(err)),
+
                            );
                        }
                    }
                }
modified radicle/src/node.rs
@@ -506,10 +506,21 @@ impl Command {
    }
}

+
/// Connection link direction.
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub enum Link {
+
    /// Outgoing connection.
+
    Outbound,
+
    /// Incoming connection.
+
    Inbound,
+
}
+

/// An established network connection with a peer.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
    pub nid: NodeId,
+
    pub link: Link,
    pub addr: Address,
    pub state: State,
}