#![allow(clippy::too_many_arguments)]
#![deny(clippy::unwrap_used)]
pub mod command;
pub use command::{Command, QueryState};
pub mod filter;
pub mod gossip;
pub mod io;
pub mod limiter;
pub mod message;
pub mod session;
use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::net::IpAddr;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, net, time};
use crossbeam_channel as chan;
use fastrand::Rng;
use localtime::{LocalDuration, LocalTime};
use log::*;
use nonempty::NonEmpty;
use radicle::identity::Doc;
use radicle::node;
use radicle::node::address;
use radicle::node::address::Store as _;
use radicle::node::address::{AddressBook, AddressType, KnownAddress};
use radicle::node::config::{PeerConfig, RateLimit};
use radicle::node::device::Device;
use radicle::node::refs::Store as _;
use radicle::node::routing::Store as _;
use radicle::node::seed;
use radicle::node::seed::Store as _;
use radicle::node::{Penalty, Severity};
use radicle::storage::refs::{FeatureLevel, SIGREFS_BRANCH};
use radicle::storage::{RepositoryError, RepositoryInfo, SignedRefsInfo};
use radicle_fetch::policy::SeedingPolicy;
use crate::fetcher;
use crate::fetcher::FetcherState;
use crate::fetcher::RefsToFetch;
use crate::fetcher::service::FetcherService;
use crate::service::gossip::Store as _;
use crate::service::message::{
Announcement, AnnouncementMessage, Info, NodeAnnouncement, Ping, RefsAnnouncement, RefsStatus,
};
use crate::service::policy::{Scope, store::Write};
use radicle::identity::RepoId;
use radicle::node::events::Emitter;
use radicle::node::routing;
use radicle::node::routing::InsertResult;
use radicle::node::{Address, Features, FetchResult, HostName, Seed, Seeds, SyncStatus, SyncedAt};
use radicle::prelude::*;
use radicle::storage;
use radicle::storage::{Namespaces, ReadStorage, refs::RefsAt};
// use radicle::worker::fetch;
// use crate::worker::FetchError;
use radicle::crypto;
use radicle::node::Link;
use radicle::node::PROTOCOL_VERSION;
use crate::bounded::BoundedVec;
use crate::service::filter::Filter;
pub use crate::service::message::{Message, ZeroBytes};
pub use crate::service::session::{QueuedFetch, Session};
use crate::worker::FetchError;
use radicle::node::events::{Event, Events};
use radicle::node::{Config, NodeId};
use radicle::node::policy::config as policy;
use self::io::Outbox;
use self::limiter::RateLimiter;
use self::message::InventoryAnnouncement;
use self::policy::NamespacesError;
/// How often to run the "idle" task.
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
/// How often to run the "gossip" task.
pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
/// How often to run the "announce" task.
pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
/// How often to run the "sync" task.
pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
/// How often to run the "prune" task.
pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
/// Duration to wait on an unresponsive peer before dropping its connection.
pub const STALE_CONNECTION_TIMEOUT: LocalDuration = LocalDuration::from_mins(2);
/// How much time should pass after a peer was last active for a *ping* to be sent.
pub const KEEP_ALIVE_DELTA: LocalDuration = LocalDuration::from_mins(1);
/// Maximum number of latency values to keep for a session.
pub const MAX_LATENCIES: usize = 16;
/// Maximum time difference between the local time, and an announcement timestamp.
pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Maximum attempts to connect to a peer before we give up.
pub const MAX_CONNECTION_ATTEMPTS: usize = 3;
/// How far back from the present time should we request gossip messages when connecting to a peer,
/// when we come online for the first time.
pub const INITIAL_SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(60 * 24);
/// When subscribing, what margin of error do we give ourselves. A igher delta means we ask for
/// messages further back than strictly necessary, to account for missed messages.
pub const SUBSCRIBE_BACKLOG_DELTA: LocalDuration = LocalDuration::from_mins(3);
/// Minimum amount of time to wait before reconnecting to a peer.
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
/// Maximum amount of time to wait before reconnecting to a peer.
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
/// Connection retry delta used for ephemeral peers that failed to connect previously.
pub const CONNECTION_RETRY_DELTA: LocalDuration = LocalDuration::from_mins(10);
/// How long to wait for a fetch to stall before aborting, default is 30s.
pub const FETCH_TIMEOUT: time::Duration = time::Duration::from_secs(30);
/// Target number of peers to maintain connections to.
pub const TARGET_OUTBOUND_PEERS: usize = 8;
/// Maximum external address limit imposed by message size limits.
pub use message::ADDRESS_LIMIT;
/// Maximum inventory limit imposed by message size limits.
pub use message::INVENTORY_LIMIT;
/// Maximum number of project git references imposed by message size limits.
pub use message::REF_REMOTE_LIMIT;
/// Metrics we track.
#[derive(Clone, Debug, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Metrics {
/// Metrics for each peer.
pub peers: HashMap<NodeId, PeerMetrics>,
/// Tasks queued in worker queue.
pub worker_queue_size: usize,
/// Current open channel count.
pub open_channels: usize,
}
impl Metrics {
/// Get metrics for the given peer.
pub fn peer(&mut self, nid: NodeId) -> &mut PeerMetrics {
self.peers.entry(nid).or_default()
}
}
/// Per-peer metrics we track.
#[derive(Clone, Debug, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PeerMetrics {
pub received_git_bytes: usize,
pub received_fetch_requests: usize,
pub received_bytes: usize,
pub received_gossip_messages: usize,
pub sent_bytes: usize,
pub sent_fetch_requests: usize,
pub sent_git_bytes: usize,
pub sent_gossip_messages: usize,
pub streams_opened: usize,
pub inbound_connection_attempts: usize,
pub outbound_connection_attempts: usize,
pub disconnects: usize,
}
/// Result of syncing our routing table with a node's inventory.
#[derive(Default)]
struct SyncedRouting {
/// Repo entries added.
added: Vec<RepoId>,
/// Repo entries removed.
removed: Vec<RepoId>,
/// Repo entries updated (time).
updated: Vec<RepoId>,
}
impl SyncedRouting {
fn is_empty(&self) -> bool {
self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
}
}
/// A peer we can connect to.
#[derive(Debug, Clone)]
struct Peer {
nid: NodeId,
addresses: Vec<KnownAddress>,
penalty: Penalty,
}
/// General service error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
Git(#[from] radicle::git::raw::Error),
#[error(transparent)]
Storage(#[from] storage::Error),
#[error(transparent)]
Gossip(#[from] gossip::Error),
#[error(transparent)]
Refs(#[from] storage::refs::Error),
#[error(transparent)]
Routing(#[from] routing::Error),
#[error(transparent)]
Address(#[from] address::Error),
#[error(transparent)]
Database(#[from] node::db::Error),
#[error(transparent)]
Seeds(#[from] seed::Error),
#[error(transparent)]
Policy(#[from] policy::Error),
#[error(transparent)]
Repository(#[from] radicle::storage::RepositoryError),
#[error("namespaces error: {0}")]
Namespaces(Box<NamespacesError>),
}
impl From<NamespacesError> for Error {
fn from(e: NamespacesError) -> Self {
Self::Namespaces(Box::new(e))
}
}
#[derive(thiserror::Error, Debug)]
pub enum ConnectError {
#[error("attempted connection to peer {nid} which already has a session")]
SessionExists { nid: NodeId },
#[error("attempted connection to self")]
SelfConnection,
#[error("outbound connection limit reached when attempting {nid} ({addr})")]
LimitReached { nid: NodeId, addr: Address },
#[error(
"attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
)]
UnsupportedAddress { nid: NodeId, addr: Address },
#[error("attempted connection with blocked peer {nid}")]
Blocked { nid: NodeId },
}
/// A store for all node data.
pub trait Store:
address::Store + gossip::Store + routing::Store + seed::Store + node::refs::Store
{
}
impl Store for radicle::node::Database {}
/// Fetch state for an ongoing fetch.
#[derive(Debug)]
pub struct FetchState {
/// Node we're fetching from.
pub from: NodeId,
/// What refs we're fetching.
pub refs_at: Vec<RefsAt>,
/// Channels waiting for fetch results.
pub subscribers: Vec<chan::Sender<FetchResult>>,
}
/// Holds all node stores.
#[derive(Debug)]
pub struct Stores<D>(D);
impl<D> Stores<D>
where
D: Store,
{
/// Get the database as a routing store.
pub fn routing(&self) -> &impl routing::Store {
&self.0
}
/// Get the database as a routing store, mutably.
pub fn routing_mut(&mut self) -> &mut impl routing::Store {
&mut self.0
}
/// Get the database as an address store.
pub fn addresses(&self) -> &impl address::Store {
&self.0
}
/// Get the database as an address store, mutably.
pub fn addresses_mut(&mut self) -> &mut impl address::Store {
&mut self.0
}
/// Get the database as a gossip store.
pub fn gossip(&self) -> &impl gossip::Store {
&self.0
}
/// Get the database as a gossip store, mutably.
pub fn gossip_mut(&mut self) -> &mut impl gossip::Store {
&mut self.0
}
/// Get the database as a seed store.
pub fn seeds(&self) -> &impl seed::Store {
&self.0
}
/// Get the database as a seed store, mutably.
pub fn seeds_mut(&mut self) -> &mut impl seed::Store {
&mut self.0
}
/// Get the database as a refs db.
pub fn refs(&self) -> &impl node::refs::Store {
&self.0
}
/// Get the database as a refs db, mutably.
pub fn refs_mut(&mut self) -> &mut impl node::refs::Store {
&mut self.0
}
}
impl<D> AsMut<D> for Stores<D> {
fn as_mut(&mut self) -> &mut D {
&mut self.0
}
}
impl<D> From<D> for Stores<D> {
fn from(db: D) -> Self {
Self(db)
}
}
/// The node service.
#[derive(Debug)]
pub struct Service<D, S, G> {
/// Service configuration.
config: Config,
/// Our cryptographic signer and key.
signer: Device<G>,
/// Project storage.
storage: S,
/// Node database.
db: Stores<D>,
/// Policy configuration.
policies: policy::Config<Write>,
/// Peer sessions, currently or recently connected.
sessions: Sessions,
/// Clock. Tells the time.
clock: LocalTime,
/// Who relayed what announcement to us. We keep track of this to ensure that
/// we don't relay messages to nodes that already know about these messages.
relayed_by: HashMap<gossip::AnnouncementId, Vec<NodeId>>,
/// I/O outbox.
outbox: Outbox,
/// Cached local node announcement.
node: NodeAnnouncement,
/// Cached local inventory announcement.
inventory: InventoryAnnouncement,
/// Source of entropy.
rng: Rng,
fetcher: FetcherService<command::Responder<FetchResult>>,
/// Request/connection rate limiter.
limiter: RateLimiter,
/// Current seeded repositories bloom filter.
filter: Filter,
/// Last time the service was idle.
last_idle: LocalTime,
/// Last time the gossip messages were relayed.
last_gossip: LocalTime,
/// Last time the service synced.
last_sync: LocalTime,
/// Last time the service routing table was pruned.
last_prune: LocalTime,
/// Last time the announcement task was run.
last_announce: LocalTime,
/// Timestamp of last local inventory announced.
last_inventory: LocalTime,
/// Last timestamp used for announcements.
last_timestamp: Timestamp,
/// Time when the service was initialized, or `None` if it wasn't initialized.
started_at: Option<LocalTime>,
/// Time when the service was last online, or `None` if this is the first time.
last_online_at: Option<LocalTime>,
/// Publishes events to subscribers.
emitter: Emitter<Event>,
/// Local listening addresses.
listening: Vec<net::SocketAddr>,
/// Latest metrics for all nodes connected to since the last start.
metrics: Metrics,
}
impl<D, S, G> Service<D, S, G> {
/// Get the local node id.
pub fn node_id(&self) -> NodeId {
*self.signer.public_key()
}
/// Get the local service time.
pub fn local_time(&self) -> LocalTime {
self.clock
}
pub fn emitter(&self) -> Emitter<Event> {
self.emitter.clone()
}
}
impl<D, S, G> Service<D, S, G>
where
D: Store,
S: WriteStorage + 'static,
G: crypto::signature::Signer<crypto::Signature>,
{
/// Initialize service with current time. Call this once.
pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
debug!(target: "service", "Init @{}", time.as_millis());
assert_ne!(time, LocalTime::default());
let nid = self.node_id();
self.clock = time;
self.started_at = Some(time);
self.last_online_at = match self.db.gossip().last() {
Ok(Some(last)) => Some(last.to_local_time()),
Ok(None) => None,
Err(e) => {
warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
None
}
};
// Populate refs database. This is only useful as part of the upgrade process for nodes
// that have been online since before the refs database was created.
match self.db.refs().count() {
Ok(0) => {
info!(target: "service", "Empty refs database, populating from storage..");
if let Err(e) = self.db.refs_mut().populate(&self.storage) {
warn!(target: "service", "Failed to populate refs database: {e}");
}
}
Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
Err(e) => {
warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
}
}
let announced = self
.db
.seeds()
.seeded_by(&nid)?
.collect::<Result<HashMap<_, _>, _>>()?;
let mut inventory = BTreeSet::new();
let mut private = BTreeSet::new();
for repo in self.storage.repositories()? {
let repo = self.upgrade_sigrefs(repo)?;
let rid = repo.rid;
// If we're not seeding this repo, just skip it.
if !self.policies.is_seeding(&rid)? {
debug!(target: "service", "Local repository {rid} is not seeded");
continue;
}
// Add public repositories to inventory.
if repo.doc.is_public() {
inventory.insert(rid);
} else {
private.insert(rid);
}
// If we have no owned refs for this repo, then there's nothing to announce.
let Some(updated_at) = repo.synced_at else {
continue;
};
// Skip this repo if the sync status matches what we have in storage.
if let Some(announced) = announced.get(&rid) {
if updated_at.oid == announced.oid {
continue;
}
}
// Make sure our local node's sync status is up to date with storage.
if self.db.seeds_mut().synced(
&rid,
&nid,
updated_at.oid,
updated_at.timestamp.into(),
)? {
debug!(target: "service", "Saved local sync status for {rid}..");
}
// If we got here, it likely means a repo was updated while the node was stopped.
// Therefore, we pre-load a refs announcement for this repo, so that it is included in
// the historical gossip messages when a node connects and subscribes to this repo.
if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
self.db.gossip_mut().announced(&nid, &ann)?;
}
}
// Ensure that our inventory is recorded in our routing table, and we are seeding
// all of it. It can happen that inventory is not properly seeded if for eg. the
// user creates a new repository while the node is stopped.
self.db
.routing_mut()
.add_inventory(inventory.iter(), nid, time.into())?;
self.inventory = gossip::inventory(self.timestamp(), inventory);
// Ensure that private repositories are not in our inventory. It's possible that
// a repository was public and then it was made private.
self.db
.routing_mut()
.remove_inventories(private.iter(), &nid)?;
// Setup subscription filter for seeded repos.
self.filter = Filter::allowed_by(self.policies.seed_policies()?);
// Connect to configured peers.
let addrs = self.config.connect.clone();
for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
if let Err(e) = self.connect(id, addr) {
debug!(target: "service", "Service::initialization connection error: {e}");
}
}
// Try to establish some connections.
self.maintain_connections();
// Start periodic tasks.
self.outbox.wakeup(IDLE_INTERVAL);
self.outbox.wakeup(GOSSIP_INTERVAL);
Ok(())
}
fn upgrade_sigrefs(&mut self, mut info: RepositoryInfo) -> Result<RepositoryInfo, Error> {
if !matches!(info.refs, SignedRefsInfo::NeedsMigration) {
return Ok(info);
}
let rid = info.rid;
log::info!(
"Migrating `rad/sigrefs` of {rid} to force feature level {}.",
FeatureLevel::LATEST
);
let repo = self.storage.repository_mut(rid)?;
// NOTE: We assume to reach `FeatureLevel::LATEST` by signing refs.
let refs = repo.force_sign_refs(&self.signer)?;
let repo = self.storage.repository(rid)?;
let synced_at = SyncedAt::new(refs.at, &repo)?;
info.synced_at = Some(synced_at);
info.refs = SignedRefsInfo::Some(refs);
Ok(info)
}
}
impl<D, S, G> Service<D, S, G>
where
D: Store,
S: ReadStorage + 'static,
G: crypto::signature::Signer<crypto::Signature>,
{
pub fn new(
config: Config,
db: Stores<D>,
storage: S,
policies: policy::Config<Write>,
signer: Device<G>,
rng: Rng,
node: NodeAnnouncement,
emitter: Emitter<Event>,
) -> Self {
let sessions = Sessions::new(rng.clone());
let limiter = RateLimiter::new(config.peers());
let last_timestamp = node.timestamp;
let clock = LocalTime::default(); // Updated on initialize.
let inventory = gossip::inventory(clock.into(), []); // Updated on initialize.
let fetcher = {
let config = fetcher::Config::new()
.with_max_concurrency(
std::num::NonZeroUsize::new(config.limits.fetch_concurrency.into())
.expect("fetch concurrency was zero, must be at least 1"),
)
.with_max_capacity(fetcher::MaxQueueSize::default());
FetcherService::new(config)
};
Self {
config,
storage,
policies,
signer,
rng,
inventory,
node,
clock,
db,
outbox: Outbox::default(),
limiter,
sessions,
fetcher,
filter: Filter::empty(),
relayed_by: HashMap::default(),
last_idle: LocalTime::default(),
last_gossip: LocalTime::default(),
last_sync: LocalTime::default(),
last_prune: LocalTime::default(),
last_timestamp,
last_announce: LocalTime::default(),
last_inventory: LocalTime::default(),
started_at: None, // Updated on initialize.
last_online_at: None, // Updated on initialize.
emitter,
listening: vec![],
metrics: Metrics::default(),
}
}
/// Whether the service was started (initialized) and if so, at what time.
pub fn started(&self) -> Option<LocalTime> {
self.started_at
}
/// Return the next i/o action to execute.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<io::Io> {
self.outbox.next()
}
/// Seed a repository.
/// Returns whether or not the repo policy was updated.
pub fn seed(&mut self, id: &RepoId, scope: Scope) -> Result<bool, policy::Error> {
let updated = self.policies.seed(id, scope)?;
self.filter.insert(id);
Ok(updated)
}
/// Unseed a repository.
/// Returns whether or not the repo policy was updated.
/// Note that when unseeding, we don't announce anything to the network. This is because by
/// simply not announcing it anymore, it will eventually be pruned by nodes.
pub fn unseed(&mut self, id: &RepoId) -> Result<bool, policy::Error> {
let updated = self.policies.unseed(id)?;
if updated {
// Nb. This is potentially slow if we have lots of repos. We should probably
// only re-compute the filter when we've unseeded a certain amount of repos
// and the filter is really out of date.
self.filter = Filter::allowed_by(self.policies.seed_policies()?);
// Update and announce new inventory.
if let Err(e) = self.remove_inventory(id) {
warn!(target: "service", "Failed to update inventory after unseed: {e}");
}
}
Ok(updated)
}
/// Find the closest `n` peers by proximity in seeding graphs.
/// Returns a sorted list from the closest peer to the furthest.
/// Peers with more seedings in common score score higher.
#[allow(unused)]
pub fn closest_peers(&self, n: usize) -> Vec<NodeId> {
todo!()
}
/// Get the database.
pub fn database(&self) -> &Stores<D> {
&self.db
}
/// Get the mutable database.
pub fn database_mut(&mut self) -> &mut Stores<D> {
&mut self.db
}
/// Get the storage instance.
pub fn storage(&self) -> &S {
&self.storage
}
/// Get the mutable storage instance.
pub fn storage_mut(&mut self) -> &mut S {
&mut self.storage
}
/// Get the node policies.
pub fn policies(&self) -> &policy::Config<Write> {
&self.policies
}
/// Get the local signer.
pub fn signer(&self) -> &Device<G> {
&self.signer
}
/// Subscriber to inner `Emitter` events.
pub fn events(&mut self) -> Events {
Events::from(self.emitter.subscribe())
}
pub fn fetcher(&self) -> &FetcherState {
self.fetcher.state()
}
/// Get I/O outbox.
pub fn outbox(&mut self) -> &mut Outbox {
&mut self.outbox
}
/// Get configuration.
pub fn config(&self) -> &Config {
&self.config
}
/// Lookup a repository, both locally and in the routing table.
pub fn lookup(&self, rid: RepoId) -> Result<Lookup, LookupError> {
let this = self.nid();
let local = self.storage.get(rid)?;
let remote = self
.db
.routing()
.get(&rid)?
.iter()
.filter(|nid| nid != &this)
.cloned()
.collect();
Ok(Lookup { local, remote })
}
pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
trace!(
target: "service",
"Tick +{}",
now - self.started_at.expect("Service::tick: service must be initialized")
);
if now >= self.clock {
self.clock = now;
} else {
// Nb. In tests, we often move the clock forwards in time to test different behaviors,
// so this warning isn't applicable there.
#[cfg(not(test))]
warn!(
target: "service",
"System clock is not monotonic: {now} is not greater or equal to {}", self.clock
);
}
self.metrics = metrics.clone();
}
pub fn wake(&mut self) {
let now = self.clock;
trace!(
target: "service",
"Wake +{}",
now - self.started_at.expect("Service::wake: service must be initialized")
);
if now - self.last_idle >= IDLE_INTERVAL {
trace!(target: "service", "Running 'idle' task...");
self.keep_alive(&now);
self.disconnect_unresponsive_peers(&now);
self.idle_connections();
self.maintain_connections();
self.dequeue_fetches();
self.outbox.wakeup(IDLE_INTERVAL);
self.last_idle = now;
}
if now - self.last_gossip >= GOSSIP_INTERVAL {
trace!(target: "service", "Running 'gossip' task...");
if let Err(e) = self.relay_announcements() {
warn!(target: "service", "Failed to relay stored announcements: {e}");
}
self.outbox.wakeup(GOSSIP_INTERVAL);
self.last_gossip = now;
}
if now - self.last_sync >= SYNC_INTERVAL {
trace!(target: "service", "Running 'sync' task...");
if let Err(e) = self.fetch_missing_repositories() {
warn!(target: "service", "Failed to fetch missing inventory: {e}");
}
self.outbox.wakeup(SYNC_INTERVAL);
self.last_sync = now;
}
if now - self.last_announce >= ANNOUNCE_INTERVAL {
trace!(target: "service", "Running 'announce' task...");
self.announce_inventory();
self.outbox.wakeup(ANNOUNCE_INTERVAL);
self.last_announce = now;
}
if now - self.last_prune >= PRUNE_INTERVAL {
trace!(target: "service", "Running 'prune' task...");
if let Err(err) = self.prune_routing_entries(&now) {
warn!(target: "service", "Failed to prune routing entries: {err}");
}
if let Err(err) = self
.db
.gossip_mut()
.prune((now - LocalDuration::from(self.config.limits.gossip_max_age)).into())
{
warn!(target: "service", "Failed to prune gossip entries: {err}");
}
self.outbox.wakeup(PRUNE_INTERVAL);
self.last_prune = now;
}
// Always check whether there are persistent peers that need reconnecting.
self.maintain_persistent();
}
pub fn command(&mut self, cmd: Command) {
info!(target: "service", "Received command {cmd:?}");
match cmd {
Command::Connect(nid, addr, opts) => {
if opts.persistent {
self.config.connect.insert((nid, addr.clone()).into());
}
if let Err(e) = self.connect(nid, addr) {
match e {
ConnectError::SessionExists { nid } => {
self.emitter.emit(Event::PeerConnected { nid });
}
e => {
// N.b. using the fact that the call to connect waits for an event
self.emitter.emit(Event::PeerDisconnected {
nid,
reason: e.to_string(),
});
}
}
}
}
Command::Disconnect(nid) => {
self.outbox.disconnect(nid, DisconnectReason::Command);
}
Command::Config(resp) => {
resp.ok(self.config.clone()).ok();
}
Command::ListenAddrs(resp) => {
resp.ok(self.listening.clone()).ok();
}
Command::Seeds(rid, namespaces, resp) => match self.seeds(&rid, namespaces) {
Ok(seeds) => {
let (connected, disconnected) = seeds.partition();
debug!(
target: "service",
"Found {} connected seed(s) and {} disconnected seed(s) for {}",
connected.len(), disconnected.len(), rid
);
resp.ok(seeds).ok();
}
Err(e) => {
warn!(target: "service", "Failed to get seeds for {rid}: {e}");
resp.err(e).ok();
}
},
Command::Fetch(rid, seed, timeout, signed_references_minimum_feature_level, resp) => {
let feature_level = signed_references_minimum_feature_level
.unwrap_or(self.config.fetch.feature_level_min());
let config = self
.fetch_config()
.with_timeout(timeout)
.with_minimum_feature_level(feature_level);
self.fetch(rid, seed, vec![], config, Some(resp));
}
Command::Seed(rid, scope, resp) => {
// Update our seeding policy.
let seeded = self
.seed(&rid, scope)
.expect("Service::command: error seeding repository");
resp.ok(seeded).ok();
// Let all our peers know that we're interested in this repo from now on.
self.outbox.broadcast(
Message::subscribe(self.filter(), self.clock.into(), Timestamp::MAX),
self.sessions.connected().map(|(_, s)| s),
);
}
Command::Unseed(id, resp) => {
let updated = self
.unseed(&id)
.expect("Service::command: error unseeding repository");
resp.ok(updated).ok();
}
Command::Follow(id, alias, resp) => {
let seeded = self
.policies
.follow(&id, alias.as_ref())
.expect("Service::command: error following node");
resp.ok(seeded).ok();
}
Command::Unfollow(id, resp) => {
let updated = self
.policies
.unfollow(&id)
.expect("Service::command: error unfollowing node");
resp.ok(updated).ok();
}
Command::Block(id, resp) => {
let updated = self
.policies
.set_follow_policy(&id, policy::Policy::Block)
.expect("Service::command: error blocking node");
if updated {
self.outbox.disconnect(id, DisconnectReason::Policy);
}
resp.send(updated).ok();
}
Command::AnnounceRefs(id, namespaces, resp) => {
let doc = match self.storage.get(id) {
Ok(Some(doc)) => doc,
Ok(None) => {
warn!(target: "service", "Failed to announce refs: repository {id} not found");
resp.err(command::Error::custom(format!("repository {id} not found")))
.ok();
return;
}
Err(e) => {
warn!(target: "service", "Failed to announce refs: doc error: {e}");
resp.err(e).ok();
return;
}
};
match self.announce_own_refs(id, doc, namespaces) {
Ok((refs, _timestamp)) => {
// TODO(finto): currently the command caller only
// expects one `RefsAt`, this should be fixed in the
// trait, eventually.
if let Some(refs) = refs.first() {
resp.ok(*refs).ok();
} else {
resp.err(command::Error::custom(format!(
"no refs were announced for {id}"
)))
.ok();
}
}
Err(err) => {
warn!(target: "service", "Failed to announce refs: {err}");
resp.err(err).ok();
}
}
}
Command::AnnounceInventory => {
self.announce_inventory();
}
Command::AddInventory(rid, resp) => match self.add_inventory(rid) {
Ok(updated) => {
resp.ok(updated).ok();
}
Err(e) => {
warn!(target: "service", "Failed to add {rid} to inventory: {e}");
resp.err(e).ok();
}
},
Command::QueryState(query, sender) => {
sender.send(query(self)).ok();
}
}
}
/// Initiate an outgoing fetch for some repository, based on another node's announcement.
/// Returns `true` if the fetch was initiated and `false` if it was skipped.
fn fetch_refs_at(
&mut self,
rid: RepoId,
from: NodeId,
refs: NonEmpty<RefsAt>,
scope: Scope,
config: fetcher::FetchConfig,
) -> bool {
match self.refs_status_of(rid, refs, &scope) {
Ok(status) => {
if status.want.is_empty() {
debug!(target: "service", "Skipping fetch for {rid}, all refs are already in storage");
} else {
self.fetch(rid, from, status.want, config, None);
return true;
}
}
Err(e) => {
warn!(target: "service", "Failed to get the refs status of {rid}: {e}");
}
}
// We didn't try to fetch anything.
false
}
fn fetch(
&mut self,
rid: RepoId,
from: NodeId,
refs_at: Vec<RefsAt>,
config: fetcher::FetchConfig,
channel: Option<command::Responder<FetchResult>>,
) {
let session = {
let reason = format!("peer {from} is not connected; cannot initiate fetch");
let Some(session) = self.sessions.get_mut(&from) else {
if let Some(c) = channel {
c.ok(FetchResult::Failed { reason }).ok();
}
return;
};
if !session.is_connected() {
if let Some(c) = channel {
c.ok(FetchResult::Failed { reason }).ok();
}
return;
}
session
};
let cmd = fetcher::state::command::Fetch {
from,
rid,
refs: refs_at.into(),
config,
};
let fetcher::service::FetchInitiated { event, rejected } = self.fetcher.fetch(cmd, channel);
if let Some(c) = rejected {
c.ok(FetchResult::Failed {
reason: "fetch queue at capacity".to_string(),
})
.ok();
}
match event {
fetcher::state::event::Fetch::Started {
rid,
from,
refs: refs_at,
config,
} => {
debug!(target: "service", "Starting fetch for {rid} from {from}");
self.outbox.fetch(
session,
rid,
refs_at.into(),
self.config.limits.fetch_pack_receive,
config,
);
}
fetcher::state::event::Fetch::Queued { rid, from } => {
debug!(target: "service", "Queued fetch for {rid} from {from}");
}
fetcher::state::event::Fetch::AlreadyFetching { rid, from } => {
debug!(target: "service", "Already fetching {rid} from {from}");
}
fetcher::state::event::Fetch::QueueAtCapacity { rid, from, .. } => {
debug!(target: "service", "Queue at capacity for {from}, rejected {rid}");
}
}
}
pub fn fetched(
&mut self,
rid: RepoId,
from: NodeId,
result: Result<crate::worker::fetch::FetchResult, crate::worker::FetchError>,
) {
let cmd = fetcher::state::command::Fetched { from, rid };
let fetcher::service::FetchCompleted { event, subscribers } = self.fetcher.fetched(cmd);
// Dequeue next fetches
self.dequeue_fetches();
match event {
fetcher::state::event::Fetched::NotFound { from, rid } => {
debug!(target: "service", "Unexpected fetch result for {rid} from {from}");
}
fetcher::state::event::Fetched::Completed { from, rid, refs: _ } => {
// Notify responders
let fetch_result = match &result {
Ok(success) => FetchResult::Success {
updated: success.updated.clone(),
namespaces: success.namespaces.clone(),
clone: success.clone,
},
Err(e) => FetchResult::Failed {
reason: e.to_string(),
},
};
for responder in subscribers {
responder.ok(fetch_result.clone()).ok();
}
match result {
Ok(crate::worker::fetch::FetchResult {
updated,
canonical,
namespaces,
clone,
doc,
}) => {
info!(target: "service", "Fetched {rid} from {from} successfully");
// Update our routing table in case this fetch was user-initiated and doesn't
// come from an announcement.
self.seed_discovered(rid, from, self.clock.into());
for update in &updated {
if update.is_skipped() {
trace!(target: "service", "Ref skipped: {update} for {rid}");
} else {
debug!(target: "service", "Ref updated: {update} for {rid}");
}
}
self.emitter.emit(Event::RefsFetched {
remote: from,
rid,
updated: updated.clone(),
});
self.emitter
.emit_all(canonical.into_iter().map(|(refname, target)| {
Event::CanonicalRefUpdated {
rid,
refname,
target,
}
}));
// Announce our new inventory if this fetch was a full clone.
// Only update and announce inventory for public repositories.
if clone && doc.is_public() {
debug!(target: "service", "Updating and announcing inventory for cloned repository {rid}..");
if let Err(e) = self.add_inventory(rid) {
warn!(target: "service", "Failed to announce inventory for {rid}: {e}");
}
}
// It's possible for a fetch to succeed but nothing was updated.
if updated.is_empty() || updated.iter().all(|u| u.is_skipped()) {
debug!(target: "service", "Nothing to announce, no refs were updated..");
} else {
// Finally, announce the refs. This is useful for nodes to know what we've synced,
// beyond just knowing that we have added an item to our inventory.
if let Err(e) = self.announce_refs(rid, doc.into(), namespaces, false) {
warn!(target: "service", "Failed to announce new refs: {e}");
}
}
}
Err(err) => {
warn!(target: "service", "Fetch failed for {rid} from {from}: {err}");
// For now, we only disconnect the from in case of timeout. In the future,
// there may be other reasons to disconnect.
if err.is_timeout() {
self.outbox.disconnect(from, DisconnectReason::Fetch(err));
}
}
}
}
}
}
/// Attempt to dequeue fetches from all peers.
/// At most one fetch is dequeued per peer. If the fetch cannot be processed,
/// it is put back on the queue for that peer.
///
/// Fetches are queued for two reasons:
/// 1. The RID was already being fetched.
/// 2. The session was already at fetch capacity.
pub fn dequeue_fetches(&mut self) {
let sessions = self
.sessions
.shuffled()
.map(|(k, _)| *k)
.collect::<Vec<_>>();
for nid in sessions {
#[allow(clippy::unwrap_used)]
let sess = self.sessions.get_mut(&nid).unwrap();
if !sess.is_connected() {
continue;
}
let Some(fetcher::QueuedFetch {
rid,
refs: refs_at,
config,
}) = self.fetcher.dequeue(&nid)
else {
continue;
};
// Check seeding policy
let repo_entry = self
.policies
.seed_policy(&rid)
.expect("error accessing repo seeding configuration");
let SeedingPolicy::Allow { scope } = repo_entry.policy else {
debug!(target: "service", "Repository {} no longer seeded, skipping", rid);
continue;
};
debug!(target: "service", "Dequeued fetch for {} from {}", rid, nid);
match refs_at {
RefsToFetch::Refs(refs) => {
self.fetch_refs_at(rid, nid, refs, scope, config);
}
RefsToFetch::All => {
// Channel is `None` since they will already be
// registered with the fetcher service.
self.fetch(rid, nid, vec![], config, None);
}
}
}
}
/// Inbound connection attempt.
pub fn accepted(&mut self, ip: IpAddr) -> bool {
// Always accept localhost connections, even if we already reached
// our inbound connection limit.
if ip.is_loopback() || ip.is_unspecified() {
return true;
}
// Check for inbound connection limit.
if self.sessions.inbound().count() >= self.config.limits.connection.inbound.into() {
return false;
}
match self.db.addresses().is_ip_banned(ip) {
Ok(banned) => {
if banned {
debug!(target: "service", "Rejecting inbound connection from banned ip {ip}");
return false;
}
}
Err(e) => warn!(target: "service", "Failed to query ban status for {ip}: {e}"),
}
let host: HostName = ip.into();
let tokens = self.config.limits.rate.inbound;
if self.limiter.limit(host.clone(), None, &tokens, self.clock) {
trace!(target: "service", "Rate limiting inbound connection from {host}..");
return false;
}
true
}
pub fn attempted(&mut self, nid: NodeId, addr: Address) {
debug!(target: "service", "Attempted connection to {nid} ({addr})");
if let Some(sess) = self.sessions.get_mut(&nid) {
sess.to_attempted();
} else {
#[cfg(debug_assertions)]
panic!("Service::attempted: unknown session {nid}@{addr}");
}
}
pub fn listening(&mut self, local_addr: net::SocketAddr) {
info!(target: "node", "Listening on {local_addr}..");
self.listening.push(local_addr);
}
pub fn connected(&mut self, remote: NodeId, addr: Address, link: Link) {
if let Ok(true) = self.policies.is_blocked(&remote) {
self.emitter.emit(Event::PeerDisconnected {
nid: remote,
reason: format!("{remote} is blocked"),
});
info!(target: "service", "Disconnecting blocked inbound peer {remote}");
self.outbox.disconnect(remote, DisconnectReason::Policy);
return;
}
info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
self.emitter.emit(Event::PeerConnected { nid: remote });
let msgs = self.initial(link);
if link.is_outbound() {
if let Some(peer) = self.sessions.get_mut(&remote) {
peer.to_connected(self.clock);
self.outbox.write_all(peer, msgs);
}
} else {
match self.sessions.entry(remote) {
Entry::Occupied(mut e) => {
// In this scenario, it's possible that our peer is persistent, and
// disconnected. We get an inbound connection before we attempt a re-connection,
// and therefore we treat it as a regular inbound connection.
//
// It's also possible that a disconnection hasn't gone through yet and our
// peer is still in connected state here, while a new inbound connection from
// that same peer is made. This results in a new connection from a peer that is
// already connected from the perspective of the service. This appears to be
// a bug in the underlying networking library.
let peer = e.get_mut();
debug!(
target: "service",
"Connecting peer {remote} already has a session open ({peer})"
);
peer.link = link;
peer.to_connected(self.clock);
self.outbox.write_all(peer, msgs);
}
Entry::Vacant(e) => {
if let HostName::Ip(ip) = addr.host {
if !address::is_local(&ip) {
if let Err(e) =
self.db
.addresses_mut()
.record_ip(&remote, ip, self.clock.into())
{
log::debug!(target: "service", "Failed to record IP address for {remote}: {e}");
}
}
}
let peer = e.insert(Session::inbound(
remote,
addr,
self.config.is_persistent(&remote),
self.rng.clone(),
self.clock,
));
self.outbox.write_all(peer, msgs);
}
}
}
}
pub fn disconnected(&mut self, remote: NodeId, link: Link, reason: &DisconnectReason) {
let since = self.local_time();
let Some(session) = self.sessions.get_mut(&remote) else {
// Since we sometimes disconnect the service eagerly, it's not unusual to get a second
// disconnection event once the transport is dropped.
trace!(target: "service", "Redundant disconnection for {remote} ({reason})");
return;
};
// In cases of connection conflicts, there may be disconnections of one of the two
// connections. In that case we don't want the service to remove the session.
if session.link != link {
return;
}
info!(target: "service", "Disconnected from {remote} ({reason})");
self.emitter.emit(Event::PeerDisconnected {
nid: remote,
reason: reason.to_string(),
});
let link = session.link;
let addr = session.addr.clone();
let cmd = fetcher::state::command::Cancel { from: remote };
let fetcher::service::FetchesCancelled { event, orphaned } = self.fetcher.cancel(cmd);
match event {
fetcher::state::event::Cancel::Unexpected { from } => {
debug!(target: "service", "No fetches to cancel for {from}");
}
fetcher::state::event::Cancel::Canceled {
from,
active,
queued,
} => {
debug!(target: "service", "Cancelled {} ongoing, {} queued for {from}", active.len(), queued.len());
}
}
// Notify orphaned responders
for (rid, responder) in orphaned {
responder
.ok(FetchResult::Failed {
reason: format!("failed fetch to {rid}, peer disconnected: {reason}"),
})
.ok();
}
// Attempt to re-connect to persistent peers.
if self.config.is_persistent(&remote) {
let delay = LocalDuration::from_secs(2u64.saturating_pow(session.attempts() as u32))
.clamp(MIN_RECONNECTION_DELTA, MAX_RECONNECTION_DELTA);
// Nb. We always try to reconnect to persistent peers, even when the error appears
// to not be transient.
session.to_disconnected(since, since + delay);
debug!(target: "service", "Reconnecting to {remote} in {delay}..");
self.outbox.wakeup(delay);
} else {
debug!(target: "service", "Dropping peer {remote}..");
self.sessions.remove(&remote);
let severity = match reason {
DisconnectReason::Dial(_)
| DisconnectReason::Fetch(_)
| DisconnectReason::Connection(_) => {
if self.is_online() {
// If we're "online", there's something wrong with this
// peer connection specifically.
Severity::Medium
} else {
Severity::Low
}
}
DisconnectReason::Session(e) => e.severity(),
DisconnectReason::Command
| DisconnectReason::Conflict
| DisconnectReason::Policy
| DisconnectReason::SelfConnection => Severity::Low,
};
if let Err(e) = self
.db
.addresses_mut()
.disconnected(&remote, &addr, severity)
{
debug!(target: "service", "Failed to update address store: {e}");
}
// Only re-attempt outbound connections, since we don't care if an inbound connection
// is dropped.
if link.is_outbound() {
self.maintain_connections();
}
}
self.dequeue_fetches();
}
pub fn received_message(&mut self, remote: NodeId, message: Message) {
if let Err(err) = self.handle_message(&remote, message) {
// If there's an error, stop processing messages from this peer.
// However, we still relay messages returned up to this point.
self.outbox
.disconnect(remote, DisconnectReason::Session(err));
// FIXME: The peer should be set in a state such that we don't
// process further messages.
}
}
/// Handle an announcement message.
///
/// Returns `true` if this announcement should be stored and relayed to connected peers,
/// and `false` if it should not.
pub fn handle_announcement(
&mut self,
relayer: &NodeId,
relayer_addr: &Address,
announcement: &Announcement,
) -> Result<Option<gossip::AnnouncementId>, session::Error> {
if !announcement.verify() {
return Err(session::Error::Misbehavior);
}
let Announcement {
node: announcer,
message,
..
} = announcement;
// Ignore our own announcements, in case the relayer sent one by mistake.
if announcer == self.nid() {
return Ok(None);
}
let now = self.clock;
let timestamp = message.timestamp();
// Don't allow messages from too far in the future.
if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
return Err(session::Error::InvalidTimestamp(timestamp));
}
// We don't process announcements from nodes we don't know, since the node announcement is
// what provides DoS protection.
//
// Note that it's possible to *not* receive the node announcement, but receive the
// subsequent announcements of a node in the case of historical gossip messages requested
// from the `subscribe` message. This can happen if the cut-off time is after the node
// announcement timestamp, but before the other announcements. In that case, we simply
// ignore all announcements of that node until we get a node announcement.
if let AnnouncementMessage::Inventory(_) | AnnouncementMessage::Refs(_) = message {
match self.db.addresses().get(announcer) {
Ok(node) => {
if node.is_none() {
debug!(target: "service", "Ignoring announcement from unknown node {announcer} (t={timestamp})");
return Ok(None);
}
}
Err(e) => {
debug!(target: "service", "Failed to look up node in address book: {e}");
return Ok(None);
}
}
}
// Discard announcement messages we've already seen, otherwise update our last seen time.
let relay = match self.db.gossip_mut().announced(announcer, announcement) {
Ok(Some(id)) => {
log::debug!(
target: "service",
"Stored announcement from {announcer} to be broadcast in {} (t={timestamp})",
(self.last_gossip + GOSSIP_INTERVAL) - self.clock
);
// Keep track of who relayed the message for later.
self.relayed_by.entry(id).or_default().push(*relayer);
// Decide whether or not to relay this message, if it's fresh.
// To avoid spamming peers on startup with historical gossip messages,
// don't relay messages that are too old. We make an exception for node announcements,
// since they are cached, and will hence often carry old timestamps.
let relay = message.is_node_announcement()
|| now - timestamp.to_local_time() <= MAX_TIME_DELTA;
relay.then_some(id)
}
Ok(None) => {
// FIXME: Still mark as relayed by this peer.
// FIXME: Refs announcements should not be delayed, since they are only sent
// to subscribers.
debug!(target: "service", "Ignoring stale announcement from {announcer} (t={timestamp})");
return Ok(None);
}
Err(e) => {
debug!(target: "service", "Failed to update gossip entry from {announcer}: {e}");
return Ok(None);
}
};
match message {
// Process a peer inventory update announcement by (maybe) fetching.
AnnouncementMessage::Inventory(message) => {
self.emitter.emit(Event::InventoryAnnounced {
nid: *announcer,
inventory: message.inventory.to_vec(),
timestamp: message.timestamp,
});
match self.sync_routing(
message.inventory.iter().cloned(),
*announcer,
message.timestamp,
) {
Ok(synced) => {
if synced.is_empty() {
trace!(target: "service", "No routes updated by inventory announcement from {announcer}");
return Ok(None);
}
}
Err(e) => {
debug!(target: "service", "Failed to process inventory from {announcer}: {e}");
return Ok(None);
}
}
let mut missing = Vec::new();
let nid = *self.nid();
// Here we handle the special case where the inventory we received is that of
// a connected peer, as opposed to being relayed to us.
if let Some(sess) = self.sessions.get_mut(announcer) {
for id in message.inventory.as_slice() {
// If we are connected to the announcer of this inventory, update the peer's
// subscription filter to include all inventory items. This way, we'll
// relay messages relating to the peer's inventory.
if let Some(sub) = &mut sess.subscribe {
sub.filter.insert(id);
}
// If we're seeding and connected to the announcer, and we don't have
// the inventory, fetch it from the announcer.
if self.policies.is_seeding(id).expect(
"Service::handle_announcement: error accessing seeding configuration",
) {
// Only if we do not have the repository locally do we fetch here.
// If we do have it, only fetch after receiving a ref announcement.
match self.db.routing().entry(id, &nid) {
Ok(entry) => {
if entry.is_none() {
missing.push(*id);
}
}
Err(e) => debug!(
target: "service",
"Error checking local inventory for {id}: {e}"
),
}
}
}
}
// Since we have limited fetch capacity, it may be that we can't fetch an entire
// inventory from a peer. Therefore we randomize the order of the RIDs to fetch
// different RIDs from different peers in case multiple peers announce the same
// RIDs.
self.rng.shuffle(&mut missing);
for rid in missing {
debug!(target: "service", "Missing seeded inventory {rid}; initiating fetch..");
self.fetch(rid, *announcer, vec![], self.fetch_config(), None);
}
return Ok(relay);
}
AnnouncementMessage::Refs(message) => {
self.emitter.emit(Event::RefsAnnounced {
nid: *announcer,
rid: message.rid,
refs: message.refs.to_vec(),
timestamp: message.timestamp,
});
// Empty announcements can be safely ignored.
let Some(refs) = NonEmpty::from_vec(message.refs.to_vec()) else {
debug!(target: "service", "Skipping fetch, no refs in announcement for {} (t={timestamp})", message.rid);
return Ok(None);
};
// We update inventories when receiving ref announcements, as these could come
// from a new repository being initialized.
self.seed_discovered(message.rid, *announcer, message.timestamp);
// Update sync status of announcer for this repo.
if let Some(refs) = refs.iter().find(|r| &r.remote == self.nid()) {
debug!(
target: "service",
"Refs announcement of {announcer} for {} contains our own remote at {} (t={})",
message.rid, refs.at, message.timestamp
);
match self.db.seeds_mut().synced(
&message.rid,
announcer,
refs.at,
message.timestamp,
) {
Ok(updated) => {
if updated {
debug!(
target: "service",
"Updating sync status of {announcer} for {} to {}",
message.rid, refs.at
);
self.emitter.emit(Event::RefsSynced {
rid: message.rid,
remote: *announcer,
at: refs.at,
});
} else {
debug!(
target: "service",
"Sync status of {announcer} was not updated for {}",
message.rid,
);
}
}
Err(e) => {
debug!(target: "service", "Failed to update sync status for {}: {e}", message.rid);
}
}
}
let repo_entry = self.policies.seed_policy(&message.rid).expect(
"Service::handle_announcement: error accessing repo seeding configuration",
);
let SeedingPolicy::Allow { scope } = repo_entry.policy else {
debug!(
target: "service",
"Ignoring refs announcement from {announcer}: repository {} isn't seeded (t={timestamp})",
message.rid
);
return Ok(None);
};
// Ref announcements may be relayed by peers who don't have the
// actual refs in storage, therefore we only check whether we
// are connected to the *announcer*, which is required by the
// protocol to only announce refs it has.
//
// TODO(Ade): Perhaps it makes sense to establish connections to
// followed but unconnected peers. Consider:
// Connections: Alice ←→ Bob ←→ Eve
// Follows: Alice ←→ Eve
// Eve announces refs, and Bob relays these announcements to Alice.
// Then, Alice might determine that Bob does not have Eve's refs,
// and therefore connect directly to Eve in order to fetch.
let Some(remote) = self.sessions.get(announcer).cloned() else {
trace!(
target: "service",
"Skipping fetch of {}, no sessions connected to {announcer}",
message.rid
);
return Ok(relay);
};
// Finally, start the fetch.
self.fetch_refs_at(message.rid, remote.id, refs, scope, self.fetch_config());
return Ok(relay);
}
AnnouncementMessage::Node(
ann @ NodeAnnouncement {
features,
addresses,
..
},
) => {
self.emitter.emit(Event::NodeAnnounced {
nid: *announcer,
alias: ann.alias.clone(),
timestamp: ann.timestamp,
features: *features,
addresses: addresses.to_vec(),
});
// If this node isn't a seed, we're not interested in adding it
// to our address book, but other nodes may be, so we relay the message anyway.
if !features.has(Features::SEED) {
return Ok(relay);
}
match self.db.addresses_mut().insert(
announcer,
ann.version,
ann.features,
&ann.alias,
ann.work(),
&ann.agent,
timestamp,
addresses
.iter()
// Ignore non-routable addresses unless received from a local network
// peer. This allows the node to function in a local network.
.filter(|a| a.is_routable() || relayer_addr.is_local())
.map(|a| KnownAddress::new(a.clone(), address::Source::Peer)),
) {
Ok(updated) => {
// Only relay if we received new information.
if updated {
debug!(
target: "service",
"Address store entry for node {announcer} updated at {timestamp}"
);
return Ok(relay);
}
}
Err(err) => {
// An error here is due to a fault in our address store.
warn!(target: "service", "Failed to process node announcement from {announcer}: {err}");
}
}
}
}
Ok(None)
}
pub fn handle_info(&mut self, remote: NodeId, info: &Info) -> Result<(), session::Error> {
match info {
// Nb. We don't currently send this message.
Info::RefsAlreadySynced { rid, at } => {
debug!(target: "service", "Refs already synced for {rid} by {remote}");
self.emitter.emit(Event::RefsSynced {
rid: *rid,
remote,
at: *at,
});
}
}
Ok(())
}
pub fn handle_message(
&mut self,
remote: &NodeId,
message: Message,
) -> Result<(), session::Error> {
let local = self.node_id();
let relay = self.config.is_relay();
let Some(peer) = self.sessions.get_mut(remote) else {
debug!(target: "service", "Session not found for {remote}");
return Ok(());
};
peer.last_active = self.clock;
let limit: RateLimit = match peer.link {
Link::Outbound => self.config.limits.rate.outbound.into(),
Link::Inbound => self.config.limits.rate.inbound.into(),
};
if self
.limiter
.limit(peer.addr.clone().into(), Some(remote), &limit, self.clock)
{
debug!(target: "service", "Rate limiting message from {remote} ({})", peer.addr);
return Ok(());
}
message.log(log::Level::Debug, remote, Link::Inbound);
let connected = match &mut peer.state {
session::State::Disconnected { .. } => {
debug!(target: "service", "Ignoring message from disconnected peer {}", peer.id);
return Ok(());
}
// In case of a discrepancy between the service state and the state of the underlying
// wire protocol, we may receive a message from a peer that we consider not fully connected
// at the service level. To remedy this, we simply transition the peer to a connected state.
//
// This is not ideal, but until the wire protocol and service are unified, it's the simplest
// solution to converge towards the same state.
session::State::Attempted | session::State::Initial => {
debug!(target: "service", "Received unexpected message from connecting peer {}", peer.id);
debug!(target: "service", "Transitioning peer {} to 'connected' state", peer.id);
peer.to_connected(self.clock);
None
}
session::State::Connected {
ping, latencies, ..
} => Some((ping, latencies)),
};
trace!(target: "service", "Received message {message:?} from {remote}");
match message {
// Process a peer announcement.
Message::Announcement(ann) => {
let relayer = remote;
let relayer_addr = peer.addr.clone();
if let Some(id) = self.handle_announcement(relayer, &relayer_addr, &ann)? {
if self.config.is_relay() {
if let AnnouncementMessage::Inventory(_) = ann.message {
if let Err(e) = self
.database_mut()
.gossip_mut()
.set_relay(id, gossip::RelayStatus::Relay)
{
warn!(target: "service", "Failed to set relay flag for message: {e}");
return Ok(());
}
} else {
self.relay(id, ann);
}
}
}
}
Message::Subscribe(subscribe) => {
// Filter announcements by interest.
match self
.db
.gossip()
.filtered(&subscribe.filter, subscribe.since, subscribe.until)
{
Ok(anns) => {
for ann in anns {
let ann = match ann {
Ok(a) => a,
Err(e) => {
debug!(target: "service", "Failed to read gossip message from store: {e}");
continue;
}
};
// Don't send announcements authored by the remote, back to the remote.
if ann.node == *remote {
continue;
}
// Only send messages if we're a relay, or it's our own messages.
if relay || ann.node == local {
self.outbox.write(peer, ann.into());
}
}
}
Err(e) => {
warn!(target: "service", "Failed to query gossip messages from store: {e}");
}
}
peer.subscribe = Some(subscribe);
}
Message::Info(info) => {
self.handle_info(*remote, &info)?;
}
Message::Ping(Ping { ponglen, .. }) => {
// Ignore pings which ask for too much data.
if ponglen > Ping::MAX_PONG_ZEROES {
return Ok(());
}
self.outbox.write(
peer,
Message::Pong {
zeroes: ZeroBytes::new(ponglen),
},
);
}
Message::Pong { zeroes } => {
if let Some((ping, latencies)) = connected {
if let session::PingState::AwaitingResponse {
len: ponglen,
since,
} = *ping
{
if (ponglen as usize) == zeroes.len() {
*ping = session::PingState::Ok;
// Keep track of peer latency.
latencies.push_back(self.clock - since);
if latencies.len() > MAX_LATENCIES {
latencies.pop_front();
}
}
}
}
}
}
Ok(())
}
/// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
fn refs_status_of(
&self,
rid: RepoId,
refs: NonEmpty<RefsAt>,
scope: &policy::Scope,
) -> Result<RefsStatus, Error> {
let mut refs = RefsStatus::new(rid, refs, self.db.refs())?;
// Check that there's something we want.
if refs.want.is_empty() {
return Ok(refs);
}
// Check scope.
let mut refs = match scope {
policy::Scope::All => refs,
policy::Scope::Followed => match self.policies.namespaces_for(&self.storage, &rid) {
Ok(Namespaces::All) => refs,
Ok(Namespaces::Followed(followed)) => {
refs.want.retain(|r| followed.contains(&r.remote));
refs
}
Err(e) => return Err(e.into()),
},
};
// Remove our own remote, we don't want to fetch that.
refs.want.retain(|r| r.remote != self.node_id());
Ok(refs)
}
/// Add a seed to our routing table.
fn seed_discovered(&mut self, rid: RepoId, nid: NodeId, time: Timestamp) {
if let Ok(result) = self.db.routing_mut().add_inventory([&rid], nid, time) {
if let &[(_, InsertResult::SeedAdded)] = result.as_slice() {
self.emitter.emit(Event::SeedDiscovered { rid, nid });
debug!(target: "service", "Routing table updated for {rid} with seed {nid}");
}
}
}
/// Set of initial messages to send to a peer.
fn initial(&mut self, _link: Link) -> Vec<Message> {
let now = self.clock();
let filter = self.filter();
// TODO: Only subscribe to outbound connections, otherwise we will consume too
// much bandwidth.
// If we've been previously connected to the network, we'll have received gossip messages.
// Instead of simply taking the last timestamp we try to ensure we don't miss any
// messages due un-synchronized clocks.
//
// If this is our first connection to the network, we just ask for a fixed backlog
// of messages to get us started.
let since = if let Some(last) = self.last_online_at {
Timestamp::from(last - SUBSCRIBE_BACKLOG_DELTA)
} else {
(*now - INITIAL_SUBSCRIBE_BACKLOG_DELTA).into()
};
debug!(target: "service", "Subscribing to messages since timestamp {since}..");
vec![
Message::node(self.node.clone(), &self.signer),
Message::inventory(self.inventory.clone(), &self.signer),
Message::subscribe(filter, since, Timestamp::MAX),
]
}
/// Try to guess whether we're online or not.
fn is_online(&self) -> bool {
self.sessions
.connected()
.filter(|(_, s)| s.addr.is_routable() && s.last_active >= self.clock - IDLE_INTERVAL)
.count()
> 0
}
/// Remove a local repository from our inventory.
fn remove_inventory(&mut self, rid: &RepoId) -> Result<bool, Error> {
let node = self.node_id();
let now = self.timestamp();
let removed = self.db.routing_mut().remove_inventory(rid, &node)?;
if removed {
self.refresh_and_announce_inventory(now)?;
}
Ok(removed)
}
/// Add a local repository to our inventory.
fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
let node = self.node_id();
let now = self.timestamp();
if !self.storage.contains(&rid)? {
debug!(target: "service", "Attempt to add non-existing inventory {rid}: repository not found in storage");
return Ok(false);
}
// Add to our local inventory.
let updates = self.db.routing_mut().add_inventory([&rid], node, now)?;
let updated = !updates.is_empty();
if updated {
self.refresh_and_announce_inventory(now)?;
}
Ok(updated)
}
/// Update cached inventory message, and announce new inventory to peers.
fn refresh_and_announce_inventory(&mut self, time: Timestamp) -> Result<(), Error> {
let inventory = self.inventory()?;
self.inventory = gossip::inventory(time, inventory);
self.announce_inventory();
Ok(())
}
/// Get our local inventory.
///
/// A node's inventory is the advertised list of repositories offered by a node.
///
/// A node's inventory consists of *public* repositories that are seeded and available locally
/// in the node's storage. We use the routing table as the canonical state of all inventories,
/// including the local node's.
///
/// When a repository is unseeded, it is also removed from the inventory. Private repositories
/// are *not* part of a node's inventory.
fn inventory(&self) -> Result<HashSet<RepoId>, Error> {
self.db
.routing()
.get_inventory(self.nid())
.map_err(Error::from)
}
/// Process a peer inventory announcement by updating our routing table.
/// This function expects the peer's full inventory, and prunes entries that are not in the
/// given inventory.
fn sync_routing(
&mut self,
inventory: impl IntoIterator<Item = RepoId>,
from: NodeId,
timestamp: Timestamp,
) -> Result<SyncedRouting, Error> {
let mut synced = SyncedRouting::default();
let included = inventory.into_iter().collect::<BTreeSet<_>>();
let mut events = Vec::new();
for (rid, result) in
self.db
.routing_mut()
.add_inventory(included.iter(), from, timestamp)?
{
match result {
InsertResult::SeedAdded => {
debug!(target: "service", "Routing table updated for {rid} with seed {from}");
events.push(Event::SeedDiscovered { rid, nid: from });
if self
.policies
.is_seeding(&rid)
.expect("Service::process_inventory: error accessing seeding configuration")
{
// TODO: We should fetch here if we're already connected, case this seed has
// refs we don't have.
}
synced.added.push(rid);
}
InsertResult::TimeUpdated => {
synced.updated.push(rid);
}
InsertResult::NotUpdated => {}
}
}
synced.removed.extend(
self.db
.routing()
.get_inventory(&from)?
.into_iter()
.filter(|rid| !included.contains(rid)),
);
self.db
.routing_mut()
.remove_inventories(&synced.removed, &from)?;
events.extend(
synced
.removed
.iter()
.map(|&rid| Event::SeedDropped { rid, nid: from }),
);
self.emitter.emit_all(events);
Ok(synced)
}
/// Return a refs announcement including the given remotes.
fn refs_announcement_for(
&mut self,
rid: RepoId,
remotes: impl IntoIterator<Item = NodeId>,
) -> Result<(Announcement, Vec<RefsAt>), Error> {
let repo = self.storage.repository(rid)?;
let timestamp = self.timestamp();
let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();
for remote_id in remotes.into_iter() {
let refs_at = RefsAt::new(&repo, remote_id).map_err(|err| {
radicle::storage::Error::Refs(radicle::storage::refs::Error::Read(err))
})?;
if refs.push(refs_at).is_err() {
warn!(
target: "service",
"refs announcement limit ({REF_REMOTE_LIMIT}) exceeded, peers will see only some of your repository references",
);
break;
}
}
let msg = AnnouncementMessage::from(RefsAnnouncement {
rid,
refs: refs.clone(),
timestamp,
});
Ok((msg.signed(&self.signer), refs.into()))
}
/// Announce our own refs for the given repo.
fn announce_own_refs(
&mut self,
rid: RepoId,
doc: Doc,
namespaces: impl IntoIterator<Item = NodeId>,
) -> Result<(Vec<RefsAt>, Timestamp), Error> {
let (refs, timestamp) = self.announce_refs(rid, doc, namespaces, true)?;
// Update refs database with our signed refs branches.
// This isn't strictly necessary for now, as we only use the database for fetches, and
// we don't fetch our own refs that are announced, but it's for good measure.
for r in refs.iter() {
self.emitter.emit(Event::LocalRefsAnnounced {
rid,
refs: *r,
timestamp,
});
if let Err(e) = self.database_mut().refs_mut().set(
&rid,
&r.remote,
&SIGREFS_BRANCH,
r.at,
timestamp.to_local_time(),
) {
warn!(
target: "service",
"Error updating refs database for `rad/sigrefs` of {} in {rid}: {e}",
r.remote
);
}
}
Ok((refs, timestamp))
}
/// Announce local refs for given repo.
fn announce_refs(
&mut self,
rid: RepoId,
doc: Doc,
remotes: impl IntoIterator<Item = NodeId>,
own: bool,
) -> Result<(Vec<RefsAt>, Timestamp), Error> {
let (ann, refs) = self.refs_announcement_for(rid, remotes)?;
let timestamp = ann.timestamp();
let peers = self.sessions.connected().map(|(_, p)| p);
// Update our sync status for our own refs. This is useful for determining if refs were
// updated while the node was stopped.
for r in refs.iter().filter(|r| own || r.remote == ann.node) {
info!(
target: "service",
"Announcing refs {rid}/{r} to peers (t={timestamp})..",
);
// Update our local node's sync status to mark the refs as announced.
if let Err(e) = self.db.seeds_mut().synced(&rid, &ann.node, r.at, timestamp) {
warn!(target: "service", "Failed to update sync status for local node: {e}");
} else {
debug!(target: "service", "Saved local sync status for {rid}..");
}
}
self.outbox.announce(
ann,
peers.filter(|p| {
// Only announce to peers who are allowed to view this repo.
doc.is_visible_to(&p.id.into())
}),
self.db.gossip_mut(),
);
Ok((refs, timestamp))
}
fn reconnect(&mut self, nid: NodeId, addr: Address) -> bool {
if let Some(sess) = self.sessions.get_mut(&nid) {
sess.to_initial();
self.outbox.connect(nid, addr);
return true;
}
false
}
fn connect(&mut self, nid: NodeId, addr: Address) -> Result<(), ConnectError> {
debug!(target: "service", "Connecting to {nid} ({addr})..");
if nid == self.node_id() {
return Err(ConnectError::SelfConnection);
}
if let Ok(true) = self.policies.is_blocked(&nid) {
return Err(ConnectError::Blocked { nid });
}
if !self.is_supported_address(&addr) {
return Err(ConnectError::UnsupportedAddress { nid, addr });
}
if self.sessions.contains_key(&nid) {
return Err(ConnectError::SessionExists { nid });
}
if self.sessions.outbound().count() >= self.config.limits.connection.outbound.into() {
return Err(ConnectError::LimitReached { nid, addr });
}
let persistent = self.config.is_persistent(&nid);
let timestamp: Timestamp = self.clock.into();
if let Err(e) = self.db.addresses_mut().attempted(&nid, &addr, timestamp) {
warn!(target: "service", "Failed to update address book with connection attempt: {e}");
}
self.sessions.insert(
nid,
Session::outbound(nid, addr.clone(), persistent, self.rng.clone()),
);
self.outbox.connect(nid, addr);
Ok(())
}
fn seeds(&self, rid: &RepoId, namespaces: HashSet<PublicKey>) -> Result<Seeds, Error> {
let mut seeds = Seeds::new(self.rng.clone());
// First, build a list of peers that have synced refs for `namespaces`, if any.
// This step is skipped:
// 1. For the repository (and thus all `namespaces`), if it not exist in storage.
// 2. For each `namespace` in `namespaces`, which does not exist in storage.
if let Ok(repo) = self.storage.repository(*rid) {
for namespace in namespaces.iter() {
let Ok(local) = RefsAt::new(&repo, *namespace) else {
continue;
};
for seed in self.db.seeds().seeds_for(rid)? {
let seed = seed?;
let state = self.sessions.get(&seed.nid).map(|s| s.state.clone());
let synced = if local.at == seed.synced_at.oid {
SyncStatus::Synced { at: seed.synced_at }
} else {
let local = SyncedAt::new(local.at, &repo)?;
SyncStatus::OutOfSync {
local,
remote: seed.synced_at,
}
};
seeds.insert(Seed::new(seed.nid, seed.addresses, state, Some(synced)));
}
}
}
// Then, add peers we know about but have no information about the sync status.
// These peers have announced that they seed the repository via an inventory
// announcement, but we haven't received any ref announcements from them.
for nid in self.db.routing().get(rid)? {
if namespaces.contains(&nid) {
continue;
}
if seeds.contains(&nid) {
// We already have a richer entry for this node.
continue;
}
let addrs = self.db.addresses().addresses_of(&nid)?;
let state = self.sessions.get(&nid).map(|s| s.state.clone());
seeds.insert(Seed::new(nid, addrs, state, None));
}
Ok(seeds)
}
/// Return a new filter object, based on our seeding policy.
fn filter(&self) -> Filter {
if self.config.seeding_policy.is_allow() {
// TODO: Remove bits for blocked repos.
Filter::default()
} else {
self.filter.clone()
}
}
/// Get a timestamp for using in announcements.
/// Never returns the same timestamp twice.
fn timestamp(&mut self) -> Timestamp {
let now = Timestamp::from(self.clock);
if *now > *self.last_timestamp {
self.last_timestamp = now;
} else {
self.last_timestamp = self.last_timestamp + 1;
}
self.last_timestamp
}
fn relay(&mut self, id: gossip::AnnouncementId, ann: Announcement) {
let announcer = ann.node;
let relayed_by = self.relayed_by.get(&id);
let rid = if let AnnouncementMessage::Refs(RefsAnnouncement { rid, .. }) = ann.message {
Some(rid)
} else {
None
};
// Choose peers we should relay this message to.
// 1. Don't relay to a peer who sent us this message.
// 2. Don't relay to the peer who signed this announcement.
let relay_to = self
.sessions
.connected()
.filter(|(id, _)| {
relayed_by
.map(|relayers| !relayers.contains(id))
.unwrap_or(true) // If there are no relayers we let it through.
})
.filter(|(id, _)| **id != announcer)
.filter(|(id, _)| {
if let Some(rid) = rid {
// Only relay this message if the peer is allowed to know about the
// repository. If we don't have the repository, return `false` because
// we can't determine if it's private or public.
self.storage
.get(rid)
.ok()
.flatten()
.map(|doc| doc.is_visible_to(&(*id).into()))
.unwrap_or(false)
} else {
// Announcement doesn't concern a specific repository, let it through.
true
}
})
.map(|(_, p)| p);
self.outbox.relay(ann, relay_to);
}
////////////////////////////////////////////////////////////////////////////
// Periodic tasks
////////////////////////////////////////////////////////////////////////////
fn relay_announcements(&mut self) -> Result<(), Error> {
let now = self.clock.into();
let rows = self.database_mut().gossip_mut().relays(now)?;
let local = self.node_id();
for (id, msg) in rows {
let announcer = msg.node;
if announcer == local {
// Don't relay our own stored gossip messages.
continue;
}
self.relay(id, msg);
}
Ok(())
}
/// Announce our inventory to all connected peers, unless it was already announced.
fn announce_inventory(&mut self) {
let timestamp = self.inventory.timestamp.to_local_time();
if self.last_inventory == timestamp {
debug!(target: "service", "Skipping redundant inventory announcement (t={})", self.inventory.timestamp);
return;
}
let msg = AnnouncementMessage::from(self.inventory.clone());
self.outbox.announce(
msg.signed(&self.signer),
self.sessions.connected().map(|(_, p)| p),
self.db.gossip_mut(),
);
self.last_inventory = timestamp;
}
fn prune_routing_entries(&mut self, now: &LocalTime) -> Result<(), routing::Error> {
let count = self.db.routing().len()?;
if count <= self.config.limits.routing_max_size.into() {
return Ok(());
}
let delta = count - usize::from(self.config.limits.routing_max_size);
let nid = self.node_id();
self.db.routing_mut().prune(
(*now - LocalDuration::from(self.config.limits.routing_max_age)).into(),
Some(delta),
&nid,
)?;
Ok(())
}
fn disconnect_unresponsive_peers(&mut self, now: &LocalTime) {
let stale = self
.sessions
.connected()
.filter(|(_, session)| *now - session.last_active >= STALE_CONNECTION_TIMEOUT);
for (_, session) in stale {
debug!(target: "service", "Disconnecting unresponsive peer {}..", session.id);
// TODO: Should we switch the session state to "disconnected" even before receiving
// an official "disconnect"? Otherwise we keep pinging until we get the disconnection.
self.outbox.disconnect(
session.id,
DisconnectReason::Session(session::Error::Timeout),
);
}
}
/// Ensure connection health by pinging connected peers.
fn keep_alive(&mut self, now: &LocalTime) {
let inactive_sessions = self
.sessions
.connected_mut()
.filter(|(_, session)| *now - session.last_active >= KEEP_ALIVE_DELTA)
.map(|(_, session)| session);
for session in inactive_sessions {
session.ping(self.clock, &mut self.outbox).ok();
}
}
/// Get a list of peers available to connect to, sorted by lowest penalty.
fn available_peers(&mut self) -> Vec<Peer> {
match self.db.addresses().entries() {
Ok(entries) => {
// Nb. we don't want to connect to any peers that already have a session with us,
// even if it's in a disconnected state. Those sessions are re-attempted automatically.
let mut peers = entries
.filter(|entry| entry.version == PROTOCOL_VERSION)
.filter(|entry| !entry.address.banned)
.filter(|entry| !entry.penalty.is_connect_threshold_reached())
.filter(|entry| !self.sessions.contains_key(&entry.node))
.filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
.filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
.filter(|entry| &entry.node != self.nid())
.filter(|entry| self.is_supported_address(&entry.address.addr))
.fold(HashMap::new(), |mut acc, entry| {
acc.entry(entry.node)
.and_modify(|e: &mut Peer| e.addresses.push(entry.address.clone()))
.or_insert_with(|| Peer {
nid: entry.node,
addresses: vec![entry.address],
penalty: entry.penalty,
});
acc
})
.into_values()
.collect::<Vec<_>>();
peers.sort_by_key(|p| p.penalty);
peers
}
Err(e) => {
warn!(target: "service", "Unable to lookup available peers in address book: {e}");
Vec::new()
}
}
}
/// Fetch all repositories that are seeded but missing from storage.
fn fetch_missing_repositories(&mut self) -> Result<(), Error> {
let policies = self.policies.seed_policies()?.collect::<Vec<_>>();
for policy in policies {
let policy = match policy {
Ok(policy) => policy,
Err(err) => {
debug!(target: "protocol::filter", "Failed to read seed policy: {err}");
continue;
}
};
let rid = policy.rid;
if !policy.is_allow() {
continue;
}
match self.storage.contains(&rid) {
Ok(exists) => {
if exists {
continue;
}
}
Err(err) => {
log::debug!(target: "protocol::filter", "Failed to check if {rid} exists: {err}");
continue;
}
}
match self.seeds(&rid, [self.node_id()].into()) {
Ok(seeds) => {
if let Some(connected) = NonEmpty::from_vec(seeds.connected().collect()) {
for seed in connected {
self.fetch(rid, seed.nid, vec![], self.fetch_config(), None);
}
} else {
// TODO: We should make sure that this fetch is retried later, either
// when we connect to a seed, or when we discover a new seed.
// Since new connections and routing table updates are both conditions for
// fetching, we should trigger fetches when those conditions appear.
// Another way to handle this would be to update our database, saying
// that we're trying to fetch a certain repo. We would then just
// iterate over those entries in the above circumstances. This is
// merely an optimization though, we can also iterate over all seeded
// repos and check which ones are not in our inventory.
debug!(target: "service", "No connected seeds found for {rid}..");
}
}
Err(e) => {
debug!(target: "service", "Couldn't fetch missing repo {rid}: failed to lookup seeds: {e}");
}
}
}
Ok(())
}
/// Run idle task for all connections.
fn idle_connections(&mut self) {
for (_, sess) in self.sessions.iter_mut() {
sess.idle(self.clock);
if sess.is_stable() {
// Mark as connected once connection is stable.
if let Err(e) =
self.db
.addresses_mut()
.connected(&sess.id, &sess.addr, self.clock.into())
{
warn!(target: "service", "Failed to update address book with connection: {e}");
}
}
}
}
/// Try to maintain a target number of connections.
fn maintain_connections(&mut self) {
let PeerConfig::Dynamic = self.config.peers else {
return;
};
trace!(target: "service", "Maintaining connections..");
let target = TARGET_OUTBOUND_PEERS;
let now = self.clock;
let outbound = self
.sessions
.values()
.filter(|s| s.link.is_outbound())
.filter(|s| s.is_connected() || s.is_connecting())
.count();
let wanted = target.saturating_sub(outbound);
// Don't connect to more peers than needed.
if wanted == 0 {
return;
}
// Peers available to connect to.
let available = self
.available_peers()
.into_iter()
.filter_map(|peer| {
peer.addresses
.into_iter()
.find(|ka| match (ka.last_success, ka.last_attempt) {
// If we succeeded the last time we tried, this is a good address.
// If it's been long enough that we failed to connect, we also try again.
(Some(success), Some(attempt)) => {
success >= attempt || now - attempt >= CONNECTION_RETRY_DELTA
}
// If we haven't succeeded yet, and we waited long enough, we can try this address.
(None, Some(attempt)) => now - attempt >= CONNECTION_RETRY_DELTA,
// If we have no failed attempts for this address, it's worth a try.
(_, None) => true,
})
.map(|ka| (peer.nid, ka))
})
.filter(|(_, ka)| self.is_supported_address(&ka.addr));
// Peers we are going to attempt connections to.
let connect = available.take(wanted).collect::<Vec<_>>();
if connect.len() < wanted {
log::debug!(
target: "service",
"Not enough available peers to connect to (available={}, wanted={wanted})",
connect.len()
);
}
for (id, ka) in connect {
if let Err(e) = self.connect(id, ka.addr.clone()) {
warn!(target: "service", "Service::maintain_connections connection error: {e}");
}
}
}
/// Maintain persistent peer connections.
fn maintain_persistent(&mut self) {
trace!(target: "service", "Maintaining persistent peers..");
let now = self.local_time();
let mut reconnect = Vec::new();
for (nid, session) in self.sessions.iter_mut() {
if self.config.is_persistent(nid) {
if self.policies.is_blocked(nid).unwrap_or(false) {
continue;
}
if let session::State::Disconnected { retry_at, .. } = &mut session.state {
// TODO: Try to reconnect only if the peer was attempted. A disconnect without
// even a successful attempt means that we're unlikely to be able to reconnect.
if now >= *retry_at {
reconnect.push((*nid, session.addr.clone(), session.attempts()));
}
}
}
}
for (nid, addr, attempts) in reconnect {
if self.reconnect(nid, addr) {
debug!(target: "service", "Reconnecting to {nid} (attempts={attempts})...");
}
}
}
/// Checks if the given [`Address`] is supported for connecting to.
///
/// # IPv4/IPv6/DNS
///
/// Always returns `true`.
///
/// # Tor
///
/// If the [`Address`] is an `.onion` address and the service supports onion
/// routing then this will return `true`.
///
/// # I2P
///
/// If the [`Address`] is an I2P address and the service supports I2P
/// connections then this will return `true`.
fn is_supported_address(&self, address: &Address) -> bool {
match AddressType::from(address) {
// Only consider onion addresses if configured.
#[cfg(feature = "tor")]
AddressType::Onion => self.config.onion != radicle::node::config::AddressConfig::Drop,
#[cfg(feature = "i2p")]
AddressType::I2p => self.config.i2p != radicle::node::config::AddressConfig::Drop,
AddressType::Dns | AddressType::Ipv4 | AddressType::Ipv6 => true,
}
}
fn fetch_config(&self) -> fetcher::FetchConfig {
fetcher::FetchConfig::default()
.with_minimum_feature_level(self.config.fetch.feature_level_min())
}
}
/// Gives read access to the service state.
pub trait ServiceState {
/// Get the Node ID.
fn nid(&self) -> &NodeId;
/// Get the existing sessions.
fn sessions(&self) -> &Sessions;
/// Get fetch state.
fn fetching(&self) -> &FetcherState;
/// Get outbox.
fn outbox(&self) -> &Outbox;
/// Get rate limiter.
fn limiter(&self) -> &RateLimiter;
/// Get event emitter.
fn emitter(&self) -> &Emitter<Event>;
/// Get a repository from storage.
fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError>;
/// Get the clock.
fn clock(&self) -> &LocalTime;
/// Get the clock mutably.
fn clock_mut(&mut self) -> &mut LocalTime;
/// Get service configuration.
fn config(&self) -> &Config;
/// Get service metrics.
fn metrics(&self) -> &Metrics;
}
impl<D, S, G> ServiceState for Service<D, S, G>
where
D: routing::Store,
G: crypto::signature::Signer<crypto::Signature>,
S: ReadStorage,
{
fn nid(&self) -> &NodeId {
self.signer.public_key()
}
fn sessions(&self) -> &Sessions {
&self.sessions
}
fn fetching(&self) -> &FetcherState {
self.fetcher.state()
}
fn outbox(&self) -> &Outbox {
&self.outbox
}
fn limiter(&self) -> &RateLimiter {
&self.limiter
}
fn emitter(&self) -> &Emitter<Event> {
&self.emitter
}
fn get(&self, rid: RepoId) -> Result<Option<Doc>, RepositoryError> {
self.storage.get(rid)
}
fn clock(&self) -> &LocalTime {
&self.clock
}
fn clock_mut(&mut self) -> &mut LocalTime {
&mut self.clock
}
fn config(&self) -> &Config {
&self.config
}
fn metrics(&self) -> &Metrics {
&self.metrics
}
}
/// Disconnect reason.
#[derive(Debug)]
pub enum DisconnectReason {
/// Error while dialing the remote. This error occurs before a connection is
/// even established. Errors of this kind are usually not transient.
Dial(Arc<dyn std::error::Error + Sync + Send>),
/// Error with an underlying established connection. Sometimes, reconnecting
/// after such an error is possible.
Connection(Arc<dyn std::error::Error + Sync + Send>),
/// Error with a fetch.
Fetch(FetchError),
/// Session error.
Session(session::Error),
/// Session conflicts with existing session.
Conflict,
/// Connection to self.
SelfConnection,
/// Peer is blocked by policy
Policy,
/// User requested disconnect
Command,
}
impl DisconnectReason {
pub fn is_dial_err(&self) -> bool {
matches!(self, Self::Dial(_))
}
pub fn is_connection_err(&self) -> bool {
matches!(self, Self::Connection(_))
}
pub fn connection() -> Self {
DisconnectReason::Connection(Arc::new(std::io::Error::from(
std::io::ErrorKind::ConnectionReset,
)))
}
}
impl fmt::Display for DisconnectReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Dial(err) => write!(f, "{err}"),
Self::Connection(err) => write!(f, "{err}"),
Self::Command => write!(f, "command"),
Self::SelfConnection => write!(f, "self-connection"),
Self::Conflict => write!(f, "conflict"),
Self::Policy => write!(f, "policy"),
Self::Session(err) => write!(f, "{err}"),
Self::Fetch(err) => write!(f, "fetch: {err}"),
}
}
}
/// Result of a project lookup.
#[derive(Debug)]
pub struct Lookup {
/// Whether the project was found locally or not.
pub local: Option<Doc>,
/// A list of remote peers on which the project is known to exist.
pub remote: Vec<NodeId>,
}
#[derive(thiserror::Error, Debug)]
pub enum LookupError {
#[error(transparent)]
Routing(#[from] routing::Error),
#[error(transparent)]
Repository(#[from] RepositoryError),
}
#[derive(Debug, Clone)]
/// Holds currently (or recently) connected peers.
pub struct Sessions(AddressBook<NodeId, Session>);
impl Sessions {
pub fn new(rng: Rng) -> Self {
Self(AddressBook::new(rng))
}
/// Iterator over fully connected peers.
pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
self.0
.iter()
.filter_map(move |(id, sess)| match &sess.state {
session::State::Connected { .. } => Some((id, sess)),
_ => None,
})
}
/// Iterator over connected inbound peers.
pub fn inbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
self.connected().filter(|(_, s)| s.link.is_inbound())
}
/// Iterator over outbound peers.
pub fn outbound(&self) -> impl Iterator<Item = (&NodeId, &Session)> + Clone {
self.connected().filter(|(_, s)| s.link.is_outbound())
}
/// Iterator over mutable fully connected peers.
pub fn connected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
self.0.iter_mut().filter(move |(_, s)| s.is_connected())
}
/// Iterator over disconnected peers.
pub fn disconnected_mut(&mut self) -> impl Iterator<Item = (&NodeId, &mut Session)> {
self.0.iter_mut().filter(move |(_, s)| s.is_disconnected())
}
/// Return whether this node has a fully established session.
pub fn is_connected(&self, id: &NodeId) -> bool {
self.0.get(id).map(|s| s.is_connected()).unwrap_or(false)
}
/// Return whether this node can be connected to.
pub fn is_disconnected(&self, id: &NodeId) -> bool {
self.0.get(id).map(|s| s.is_disconnected()).unwrap_or(true)
}
}
impl Deref for Sessions {
type Target = AddressBook<NodeId, Session>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Sessions {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}