Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol/service: wire up connections management
Fintan Halpenny committed 4 months ago
commit 55efb8e35cff12105905952d98134af4d81dd3ef
parent f5c62118a839dea8c4d249fb5cfd95e3a6b6ce61
5 files changed +406 -711
modified crates/radicle-node/src/runtime/handle.rs
@@ -293,8 +293,8 @@ impl radicle::node::Handle for Handle {
        let query: Arc<QueryState> = Arc::new(move |state| {
            let sessions = state
                .sessions()
-
                .values()
-
                .map(radicle::node::Session::from)
+
                .iter()
+
                .map(|(_, s)| radicle::node::Session::from(s))
                .collect();
            sender.send(sessions).ok();

@@ -312,7 +312,10 @@ impl radicle::node::Handle for Handle {
    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
+
            let session = state
+
                .sessions()
+
                .get_session(&nid)
+
                .map(radicle::node::Session::from);
            sender.send(session).ok();

            Ok(())
modified crates/radicle-node/src/tests.rs
@@ -23,6 +23,8 @@ use radicle::storage::RefUpdate;
use radicle::test::arbitrary::gen;
use radicle::test::storage::MockRepository;
use radicle_protocol::bounded::BoundedVec;
+
use radicle_protocol::connections::config::KEEP_ALIVE_DELTA;
+
use radicle_protocol::connections::config::STALE_CONNECTION_TIMEOUT;

use crate::collections::{RandomMap, RandomSet};
use crate::identity::RepoId;
@@ -129,7 +131,7 @@ fn test_disconnecting_unresponsive_peer() {
    let bob = Peer::new("bob", [9, 9, 9, 9]);

    alice.connect_to(&bob);
-
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");
+
    assert_eq!(1, alice.sessions().connected().len(), "bob connects");
    alice.elapse(STALE_CONNECTION_TIMEOUT + LocalDuration::from_secs(1));
    alice
        .outbox()
@@ -173,7 +175,7 @@ fn test_connection_kept_alive() {
        ConnectOptions::default(),
    ));
    sim.run_while([&mut alice, &mut bob], |s| !s.is_settled());
-
    assert_eq!(1, alice.sessions().connected().count(), "bob connects");
+
    assert_eq!(1, alice.sessions().connected().len(), "bob connects");

    let mut elapsed: LocalDuration = LocalDuration::from_secs(0);
    let step: LocalDuration = STALE_CONNECTION_TIMEOUT / 10;
@@ -185,8 +187,16 @@ fn test_connection_kept_alive() {
        elapsed = elapsed + step;
    }

-
    assert_eq!(1, alice.sessions().len(), "alice remains connected to Bob");
-
    assert_eq!(1, bob.sessions().len(), "bob remains connected to Alice");
+
    assert_eq!(
+
        1,
+
        alice.sessions().connected().len(),
+
        "alice remains connected to Bob"
+
    );
+
    assert_eq!(
+
        1,
+
        bob.sessions().connected().len(),
+
        "bob remains connected to Alice"
+
    );
}

#[test]
@@ -202,7 +212,8 @@ fn test_outbound_connection() {
        .service
        .sessions()
        .connected()
-
        .map(|(id, _)| *id)
+
        .node_ids()
+
        .copied()
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.id()));
@@ -222,7 +233,8 @@ fn test_inbound_connection() {
        .service
        .sessions()
        .connected()
-
        .map(|(id, _)| *id)
+
        .node_ids()
+
        .copied()
        .collect::<Vec<_>>();

    assert!(peers.contains(&eve.id()));
@@ -1215,7 +1227,8 @@ fn test_persistent_peer_reconnect_attempt() {
    let ips = alice
        .sessions()
        .connected()
-
        .map(|(id, _)| *id)
+
        .node_ids()
+
        .copied()
        .collect::<Vec<_>>();
    assert!(ips.contains(&bob.id()));
    assert!(ips.contains(&eve.id()));
@@ -1304,7 +1317,7 @@ fn test_maintain_connections() {
    }
    assert_eq!(
        connected.len(),
-
        alice.sessions().len(),
+
        alice.sessions().connected().len(),
        "alice should be connected to the first set of peers"
    );
    // We now import the other addresses.
@@ -2016,19 +2029,19 @@ fn test_announcement_message_amplification() {
        });

        // Ensure nodes are all connected, otherwise skip this test run.
-
        if alice.sessions().connected().count() != 4 {
+
        if alice.sessions().connected().len() != 4 {
            continue;
        }
-
        if bob.sessions().connected().count() != 4 {
+
        if bob.sessions().connected().len() != 4 {
            continue;
        }
-
        if eve.sessions().connected().count() != 4 {
+
        if eve.sessions().connected().len() != 4 {
            continue;
        }
-
        if zod.sessions().connected().count() != 4 {
+
        if zod.sessions().connected().len() != 4 {
            continue;
        }
-
        if tom.sessions().connected().count() != 4 {
+
        if tom.sessions().connected().len() != 4 {
            continue;
        }

modified crates/radicle-protocol/src/service.rs
@@ -9,10 +9,8 @@ pub mod limiter;
pub mod message;
pub mod session;

-
use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::net::IpAddr;
-
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, net, time};

@@ -26,18 +24,19 @@ use radicle::identity::Doc;
use radicle::node;
use radicle::node::address;
use radicle::node::address::Store as _;
-
use radicle::node::address::{AddressBook, AddressType, KnownAddress};
+
use radicle::node::address::{AddressType, KnownAddress};
use radicle::node::config::{PeerConfig, RateLimit};
use radicle::node::device::Device;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
-
use radicle::node::{ConnectOptions, Penalty, Severity};
+
use radicle::node::{ConnectOptions, Penalty};
use radicle::storage::refs::SIGREFS_BRANCH;
use radicle::storage::RepositoryError;
use radicle_fetch::policy::SeedingPolicy;

+
use crate::connections;
use crate::fetcher;
use crate::fetcher::service::FetcherService;
use crate::fetcher::FetcherState;
@@ -50,9 +49,7 @@ use radicle::identity::RepoId;
use radicle::node::events::Emitter;
use radicle::node::routing;
use radicle::node::routing::InsertResult;
-
use radicle::node::{
-
    Address, Alias, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt,
-
};
+
use radicle::node::{Address, Alias, Features, FetchResult, Seed, Seeds, SyncStatus, SyncedAt};
use radicle::prelude::*;
use radicle::storage;
use radicle::storage::{refs::RefsAt, Namespaces, ReadStorage};
@@ -65,7 +62,6 @@ use radicle::node::PROTOCOL_VERSION;
use crate::bounded::BoundedVec;
use crate::service::filter::Filter;
pub use crate::service::message::{Message, ZeroBytes};
-
pub use crate::service::session::{QueuedFetch, Session};
use crate::worker::FetchError;
use radicle::node::events::{Event, Events};
use radicle::node::{Config, NodeId};
@@ -77,8 +73,6 @@ use self::limiter::RateLimiter;
use self::message::InventoryAnnouncement;
use self::policy::NamespacesError;

-
/// How often to run the "idle" task.
-
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
/// How often to run the "gossip" task.
pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
/// How often to run the "announce" task.
@@ -87,10 +81,6 @@ pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
/// How often to run the "prune" task.
pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
-
/// Duration to wait on an unresponsive peer before dropping its connection.
-
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
-
/// How much time should pass after a peer was last active for a *ping* to be sent.
-
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
/// Maximum number of latency values to keep for a session.
pub const MAX_LATENCIES: usize = 16;
/// Maximum time difference between the local time, and an announcement timestamp.
@@ -111,8 +101,6 @@ pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
/// How long to wait for a fetch to stall before aborting, default is 3s.
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(3);
-
/// Target number of peers to maintain connections to.
-
pub const TARGET_OUTBOUND_PEERS: usize = 8;

/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
@@ -398,7 +386,7 @@ pub struct Service<D, S, G> {
    /// Policy configuration.
    policies: policy::Config<Write>,
    /// Peer sessions, currently or recently connected.
-
    sessions: Sessions,
+
    connections: connections::state::Connections,
    /// Clock. Tells the time.
    clock: LocalTime,
    /// Who relayed what announcement to us. We keep track of this to ensure that
@@ -475,11 +463,13 @@ where
        node: NodeAnnouncement,
        emitter: Emitter<Event>,
    ) -> Self {
-
        let sessions = Sessions::new(rng.clone());
        let limiter = RateLimiter::new(config.peers());
        let last_timestamp = node.timestamp;
        let clock = LocalTime::default(); // Updated on initialize.
        let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
+

+
        // TODO(finto): `Connections` and `Fetcher` should be configured outside
+
        // of `Service::new` so that they can be setup in a fallible environment
        let fetcher = {
            let config = fetcher::Config::new()
                .with_max_concurrency(
@@ -489,6 +479,24 @@ where
                .with_max_capacity(fetcher::MaxQueueSize::default());
            FetcherService::new(config)
        };
+
        let connections = {
+
            use connections::config::{Durations, Inbound, Outbound};
+
            let durations = Durations::default();
+
            let inbound = Inbound::from(RateLimit::from(config.limits.rate.inbound));
+
            let outbound = Outbound {
+
                rate_limit: config.limits.rate.outbound.into(),
+
                target: connections::config::TARGET_OUTBOUND_PEERS,
+
            };
+
            let connections_config = connections::Config {
+
                durations,
+
                inbound,
+
                outbound,
+
            };
+
            connections::state::Connections::new(
+
                connections_config,
+
                RateLimiter::new(config.peers()),
+
            )
+
        };
        Self {
            config,
            storage,
@@ -501,7 +509,7 @@ where
            db,
            outbox: Outbox::default(),
            limiter,
-
            sessions,
+
            connections,
            fetcher,
            filter: Filter::empty(),
            relayed_by: HashMap::default(),
@@ -740,12 +748,16 @@ where
        // Try to establish some connections.
        self.maintain_connections();
        // Start periodic tasks.
-
        self.outbox.wakeup(IDLE_INTERVAL);
+
        self.outbox.wakeup(self.idle_interval());
        self.outbox.wakeup(GOSSIP_INTERVAL);

        Ok(())
    }

+
    pub fn idle_interval(&self) -> LocalDuration {
+
        self.connections.config().idle()
+
    }
+

    pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
        trace!(
            target: "service",
@@ -775,7 +787,7 @@ where
            now - self.started_at.expect("Service::wake: service must be initialized")
        );

-
        if now - self.last_idle >= IDLE_INTERVAL {
+
        if now - self.last_idle >= self.idle_interval() {
            trace!(target: "service", "Running 'idle' task...");

            self.keep_alive(&now);
@@ -783,7 +795,7 @@ where
            self.idle_connections();
            self.maintain_connections();
            self.dequeue_fetches();
-
            self.outbox.wakeup(IDLE_INTERVAL);
+
            self.outbox.wakeup(self.idle_interval());
            self.last_idle = now;
        }
        if now - self.last_gossip >= GOSSIP_INTERVAL {
@@ -839,6 +851,8 @@ where
        match cmd {
            Command::Connect(nid, addr, opts) => {
                if opts.persistent {
+
                    // TODO(finto): I think these should live in the `Connections`
+
                    // as the persisted peers.
                    self.config.connect.insert((nid, addr.clone()).into());
                }
                if let Err(e) = self.connect(nid, addr) {
@@ -892,7 +906,7 @@ where
                // Let all our peers know that we're interested in this repo from now on.
                self.outbox.broadcast(
                    Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
-
                    self.sessions.connected().map(|(_, s)| s),
+
                    self.connections.sessions().connected().sessions(),
                );
            }
            Command::Unseed(id, resp) => {
@@ -994,18 +1008,12 @@ where
    ) {
        let session = {
            let reason = format!("peer {from} is not connected; cannot initiate fetch");
-
            let Some(session) = self.sessions.get_mut(&from) else {
+
            let Some(session) = self.connections.get_connected(&from) else {
                if let Some(c) = channel {
                    c.send(FetchResult::Failed { reason }).ok();
                }
                return;
            };
-
            if !session.is_connected() {
-
                if let Some(c) = channel {
-
                    c.send(FetchResult::Failed { reason }).ok();
-
                }
-
                return;
-
            }
            session
        };

@@ -1166,25 +1174,21 @@ where
    /// 1. The RID was already being fetched.
    /// 2. The session was already at fetch capacity.
    pub fn dequeue_fetches(&mut self) {
-
        let sessions = self
-
            .sessions
-
            .shuffled()
-
            .map(|(k, _)| *k)
+
        let mut connected = self
+
            .sessions()
+
            .connected()
+
            .node_ids()
+
            .copied()
            .collect::<Vec<_>>();
+
        self.rng.shuffle(&mut connected);

-
        for nid in sessions {
-
            #[allow(clippy::unwrap_used)]
-
            let sess = self.sessions.get_mut(&nid).unwrap();
-
            if !sess.is_connected() {
-
                continue;
-
            }
-

+
        for node in connected {
            let Some(fetcher::QueuedFetch {
                rid,
                from,
                refs_at,
                timeout,
-
            }) = self.fetcher.dequeue(&nid)
+
            }) = self.fetcher.dequeue(&node)
            else {
                continue;
            };
@@ -1214,15 +1218,9 @@ where

    /// Inbound connection attempt.
    pub fn accepted(&mut self, ip: IpAddr) -> bool {
-
        // Always accept localhost connections, even if we already reached
-
        // our inbound connection limit.
-
        if ip.is_loopback() || ip.is_unspecified() {
-
            return true;
-
        }
-
        // Check for inbound connection limit.
-
        if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
-
            return false;
-
        }
+
        use connections::state::command;
+
        use connections::state::event;
+

        match self.db.addresses().is_ip_banned(ip) {
            Ok(banned) => {
                if banned {
@@ -1230,26 +1228,38 @@ where
                    return false;
                }
            }
-
            Err(e) => error!(target: "service", "Error querying ban status for {ip}: {e}"),
+
            Err(e) => {
+
                error!(target: "service", "Error querying ban status for {ip}: {e}");
+
                return false;
+
            }
        }
-
        let host: HostName = ip.into();
-
        let tokens = self.config.limits.rate.inbound;
-

-
        if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
-
            trace!(target: "service", "Rate limiting inbound connection from {host}..");
-
            return false;
+
        match self.connections.accept(command::Accept { ip }, self.clock) {
+
            event::Accept::LimitExceeded {
+
                ip: _,
+
                current_inbound: _,
+
            } => false,
+
            event::Accept::HostLimited { ip } => {
+
                trace!(target: "service", "Rate limiting inbound connection from {ip}..");
+
                false
+
            }
+
            // Always accept localhost connections, even if we already reached
+
            // our inbound connection limit.
+
            event::Accept::LocalHost { ip: _ } => true,
+
            event::Accept::Accepted { ip: _ } => true,
        }
-
        true
    }

    pub fn attempted(&mut self, nid: NodeId, addr: Address) {
-
        debug!(target: "service", "Attempted connection to {nid} ({addr})");
-

-
        if let Some(sess) = self.sessions.get_mut(&nid) {
-
            sess.to_attempted();
-
        } else {
-
            #[cfg(debug_assertions)]
-
            panic!("Service::attempted: unknown session {nid}@{addr}");
+
        use connections::state::command;
+
        use connections::state::event;
+
        match self.connections.attempted(command::Attempt { node: nid }) {
+
            event::Attempted::ConnectionAttempt { session } => {
+
                debug!(target: "service", "Attempted connection nid={nid} addr={addr} link={} persistent={}", session.link(), session.persistent());
+
            }
+
            event::Attempted::MissingSession { node: _ } => {
+
                #[cfg(debug_assertions)]
+
                panic!("Service::attempted: unknown session {nid}@{addr}");
+
            }
        }
    }

@@ -1260,88 +1270,99 @@ where
    }

    pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
-
        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
-
        self.emitter.emit(Event::PeerConnected { nid: remote });
+
        use connections::state::command;
+
        use connections::state::event;

        let msgs = self.initial(link);
-

-
        if link.is_outbound() {
-
            if let Some(peer) = self.sessions.get_mut(&remote) {
-
                peer.to_connected(self.clock);
-
                self.outbox.write_all(peer, msgs);
+
        let connection_type = self.connection_type(&remote);
+
        let command = match link {
+
            Link::Outbound => command::Connected::Outbound {
+
                node: remote,
+
                addr: addr.clone(),
+
                connection_type,
+
            },
+
            Link::Inbound => command::Connected::Inbound {
+
                node: remote,
+
                addr: addr.clone(),
+
                connection_type,
+
            },
+
        };
+
        match self.connections.connected(command, self.clock) {
+
            event::Connected::Established { session: _ } => {
+
                info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
+
                self.emitter.emit(Event::PeerConnected { nid: remote });
+
                self.outbox.write_all(remote, msgs);
            }
-
        } else {
-
            match self.sessions.entry(remote) {
-
                Entry::Occupied(mut e) => {
-
                    // In this scenario, it's possible that our peer is persistent, and
-
                    // disconnected. We get an inbound connection before we attempt a re-connection,
-
                    // and therefore we treat it as a regular inbound connection.
-
                    //
-
                    // It's also possible that a disconnection hasn't gone through yet and our
-
                    // peer is still in connected state here, while a new inbound connection from
-
                    // that same peer is made. This results in a new connection from a peer that is
-
                    // already connected from the perspective of the service. This appears to be
-
                    // a bug in the underlying networking library.
-
                    let peer = e.get_mut();
-
                    debug!(
-
                        target: "service",
-
                        "Connecting peer {remote} already has a session open ({peer})"
-
                    );
-
                    peer.link = link;
-
                    peer.to_connected(self.clock);
-
                    self.outbox.write_all(peer, msgs);
-
                }
-
                Entry::Vacant(e) => {
-
                    if let HostName::Ip(ip) = addr.host {
-
                        if !address::is_local(&ip) {
-
                            if let Err(e) =
-
                                self.db
-
                                    .addresses_mut()
-
                                    .record_ip(&remote, ip, self.clock.into())
-
                            {
-
                                log::error!(target: "service", "Error recording IP address for {remote}: {e}");
-
                            }
-
                        }
-
                    }
-
                    let peer = e.insert(Session::inbound(
-
                        remote,
-
                        addr,
-
                        self.config.is_persistent(&remote),
-
                        self.rng.clone(),
-
                        self.clock,
-
                    ));
-
                    self.outbox.write_all(peer, msgs);
-
                }
+
            event::Connected::MissingSession { node } => {
+
                debug!(target: "service", "Could not transition {node} to connect since its session is missing");
            }
        }
    }

    pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
-
        let since = self.local_time();
-
        let Some(session) = self.sessions.get_mut(&remote) else {
-
            // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
-
            // disconnection event once the transport is dropped.
-
            trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
-
            return;
+
        use connections::state::command;
+
        use connections::state::event;
+

+
        let command = command::Disconnect {
+
            node: remote,
+
            link,
+
            connection_type: self.connection_type(&remote),
+
            since: self.clock,
        };
-
        // In cases of connection conflicts, there may be disconnections of one of the two
-
        // connections. In that case we don't want the service to remove the session.
-
        if session.link != link {
-
            return;
+
        match self.connections.disconnected(command, reason) {
+
            event::Disconnected::Retry {
+
                session: _,
+
                retry_at: _,
+
                delay,
+
            } => {
+
                info!(target: "service", "Disconnected from {remote} ({reason})");
+
                self.emitter.emit(Event::PeerDisconnected {
+
                    nid: remote,
+
                    reason: reason.to_string(),
+
                });
+
                debug!(target: "service", "Reconnecting to {remote} in {delay}..");
+
                self.outbox.wakeup(delay);
+
            }
+
            event::Disconnected::Severed { session, severity } => {
+
                info!(target: "service", "Disconnected from {remote} ({reason})");
+
                self.emitter.emit(Event::PeerDisconnected {
+
                    nid: remote,
+
                    reason: reason.to_string(),
+
                });
+
                if let Err(e) =
+
                    self.db
+
                        .addresses_mut()
+
                        .disconnected(&remote, session.address(), severity)
+
                {
+
                    error!(target: "service", "Error updating address store: {e}");
+
                }
+
                // Only re-attempt outbound connections, since we don't care if an inbound connection
+
                // is dropped.
+
                if link.is_outbound() {
+
                    self.maintain_connections();
+
                }
+
            }
+
            event::Disconnected::MissingSession { node } => {
+
                debug!(target: "service", "Attempted to disconnect missing session {node}");
+
            }
+
            event::Disconnected::AlreadyDisconnected { node: _ } => {
+
                // Since we sometimes disconnect the service eagerly, it's not unusual to get a second
+
                // disconnection event once the transport is dropped.
+
                trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
+
            }
+
            event::Disconnected::LinkConflict {
+
                node,
+
                found,
+
                expected,
+
            } => {
+
                // In cases of connection conflicts, there may be disconnections of one of the two
+
                // connections. In that case we don't want the service to remove the session.
+
                trace!(target: "service", "Conflicting sessions {node} found={found} expected={expected}");
+
            }
        }

-
        info!(target: "service", "Disconnected from {remote} ({reason})");
-
        self.emitter.emit(Event::PeerDisconnected {
-
            nid: remote,
-
            reason: reason.to_string(),
-
        });
-

-
        let link = session.link;
-
        let addr = session.addr.clone();
-

        let cmd = fetcher::state::command::Cancel { from: remote };
        let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
-

        match event {
            fetcher::state::event::Cancel::Unexpected { from } => {
                debug!(target: "service", "No fetches to cancel for {from}");
@@ -1363,54 +1384,6 @@ where
                })
                .ok();
        }
-

-
        // Attempt to re-connect to persistent peers.
-
        if self.config.peer(&remote).is_some() {
-
            let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
-
                .clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
-

-
            // Nb. We always try to reconnect to persistent peers, even when the error appears
-
            // to not be transient.
-
            session.to_disconnected(since, since + delay);
-

-
            debug!(target: "service", "Reconnecting to {remote} in {delay}..");
-

-
            self.outbox.wakeup(delay);
-
        } else {
-
            debug!(target: "service", "Dropping peer {remote}..");
-
            self.sessions.remove(&remote);
-

-
            let severity = match reason {
-
                DisconnectReason::Dial(_)
-
                | DisconnectReason::Fetch(_)
-
                | DisconnectReason::Connection(_) => {
-
                    if self.is_online() {
-
                        // If we're "online", there's something wrong with this
-
                        // peer connection specifically.
-
                        Severity::Medium
-
                    } else {
-
                        Severity::Low
-
                    }
-
                }
-
                DisconnectReason::Session(e) => e.severity(),
-
                DisconnectReason::Command
-
                | DisconnectReason::Conflict
-
                | DisconnectReason::SelfConnection => Severity::Low,
-
            };
-

-
            if let Err(e) = self
-
                .db
-
                .addresses_mut()
-
                .disconnected(&remote, &addr, severity)
-
            {
-
                error!(target: "service", "Error updating address store: {e}");
-
            }
-
            // Only re-attempt outbound connections, since we don't care if an inbound connection
-
            // is dropped.
-
            if link.is_outbound() {
-
                self.maintain_connections();
-
            }
-
        }
        self.dequeue_fetches();
    }

@@ -1541,15 +1514,18 @@ where

                // Here we handle the special case where the inventory we received is that of
                // a connected peer, as opposed to being relayed to us.
-
                if let Some(sess) = self.sessions.get_mut(announcer) {
-
                    for id in message.inventory.as_slice() {
-
                        // If we are connected to the announcer of this inventory, update the peer's
-
                        // subscription filter to include all inventory items. This way, we'll
-
                        // relay messages relating to the peer's inventory.
-
                        if let Some(sub) = &mut sess.subscribe {
-
                            sub.filter.insert(id);
-
                        }
-

+
                for id in message.inventory.as_slice() {
+
                    // If we are connected to the announcer of this inventory, update the peer's
+
                    // subscription filter to include all inventory items. This way, we'll
+
                    // relay messages relating to the peer's inventory.
+
                    let should_route = matches!(
+
                        // The logic previously would consider a missing
+
                        // subscription as ok
+
                        self.connections.subscribe_to(announcer, id),
+
                        connections::session::SubscribeTo::NoSubscription
+
                            | connections::session::SubscribeTo::Subscribed
+
                    );
+
                    if should_route {
                        // If we're seeding and connected to the announcer, and we don't have
                        // the inventory, fetch it from the announcer.
                        if self.policies.is_seeding(id).expect(
@@ -1651,7 +1627,7 @@ where
                // Refs can be relayed by peers who don't have the data in storage,
                // therefore we only check whether we are connected to the *announcer*,
                // which is required by the protocol to only announce refs it has.
-
                let Some(remote) = self.sessions.get(announcer).cloned() else {
+
                let Some(remote) = self.connections.get_connected(announcer) else {
                    trace!(
                        target: "service",
                        "Skipping fetch of {}, no sessions connected to {announcer}",
@@ -1660,7 +1636,7 @@ where
                    return Ok(relay);
                };
                // Finally, start the fetch.
-
                self.fetch_refs_at(message.rid, remote.id, refs, scope, FETCH_TIMEOUT, None);
+
                self.fetch_refs_at(message.rid, remote.node(), refs, scope, FETCH_TIMEOUT, None);

                return Ok(relay);
            }
@@ -1740,58 +1716,54 @@ where
        remote: &NodeId,
        message: Message,
    ) -> Result<(), session::Error> {
+
        use connections::state::command;
+
        use connections::state::command::Payload;
+
        use connections::state::event::HandledMessage;
+

        let local = self.node_id();
        let relay = self.config.is_relay();
-
        let Some(peer) = self.sessions.get_mut(remote) else {
-
            warn!(target: "service", "Session not found for {remote}");
-
            return Ok(());
-
        };
-
        peer.last_active = self.clock;

-
        let limit: RateLimit = match peer.link {
-
            Link::Outbound => self.config.limits.rate.outbound.into(),
-
            Link::Inbound => self.config.limits.rate.inbound.into(),
+
        let payload = match &message {
+
            // TODO(finto): I need to convince myself why this is always from the
+
            // sending node and not a relaying node – the previous code assumed so too.
+
            Message::Subscribe(subscribe) => Some(Payload::Subscribe(subscribe.clone())),
+
            Message::Announcement(_) => None,
+
            Message::Info(_) => None,
+
            Message::Ping(_) => None,
+
            Message::Pong { zeroes } => Some(Payload::pong(zeroes.clone(), self.clock)),
        };
-
        if self
-
            .limiter
-
            .limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
-
        {
-
            debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
-
            return Ok(());
-
        }
-
        message.log(log::Level::Debug, remote, Link::Inbound);
-

-
        let connected = match &mut peer.state {
-
            session::State::Disconnected { .. } => {
-
                debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
+
        let command = command::Message {
+
            node: *remote,
+
            payload,
+
            connection_type: self.connection_type(remote),
+
        };
+
        let peer = match self.connections.handle_message(command, self.clock) {
+
            HandledMessage::MissingSession { node } => {
+
                debug!(target: "service", "Dropping message from unknown peer {}", node);
                return Ok(());
            }
-
            // In case of a discrepancy between the service state and the state of the underlying
-
            // wire protocol, we may receive a message from a peer that we consider not fully connected
-
            // at the service level. To remedy this, we simply transition the peer to a connected state.
-
            //
-
            // This is not ideal, but until the wire protocol and service are unified, it's the simplest
-
            // solution to converge towards the same state.
-
            session::State::Attempted | session::State::Initial => {
-
                debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
-
                debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
-

-
                peer.to_connected(self.clock);
-

-
                None
+
            HandledMessage::Disconnected { node } => {
+
                debug!(target: "service", "Ignoring message from disconnected peer {}", node);
+
                return Ok(());
            }
-
            session::State::Connected {
-
                ping, latencies, ..
-
            } => Some((ping, latencies)),
+
            HandledMessage::RateLimited { node } => {
+
                info!(target: "service", "Peer {node} reached rate limit, ignoring message");
+
                return Ok(());
+
            }
+
            HandledMessage::Connected { session } => session,
+
            HandledMessage::Subscribed { session } => session,
+
            HandledMessage::Pinged { session, pinged: _ } => session,
        };

+
        message.log(log::Level::Debug, remote, Link::Inbound);
+

        trace!(target: "service", "Received message {message:?} from {remote}");

        match message {
            // Process a peer announcement.
            Message::Announcement(ann) => {
                let relayer = remote;
-
                let relayer_addr = peer.addr.clone();
+
                let relayer_addr = peer.address().clone();

                if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
                    if self.config.is_relay() {
@@ -1832,7 +1804,7 @@ where
                            }
                            // Only send messages if we're a relay, or it's our own messages.
                            if relay || ann.node == local {
-
                                self.outbox.write(peer, ann.into());
+
                                self.outbox.write(&peer, ann.into());
                            }
                        }
                    }
@@ -1840,7 +1812,6 @@ where
                        error!(target: "service", "Error querying gossip messages from store: {e}");
                    }
                }
-
                peer.subscribe = Some(subscribe);
            }
            Message::Info(info) => {
                self.handle_info(*remote, &info)?;
@@ -1851,30 +1822,13 @@ where
                    return Ok(());
                }
                self.outbox.write(
-
                    peer,
+
                    &peer,
                    Message::Pong {
                        zeroes: ZeroBytes::new(ponglen),
                    },
                );
            }
-
            Message::Pong { zeroes } => {
-
                if let Some((ping, latencies)) = connected {
-
                    if let session::PingState::AwaitingResponse {
-
                        len: ponglen,
-
                        since,
-
                    } = *ping
-
                    {
-
                        if (ponglen as usize) == zeroes.len() {
-
                            *ping = session::PingState::Ok;
-
                            // Keep track of peer latency.
-
                            latencies.push_back(self.clock - since);
-
                            if latencies.len() > MAX_LATENCIES {
-
                                latencies.pop_front();
-
                            }
-
                        }
-
                    }
-
                }
-
            }
+
            Message::Pong { zeroes: _ } => {}
        }
        Ok(())
    }
@@ -1947,15 +1901,6 @@ where
        ]
    }

-
    /// Try to guess whether we're online or not.
-
    fn is_online(&self) -> bool {
-
        self.sessions
-
            .connected()
-
            .filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
-
            .count()
-
            > 0
-
    }
-

    /// Remove a local repository from our inventory.
    fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
        let node = self.node_id();
@@ -2138,7 +2083,7 @@ where
    ) -> Result<(Vec<RefsAt>, Timestamp), Error> {
        let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
        let timestamp = ann.timestamp();
-
        let peers = self.sessions.connected().map(|(_, p)| p);
+
        let peers = self.connections.sessions().connected().sessions();

        // Update our sync status for our own refs. This is useful for determining if refs were
        // updated while the node was stopped.
@@ -2155,52 +2100,93 @@ where
            }
        }

+
        // Store our announcement so that it can be retrieved from us later, just like
+
        // announcements we receive from peers.
+
        if let Err(e) = self.db.gossip_mut().announced(&ann.node, &ann) {
+
            error!(target: "service", "Error updating our gossip store with announced message: {e}");
+
        }
+

        self.outbox.announce(
            ann,
            peers.filter(|p| {
                // Only announce to peers who are allowed to view this repo.
-
                doc.is_visible_to(&p.id.into())
+
                doc.is_visible_to(&p.node().into())
            }),
-
            self.db.gossip_mut(),
        );
        Ok((refs, timestamp))
    }

    fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
-
        if let Some(sess) = self.sessions.get_mut(&nid) {
-
            sess.to_initial();
-
            self.outbox.connect(nid, addr);
+
        use connections::state::command;
+
        use connections::state::event;

-
            return true;
+
        match self.connections.reconnect(command::Reconnect { node: nid }) {
+
            event::Reconnect::Reconnecting { session } => {
+
                debug_assert_eq!(nid, session.node());
+
                debug_assert_eq!(addr, *session.address());
+
                self.outbox
+
                    .connect(session.node(), session.address().clone());
+
                true
+
            }
+
            event::Reconnect::MissingSession { node } => {
+
                debug!("Reconnecting to missing session for {node}");
+
                false
+
            }
        }
-
        false
    }

    fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
-
        debug!(target: "service", "Connecting to {nid} ({addr})..");
+
        use connections::state::command;
+
        use connections::state::event;

        if nid == self.node_id() {
            return Err(ConnectError::SelfConnection);
        }
-
        if self.sessions.contains_key(&nid) {
-
            return Err(ConnectError::SessionExists { nid });
-
        }
-
        if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
-
            return Err(ConnectError::LimitReached { nid, addr });
-
        }
-
        let persistent = self.config.is_persistent(&nid);
-
        let timestamp: Timestamp = self.clock.into();
-

-
        if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
-
            error!(target: "service", "Error updating address book with connection attempt: {e}");
+
        let command = command::Connect {
+
            node: nid,
+
            addr: addr.clone(),
+
            connection_type: self.connection_type(&nid),
+
        };
+
        match self.connections.connect(command, self.clock) {
+
            event::Connect::AlreadyConnected { session } => {
+
                let node = session.node();
+
                trace!(target: "service", "Connected to {node} already");
+
                Err(ConnectError::SessionExists { nid: node })
+
            }
+
            event::Connect::AlreadyConnecting { node } => {
+
                trace!(target: "service", "Connecting to {node} already");
+
                Err(ConnectError::SessionExists { nid: node })
+
            }
+
            event::Connect::Disconnected { node } => {
+
                trace!(target: "service", "Attempted connect to {node} which is disconnected");
+
                Err(ConnectError::SessionExists { nid: node })
+
            }
+
            event::Connect::Establish {
+
                node,
+
                connection_type: _,
+
                record_ip,
+
            } => {
+
                if let Some(ip) = record_ip {
+
                    if let Err(e) = self
+
                        .db
+
                        .addresses_mut()
+
                        .record_ip(&node, ip, self.clock.into())
+
                    {
+
                        log::error!(target: "service", "Error recording IP address {ip} for {node}: {e}");
+
                    }
+
                }
+
                if let Err(e) = self
+
                    .db
+
                    .addresses_mut()
+
                    .attempted(&nid, &addr, self.clock.into())
+
                {
+
                    error!(target: "service", "Error updating address book with connection attempt: {e}");
+
                }
+
                debug!(target: "service", "Connecting to outbound {nid} ({addr})..");
+
                self.outbox.connect(node, addr);
+
                Ok(())
+
            }
        }
-
        self.sessions.insert(
-
            nid,
-
            Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
-
        );
-
        self.outbox.connect(nid, addr);
-

-
        Ok(())
    }

    fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
@@ -2218,7 +2204,10 @@ where

                for seed in self.db.seeds().seeds_for(rid)? {
                    let seed = seed?;
-
                    let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
+
                    let state = self
+
                        .connections
+
                        .session_for(&seed.nid)
+
                        .map(|s| node::State::from(s.state().clone()));
                    let synced = if local.at == seed.synced_at.oid {
                        SyncStatus::Synced { at: seed.synced_at }
                    } else {
@@ -2246,7 +2235,10 @@ where
                continue;
            }
            let addrs = self.db.addresses().addresses_of(&nid)?;
-
            let state = self.sessions.get(&nid).map(|s| s.state.clone());
+
            let state = self
+
                .connections
+
                .session_for(&nid)
+
                .map(|s| node::State::from(s.state().clone()));

            seeds.insert(Seed::new(nid, addrs, state, None));
        }
@@ -2287,8 +2279,10 @@ where
        // 1. Don't relay to a peer who sent us this message.
        // 2. Don't relay to the peer who signed this announcement.
        let relay_to = self
-
            .sessions
+
            .connections
+
            .sessions()
            .connected()
+
            .into_iter()
            .filter(|(id, _)| {
                relayed_by
                    .map(|relayers| !relayers.contains(id))
@@ -2345,12 +2339,22 @@ where
            return;
        }
        let msg = AnnouncementMessage::from(self.inventory.clone());
+
        let ann = msg.signed(&self.signer);
+
        // Store our announcement so that it can be retrieved from us later, just like
+
        // announcements we receive from peers.
+
        if let Err(e) = self.db.gossip_mut().announced(&ann.node, &ann) {
+
            error!(target: "service", "Error updating our gossip store with announced message: {e}");
+
        }

-
        self.outbox.announce(
-
            msg.signed(&self.signer),
-
            self.sessions.connected().map(|(_, p)| p),
-
            self.db.gossip_mut(),
-
        );
+
        // Borrow-checker prevents us from passing the borrowed sessions, while
+
        // we have a mutable borrow of outbox
+
        let peers = self
+
            .sessions()
+
            .connected()
+
            .sessions()
+
            .cloned()
+
            .collect::<Vec<_>>();
+
        self.outbox.announce(ann, peers.iter());
        self.last_inventory = timestamp;
    }

@@ -2371,19 +2375,13 @@ where
    }

    fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
-
        let stale = self
-
            .sessions
-
            .connected()
-
            .filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
-

-
        for (_, session) in stale {
-
            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
+
        for (_, session) in self.connections.unresponsive(now) {
+
            debug!(target: "service", "Disconnecting unresponsive peer {}..", session.node());

            // TODO: Should we switch the session state to "disconnected" even before receiving
            // an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
-

            self.outbox.disconnect(
-
                session.id,
+
                session.node(),
                DisconnectReason::Session(session::Error::Timeout),
            );
        }
@@ -2391,13 +2389,13 @@ where

    /// Ensure connection health by pinging connected peers.
    fn keep_alive(&mut self, now: &LocalTime) {
-
        let inactive_sessions = self
-
            .sessions
-
            .connected_mut()
-
            .filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
-
            .map(|(_, session)| session);
-
        for session in inactive_sessions {
-
            session.ping(self.clock, &mut self.outbox).ok();
+
        use connections::state::event::Ping;
+
        for Ping { session, ping } in self
+
            .connections
+
            .ping(|| message::Ping::new(&mut self.rng), *now)
+
        {
+
            debug!(target: "service", "Pinging {}@{}", session.node(), session.address());
+
            self.outbox.write(&session, Message::Ping(ping));
        }
    }

@@ -2411,7 +2409,7 @@ where
                    .filter(|entry| entry.version == PROTOCOL_VERSION)
                    .filter(|entry| !entry.address.banned)
                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
-
                    .filter(|entry| !self.sessions.contains_key(&entry.node))
+
                    .filter(|entry| !self.connections.has_session(&entry.node))
                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
                    .filter(|entry| &entry.node != self.nid())
                    .filter(|entry| !entry.address.addr.is_onion() || self.config.onion.is_some())
@@ -2494,18 +2492,14 @@ where

    /// Run idle task for all connections.
    fn idle_connections(&mut self) {
-
        for (_, sess) in self.sessions.iter_mut() {
-
            sess.idle(self.clock);
-

-
            if sess.is_stable() {
-
                // Mark as connected once connection is stable.
-
                if let Err(e) =
-
                    self.db
-
                        .addresses_mut()
-
                        .connected(&sess.id, &sess.addr, self.clock.into())
-
                {
-
                    error!(target: "service", "Error updating address book with connection: {e}");
-
                }
+
        for session in self.connections.stabilise(self.clock) {
+
            // Mark as connected once connection is stable.
+
            if let Err(e) = self.db.addresses_mut().connected(
+
                &session.node(),
+
                session.address(),
+
                self.clock.into(),
+
            ) {
+
                error!(target: "service", "Error updating address book with connection: {e}");
            }
        }
    }
@@ -2517,14 +2511,9 @@ where
        };
        trace!(target: "service", "Maintaining connections..");

-
        let target = TARGET_OUTBOUND_PEERS;
+
        let target = self.connections.config().outbound_target();
        let now = self.clock;
-
        let outbound = self
-
            .sessions
-
            .values()
-
            .filter(|s| s.link.is_outbound())
-
            .filter(|s| s.is_connected() || s.is_connecting())
-
            .count();
+
        let outbound = self.connections.number_of_outbound_connections();
        let wanted = target.saturating_sub(outbound);

        // Don't connect to more peers than needed.
@@ -2579,27 +2568,33 @@ where
        trace!(target: "service", "Maintaining persistent peers..");

        let now = self.local_time();
-
        let mut reconnect = Vec::new();
-

-
        for (nid, session) in self.sessions.iter_mut() {
-
            if let Some(addr) = self.config.peer(nid) {
-
                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
-
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
-
                    // even a successful attempt means that we're unlikely to be able to reconnect.
-

-
                    if now >= *retry_at {
-
                        reconnect.push((*nid, addr.clone(), session.attempts()));
-
                    }
+
        let reconnect = self.connections.sessions().disconnected().into_iter().fold(
+
            Vec::new(),
+
            |mut reconnect, (node, session)| {
+
                // TODO: Try to reconnect only if the peer was attempted. A disconnect without
+
                // even a successful attempt means that we're unlikely to be able to reconnect.
+
                if now >= *session.should_retry_at() {
+
                    reconnect.push((*node, session.address().clone(), session.attempts()))
                }
-
            }
-
        }
+
                reconnect
+
            },
+
        );

+
        // TODO(finto): we don't need to iterate over this twice
        for (nid, addr, attempts) in reconnect {
            if self.reconnect(nid, addr) {
                debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
            }
        }
    }
+

+
    fn connection_type(&self, node: &NodeId) -> connections::session::ConnectionType {
+
        if self.config.is_persistent(node) {
+
            connections::session::ConnectionType::Persistent
+
        } else {
+
            connections::session::ConnectionType::Ephemeral
+
        }
+
    }
}

/// Gives read access to the service state.
@@ -2607,7 +2602,7 @@ pub trait ServiceState {
    /// Get the Node ID.
    fn nid(&self) -> &NodeId;
    /// Get the existing sessions.
-
    fn sessions(&self) -> &Sessions;
+
    fn sessions(&self) -> &connections::Sessions;
    /// Get fetch state.
    fn fetching(&self) -> &FetcherState;
    /// Get outbox.
@@ -2638,8 +2633,8 @@ where
        self.signer.public_key()
    }

-
    fn sessions(&self) -> &Sessions {
-
        &self.sessions
+
    fn sessions(&self) -> &connections::Sessions {
+
        self.connections.sessions()
    }

    fn fetching(&self) -> &FetcherState {
@@ -2746,67 +2741,3 @@ pub enum LookupError {
    #[error(transparent)]
    Repository(#[from] RepositoryError),
}
-

-
#[derive(Debug, Clone)]
-
/// Holds currently (or recently) connected peers.
-
pub struct Sessions(AddressBook<NodeId, Session>);
-

-
impl Sessions {
-
    pub fn new(rng: Rng) -> Self {
-
        Self(AddressBook::new(rng))
-
    }
-

-
    /// Iterator over fully connected peers.
-
    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
-
        self.0
-
            .iter()
-
            .filter_map(move |(id, sess)| match &sess.state {
-
                session::State::Connected { .. } => Some((id, sess)),
-
                _ => None,
-
            })
-
    }
-

-
    /// Iterator over connected inbound peers.
-
    pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
-
        self.connected().filter(|(_, s)| s.link.is_inbound())
-
    }
-

-
    /// Iterator over outbound peers.
-
    pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
-
        self.connected().filter(|(_, s)| s.link.is_outbound())
-
    }
-

-
    /// Iterator over mutable fully connected peers.
-
    pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
-
        self.0.iter_mut().filter(move |(_, s)| s.is_connected())
-
    }
-

-
    /// Iterator over disconnected peers.
-
    pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
-
        self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
-
    }
-

-
    /// Return whether this node has a fully established session.
-
    pub fn is_connected(&self, id: &NodeId) -> bool {
-
        self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
-
    }
-

-
    /// Return whether this node can be connected to.
-
    pub fn is_disconnected(&self, id: &NodeId) -> bool {
-
        self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
-
    }
-
}
-

-
impl Deref for Sessions {
-
    type Target = AddressBook<NodeId, Session>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.0
-
    }
-
}
-

-
impl DerefMut for Sessions {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.0
-
    }
-
}
modified crates/radicle-protocol/src/service/io.rs
@@ -9,12 +9,12 @@ use radicle::node::Address;
use radicle::node::NodeId;
use radicle::storage::refs::RefsAt;

+
use crate::connections::session;
+
use crate::connections::session::Session;
use crate::service::message::Message;
-
use crate::service::session::Session;
use crate::service::DisconnectReason;
use crate::service::Link;

-
use super::gossip;
use super::message::{Announcement, AnnouncementMessage};

/// I/O operation to execute at the network/wire level.
@@ -61,46 +61,35 @@ impl Outbox {
        self.io.push_back(Io::Disconnect(id, reason));
    }

-
    pub fn write(&mut self, remote: &Session, msg: Message) {
+
    // TODO(finto): use a `ConnectedNode` token that is a smart constructed
+
    // `NodeId`. We can take that instead of `Session<session::Connected>`,
+
    // which can relax the borrow-checker.
+
    pub fn write(&mut self, remote: &Session<session::Connected>, msg: Message) {
        let level = match &msg {
            Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
            _ => log::Level::Debug,
        };
-
        msg.log(level, &remote.id, Link::Outbound);
+
        msg.log(level, &remote.node(), Link::Outbound);
        trace!(target: "service", "Write {:?} to {}", &msg, remote);

-
        self.io.push_back(Io::Write(remote.id, vec![msg]));
+
        self.io.push_back(Io::Write(remote.node(), vec![msg]));
    }

    /// Announce something to a peer. This is meant for our own announcement messages.
    pub fn announce<'a>(
        &mut self,
        ann: Announcement,
-
        peers: impl Iterator<Item = &'a Session>,
-
        gossip: &mut impl gossip::Store,
+
        peers: impl Iterator<Item = &'a Session<session::Connected>>,
    ) {
-
        // Store our announcement so that it can be retrieved from us later, just like
-
        // announcements we receive from peers.
-
        if let Err(e) = gossip.announced(&ann.node, &ann) {
-
            error!(target: "service", "Error updating our gossip store with announced message: {e}");
-
        }
-

        for peer in peers {
            if let AnnouncementMessage::Refs(refs) = &ann.message {
-
                if let Some(subscribe) = &peer.subscribe {
-
                    if subscribe.filter.contains(&refs.rid) {
-
                        self.write(peer, ann.clone().into());
-
                    } else {
-
                        debug!(
-
                            target: "service",
-
                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
-
                            refs.rid
-
                        );
-
                    }
+
                if peer.is_subscribed_to(&refs.rid) {
+
                    self.write(peer, ann.clone().into());
                } else {
                    debug!(
                        target: "service",
-
                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
+
                        "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
+
                        refs.rid
                    );
                }
            } else {
@@ -109,7 +98,7 @@ impl Outbox {
        }
    }

-
    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
+
    pub fn write_all(&mut self, remote: NodeId, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();

        for (ix, msg) in msgs.iter().enumerate() {
@@ -121,18 +110,21 @@ impl Outbox {
                ix + 1,
                msgs.len()
            );
-
            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
+
            msg.log(log::Level::Trace, &remote, Link::Outbound);
        }
-
        self.io.push_back(Io::Write(remote.id, msgs));
+
        self.io.push_back(Io::Write(remote, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
        self.io.push_back(Io::Wakeup(after));
    }

+
    // TODO(finto): use a `ConnectedNode` token that is a smart constructed
+
    // `NodeId`. We can take that instead of `Session<session::Connected>`,
+
    // which can relax the borrow-checker.
    pub fn fetch(
        &mut self,
-
        peer: &mut Session,
+
        peer: &Session<session::Connected>,
        rid: RepoId,
        refs_at: Vec<RefsAt>,
        timeout: time::Duration,
@@ -152,7 +144,7 @@ impl Outbox {
        self.io.push_back(Io::Fetch {
            rid,
            refs_at,
-
            remote: peer.id,
+
            remote: peer.node(),
            timeout,
            reader_limit,
        });
@@ -162,7 +154,7 @@ impl Outbox {
    pub fn broadcast<'a>(
        &mut self,
        msg: impl Into<Message>,
-
        peers: impl IntoIterator<Item = &'a Session>,
+
        peers: impl IntoIterator<Item = &'a Session<session::Connected>>,
    ) {
        let msg = msg.into();
        for peer in peers {
@@ -171,18 +163,14 @@ impl Outbox {
    }

    /// Relay a message to interested peers.
-
    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
+
    pub fn relay<'a>(
+
        &mut self,
+
        ann: Announcement,
+
        peers: impl IntoIterator<Item = &'a Session<session::Connected>>,
+
    ) {
        if let AnnouncementMessage::Refs(msg) = &ann.message {
            let id = msg.rid;
-
            let peers = peers.into_iter().filter(|p| {
-
                if let Some(subscribe) = &p.subscribe {
-
                    subscribe.filter.contains(&id)
-
                } else {
-
                    // If the peer did not send us a `subscribe` message, we don't
-
                    // relay any messages to them.
-
                    false
-
                }
-
            });
+
            let peers = peers.into_iter().filter(|p| p.is_subscribed_to(&id));
            self.broadcast(ann, peers);
        } else {
            self.broadcast(ann, peers);
modified crates/radicle-protocol/src/service/session.rs
@@ -1,15 +1,8 @@
-
use std::collections::VecDeque;
-
use std::{fmt, time};
-

-
use crossbeam_channel as chan;
-
use radicle::node::{FetchResult, Severity};
-
use radicle::node::{Link, Timestamp};
+
use radicle::node::Severity;
+
use radicle::node::Timestamp;
pub use radicle::node::{PingState, State};
-
use radicle::storage::refs::RefsAt;

-
use crate::service::message;
-
use crate::service::message::Message;
-
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
+
use crate::service::LocalDuration;

/// Time after which a connection is considered stable.
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
@@ -45,236 +38,3 @@ impl Error {
        }
    }
}
-

-
/// Error when trying to queue a fetch.
-
#[derive(thiserror::Error, Debug, Clone)]
-
pub enum QueueError {
-
    /// The item already exists in the queue.
-
    #[error("item is already queued")]
-
    Duplicate(QueuedFetch),
-
    /// The queue is at capacity.
-
    #[error("queue capacity reached")]
-
    CapacityReached(QueuedFetch),
-
}
-

-
impl QueueError {
-
    /// Get the inner [`QueuedFetch`].
-
    pub fn inner(&self) -> &QueuedFetch {
-
        match self {
-
            Self::Duplicate(f) => f,
-
            Self::CapacityReached(f) => f,
-
        }
-
    }
-
}
-

-
/// Fetch waiting to be processed, in the fetch queue.
-
#[derive(Debug, Clone)]
-
pub struct QueuedFetch {
-
    /// Repo being fetched.
-
    pub rid: RepoId,
-
    /// Peer being fetched from.
-
    pub from: NodeId,
-
    /// Refs being fetched.
-
    pub refs_at: Vec<RefsAt>,
-
    /// The timeout given for the fetch request.
-
    pub timeout: time::Duration,
-
    /// Result channel.
-
    pub channel: Option<chan::Sender<FetchResult>>,
-
}
-

-
impl PartialEq for QueuedFetch {
-
    fn eq(&self, other: &Self) -> bool {
-
        self.rid == other.rid
-
            && self.from == other.from
-
            && self.refs_at == other.refs_at
-
            && self.channel.is_none()
-
            && other.channel.is_none()
-
    }
-
}
-

-
/// A peer session. Each connected peer will have one session.
-
#[derive(Debug, Clone)]
-
pub struct Session {
-
    /// Peer id.
-
    pub id: NodeId,
-
    /// Peer address.
-
    pub addr: Address,
-
    /// Connection direction.
-
    pub link: Link,
-
    /// Whether we should attempt to re-connect
-
    /// to this peer upon disconnection.
-
    pub persistent: bool,
-
    /// Peer connection state.
-
    pub state: State,
-
    /// Peer subscription.
-
    pub subscribe: Option<message::Subscribe>,
-
    /// Last time a message was received from the peer.
-
    pub last_active: LocalTime,
-

-
    /// Connection attempts. For persistent peers, Tracks
-
    /// how many times we've attempted to connect. We reset this to zero
-
    /// upon successful connection, once the connection is stable.
-
    attempts: usize,
-
    /// Source of entropy.
-
    rng: Rng,
-
}
-

-
impl fmt::Display for Session {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        let mut attrs = Vec::new();
-
        let state = self.state.to_string();
-

-
        if self.link.is_inbound() {
-
            attrs.push("inbound");
-
        } else {
-
            attrs.push("outbound");
-
        }
-
        if self.persistent {
-
            attrs.push("persistent");
-
        }
-
        attrs.push(state.as_str());
-

-
        write!(f, "{} [{}]", self.id, attrs.join(" "))
-
    }
-
}
-

-
impl From<&Session> for radicle::node::Session {
-
    fn from(s: &Session) -> Self {
-
        Self {
-
            nid: s.id,
-
            link: if s.link.is_inbound() {
-
                radicle::node::Link::Inbound
-
            } else {
-
                radicle::node::Link::Outbound
-
            },
-
            addr: s.addr.clone(),
-
            state: s.state.clone(),
-
        }
-
    }
-
}
-

-
impl Session {
-
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
-
        Self {
-
            id,
-
            addr,
-
            state: State::Initial,
-
            link: Link::Outbound,
-
            subscribe: None,
-
            persistent,
-
            last_active: LocalTime::default(),
-
            attempts: 1,
-
            rng,
-
        }
-
    }
-

-
    pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
-
        Self {
-
            id,
-
            addr,
-
            state: State::Connected {
-
                since: time,
-
                ping: PingState::default(),
-
                latencies: VecDeque::default(),
-
                stable: false,
-
            },
-
            link: Link::Inbound,
-
            subscribe: None,
-
            persistent,
-
            last_active: time,
-
            attempts: 0,
-
            rng,
-
        }
-
    }
-

-
    pub fn is_connecting(&self) -> bool {
-
        matches!(self.state, State::Attempted)
-
    }
-

-
    pub fn is_stable(&self) -> bool {
-
        matches!(self.state, State::Connected { stable: true, .. })
-
    }
-

-
    pub fn is_connected(&self) -> bool {
-
        self.state.is_connected()
-
    }
-

-
    pub fn is_disconnected(&self) -> bool {
-
        matches!(self.state, State::Disconnected { .. })
-
    }
-

-
    pub fn is_initial(&self) -> bool {
-
        matches!(self.state, State::Initial)
-
    }
-

-
    pub fn attempts(&self) -> usize {
-
        self.attempts
-
    }
-

-
    /// Run 'idle' task for session.
-
    pub fn idle(&mut self, now: LocalTime) {
-
        if let State::Connected {
-
            since,
-
            ref mut stable,
-
            ..
-
        } = self.state
-
        {
-
            if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
-
                *stable = true;
-
                // Reset number of attempts for stable connections.
-
                self.attempts = 0;
-
            }
-
        }
-
    }
-

-
    pub fn to_attempted(&mut self) {
-
        assert!(
-
            self.is_initial(),
-
            "Can only transition to 'attempted' state from 'initial' state"
-
        );
-
        self.state = State::Attempted;
-
        self.attempts += 1;
-
    }
-

-
    pub fn to_connected(&mut self, since: LocalTime) {
-
        self.last_active = since;
-

-
        if let State::Connected { .. } = &self.state {
-
            log::error!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
-
        };
-
        self.state = State::Connected {
-
            since,
-
            ping: PingState::default(),
-
            latencies: VecDeque::default(),
-
            stable: false,
-
        };
-
    }
-

-
    /// Move the session state to "disconnected". Returns any pending RID
-
    /// that was requested.
-
    pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
-
        self.state = State::Disconnected { since, retry_at };
-
    }
-

-
    /// Return to initial state from disconnected state. This state transition
-
    /// happens when we attempt to re-connect to a disconnected peer.
-
    pub fn to_initial(&mut self) {
-
        assert!(
-
            self.is_disconnected(),
-
            "Can only transition to 'initial' state from 'disconnected' state"
-
        );
-
        self.state = State::Initial;
-
    }
-

-
    pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
-
        if let State::Connected { ping, .. } = &mut self.state {
-
            let msg = message::Ping::new(&mut self.rng);
-
            *ping = PingState::AwaitingResponse {
-
                len: msg.ponglen,
-
                since,
-
            };
-
            reactor.write(self, Message::Ping(msg));
-
        }
-
        Ok(())
-
    }
-
}