use std::collections::VecDeque;
use std::{fmt, time};
use crossbeam_channel as chan;
use radicle::node::{FetchResult, Severity};
use radicle::node::{Link, Timestamp};
pub use radicle::node::{PingState, State};
use radicle::storage::refs::RefsAt;
use crate::service::message;
use crate::service::message::Message;
use crate::service::{Address, LocalDuration, LocalTime, NodeId, Outbox, RepoId, Rng};
/// Time after which a connection is considered stable.
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
/// Maximum items in the fetch queue.
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
/// The remote peer sent an invalid announcement timestamp,
/// for eg. a timestamp far in the future.
#[error("invalid announcement timestamp: {0}")]
InvalidTimestamp(Timestamp),
/// The remote peer sent git protocol messages while we were expecting
/// gossip messages. Or vice-versa.
#[error("protocol mismatch")]
ProtocolMismatch,
/// The remote peer did something that violates the protocol rules.
#[error("peer misbehaved")]
Misbehavior,
/// The remote peer timed out.
#[error("peer timed out")]
Timeout,
}
impl Error {
/// Return the severity for this error.
pub fn severity(&self) -> Severity {
match self {
Self::InvalidTimestamp(_) => Severity::High,
Self::ProtocolMismatch => Severity::High,
Self::Misbehavior => Severity::High,
Self::Timeout => Severity::Low,
}
}
}
/// Error when trying to queue a fetch.
#[derive(thiserror::Error, Debug, Clone)]
pub enum QueueError {
/// The item already exists in the queue.
#[error("item is already queued")]
Duplicate(QueuedFetch),
/// The queue is at capacity.
#[error("queue capacity reached")]
CapacityReached(QueuedFetch),
}
impl QueueError {
/// Get the inner [`QueuedFetch`].
pub fn inner(&self) -> &QueuedFetch {
match self {
Self::Duplicate(f) => f,
Self::CapacityReached(f) => f,
}
}
}
/// Fetch waiting to be processed, in the fetch queue.
#[derive(Debug, Clone)]
pub struct QueuedFetch {
/// Repo being fetched.
pub rid: RepoId,
/// Peer being fetched from.
pub from: NodeId,
/// Refs being fetched.
pub refs_at: Vec<RefsAt>,
/// The timeout given for the fetch request.
pub timeout: time::Duration,
/// Result channel.
pub channel: Option<chan::Sender<FetchResult>>,
}
impl PartialEq for QueuedFetch {
fn eq(&self, other: &Self) -> bool {
self.rid == other.rid
&& self.from == other.from
&& self.refs_at == other.refs_at
&& self.channel.is_none()
&& other.channel.is_none()
}
}
/// A peer session. Each connected peer will have one session.
#[derive(Debug, Clone)]
pub struct Session {
/// Peer id.
pub id: NodeId,
/// Peer address.
pub addr: Address,
/// Connection direction.
pub link: Link,
/// Whether we should attempt to re-connect
/// to this peer upon disconnection.
pub persistent: bool,
/// Peer connection state.
pub state: State,
/// Peer subscription.
pub subscribe: Option<message::Subscribe>,
/// Last time a message was received from the peer.
pub last_active: LocalTime,
/// Connection attempts. For persistent peers, Tracks
/// how many times we've attempted to connect. We reset this to zero
/// upon successful connection, once the connection is stable.
attempts: usize,
/// Source of entropy.
rng: Rng,
}
impl fmt::Display for Session {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut attrs = Vec::new();
let state = self.state.to_string();
if self.link.is_inbound() {
attrs.push("inbound");
} else {
attrs.push("outbound");
}
if self.persistent {
attrs.push("persistent");
}
attrs.push(state.as_str());
write!(f, "{} [{}]", self.id, attrs.join(" "))
}
}
impl From<&Session> for radicle::node::Session {
fn from(s: &Session) -> Self {
Self {
nid: s.id,
link: if s.link.is_inbound() {
radicle::node::Link::Inbound
} else {
radicle::node::Link::Outbound
},
addr: s.addr.clone(),
state: s.state.clone(),
}
}
}
impl Session {
pub fn outbound(id: NodeId, addr: Address, persistent: bool, rng: Rng) -> Self {
Self {
id,
addr,
state: State::Initial,
link: Link::Outbound,
subscribe: None,
persistent,
last_active: LocalTime::default(),
attempts: 1,
rng,
}
}
pub fn inbound(id: NodeId, addr: Address, persistent: bool, rng: Rng, time: LocalTime) -> Self {
Self {
id,
addr,
state: State::Connected {
since: time,
ping: PingState::default(),
latencies: VecDeque::default(),
stable: false,
},
link: Link::Inbound,
subscribe: None,
persistent,
last_active: time,
attempts: 0,
rng,
}
}
pub fn is_connecting(&self) -> bool {
matches!(self.state, State::Attempted)
}
pub fn is_stable(&self) -> bool {
matches!(self.state, State::Connected { stable: true, .. })
}
pub fn is_connected(&self) -> bool {
self.state.is_connected()
}
pub fn is_disconnected(&self) -> bool {
matches!(self.state, State::Disconnected { .. })
}
pub fn is_initial(&self) -> bool {
matches!(self.state, State::Initial)
}
pub fn attempts(&self) -> usize {
self.attempts
}
/// Run 'idle' task for session.
pub fn idle(&mut self, now: LocalTime) {
if let State::Connected {
since,
ref mut stable,
..
} = self.state
{
if now >= since && now.duration_since(since) >= CONNECTION_STABLE_THRESHOLD {
*stable = true;
// Reset number of attempts for stable connections.
self.attempts = 0;
}
}
}
pub fn to_attempted(&mut self) {
assert!(
self.is_initial(),
"Can only transition to 'attempted' state from 'initial' state"
);
self.state = State::Attempted;
self.attempts += 1;
}
pub fn to_connected(&mut self, since: LocalTime) {
self.last_active = since;
if let State::Connected { .. } = &self.state {
log::debug!(target: "service", "Session {} is already in 'connected' state, resetting..", self.id);
};
self.state = State::Connected {
since,
ping: PingState::default(),
latencies: VecDeque::default(),
stable: false,
};
}
/// Move the session state to "disconnected". Returns any pending RID
/// that was requested.
pub fn to_disconnected(&mut self, since: LocalTime, retry_at: LocalTime) {
self.state = State::Disconnected { since, retry_at };
}
/// Return to initial state from disconnected state. This state transition
/// happens when we attempt to re-connect to a disconnected peer.
pub fn to_initial(&mut self) {
assert!(
self.is_disconnected(),
"Can only transition to 'initial' state from 'disconnected' state"
);
self.state = State::Initial;
}
pub fn ping(&mut self, since: LocalTime, reactor: &mut Outbox) -> Result<(), Error> {
if let State::Connected { ping, .. } = &mut self.state {
let msg = message::Ping::new(&mut self.rng);
*ping = PingState::AwaitingResponse {
len: msg.ponglen,
since,
};
reactor.write(self, Message::Ping(msg));
}
Ok(())
}
}