Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
`radicle-protocol`: Sans-IO-ification
Draft fintohaps opened 9 months ago

Exploring the conversion of radicle-protocol into a true sans-IO approach.

18 files changed +3097 -1 1cd3ad07 99c847ad
added crates/radicle-protocol/src/connections.rs
@@ -0,0 +1,446 @@
+
// TODO(finto): command should be something else, perhaps `input`?
+
pub mod command;
+
pub use command::{Connect, Disconnect};
+

+
pub mod commands;
+
pub mod effects;
+
pub mod events;
+

+
use radicle::node::config::RateLimits;
+
use radicle::prelude::RepoId;
+
use session::{HasAttempts as _, Pinged, Session, Sessions};
+
mod session;
+

+
use std::collections::HashSet;
+
use std::net::IpAddr;
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::node::address;
+
use radicle::node::{Address, HostName, Link, NodeId, Severity};
+

+
use crate::service::limiter::RateLimiter;
+
use crate::service::{message, DisconnectReason};
+

+
/// Minimum amount of time to wait before reconnecting to a peer.
+
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
+
/// Maximum amount of time to wait before reconnecting to a peer.
+
pub const MAX_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_mins(60);
+

+
pub struct Connections {
+
    /// The state of the connection lifecycle for each node in the network.
+
    sessions: Sessions,
+
    /// Keep track of which node connections are meant to be persistent.
+
    persistent: HashSet<NodeId>,
+
    /// Keep track of banned IP addresses.
+
    banned: HashSet<IpAddr>,
+
    /// Rate limiter of IP hosts.
+
    limiter: RateLimiter,
+
    /// Configuration for managing connections.
+
    config: Config,
+
}
+

+
pub struct Config {
+
    /// Duration for a connection to be considered idle.
+
    idle: LocalDuration,
+
    /// Allowed number of inbound connections
+
    inbound_limit: usize,
+
    limits: RateLimits,
+
    reconnection_delay: ReconnectionDelay,
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct ReconnectionDelay {
+
    /// The minimum amount of time to wait before attempting a re-connection.
+
    pub min_delta: LocalDuration,
+
    /// The maximum amount of time to wait before attempting a re-connection.
+
    pub max_delta: LocalDuration,
+
}
+

+
impl Default for ReconnectionDelay {
+
    fn default() -> Self {
+
        Self {
+
            min_delta: MIN_RECONNECTION_DELTA,
+
            max_delta: MAX_RECONNECTION_DELTA,
+
        }
+
    }
+
}
+

+
pub enum CommandEvent {
+
    MissingSession {
+
        node: NodeId,
+
    },
+
    Attempted(Session<session::Attempted>),
+
    Connected(Session<session::Connected>),
+
    Disconnected(Session<session::Disconnected>),
+
    Subscribed {
+
        node: NodeId,
+
        subscription: message::Subscribe,
+
    },
+
    SubscribedTo {
+
        node: NodeId,
+
        rid: RepoId,
+
    },
+
    Pinged {
+
        node: NodeId,
+
        pinged: Option<Pinged>,
+
    },
+
}
+

+
impl Connections {
+
    pub fn handle_command(&mut self, command: commands::Command) -> CommandEvent {
+
        match command {
+
            commands::Command::Attempt(attempt) => self.attempted(attempt),
+
            commands::Command::Connect(connect) => self.connected(connect),
+
            commands::Command::Disconnect(disconnect) => self.disconnected(disconnect),
+
            commands::Command::Subscribe(subscribe) => self.subscribed(subscribe),
+
            commands::Command::SubscribeTo(subscribe) => self.subscribed_to(subscribe),
+
            commands::Command::Pong(pong) => self.pinged(pong),
+
        }
+
    }
+

+
    fn attempted(&mut self, commands::Attempt { node }: commands::Attempt) -> CommandEvent {
+
        self.sessions
+
            .session_to_attempted(&node)
+
            .map(CommandEvent::Attempted)
+
            .unwrap_or(CommandEvent::MissingSession { node })
+
    }
+

+
    fn connected(
+
        &mut self,
+
        commands::Connect {
+
            node,
+
            now,
+
            link,
+
            persistent,
+
        }: commands::Connect,
+
    ) -> CommandEvent {
+
        self.sessions
+
            .session_to_connected(&node, now, link, persistent)
+
            .map(CommandEvent::Connected)
+
            .unwrap_or(CommandEvent::MissingSession { node })
+
    }
+

+
    fn disconnected(
+
        &mut self,
+
        commands::Disconnect {
+
            node,
+
            since,
+
            retry_at,
+
        }: commands::Disconnect,
+
    ) -> CommandEvent {
+
        self.sessions
+
            .session_to_disconnected(&node, since, retry_at)
+
            .map(CommandEvent::Disconnected)
+
            .unwrap_or(CommandEvent::MissingSession { node })
+
    }
+

+
    fn subscribed(
+
        &mut self,
+
        commands::Subscribe { node, subscription }: commands::Subscribe,
+
    ) -> CommandEvent {
+
        if self.sessions.subscribe(&node, subscription.clone()) {
+
            CommandEvent::Subscribed { node, subscription }
+
        } else {
+
            CommandEvent::MissingSession { node }
+
        }
+
    }
+

+
    fn subscribed_to(
+
        &mut self,
+
        commands::SubscribeTo { node, rid }: commands::SubscribeTo,
+
    ) -> CommandEvent {
+
        if self.sessions.subscribe_to(&node, &rid) {
+
            CommandEvent::SubscribedTo { node, rid }
+
        } else {
+
            CommandEvent::MissingSession { node }
+
        }
+
    }
+

+
    fn pinged(&mut self, commands::Pong { node, pong }: commands::Pong) -> CommandEvent {
+
        self.sessions.pinged(&node, pong).map_or_else(
+
            |session::Missing| CommandEvent::MissingSession { node },
+
            |pinged| CommandEvent::Pinged { node, pinged },
+
        )
+
    }
+
}
+

+
impl Connections {
+
    pub fn sessions(&self) -> &Sessions {
+
        &self.sessions
+
    }
+

+
    pub fn accept(&mut self, ip: IpAddr, now: LocalTime) -> AcceptResult {
+
        let mut result = AcceptResult::default();
+
        // Always accept localhost connections, even if we already reached
+
        // our inbound connection limit.
+
        if ip.is_loopback() || ip.is_unspecified() {
+
            result.local_host(ip);
+
            return result;
+
        }
+

+
        if self.has_reached_inbound_limit() {
+
            result.inbound_limit_exceeded(ip, self.sessions.connected_inbound());
+
            return result;
+
        }
+

+
        if self.is_ip_banned(&ip) {
+
            result.ip_banned(ip);
+
            return result;
+
        }
+

+
        if self.has_reached_ip_limit(&ip, now) {
+
            result.host_limited(ip);
+
            return result;
+
        }
+

+
        result.accepted(ip);
+
        result
+
    }
+

+
    pub fn connect(&self, connect: Connect) -> ConnectResult {
+
        let mut result = ConnectResult::default();
+
        match connect {
+
            Connect::Inbound(command::Inbound {
+
                node,
+
                clock,
+
                persistent,
+
            }) => match self.sessions.get_connected(&node) {
+
                Some(session) => result.already_connected(session.clone(), Link::Inbound),
+
                None => {
+
                    result.connect(node, clock, Link::Inbound, persistent);
+
                    result.send_initial_messages(node, Link::Inbound);
+
                }
+
            },
+
            Connect::Outbound(command::Outbound {
+
                node,
+
                addr,
+
                persistent,
+
                clock,
+
            }) => match self.sessions.get_connected(&node) {
+
                Some(session) => {
+
                    result.already_connected(session.clone(), Link::Outbound);
+
                    result.send_initial_messages(node, *session.link());
+
                }
+
                None => {
+
                    if let HostName::Ip(ip) = addr.host {
+
                        if !address::is_local(&ip) {
+
                            result.record_ip(node, ip, clock);
+
                        }
+
                    }
+
                    result.connect(node, clock, Link::Outbound, persistent);
+
                    result.send_initial_messages(node, Link::Outbound);
+
                }
+
            },
+
        }
+
        result
+
    }
+

+
    pub fn disconnect(&self, disconnect: Disconnect) -> DisconnectResult {
+
        let mut result = DisconnectResult::default();
+
        let Disconnect {
+
            node,
+
            link,
+
            reason,
+
            since,
+
        } = disconnect;
+
        let is_persistent = self.is_persistent(&node);
+
        let Some(session) = self.sessions.get_session(&node) else {
+
            result.already_disconnected(node);
+
            return result;
+
        };
+
        if *session.link() != link {
+
            result.link_conflict(node, *session.link(), link);
+
            return result;
+
        }
+

+
        if is_persistent {
+
            let delay = self.reconnection_delay(session.attempts().as_u32());
+
            let retry_at = since + delay;
+
            result.retry_connection(node, since, retry_at);
+
            result.disconnect(node, since, Some(retry_at));
+
        } else {
+
            let severity = self.reason_severity(reason, since);
+
            result.record_severity(node, session.address().clone(), severity);
+
            result.disconnect(node, since, None);
+
            if link.is_outbound() {
+
                result.maintain_connnections();
+
            }
+
        }
+
        result
+
    }
+

+
    fn reason_severity(&self, reason: DisconnectReason, now: LocalTime) -> Severity {
+
        match reason {
+
            DisconnectReason::Dial(_)
+
            | DisconnectReason::Fetch(_)
+
            | DisconnectReason::Connection(_) => {
+
                if self.is_online(now) {
+
                    // If we're "online", there's something wrong with this
+
                    // peer connection specifically.
+
                    Severity::Medium
+
                } else {
+
                    Severity::Low
+
                }
+
            }
+
            DisconnectReason::Session(e) => e.severity(),
+
            DisconnectReason::Command
+
            | DisconnectReason::Conflict
+
            | DisconnectReason::SelfConnection => Severity::Low,
+
        }
+
    }
+

+
    /// Try to guess whether we're online or not.
+
    fn is_online(&self, now: LocalTime) -> bool {
+
        self.sessions
+
            .connected()
+
            .filter(|(_, s)| s.address().is_routable() && *s.last_active() >= now - self.idle())
+
            .count()
+
            > 0
+
    }
+

+
    fn idle(&self) -> LocalDuration {
+
        self.config.idle
+
    }
+

+
    fn is_persistent(&self, node: &NodeId) -> bool {
+
        self.persistent.contains(node)
+
    }
+

+
    fn is_ip_banned(&self, ip: &IpAddr) -> bool {
+
        self.banned.contains(ip)
+
    }
+

+
    // TODO: limit is harshing my buzz by taking &mut self here
+
    fn has_reached_ip_limit(&mut self, ip: &IpAddr, now: LocalTime) -> bool {
+
        let addr = HostName::from(*ip);
+
        self.limiter
+
            .limit(addr, None, &self.config.limits.inbound, now)
+
    }
+

+
    fn has_reached_inbound_limit(&self) -> bool {
+
        self.sessions.connected_inbound() >= self.config.inbound_limit
+
    }
+

+
    fn reconnection_delay(&self, attempts: u32) -> LocalDuration {
+
        LocalDuration::from_secs(2u64.pow(attempts)).clamp(
+
            self.config.reconnection_delay.min_delta,
+
            self.config.reconnection_delay.max_delta,
+
        )
+
    }
+
}
+

+
#[derive(Debug, Default)]
+
pub struct AcceptResult {
+
    pub effects: Vec<effects::Accept>,
+
    pub events: Vec<events::Accept>,
+
}
+

+
impl AcceptResult {
+
    fn local_host(&mut self, ip: IpAddr) {
+
        self.effects.push(effects::Accept::LocalHost { ip });
+
    }
+

+
    fn inbound_limit_exceeded(&mut self, ip: IpAddr, connected_inbound: usize) {
+
        self.events.push(events::Accept::LimitExceeded {
+
            ip,
+
            current_inbound: connected_inbound,
+
        });
+
    }
+

+
    fn ip_banned(&mut self, ip: IpAddr) {
+
        self.events.push(events::Accept::IpBanned { ip })
+
    }
+

+
    fn host_limited(&mut self, ip: IpAddr) {
+
        self.events.push(events::Accept::HostLimited { ip })
+
    }
+

+
    fn accepted(&mut self, ip: IpAddr) {
+
        self.effects.push(effects::Accept::Accepted { ip })
+
    }
+
}
+

+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct ConnectResult {
+
    pub events: Vec<events::Connect>,
+
    pub effects: Vec<effects::Connect>,
+
    pub commands: Vec<commands::Connect>,
+
}
+

+
impl ConnectResult {
+
    fn already_connected(&mut self, session: Session<session::Connected>, attempted_link: Link) {
+
        self.events.push(events::Connect::AlreadyConnected {
+
            session,
+
            attempted_link,
+
        });
+
    }
+

+
    fn send_initial_messages(&mut self, node: NodeId, link: Link) {
+
        self.effects
+
            .push(effects::Connect::SendInitialMessages { node, link });
+
    }
+

+
    fn record_ip(&mut self, node: NodeId, ip: IpAddr, clock: LocalTime) {
+
        self.effects
+
            .push(effects::Connect::RecordIp { node, ip, clock });
+
    }
+

+
    fn connect(&mut self, node: NodeId, clock: LocalTime, link: Link, persistent: bool) {
+
        self.commands.push(commands::Connect {
+
            node,
+
            now: clock,
+
            link,
+
            persistent,
+
        });
+
    }
+
}
+

+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct DisconnectResult {
+
    pub events: Vec<events::Disconnect>,
+
    pub effects: Vec<effects::Disconnect>,
+
    pub commands: Vec<commands::Disconnect>,
+
}
+

+
impl DisconnectResult {
+
    fn already_disconnected(&mut self, node: NodeId) {
+
        self.events
+
            .push(events::Disconnect::AlreadyDisconnected { node });
+
    }
+

+
    fn link_conflict(&mut self, node: NodeId, found: Link, expected: Link) {
+
        self.events.push(events::Disconnect::LinkConflict {
+
            node,
+
            found,
+
            expected,
+
        });
+
    }
+

+
    fn retry_connection(&mut self, node: NodeId, since: LocalTime, retry_at: LocalTime) {
+
        self.effects.push(effects::Disconnect::RetryConnection {
+
            node,
+
            since,
+
            retry_at,
+
        });
+
    }
+

+
    fn maintain_connnections(&mut self) {
+
        self.effects.push(effects::Disconnect::MaintainConnections);
+
    }
+

+
    fn record_severity(&mut self, node: NodeId, address: Address, severity: Severity) {
+
        self.effects.push(effects::Disconnect::RecordServerity {
+
            node,
+
            address,
+
            severity,
+
        })
+
    }
+

+
    fn disconnect(&mut self, node: NodeId, since: LocalTime, retry_at: Option<LocalTime>) {
+
        self.commands.push(commands::Disconnect {
+
            node,
+
            since,
+
            retry_at,
+
        });
+
    }
+
}
added crates/radicle-protocol/src/connections/command.rs
@@ -0,0 +1,49 @@
+
use localtime::LocalTime;
+

+
use radicle::node::{Address, Link, NodeId};
+

+
use crate::service::DisconnectReason;
+

+
pub enum Connect {
+
    Inbound(Inbound),
+
    Outbound(Outbound),
+
}
+

+
impl Connect {
+
    pub fn inbound(node: NodeId, clock: LocalTime, persistent: bool) -> Self {
+
        Self::Inbound(Inbound {
+
            node,
+
            clock,
+
            persistent,
+
        })
+
    }
+

+
    pub fn outbound(node: NodeId, addr: Address, persistent: bool, clock: LocalTime) -> Self {
+
        Self::Outbound(Outbound {
+
            node,
+
            addr,
+
            persistent,
+
            clock,
+
        })
+
    }
+
}
+

+
pub struct Inbound {
+
    pub(super) node: NodeId,
+
    pub(super) clock: LocalTime,
+
    pub(super) persistent: bool,
+
}
+

+
pub struct Outbound {
+
    pub(super) node: NodeId,
+
    pub(super) addr: Address,
+
    pub(super) persistent: bool,
+
    pub(super) clock: LocalTime,
+
}
+

+
pub struct Disconnect {
+
    pub(super) node: NodeId,
+
    pub(super) link: Link,
+
    pub(super) reason: DisconnectReason,
+
    pub(super) since: LocalTime,
+
}
added crates/radicle-protocol/src/connections/commands.rs
@@ -0,0 +1,92 @@
+
use localtime::LocalTime;
+
use radicle::{
+
    node::{Link, NodeId},
+
    prelude::RepoId,
+
};
+

+
use crate::service::message;
+

+
use super::session;
+

+
pub enum Command {
+
    Attempt(Attempt),
+
    Connect(Connect),
+
    Disconnect(Disconnect),
+
    Subscribe(Subscribe),
+
    SubscribeTo(SubscribeTo),
+
    Pong(Pong),
+
}
+

+
impl From<Attempt> for Command {
+
    fn from(v: Attempt) -> Self {
+
        Self::Attempt(v)
+
    }
+
}
+

+
impl From<Connect> for Command {
+
    fn from(v: Connect) -> Self {
+
        Self::Connect(v)
+
    }
+
}
+

+
impl From<Disconnect> for Command {
+
    fn from(v: Disconnect) -> Self {
+
        Self::Disconnect(v)
+
    }
+
}
+

+
impl From<Subscribe> for Command {
+
    fn from(v: Subscribe) -> Self {
+
        Self::Subscribe(v)
+
    }
+
}
+

+
impl From<SubscribeTo> for Command {
+
    fn from(v: SubscribeTo) -> Self {
+
        Self::SubscribeTo(v)
+
    }
+
}
+

+
impl From<Pong> for Command {
+
    fn from(v: Pong) -> Self {
+
        Self::Pong(v)
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct Attempt {
+
    pub node: NodeId,
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub struct Connect {
+
    pub node: NodeId,
+
    pub now: LocalTime,
+
    pub link: Link,
+
    pub persistent: bool,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Disconnect {
+
    pub node: NodeId,
+
    pub since: LocalTime,
+
    pub retry_at: Option<LocalTime>,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Subscribe {
+
    pub node: NodeId,
+
    pub subscription: message::Subscribe,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct SubscribeTo {
+
    pub node: NodeId,
+
    pub rid: RepoId,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pong {
+
    pub node: NodeId,
+
    pub pong: session::Pong,
+
}
added crates/radicle-protocol/src/connections/effects.rs
@@ -0,0 +1,100 @@
+
//! External effects that are emitted by interacting with [`Connections`]. These
+
//! effects should be used by the rest of the system to perform side-effects.
+
//!
+
//! [`Connections`]: super::Connections.
+

+
use std::net::IpAddr;
+

+
use localtime::LocalTime;
+
use radicle::node::{Address, Link, NodeId, Severity};
+

+
/// All effects that can occur from interacting with [`Connections`].
+
///
+
/// [`Connections`]: super::Connections.
+
pub enum Effect {
+
    Accept(Accept),
+
    Connect(Connect),
+
    Disconnect(Disconnect),
+
}
+

+
impl From<Accept> for Effect {
+
    fn from(v: Accept) -> Self {
+
        Self::Accept(v)
+
    }
+
}
+

+
impl From<Connect> for Effect {
+
    fn from(v: Connect) -> Self {
+
        Self::Connect(v)
+
    }
+
}
+

+
impl From<Disconnect> for Effect {
+
    fn from(v: Disconnect) -> Self {
+
        Self::Disconnect(v)
+
    }
+
}
+

+
/// Effects that occur when checking for accepting a connection from an
+
/// [`IpAddr`].
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Accept {
+
    /// The [`IpAddr`] is likely a localhost connection.
+
    LocalHost { ip: IpAddr },
+
    /// The [`IpAddr`] should be accepted by the system, and later connected to.
+
    Accepted { ip: IpAddr },
+
}
+

+
impl Accept {
+
    /// The [`Accept::LocalHost`] should be created only if the [`IpAddr`] is
+
    /// either a loopback address or has an 'unspecified' address (see
+
    /// [`IpAddr::is_unspecified`]).
+
    pub fn local_host(ip: IpAddr) -> Option<Self> {
+
        (ip.is_loopback() || ip.is_unspecified()).then_some(Self::LocalHost { ip })
+
    }
+

+
    /// See [`Accept::Accepted`].
+
    pub fn accepted(ip: IpAddr) -> Self {
+
        Self::Accepted { ip }
+
    }
+
}
+

+
/// Effects that occur when connecting a node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub enum Connect {
+
    /// The set of initial messages should be sent to the [`NodeId`].
+
    SendInitialMessages { node: NodeId, link: Link },
+
    /// The [`IpAddr`] of the [`NodeId`] should be recorded in an external
+
    /// database.
+
    RecordIp {
+
        node: NodeId,
+
        ip: IpAddr,
+
        clock: LocalTime,
+
    },
+
}
+

+
/// Effects that occur when disconnecting a node.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Disconnect {
+
    /// The connection to [`NodeId`] was disconnected, but re-connection should
+
    /// be attempted at the given point in time.
+
    RetryConnection {
+
        /// The node that should be re-connected to.
+
        node: NodeId,
+
        /// When the node was disconnected.
+
        since: LocalTime,
+
        /// When the re-connection attempt should be made.
+
        retry_at: LocalTime,
+
    },
+
    /// Record the severity of the disconnect reason.
+
    RecordServerity {
+
        /// The node that was disconnected.
+
        node: NodeId,
+
        /// The address of the node that was disconnected.
+
        address: Address,
+
        /// The severity of the disconnect reason.
+
        severity: Severity,
+
    },
+
    /// Try to maintain all connections that are meant to be persistent.
+
    MaintainConnections,
+
}
added crates/radicle-protocol/src/connections/events.rs
@@ -0,0 +1,58 @@
+
use std::net::IpAddr;
+

+
use radicle::node::{Link, NodeId};
+

+
use super::session;
+
use super::session::Session;
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Event {
+
    Accept(Accept),
+
    Connect(Box<Connect>),
+
    Disconnect(Disconnect),
+
}
+

+
impl From<Disconnect> for Event {
+
    fn from(v: Disconnect) -> Self {
+
        Self::Disconnect(v)
+
    }
+
}
+

+
impl From<Connect> for Event {
+
    fn from(v: Connect) -> Self {
+
        Self::Connect(Box::new(v))
+
    }
+
}
+

+
impl From<Accept> for Event {
+
    fn from(v: Accept) -> Self {
+
        Self::Accept(v)
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Accept {
+
    LimitExceeded { ip: IpAddr, current_inbound: usize },
+
    IpBanned { ip: IpAddr },
+
    HostLimited { ip: IpAddr },
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Connect {
+
    AlreadyConnected {
+
        session: Session<session::Connected>,
+
        attempted_link: Link,
+
    },
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+
pub enum Disconnect {
+
    AlreadyDisconnected {
+
        node: NodeId,
+
    },
+
    LinkConflict {
+
        node: NodeId,
+
        found: Link,
+
        expected: Link,
+
    },
+
}
added crates/radicle-protocol/src/connections/session.rs
@@ -0,0 +1,651 @@
+
use std::collections::{HashMap, HashSet, VecDeque};
+

+
use localtime::{LocalDuration, LocalTime};
+
use radicle::{
+
    node::{Address, Link, NodeId, PingState},
+
    prelude::RepoId,
+
};
+

+
use crate::service::{message, ZeroBytes, MAX_LATENCIES};
+

+
/// Time after which a connection is considered stable.
+
#[allow(unused)]
+
pub const CONNECTION_STABLE_THRESHOLD: LocalDuration = LocalDuration::from_mins(1);
+

+
#[derive(Clone, Debug)]
+
pub enum State {
+
    Initial(Initial),
+
    Attempted(Attempted),
+
    Connected(Connected),
+
    Disconnected(Disconnected),
+
}
+

+
impl From<Initial> for State {
+
    fn from(value: Initial) -> Self {
+
        Self::Initial(value)
+
    }
+
}
+

+
impl From<Attempted> for State {
+
    fn from(value: Attempted) -> Self {
+
        Self::Attempted(value)
+
    }
+
}
+

+
impl From<Connected> for State {
+
    fn from(value: Connected) -> Self {
+
        Self::Connected(value)
+
    }
+
}
+

+
impl From<Disconnected> for State {
+
    fn from(value: Disconnected) -> Self {
+
        Self::Disconnected(value)
+
    }
+
}
+

+
impl HasAttempts for State {
+
    fn attempts(&self) -> Attempts {
+
        match self {
+
            State::Initial(initial) => initial.attempts,
+
            State::Attempted(attempted) => attempted.attempts,
+
            State::Connected(connected) => connected.attempts,
+
            State::Disconnected(disconnected) => disconnected.attempts,
+
        }
+
    }
+
}
+

+
/// Marker type for when a [`NodeId`] is missing from [`Sessions`].
+
pub struct Missing;
+

+
pub struct Sessions {
+
    initial: HashMap<NodeId, Session<Initial>>,
+
    attempted: HashMap<NodeId, Session<Attempted>>,
+
    disconnected: HashMap<NodeId, Session<Disconnected>>,
+
    connected: HashMap<NodeId, Session<Connected>>,
+
}
+

+
impl Sessions {
+
    /// Get the number of sessions that are connected and have an [inbound]
+
    /// link.
+
    ///
+
    /// [inbound]: Link::Inbound
+
    pub fn connected_inbound(&self) -> usize {
+
        self.connected
+
            .values()
+
            .filter(|session| session.link().is_inbound())
+
            .count()
+
    }
+

+
    /// Get the number of sessions that are connected and have an [outbound]
+
    /// link.
+
    ///
+
    /// [outbound]: Link::Outbound
+
    pub fn connected_outbound(&self) -> usize {
+
        self.connected
+
            .values()
+
            .filter(|session| session.link().is_outbound())
+
            .count()
+
    }
+

+
    /// Checks that an existing [`Session`] exists for the given [`NodeId`].
+
    pub fn has_session_for(&self, node: &NodeId) -> bool {
+
        self.initial.contains_key(node)
+
            || self.attempted.contains_key(node)
+
            || self.disconnected.contains_key(node)
+
            || self.connected.contains_key(node)
+
    }
+

+
    /// Get all [`Session`]s that are in the [`Connected`] state, along with
+
    /// their [`NodeId`]s.
+
    pub fn connected(&self) -> impl Iterator<Item = (&NodeId, &Session<Connected>)> {
+
        self.connected.iter()
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Attempted`] state.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub fn session_to_attempted(&mut self, node: &NodeId) -> Option<Session<Attempted>> {
+
        let s = self.initial.remove(node)?.into_attempted();
+
        self.attempted.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Disconnected`] state.
+
    ///
+
    /// The time this [`Session`] was disconnected is marked by `since`, and if
+
    /// the connection should be retried then a `retry_at` value should be
+
    /// provided.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub fn session_to_disconnected(
+
        &mut self,
+
        node: &NodeId,
+
        since: LocalTime,
+
        retry_at: Option<LocalTime>,
+
    ) -> Option<Session<Disconnected>> {
+
        match self.remove_session(node) {
+
            None => None,
+
            Some(session) => {
+
                let s = session.into_disconnected(since, retry_at);
+
                self.disconnected.insert(*node, s.clone());
+
                Some(s)
+
            }
+
        }
+
    }
+

+
    /// Transition the [`Session`], identified by the [`NodeId`], to the
+
    /// [`Connected`] state.
+
    ///
+
    /// The [`Session`] is last active given by the time given for `now`, the
+
    /// type of [`Link`] is also marked by the provided value, and also keep
+
    /// track of whether the session should be persisted.
+
    ///
+
    /// If the [`Session`] does not exist, then `None` is returned.
+
    pub fn session_to_connected(
+
        &mut self,
+
        node: &NodeId,
+
        now: LocalTime,
+
        link: Link,
+
        persistent: bool,
+
    ) -> Option<Session<Connected>> {
+
        let s = self.remove_session(node)?;
+
        let state = match s.state {
+
            State::Initial(initial) => Connected::from_initial(initial, now),
+
            State::Attempted(attempted) => Connected::from_attempted(attempted, now),
+
            State::Connected(connected) => connected,
+
            State::Disconnected(disconnected) => Connected::from_disconnected(disconnected, now),
+
        };
+
        Some(Session {
+
            state,
+
            id: s.id,
+
            addr: s.addr,
+
            link,
+
            persistent,
+
            last_active: now,
+
            subscribe: s.subscribe,
+
        })
+
    }
+

+
    /// Transition a [`Disconnected`] [`Session`] into an [`Initial`] state,
+
    /// meaning that it should be re-connected to.
+
    ///
+
    /// If the [`NodeId`] was not in a [`Disconnected`] state then `None` is
+
    /// returned.
+
    pub fn reconnect(&mut self, node: &NodeId) -> Option<Session<Initial>> {
+
        let s = self.disconnected.remove(node)?.into_initial();
+
        self.initial.insert(*node, s.clone());
+
        Some(s)
+
    }
+

+
    /// Get a [`Session`] that can be in any [`State`].
+
    pub fn get_session(&self, node: &NodeId) -> Option<Session<State>> {
+
        self.initial
+
            .get(node)
+
            .cloned()
+
            .map(|s| s.into_any_state())
+
            .or_else(|| {
+
                self.attempted
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
            .or_else(|| {
+
                self.disconnected
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
            .or_else(|| {
+
                self.connected
+
                    .get(node)
+
                    .cloned()
+
                    .map(|s| s.into_any_state())
+
            })
+
    }
+

+
    pub fn subscribe(&mut self, node: &NodeId, subscription: message::Subscribe) -> bool {
+
        if let Some(session) = self.connected.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        if let Some(session) = self.disconnected.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        if let Some(session) = self.attempted.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        if let Some(session) = self.initial.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        false
+
    }
+

+
    pub fn subscribe_to(&mut self, node: &NodeId, rid: &RepoId) -> bool {
+
        if let Some(session) = self.connected.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        if let Some(session) = self.disconnected.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        if let Some(session) = self.attempted.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        if let Some(session) = self.initial.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        false
+
    }
+

+
    pub fn pinged(&mut self, node: &NodeId, pong: Pong) -> Result<Option<Pinged>, Missing> {
+
        let session = self.connected.get_mut(node).ok_or(Missing)?;
+
        Ok(session.pinged(pong))
+
    }
+

+
    fn remove_session(&mut self, node: &NodeId) -> Option<Session<State>> {
+
        self.initial
+
            .remove(node)
+
            .map(|s| s.into_any_state())
+
            .or_else(|| self.attempted.remove(node).map(|s| s.into_any_state()))
+
            .or_else(|| self.disconnected.remove(node).map(|s| s.into_any_state()))
+
            .or_else(|| self.connected.remove(node).map(|s| s.into_any_state()))
+
    }
+

+
    /// Get the [`Session`], for the given [`NodeId`], that is expected to be in
+
    /// the [`Connected`] state.
+
    pub fn get_connected(&self, node: &NodeId) -> Option<&Session<Connected>> {
+
        self.connected.get(node)
+
    }
+

+
    #[allow(unused)]
+
    fn inbound(&mut self, node: NodeId, addr: Address, persistent: bool, now: LocalTime) {
+
        self.connected
+
            .insert(node, Session::inbound(node, addr, persistent, now));
+
    }
+
}
+

+
pub trait HasAttempts {
+
    fn attempts(&self) -> Attempts;
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct Attempts {
+
    /// Connection attempts. For persistent peers, Tracks
+
    /// how many times we've attempted to connect. We reset this to zero
+
    /// upon successful connection, once the connection is stable.
+
    attempts: usize,
+
}
+

+
impl Attempts {
+
    fn new(attempts: usize) -> Self {
+
        Attempts { attempts }
+
    }
+

+
    pub fn attempted(self) -> Self {
+
        Self {
+
            attempts: self.attempts + 1,
+
        }
+
    }
+

+
    pub fn reset(&mut self) {
+
        self.attempts = 0;
+
    }
+

+
    pub fn attempts(&self) -> usize {
+
        self.attempts
+
    }
+

+
    pub(super) fn as_u32(&self) -> u32 {
+
        self.attempts as u32
+
    }
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Session<S> {
+
    /// The [`NodeId`] of the session.
+
    id: NodeId,
+
    /// The public protocol [`Address`] for the session.
+
    addr: Address,
+
    /// The [`Link`] direction for the session.
+
    link: Link,
+
    /// Keep track of whether the session should be persisted. That is, if it is
+
    /// disconnected, re-connection attempts should be made.
+
    persistent: bool,
+
    /// Last time a message was received from the peer.
+
    last_active: LocalTime,
+
    /// Peer subscription.
+
    subscribe: Option<message::Subscribe>,
+
    /// The state the session is in. Can be in the following states:
+
    ///   - [`Initial`]
+
    ///   - [`Attempted`]
+
    ///   - [`Disconnected`]
+
    ///   - [`Connected`]
+
    state: S,
+
}
+

+
impl<S: HasAttempts> HasAttempts for Session<S> {
+
    fn attempts(&self) -> Attempts {
+
        self.state.attempts()
+
    }
+
}
+

+
impl<S> Session<S> {
+
    pub fn node(&self) -> NodeId {
+
        self.id
+
    }
+

+
    pub fn address(&self) -> &Address {
+
        &self.addr
+
    }
+

+
    /// Set the [`message::Subscribe`] of this [`Session`].
+
    pub fn set_subscription(&mut self, subscription: message::Subscribe) {
+
        self.subscribe = Some(subscription);
+
    }
+

+
    /// Subscribe to the given [`RepoId`], if the [`message::Subscribe`] has
+
    /// been set.
+
    pub fn subscribe_to(&mut self, rid: &RepoId) {
+
        if let Some(ref mut sub) = self.subscribe {
+
            sub.filter.insert(rid);
+
        }
+
    }
+

+
    pub fn last_active(&self) -> &LocalTime {
+
        &self.last_active
+
    }
+

+
    pub fn link(&self) -> &Link {
+
        &self.link
+
    }
+

+
    pub fn as_outbound(&mut self) {
+
        self.link = Link::Outbound;
+
    }
+

+
    pub fn into_disconnected(
+
        self,
+
        since: LocalTime,
+
        retry_at: Option<LocalTime>,
+
    ) -> Session<Disconnected>
+
    where
+
        S: HasAttempts,
+
    {
+
        self.map(|s| Disconnected {
+
            since,
+
            retry_at,
+
            attempts: s.attempts(),
+
        })
+
    }
+

+
    #[allow(unused)]
+
    fn seen(&mut self, since: LocalTime) {
+
        self.last_active = since;
+
    }
+

+
    fn into_any_state<T>(self) -> Session<T>
+
    where
+
        T: From<S>,
+
    {
+
        self.map(|state| state.into())
+
    }
+

+
    fn map<T, F>(self, f: F) -> Session<T>
+
    where
+
        F: FnOnce(S) -> T,
+
    {
+
        Session {
+
            id: self.id,
+
            addr: self.addr,
+
            link: self.link,
+
            persistent: self.persistent,
+
            last_active: self.last_active,
+
            subscribe: self.subscribe,
+
            state: f(self.state),
+
        }
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+
pub struct Initial {
+
    attempts: Attempts,
+
}
+

+
impl Initial {
+
    pub fn new() -> Self {
+
        Self::with_attempts(Attempts::new(1))
+
    }
+

+
    pub fn with_attempts(attempts: Attempts) -> Self {
+
        Self { attempts }
+
    }
+
}
+

+
impl Default for Initial {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

+
impl Session<Initial> {
+
    pub fn outbound(id: NodeId, addr: Address, persistent: bool, last_active: LocalTime) -> Self {
+
        Self {
+
            id,
+
            addr,
+
            link: Link::Outbound,
+
            persistent,
+
            state: Initial::new(),
+
            last_active,
+
            subscribe: None,
+
        }
+
    }
+

+
    /// Transition the [`Session`] to an [`Attempted`] state, incrementing the
+
    /// number of attempts made.
+
    pub fn into_attempted(self) -> Session<Attempted> {
+
        self.map(|s| Attempted::new(s.attempts.attempted()))
+
    }
+

+
    /// Transition the [`Session`] into the [`Connected`] state.
+
    pub fn into_connected(self, since: LocalTime) -> Session<Connected> {
+
        self.map(|s| Connected::new(since, s.attempts))
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Attempted {
+
    attempts: Attempts,
+
}
+

+
impl Attempted {
+
    pub fn new(attempts: Attempts) -> Self {
+
        Attempted { attempts }
+
    }
+
}
+

+
impl Session<Attempted> {
+
    /// Transition the [`Session`] into the [`Connected`] state.
+
    pub fn into_connected(self, since: LocalTime) -> Session<Connected> {
+
        self.map(|s| Connected::new(since, s.attempts))
+
    }
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct Connected {
+
    /// Connected since this time.
+
    since: LocalTime,
+
    /// Ping state.
+
    ping: PingState,
+
    /// Ongoing fetches.
+
    fetching: HashSet<RepoId>,
+
    /// Measured latencies for this peer.
+
    latencies: VecDeque<LocalDuration>,
+
    /// Whether the connection is stable.
+
    stable: bool,
+
    /// Number of attempts over the lifetime of the connection. This includes if
+
    /// the connection is degraded back to an [`Initial`] state through a
+
    /// [`Session::reconnect`].
+
    attempts: Attempts,
+
}
+

+
impl HasAttempts for Connected {
+
    fn attempts(&self) -> Attempts {
+
        self.attempts
+
    }
+
}
+

+
impl Connected {
+
    /// Create a new [`Connected`] state, where `since` is the time of
+
    /// connection, and `attempts` is the number of attempted connections in the
+
    /// lifetime of the [`Session`].
+
    pub fn new(since: LocalTime, attempts: Attempts) -> Self {
+
        Self {
+
            since,
+
            ping: PingState::default(),
+
            fetching: HashSet::default(),
+
            latencies: VecDeque::default(),
+
            stable: false,
+
            attempts,
+
        }
+
    }
+

+
    /// Create a fresh [`Connected`] state, using `since` as the [`LocalTime`] for
+
    /// when this connection was made.
+
    pub fn fresh(since: LocalTime) -> Self {
+
        Self::new(since, Attempts::new(0))
+
    }
+

+
    fn from_initial(initial: Initial, since: LocalTime) -> Self {
+
        Self::new(since, initial.attempts)
+
    }
+

+
    fn from_attempted(attempted: Attempted, since: LocalTime) -> Self {
+
        Self::new(since, attempted.attempts)
+
    }
+

+
    fn from_disconnected(disconnected: Disconnected, since: LocalTime) -> Self {
+
        Self::new(since, disconnected.attempts)
+
    }
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Ping {
+
    pub since: LocalTime,
+
    pub rng: fastrand::Rng,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pong {
+
    pub now: LocalTime,
+
    pub zeroes: ZeroBytes,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Pinged {
+
    pub latency: LocalDuration,
+
}
+

+
impl Session<Connected> {
+
    pub fn inbound(id: NodeId, addr: Address, persistent: bool, now: LocalTime) -> Self {
+
        Self {
+
            id,
+
            addr,
+
            link: Link::Inbound,
+
            persistent,
+
            last_active: now,
+
            subscribe: None,
+
            state: Connected::fresh(now),
+
        }
+
    }
+

+
    /// Checks if the [`Session`] is inactive, i.e. the time passed is greater
+
    /// than the `delta`.
+
    pub fn is_inactive(&self, now: &LocalTime, delta: LocalDuration) -> bool {
+
        *now - self.last_active >= delta
+
    }
+

+
    pub fn ping(&mut self, mut ping: Ping) -> message::Ping {
+
        let msg = message::Ping::new(&mut ping.rng);
+
        self.state.ping = PingState::AwaitingResponse {
+
            len: msg.ponglen,
+
            since: ping.since,
+
        };
+
        msg
+
    }
+

+
    pub fn pinged(&mut self, Pong { zeroes, now }: Pong) -> Option<Pinged> {
+
        if let PingState::AwaitingResponse {
+
            len: ponglen,
+
            since,
+
        } = self.state.ping
+
        {
+
            if (ponglen as usize) == zeroes.len() {
+
                self.state.ping = PingState::Ok;
+
                let latency = now - since;
+
                self.state.latencies.push_back(latency);
+
                // TODO(finto): MAX_LATENCIES should likely be configured
+
                // somewhere else
+
                if self.state.latencies.len() > MAX_LATENCIES {
+
                    self.state.latencies.pop_front();
+
                }
+
                return Some(Pinged { latency });
+
            }
+
        }
+
        None
+
    }
+

+
    pub fn idle(&mut self, now: LocalTime, stable_threshold: LocalDuration) {
+
        let Connected {
+
            since,
+
            ref mut stable,
+
            ref mut attempts,
+
            ..
+
        } = self.state;
+
        if now >= since && now.duration_since(since) >= stable_threshold {
+
            *stable = true;
+
            attempts.reset();
+
        }
+
    }
+
}
+

+
#[derive(Clone, Copy, Debug)]
+
pub struct Disconnected {
+
    /// Since when has this peer been disconnected.
+
    since: LocalTime,
+
    /// When to retry the connection.
+
    retry_at: Option<LocalTime>,
+
    /// Number of attempts while disconnected.
+
    attempts: Attempts,
+
}
+

+
impl Session<Disconnected> {
+
    pub fn disconnected_since(&self) -> &LocalTime {
+
        &self.state.since
+
    }
+

+
    pub fn should_retry_at(&self) -> Option<&LocalTime> {
+
        self.state.retry_at.as_ref()
+
    }
+

+
    /// Transition the [`Session`] to an [`Initial`] state.
+
    fn into_initial(self) -> Session<Initial> {
+
        self.map(|s| Initial::with_attempts(s.attempts))
+
    }
+
}
added crates/radicle-protocol/src/fetcher.rs
@@ -0,0 +1,519 @@
+
pub mod commands;
+
pub use commands::Command;
+
pub mod effects;
+
pub mod events;
+

+
use std::collections::{BTreeMap, HashSet, VecDeque};
+
use std::time;
+

+
use nonempty::NonEmpty;
+

+
use radicle::identity::DocAt;
+
use radicle::node;
+
use radicle::node::NodeId;
+
use radicle::prelude::RepoId;
+
use radicle::storage::{refs::RefsAt, RefUpdate};
+

+
// TODO(finto): I think this should be defined here, and the worker should
+
// provide the result based on this.
+
type WorkerResult = Result<Fetched, crate::worker::FetchError>;
+

+
// TODO(finto): `Service::fetch_refs_at` and the use of `refs_status_of` is a
+
// layer above the `Fetcher` where it would perform I/O, mocked out by a trait,
+
// to check if there are wants and add a fetch to the Fetcher.
+

+
/// Maximum items in the fetch queue.
+
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
+

+
/// Maintain the state of ongoing and queued fetches.
+
pub struct FetcherState {
+
    fetching: BTreeMap<NodeId, Fetching>,
+
    queues: BTreeMap<NodeId, Queue>,
+
    config: Config,
+
}
+

+
impl FetcherState {
+
    /// Transition the state of ongoing fetches by recording new fetches that
+
    /// are occurring, queuing them is the node if required, attempting to
+
    /// dequeue fetches, and marking fetches as complete.
+
    pub fn handle_command(&mut self, command: Command) -> Vec<Command> {
+
        match command {
+
            Command::Fetch(fetch) => self.handle_fetch(fetch),
+
            Command::Fetched(fetched) => self.handle_fetched(fetched),
+
        }
+
    }
+

+
    fn handle_fetch(&mut self, fetch: commands::Fetch) -> Vec<Command> {
+
        let mut commands = Vec::new();
+
        match fetch {
+
            commands::Fetch::Repository { from, rid, .. } => {
+
                let fetching = self.fetching.entry(from).or_default();
+
                fetching.fetching(
+
                    rid,
+
                    FetchingFor {
+
                        from,
+
                        refs_at: vec![],
+
                    },
+
                );
+
            }
+
            commands::Fetch::RefsAt {
+
                from, rid, refs_at, ..
+
            } => {
+
                let fetching = self.fetching.entry(from).or_default();
+
                fetching.fetching(
+
                    rid,
+
                    FetchingFor {
+
                        from,
+
                        refs_at: refs_at.into(),
+
                    },
+
                );
+
            }
+
            commands::Fetch::Queue {
+
                from,
+
                rid,
+
                refs_at,
+
                timeout,
+
            } => {
+
                let queue = self
+
                    .queues
+
                    .entry(from)
+
                    .or_insert(Queue::new(self.config.maximum_queue_size));
+
                match queue.enqueue(QueuedFetch {
+
                    rid,
+
                    from,
+
                    refs_at,
+
                    timeout,
+
                }) {
+
                    Enqueue::CapacityReached(queued_fetch) => {
+
                        commands.push(commands::Fetch::from(queued_fetch).into());
+
                    }
+
                    Enqueue::Duplicate(queued_fetch) => {
+
                        commands.push(commands::Fetch::from(queued_fetch).into());
+
                    }
+
                    Enqueue::Queued => { /* TODO(finto): I think we also want to return events */ }
+
                }
+
            }
+
        }
+
        commands
+
    }
+

+
    fn handle_fetched(&mut self, fetched: commands::Fetched) -> Vec<Command> {
+
        let mut commands = Vec::new();
+
        match fetched {
+
            commands::Fetched::DequeueFetches => {
+
                self.deuque_fetches(&mut commands);
+
                commands
+
            }
+
            commands::Fetched::Fetched { from, rid } => {
+
                self.fetched_from(&from, &rid);
+
                vec![]
+
            }
+
        }
+
    }
+

+
    fn deuque_fetches(&mut self, commands: &mut Vec<Command>) {
+
        for queue in self.queues.values_mut() {
+
            if let Some(QueuedFetch {
+
                rid,
+
                from,
+
                refs_at,
+
                timeout,
+
            }) = queue.dequeue()
+
            {
+
                let command = NonEmpty::from_vec(refs_at).map_or(
+
                    commands::Fetch::Repository { from, rid, timeout },
+
                    |refs_at| commands::Fetch::RefsAt {
+
                        from,
+
                        rid,
+
                        refs_at,
+
                        timeout,
+
                    },
+
                );
+
                commands.push(command.into());
+
            }
+
        }
+
    }
+

+
    fn fetched_from(&mut self, node: &NodeId, rid: &RepoId) -> Option<FetchingFor> {
+
        let fetching = self.fetching.get_mut(node)?;
+
        fetching.fetched(rid)
+
    }
+
}
+

+
impl FetcherState {
+
    /// The protocol wishes to fetch the [`RepoId`] from the given [`NodeId`],
+
    /// with the specified set of [`RefsAt`].
+
    ///
+
    /// This will result in a [`FetchResult`], reporting back to the protocol
+
    /// what events occurred, which effects should performed, and the commands
+
    /// to transition the state using [`FetchState::handle_command`].
+
    pub fn fetch(
+
        &self,
+
        rid: RepoId,
+
        from: NodeId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    ) -> FetchResult {
+
        let mut result = FetchResult::default();
+
        if let Some(fetching) = self.is_fetching(&from, &rid) {
+
            result.queue_fetch(rid, from, refs_at, timeout);
+
            result.already_fetching(rid, fetching.clone());
+
            return result;
+
        }
+

+
        if self.is_at_capacity(&from) {
+
            result.queue_fetch(rid, from, refs_at, timeout);
+
            result.capacity_reached();
+
            return result;
+
        }
+

+
        match NonEmpty::from_vec(refs_at) {
+
            Some(refs_at) => result.fetch_refs_at(from, rid, refs_at, timeout),
+
            None => result.fetch_repository(from, rid, timeout),
+
        }
+
        result
+
    }
+

+
    /// The protocol wishes to mark the fetch for [`RepoId`] from the given
+
    /// [`NodeId`] as done, with the specified [`WorkerResult`].
+
    ///
+
    /// This will result in a [`FetchedResult`], reporting back to the protocol
+
    /// what events occurred, which effects should performed, and the commands
+
    /// to transition the state using [`FetchState::handle_command`].
+
    pub fn fetched(&self, from: NodeId, rid: RepoId, worker_result: WorkerResult) -> FetchedResult {
+
        let mut result = FetchedResult::default();
+
        if self.is_fetching(&from, &rid).is_none() {
+
            result.unexpected(from, rid);
+
            return result;
+
        };
+
        match worker_result {
+
            Ok(success) => {
+
                if success.clone && success.doc.is_public() {
+
                    result.public_repo(rid);
+
                }
+
                if !success.updated.is_empty() && !success.updated.iter().all(|u| u.is_skipped()) {
+
                    result.announce(rid, success.doc, success.namespaces.clone());
+
                }
+
                result.refs_fetched(from, rid, success.updated.clone());
+
                let node_result = node::FetchResult::Success {
+
                    updated: success.updated,
+
                    namespaces: success.namespaces,
+
                    clone: success.clone,
+
                };
+
                result.notify(from, rid, node_result);
+
            }
+
            Err(e) => {
+
                if e.is_timeout() {
+
                    result.disconnect(from, e.to_string());
+
                }
+
                let node_result = node::FetchResult::Failed {
+
                    reason: e.to_string(),
+
                };
+
                result.notify(from, rid, node_result);
+
            }
+
        };
+

+
        result.dequeue_fetches();
+
        result
+
    }
+

+
    /// Get the [`FetchingFor`] for the provided [`NodeId`] and [`RepoId`],
+
    /// returning `None` if it does not exist.
+
    fn is_fetching(&self, node: &NodeId, rid: &RepoId) -> Option<&FetchingFor> {
+
        self.fetching.get(node).and_then(|f| f.is_fetching(rid))
+
    }
+

+
    /// Check if the number of fetches exceeds the maximum number of concurrent
+
    /// fetches for a given [`NodeId`].
+
    fn is_at_capacity(&self, node: &NodeId) -> bool {
+
        let number_of_fetches = self.fetching.get(node).map_or(0, |f| f.number_of_fetches());
+
        number_of_fetches > self.config.maximum_concurrency
+
    }
+
}
+

+
/// Configuration for the [`FetchState`].
+
pub struct Config {
+
    /// Maximum number of concurrent fetches per peer connection.
+
    maximum_concurrency: usize,
+
    /// Maximum fetching queue size for a single node.
+
    maximum_queue_size: MaxQueueSize,
+
}
+

+
/// Fetch state for an ongoing fetch.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct FetchingFor {
+
    /// Node we're fetching from.
+
    from: NodeId,
+
    /// What refs we're fetching.
+
    refs_at: Vec<RefsAt>,
+
}
+

+
impl FetchingFor {
+
    pub fn new(from: NodeId, refs_at: Vec<RefsAt>) -> Self {
+
        Self { from, refs_at }
+
    }
+
}
+

+
/// Keep track of which repositories are being fetched, and which node and
+
/// `rad/sigrefs` they are fetching.
+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
struct Fetching {
+
    inner: BTreeMap<RepoId, FetchingFor>,
+
}
+

+
impl Fetching {
+
    /// Get the number of fetches currently happening.
+
    fn number_of_fetches(&self) -> usize {
+
        self.inner.len()
+
    }
+

+
    /// Inspect the [`FetchingFor`] of the [`RepoId`].
+
    fn is_fetching(&self, rid: &RepoId) -> Option<&FetchingFor> {
+
        self.inner.get(rid)
+
    }
+

+
    /// The [`RepoId`] is currently being fetched.
+
    fn fetching(&mut self, rid: RepoId, state: FetchingFor) {
+
        self.inner.insert(rid, state);
+
    }
+

+
    /// The [`RepoId`] has been fetched.
+
    fn fetched(&mut self, rid: &RepoId) -> Option<FetchingFor> {
+
        self.inner.remove(rid)
+
    }
+
}
+

+
/// Fetch waiting to be processed, in the fetch queue.
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct QueuedFetch {
+
    /// Repo being fetched.
+
    pub rid: RepoId,
+
    /// Peer being fetched from.
+
    pub from: NodeId,
+
    /// Refs being fetched.
+
    pub refs_at: Vec<RefsAt>,
+
    /// The timeout given for the fetch request.
+
    pub timeout: time::Duration,
+
}
+

+
impl From<QueuedFetch> for commands::Fetch {
+
    fn from(
+
        QueuedFetch {
+
            rid,
+
            from,
+
            refs_at,
+
            timeout,
+
        }: QueuedFetch,
+
    ) -> Self {
+
        Self::Queue {
+
            from,
+
            rid,
+
            refs_at,
+
            timeout,
+
        }
+
    }
+
}
+

+
/// A queue for keeping track of fetches.
+
///
+
/// It ensures that the queue contains unique items for fetching, and does not
+
/// exceed the provided maximum capacity.
+
struct Queue {
+
    queue: VecDeque<QueuedFetch>,
+
    max_queue_size: MaxQueueSize,
+
}
+

+
/// The maximum number of fetches that can be queued for a single node.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
+
pub struct MaxQueueSize(usize);
+

+
impl MaxQueueSize {
+
    /// Checks if the `n` provided exceeds the maximum queue size.
+
    fn is_exceeded_by(&self, n: usize) -> bool {
+
        n >= self.0
+
    }
+
}
+

+
impl Default for MaxQueueSize {
+
    fn default() -> Self {
+
        Self(MAX_FETCH_QUEUE_SIZE)
+
    }
+
}
+

+
/// The result of [`Queue::enqueue`].
+
pub enum Enqueue {
+
    /// The capacity of the queue has been exceeded, and the [`QueuedFetch`] is
+
    /// returned.
+
    CapacityReached(QueuedFetch),
+
    /// The [`QueuedFetch`] was a duplicate.
+
    Duplicate(QueuedFetch),
+
    /// The [`QueuedFetch`] was successfully queued.
+
    Queued,
+
}
+

+
impl Queue {
+
    /// Create the [`Queue`] with the given [`MaxQueueSize`].
+
    fn new(max_queue_size: MaxQueueSize) -> Self {
+
        Self {
+
            queue: VecDeque::with_capacity(max_queue_size.0),
+
            max_queue_size,
+
        }
+
    }
+

+
    /// Enqueues a fetch onto the back of the queue, and will only succeed if
+
    /// the queue has not reached capacity and if the item is unique.
+
    fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
+
        if self.max_queue_size.is_exceeded_by(self.queue.len()) {
+
            Enqueue::CapacityReached(fetch)
+
        } else if self.queue.contains(&fetch) {
+
            Enqueue::Duplicate(fetch)
+
        } else {
+
            self.queue.push_back(fetch);
+
            Enqueue::Queued
+
        }
+
    }
+

+
    /// Dequeues a fetch from the front of the queue.
+
    fn dequeue(&mut self) -> Option<QueuedFetch> {
+
        self.queue.pop_front()
+
    }
+
}
+

+
/// Events, effects, and commands that occur from [`FetchState::fetch`].
+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct FetchResult {
+
    /// Events that can be recorded by the rest of the system.
+
    pub events: Vec<events::Fetch>,
+
    /// Commands to progress [`FetchState`].
+
    pub commands: Vec<commands::Fetch>,
+
}
+

+
impl FetchResult {
+
    fn capacity_reached(&mut self) {
+
        self.events.push(events::Fetch::CapacityReached);
+
    }
+

+
    fn already_fetching(&mut self, rid: RepoId, fetching: FetchingFor) {
+
        self.events
+
            .push(events::Fetch::AlreadyFetching { rid, fetching });
+
    }
+

+
    fn fetch_repository(&mut self, from: NodeId, rid: RepoId, timeout: time::Duration) {
+
        self.commands
+
            .push(commands::Fetch::Repository { from, rid, timeout });
+
    }
+

+
    fn fetch_refs_at(
+
        &mut self,
+
        from: NodeId,
+
        rid: RepoId,
+
        refs_at: NonEmpty<RefsAt>,
+
        timeout: time::Duration,
+
    ) {
+
        self.commands.push(commands::Fetch::RefsAt {
+
            from,
+
            rid,
+
            refs_at,
+
            timeout,
+
        });
+
    }
+

+
    fn queue_fetch(
+
        &mut self,
+
        rid: RepoId,
+
        from: NodeId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    ) {
+
        self.commands.push(commands::Fetch::Queue {
+
            rid,
+
            from,
+
            refs_at,
+
            timeout,
+
        });
+
    }
+
}
+

+
/// Events, effects, and commands that occur from [`FetchState::fetched`].
+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct FetchedResult {
+
    /// Events that can be recorded by the rest of the system.
+
    pub events: Vec<events::Fetched>,
+
    /// Commands to progress [`FetchState`].
+
    pub effects: Vec<effects::Fetched>,
+
    /// Effects that can be performed by the rest of the system.
+
    pub commands: Vec<commands::Fetched>,
+
}
+

+
impl FetchedResult {
+
    fn unexpected(&mut self, node: NodeId, rid: RepoId) {
+
        self.events
+
            .push(events::Fetched::UnexpectedResult { node, rid });
+
    }
+

+
    fn notify(&mut self, from: NodeId, rid: RepoId, result: node::FetchResult) {
+
        self.effects
+
            .push(effects::Fetched::Notify { from, rid, result })
+
    }
+

+
    fn dequeue_fetches(&mut self) {
+
        self.commands.push(commands::Fetched::DequeueFetches);
+
    }
+

+
    fn disconnect(&mut self, node: NodeId, reason: String) {
+
        self.effects
+
            .push(effects::Fetched::Disconnect { node, reason })
+
    }
+

+
    fn announce(&mut self, rid: RepoId, doc: DocAt, namespaces: HashSet<NodeId>) {
+
        self.effects.push(effects::Fetched::Announce {
+
            rid,
+
            doc,
+
            namespaces,
+
        })
+
    }
+

+
    fn public_repo(&mut self, rid: RepoId) {
+
        self.events.push(events::Fetched::PublicRepo { rid });
+
    }
+

+
    fn refs_fetched(&mut self, node: NodeId, rid: RepoId, updated: Vec<RefUpdate>) {
+
        self.events
+
            .push(events::Fetched::RefsFetched { node, rid, updated });
+
    }
+
}
+

+
#[derive(Debug, Clone)]
+
pub struct Fetched {
+
    /// The set of updated references.
+
    pub updated: Vec<RefUpdate>,
+
    /// The set of remote namespaces that were updated.
+
    pub namespaces: HashSet<NodeId>,
+
    /// The fetch was a full clone.
+
    pub clone: bool,
+
    /// Identity doc of fetched repository.
+
    pub doc: DocAt,
+
}
+

+
impl Fetched {
+
    pub fn new(doc: DocAt) -> Self {
+
        Self {
+
            updated: vec![],
+
            namespaces: HashSet::new(),
+
            clone: false,
+
            doc,
+
        }
+
    }
+
}
+

+
#[cfg(any(test, feature = "test"))]
+
impl qcheck::Arbitrary for Fetched {
+
    fn arbitrary(g: &mut qcheck::Gen) -> Self {
+
        Fetched {
+
            updated: vec![],
+
            namespaces: HashSet::arbitrary(g),
+
            clone: bool::arbitrary(g),
+
            doc: DocAt::arbitrary(g),
+
        }
+
    }
+
}
added crates/radicle-protocol/src/fetcher/commands.rs
@@ -0,0 +1,58 @@
+
use std::time;
+

+
use nonempty::NonEmpty;
+
use radicle::{node::NodeId, prelude::RepoId, storage::refs::RefsAt};
+

+
/// Commands for transitioning the [`FetchState`].
+
///
+
/// [`FetchState`]: super::FetchState
+
pub enum Command {
+
    Fetch(Fetch),
+
    Fetched(Fetched),
+
}
+

+
impl From<Fetch> for Command {
+
    fn from(v: Fetch) -> Self {
+
        Self::Fetch(v)
+
    }
+
}
+

+
impl From<Fetched> for Command {
+
    fn from(v: Fetched) -> Self {
+
        Self::Fetched(v)
+
    }
+
}
+

+
/// Command results that occur when a repository is being fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetch {
+
    /// The repository should be queued for fetching.
+
    Queue {
+
        from: NodeId,
+
        rid: RepoId,
+
        refs_at: Vec<RefsAt>,
+
        timeout: time::Duration,
+
    },
+
    /// The repository should be fetched, and we do not know the references that
+
    /// are required for fetching.
+
    Repository {
+
        from: NodeId,
+
        rid: RepoId,
+
        timeout: time::Duration,
+
    },
+
    /// The repository should be fetched, and only the references stated should
+
    /// be fetched.
+
    RefsAt {
+
        from: NodeId,
+
        rid: RepoId,
+
        refs_at: NonEmpty<RefsAt>,
+
        timeout: time::Duration,
+
    },
+
}
+

+
/// Command results that occur after a repository has been fetched.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Fetched {
+
    DequeueFetches,
+
    Fetched { from: NodeId, rid: RepoId },
+
}
added crates/radicle-protocol/src/fetcher/effects.rs
@@ -0,0 +1,28 @@
+
use std::collections::HashSet;
+

+
use radicle::{identity::DocAt, node, node::NodeId, prelude::RepoId};
+

+
/// Effects that should be performed after a repository has been fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetched {
+
    /// Announce that the namespaces were fetched for this repository.
+
    Announce {
+
        rid: RepoId,
+
        doc: DocAt,
+
        namespaces: HashSet<NodeId>,
+
    },
+
    /// Notify listeners about the result of the fetch.
+
    Notify {
+
        from: NodeId,
+
        rid: RepoId,
+
        result: node::FetchResult,
+
    },
+
    /// The fetch failed, due to a timeout, so the [`NodeId`] should likely be
+
    /// disconnected.
+
    Disconnect {
+
        node: NodeId,
+
        // TODO(finto): this was a FetchError type is it ok to have it just as a
+
        // String?
+
        reason: String,
+
    },
+
}
added crates/radicle-protocol/src/fetcher/events.rs
@@ -0,0 +1,30 @@
+
use radicle::{node::NodeId, prelude::RepoId, storage::RefUpdate};
+

+
use super::FetchingFor;
+

+
/// Events that occur when a repository is being fetched.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetch {
+
    /// The repository is already being fetched.
+
    AlreadyFetching { rid: RepoId, fetching: FetchingFor },
+
    /// The capacity of the node has been reached.
+
    CapacityReached,
+
}
+

+
/// Events that occur after a repository has been fetched.
+
// TODO(finto): note to self a successful fetch should mark a seed as discovered
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Fetched {
+
    /// There was no ongoing fetch for the given [`NodeId`] and [`RepoId`].
+
    UnexpectedResult { node: NodeId, rid: RepoId },
+
    /// The [`RepoId`] was fetched from the [`NodeId`] with the set of updated
+
    /// references.
+
    RefsFetched {
+
        node: NodeId,
+
        rid: RepoId,
+
        updated: Vec<RefUpdate>,
+
    },
+
    // TODO(finto): this needs to be used to add inventory
+
    /// The fetched repository was a public repository.
+
    PublicRepo { rid: RepoId },
+
}
modified crates/radicle-protocol/src/lib.rs
@@ -4,5 +4,11 @@ pub mod service;
pub mod wire;
pub mod worker;

+
pub mod connections;
+
pub mod fetcher;
+
pub mod node;
+
pub mod routing;
+
pub mod tasks;
+

/// Peer-to-peer protocol version.
pub const PROTOCOL_VERSION: u8 = 1;
added crates/radicle-protocol/src/node.rs
@@ -0,0 +1 @@
+
pub mod events;
added crates/radicle-protocol/src/node/events.rs
@@ -0,0 +1,48 @@
+
pub mod emitter;
+

+
use std::collections::VecDeque;
+

+
use radicle::node::Event;
+

+
/// Keep track of [`Event`]s that occur within the rest of the protocol system.
+
///
+
/// The events are queued with [`NodeEvents::push_event`] and removed using
+
/// [`NodeEvents::pop_event`] and [`NodeEvents::drain_events`].
+
///
+
/// To inspect the events use [`NodeEvents::events`].
+
pub struct Events {
+
    events: VecDeque<Event>,
+
}
+

+
impl Extend<Event> for Events {
+
    fn extend<T: IntoIterator<Item = Event>>(&mut self, iter: T) {
+
        self.events.extend(iter);
+
    }
+
}
+

+
impl Events {
+
    /// Push an [`Event`] onto the events queue.
+
    pub fn push_event(&mut self, event: Event) {
+
        self.events.push_back(event);
+
    }
+

+
    /// Pop the next [`Event`] from the events queue.
+
    pub fn pop_event(&mut self) -> Option<Event> {
+
        self.events.pop_front()
+
    }
+

+
    /// Drain the queue of all its events.
+
    ///
+
    /// This is useful for batch processing the available events.
+
    pub fn drain_events(&mut self) -> impl Iterator<Item = Event> + '_ {
+
        self.events.drain(0..self.events.len())
+
    }
+
}
+

+
impl Events {
+
    /// Get the events that are in the queue currently, without modifying the
+
    /// queue itself.
+
    pub fn events(&self) -> impl Iterator<Item = &Event> {
+
        self.events.iter()
+
    }
+
}
added crates/radicle-protocol/src/node/events/emitter.rs
@@ -0,0 +1,26 @@
+
//! An [`Emitter`] captures the ability to emit [`Event`]s to some subscriber
+
//! mechanism.
+

+
use radicle::node::Event;
+

+
use super::Events;
+

+
/// The ability of emit an event to some subscriber mechanism.
+
pub trait Emitter {
+
    /// Emit a single [`Event`], bypassing the need of an events queue.
+
    fn emit(&self, event: Event);
+

+
    /// Emit the next event from the events queue.
+
    fn emit_next(&self, events: &mut Events) {
+
        if let Some(event) = events.pop_event() {
+
            self.emit(event);
+
        }
+
    }
+

+
    /// Emit all the events that are currently on the queue.
+
    fn emit_all(&self, events: &mut Events) {
+
        for event in events.drain_events() {
+
            self.emit(event);
+
        }
+
    }
+
}
added crates/radicle-protocol/src/routing.rs
@@ -0,0 +1,115 @@
+
pub mod effects;
+

+
use std::collections::BTreeSet;
+

+
use radicle::{
+
    node::{NodeId, Timestamp},
+
    prelude::RepoId,
+
};
+

+
/// Evaluate a node's inventory when a new inventory is announced.
+
pub struct Routing {
+
    node: NodeId,
+
    inventory: BTreeSet<RepoId>,
+
}
+

+
impl Routing {
+
    /// Construct the node's existing inventory.
+
    pub fn new(node: NodeId, inventory: BTreeSet<RepoId>) -> Self {
+
        Self { node, inventory }
+
    }
+

+
    /// When a new inventory has been found, evaluate it against the existing
+
    /// inventory to see which repositories were added or removed.
+
    pub fn found_inventory(&self, inventory: BTreeSet<RepoId>) -> RoutingResult {
+
        let mut result = RoutingResult::default();
+
        let existing = &self.inventory;
+
        let removed = existing
+
            .difference(&inventory)
+
            .copied()
+
            .collect::<BTreeSet<RepoId>>();
+
        let added = inventory
+
            .difference(existing)
+
            .copied()
+
            .collect::<BTreeSet<RepoId>>();
+

+
        result.update(inventory);
+
        result.added(self.node, added);
+
        result.removed(self.node, removed);
+
        result
+
    }
+

+
    /// Finalize the node's inventory and get the [`SetInventory`] for setting
+
    /// the inventory in the [`RoutingTable`].
+
    ///
+
    /// [`SetInventory`]: effects::SetInventory
+
    /// [`RoutingTable`]: effects::RoutingTable
+
    pub fn set_inventory(self, now: Timestamp) -> effects::SetInventory {
+
        effects::SetInventory {
+
            node: self.node,
+
            inventory: self.inventory,
+
            now,
+
        }
+
    }
+
}
+

+
impl Routing {
+
    /// Process a [`Command`] for the routing computation to update the state of
+
    /// the node's inventory.
+
    pub fn handle_command(&mut self, command: Command) {
+
        match command {
+
            Command::Update { inventory } => {
+
                self.inventory = inventory;
+
            }
+
        }
+
    }
+
}
+

+
/// Commands for advancing the [`Routing`] state.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Command {
+
    /// Update the [`Routing`] state with the new inventory.
+
    Update { inventory: BTreeSet<RepoId> },
+
}
+

+
/// Events that are emitted when evaluating the [`Routing`] state against new
+
/// input.
+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub enum Event {
+
    /// The set of repositories were added for the node's routing table.
+
    Added {
+
        node: NodeId,
+
        repos: BTreeSet<RepoId>,
+
    },
+
    /// The set of repositories were removed from the node's routing table.
+
    Removed {
+
        node: NodeId,
+
        repos: BTreeSet<RepoId>,
+
    },
+
}
+

+
/// The result of [`Routing::found_inventory`].
+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
+
pub struct RoutingResult {
+
    /// See [`Command`].
+
    pub commands: Vec<Command>,
+
    /// See [`Event`].
+
    pub events: Vec<Event>,
+
}
+

+
impl RoutingResult {
+
    fn update(&mut self, inventory: BTreeSet<RepoId>) {
+
        self.commands.push(Command::Update { inventory });
+
    }
+

+
    fn added(&mut self, node: NodeId, added: BTreeSet<RepoId>) {
+
        self.events.push(Event::Added { node, repos: added });
+
    }
+

+
    fn removed(&mut self, node: NodeId, removed: BTreeSet<RepoId>) {
+
        self.events.push(Event::Removed {
+
            node,
+
            repos: removed,
+
        });
+
    }
+
}
added crates/radicle-protocol/src/routing/effects.rs
@@ -0,0 +1,31 @@
+
use std::collections::BTreeSet;
+

+
use radicle::{
+
    node::{NodeId, Timestamp},
+
    prelude::RepoId,
+
};
+

+
pub struct SetInventory {
+
    /// The node we are setting the inventory for.
+
    pub node: NodeId,
+
    /// The inventory of the node.
+
    pub inventory: BTreeSet<RepoId>,
+
    /// When the inventory update was found.
+
    pub now: Timestamp,
+
}
+

+
/// An error occurred when setting the inventory for a node in the routing
+
/// table.
+
///
+
/// Note that there are no domain errors for setting the inventory, since we
+
/// expect it to always set the inventory.
+
pub enum SetInventoryError {
+
    /// An error occurred due to the underlying storage mechanism.
+
    Other(Box<dyn std::error::Error + Send + Sync + 'static>),
+
}
+

+
pub trait RoutingTable {
+
    /// Set the inventory for a node. The inventory is essentially the set of
+
    /// RIDs that the node is seeding and replicating.
+
    fn set_inventory(set: SetInventory) -> Result<(), SetInventoryError>;
+
}
added crates/radicle-protocol/src/tasks.rs
@@ -0,0 +1,838 @@
+
use localtime::{LocalDuration, LocalTime};
+
use nonempty::{nonempty, NonEmpty};
+

+
/// How often to run the "idle" task.
+
pub const IDLE_INTERVAL: LocalDuration = LocalDuration::from_secs(30);
+
/// How often to run the "gossip" task.
+
pub const GOSSIP_INTERVAL: LocalDuration = LocalDuration::from_secs(6);
+
/// How often to run the "announce" task.
+
pub const ANNOUNCE_INTERVAL: LocalDuration = LocalDuration::from_mins(60);
+
/// How often to run the "sync" task.
+
pub const SYNC_INTERVAL: LocalDuration = LocalDuration::from_secs(60);
+
/// How often to run the "prune" task.
+
pub const PRUNE_INTERVAL: LocalDuration = LocalDuration::from_mins(30);
+

+
/// The [`TaskTimers`] keeps track of the state of different timers and duration
+
/// intervals for when to emit a set of tasks.
+
///
+
/// [`TaskTimers::tick`] checks which intervals have elapsed and produces a
+
/// [`TaskTimerCommand`].
+
///
+
/// The [`TaskTimerCommand`] will provide a set of [`TimerEvent`]s which can be
+
/// used to [`TaskTimers::advance`] the clocks forward.
+
///
+
/// It also provides a set [`TaskEvent`]s which are tasks that the rest of the
+
/// system should execute.
+
pub struct TaskTimers {
+
    last_idle: LocalTime,
+
    last_gossip: LocalTime,
+
    last_sync: LocalTime,
+
    last_announce: LocalTime,
+
    last_prune: LocalTime,
+
    intervals: Intervals,
+
}
+

+
/// Interval durations used in [`TaskTimers`] for deciding when to issue
+
/// [`TaskEvent`]s and [`TimerEvent`]s.
+
///
+
/// The default values for each interval are given by:
+
///   - Idle: 30s
+
///   - Gossip: 6s
+
///   - Sync: 60s
+
///   - Announce: 60m
+
///   - Prune: 30m
+
pub struct Intervals {
+
    pub idle: LocalDuration,
+
    pub gossip: LocalDuration,
+
    pub sync: LocalDuration,
+
    pub announce: LocalDuration,
+
    pub prune: LocalDuration,
+
}
+

+
impl Default for Intervals {
+
    fn default() -> Self {
+
        Self {
+
            idle: IDLE_INTERVAL,
+
            gossip: GOSSIP_INTERVAL,
+
            sync: SYNC_INTERVAL,
+
            announce: ANNOUNCE_INTERVAL,
+
            prune: PRUNE_INTERVAL,
+
        }
+
    }
+
}
+

+
impl Intervals {
+
    /// The set of [`Intervals`] that never elapse.
+
    pub const NEVER: Intervals = Intervals {
+
        idle: LocalDuration::MAX,
+
        gossip: LocalDuration::MAX,
+
        sync: LocalDuration::MAX,
+
        announce: LocalDuration::MAX,
+
        prune: LocalDuration::MAX,
+
    };
+

+
    /// The set of [`Intervals`] that always elapse – as long as at least a
+
    /// millisecond of time passes.
+
    pub const ALWAYS: Intervals = Intervals {
+
        idle: LocalDuration::from_millis(0),
+
        gossip: LocalDuration::from_millis(0),
+
        sync: LocalDuration::from_millis(0),
+
        announce: LocalDuration::from_millis(0),
+
        prune: LocalDuration::from_millis(0),
+
    };
+

+
    /// Set the idle interval – issues tasks based on the node's idleness.
+
    pub fn with_idle(mut self, idle: LocalDuration) -> Self {
+
        self.idle = idle;
+
        self
+
    }
+

+
    /// Set the gossip interval – issues tasks for gossiping to the network.
+
    pub fn with_gossip(mut self, gossip: LocalDuration) -> Self {
+
        self.gossip = gossip;
+
        self
+
    }
+

+
    /// Set the sync interval – issues tasks for syncing with the network.
+
    pub fn with_sync(mut self, sync: LocalDuration) -> Self {
+
        self.sync = sync;
+
        self
+
    }
+

+
    /// Set the announce interval – issues tasks for announcing changes to the
+
    /// network.
+
    pub fn with_announce(mut self, announce: LocalDuration) -> Self {
+
        self.announce = announce;
+
        self
+
    }
+

+
    /// Set the prune interval – issues tasks for cleaning up stale data on the
+
    /// node.
+
    pub fn with_prune(mut self, prune: LocalDuration) -> Self {
+
        self.prune = prune;
+
        self
+
    }
+
}
+

+
/// Task events that are emitted by the [`TaskTimers`].
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum TaskEvent {
+
    /// Maintain any configured persistent connections.
+
    MaintainPersistentConnections,
+
    /// Idle tasks.
+
    Idle(Idle),
+
    /// Gossip tasks.
+
    Gossip(Gossip),
+
    /// Synchronization tasks.
+
    Sync(Sync),
+
    /// Announcement tasks.
+
    Announce(Announce),
+
    /// Prune tasks.
+
    Prune(Prune),
+
}
+

+
impl TaskEvent {
+
    /// Produce all [`TaskEvent`]s.
+
    pub fn all() -> NonEmpty<Self> {
+
        let mut events = nonempty![TaskEvent::MaintainPersistentConnections];
+
        events.extend(Idle::all().map(Self::from));
+
        events.extend(Gossip::all().map(Self::from));
+
        events.extend(Sync::all().map(Self::from));
+
        events.extend(Announce::all().map(Self::from));
+
        events.extend(Prune::all().map(Self::from));
+
        events
+
    }
+
}
+

+
/// Events that describe how timer state should change.
+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
+
pub enum TimerEvent {
+
    /// Advance the idle clock of [`TaskTimers`].
+
    AdvanceIdle(LocalTime),
+
    /// Advance the gossip clock of [`TaskTimers`].
+
    AdvanceGossip(LocalTime),
+
    /// Advance the sync clock of [`TaskTimers`].
+
    AdvanceSync(LocalTime),
+
    /// Advance the announce clock of [`TaskTimers`].
+
    AdvanceAnnounce(LocalTime),
+
    /// Advance the prune clock of [`TaskTimers`].
+
    AdvancePrune(LocalTime),
+
}
+

+
/// Tasks for the machine to perform while it is idling.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Idle {
+
    /// Maintain any existing connections.
+
    MaintainConnections,
+
    /// Issue a keep alive message to connections.
+
    KeepAlive,
+
    /// Disconnect any unresponsive peers.
+
    DisconnectUnresponsivePeers,
+
    /// Dequeue any queued fetches.
+
    DequeueFetches,
+
}
+

+
impl Idle {
+
    /// Return all the [`Idle`] variants.
+
    pub fn all() -> NonEmpty<Self> {
+
        nonempty![
+
            Self::MaintainConnections,
+
            Self::KeepAlive,
+
            Self::DisconnectUnresponsivePeers,
+
            Self::DequeueFetches,
+
        ]
+
    }
+
}
+

+
impl From<Idle> for TaskEvent {
+
    fn from(value: Idle) -> Self {
+
        TaskEvent::Idle(value)
+
    }
+
}
+

+
/// Gossip tasks for the machine to execute.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Gossip {
+
    /// Relay announcements to connections.
+
    RelayAnnouncements,
+
}
+

+
impl Gossip {
+
    /// Return all the [`Gossip`] variants.
+
    pub fn all() -> NonEmpty<Self> {
+
        nonempty![Self::RelayAnnouncements]
+
    }
+
}
+

+
impl From<Gossip> for TaskEvent {
+
    fn from(value: Gossip) -> Self {
+
        TaskEvent::Gossip(value)
+
    }
+
}
+

+
/// Synchronization tasks for the machine to execute.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Sync {
+
    /// Fetch missing repositories from connections. Missing repositories are
+
    /// repositories that are seeded, but not in storage.
+
    MissingRepositories,
+
}
+

+
impl Sync {
+
    /// Return all the [`Sync`] variants.
+
    pub fn all() -> NonEmpty<Self> {
+
        nonempty![Self::MissingRepositories]
+
    }
+
}
+

+
impl From<Sync> for TaskEvent {
+
    fn from(value: Sync) -> Self {
+
        TaskEvent::Sync(value)
+
    }
+
}
+

+
/// Announcement tasks for the machine to execute.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Announce {
+
    /// Announce inventory to connections.
+
    Inventory,
+
}
+

+
impl Announce {
+
    /// Return all the [`Announce`] variants.
+
    pub fn all() -> NonEmpty<Self> {
+
        nonempty![Self::Inventory]
+
    }
+
}
+

+
impl From<Announce> for TaskEvent {
+
    fn from(value: Announce) -> Self {
+
        TaskEvent::Announce(value)
+
    }
+
}
+

+
/// Pruning tasks for the machine to execute.
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+
pub enum Prune {
+
    /// Prune any routing entries that are no longer required.
+
    RoutingEntries,
+
    /// Prune any announcements that are no longer required.
+
    Announcements,
+
}
+

+
impl Prune {
+
    /// Return all the [`Prune`] variants.
+
    pub fn all() -> NonEmpty<Self> {
+
        nonempty![Self::RoutingEntries, Self::Announcements]
+
    }
+
}
+

+
impl From<Prune> for TaskEvent {
+
    fn from(value: Prune) -> Self {
+
        TaskEvent::Prune(value)
+
    }
+
}
+

+
/// The result of performing a [`TaskTimers::tick`].
+
pub struct TaskTimerCommand {
+
    /// The set of tasks that should be performed next.
+
    pub tasks: NonEmpty<TaskEvent>,
+
    /// The set of timers that should be used to advance the [`TaskTimers`]
+
    /// ([`TaskTimers::advance`]).
+
    pub timers: Vec<TimerEvent>,
+
}
+

+
impl TaskTimers {
+
    /// Construct the [`Timers`] starting all timers at `now`.
+
    pub fn new(now: LocalTime) -> Self {
+
        Self {
+
            last_idle: now,
+
            last_gossip: now,
+
            last_sync: now,
+
            last_announce: now,
+
            last_prune: now,
+
            intervals: Intervals::default(),
+
        }
+
    }
+

+
    /// Set the [`Intervals`] for the [`Timers`].
+
    pub fn with_intervals(mut self, intervals: Intervals) -> Self {
+
        self.intervals = intervals;
+
        self
+
    }
+

+
    /// Issue [`TaskEvent`]s and [`TimerEvent`]s based on the [`Intervals`] of
+
    /// the timers, and the time passed in `now`.
+
    ///
+
    /// The [`TimerEvent`]s can be used to advance the timers, using
+
    /// [`Timers::advance`].
+
    pub fn tick(&self, now: LocalTime) -> TaskTimerCommand {
+
        let mut tasks = NonEmpty::new(TaskEvent::MaintainPersistentConnections);
+
        let mut timers = Vec::with_capacity(5);
+

+
        if self.has_idle_elapsed(now) {
+
            tasks.extend(Idle::all().map(TaskEvent::from));
+
            timers.push(TimerEvent::AdvanceIdle(now));
+
        }
+

+
        if self.has_gossip_elapsed(now) {
+
            tasks.extend(Gossip::all().map(TaskEvent::from));
+
            timers.push(TimerEvent::AdvanceGossip(now));
+
        }
+

+
        if self.has_sync_elapsed(now) {
+
            tasks.extend(Sync::all().map(TaskEvent::from));
+
            timers.push(TimerEvent::AdvanceSync(now));
+
        }
+

+
        if self.has_announce_elapsed(now) {
+
            tasks.extend(Announce::all().map(TaskEvent::from));
+
            timers.push(TimerEvent::AdvanceAnnounce(now));
+
        }
+

+
        if self.has_prune_elapsed(now) {
+
            tasks.extend(Prune::all().map(TaskEvent::from));
+
            timers.push(TimerEvent::AdvancePrune(now));
+
        }
+

+
        TaskTimerCommand { tasks, timers }
+
    }
+

+
    /// Apply [`TimerEvent`]s to advance their clocks.
+
    pub fn advance(&mut self, events: &[TimerEvent]) {
+
        for event in events {
+
            self.advance_timer(event);
+
        }
+
    }
+

+
    fn advance_timer(&mut self, event: &TimerEvent) {
+
        match event {
+
            TimerEvent::AdvanceIdle(time) => self.last_idle = *time,
+
            TimerEvent::AdvanceGossip(time) => self.last_gossip = *time,
+
            TimerEvent::AdvanceSync(time) => self.last_sync = *time,
+
            TimerEvent::AdvanceAnnounce(time) => self.last_announce = *time,
+
            TimerEvent::AdvancePrune(time) => self.last_prune = *time,
+
        }
+
    }
+

+
    fn has_idle_elapsed(&self, now: LocalTime) -> bool {
+
        now - self.last_idle >= self.intervals.idle
+
    }
+

+
    fn has_gossip_elapsed(&self, now: LocalTime) -> bool {
+
        now - self.last_gossip >= self.intervals.gossip
+
    }
+

+
    fn has_sync_elapsed(&self, now: LocalTime) -> bool {
+
        now - self.last_sync >= self.intervals.sync
+
    }
+

+
    fn has_announce_elapsed(&self, now: LocalTime) -> bool {
+
        now - self.last_announce >= self.intervals.announce
+
    }
+

+
    fn has_prune_elapsed(&self, now: LocalTime) -> bool {
+
        now - self.last_prune >= self.intervals.prune
+
    }
+
}
+

+
#[cfg(test)]
+
mod tests {
+
    use super::*;
+
    use localtime::{LocalDuration, LocalTime};
+
    use nonempty::nonempty;
+

+
    // Helper function to create a test time
+
    fn test_time(secs: u64) -> LocalTime {
+
        LocalTime::from_secs(secs)
+
    }
+

+
    // Helper function to create test duration
+
    fn test_duration(secs: u64) -> LocalDuration {
+
        LocalDuration::from_secs(secs)
+
    }
+

+
    fn assert_tasks(got: &NonEmpty<TaskEvent>, expected: NonEmpty<TaskEvent>) {
+
        assert_eq!(
+
            got.len(),
+
            expected.len(),
+
            "extra task(s) emitted: {:?}",
+
            got
+
        );
+
        for t in expected {
+
            assert!(got.contains(&t), "missing expected task: {t:?}")
+
        }
+
    }
+

+
    fn assert_timers(got: &Vec<TimerEvent>, expected: Vec<TimerEvent>) {
+
        assert_eq!(
+
            got.len(),
+
            expected.len(),
+
            "extra timer(s) emitted: {:?}",
+
            got
+
        );
+
        for t in expected {
+
            assert!(got.contains(&t), "missing expected timer: {t:?}")
+
        }
+
    }
+

+
    fn gen_duration(seed: usize) -> LocalDuration {
+
        LocalDuration::from_secs(radicle::test::arbitrary::gen(seed))
+
    }
+

+
    #[test]
+
    fn test_tick_at_start_time_only_persistent_connections() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals::NEVER);
+

+
        let command = timers.tick(start_time);
+

+
        assert_eq!(
+
            command.tasks,
+
            nonempty![TaskEvent::MaintainPersistentConnections]
+
        );
+
        assert_eq!(command.timers, vec![]);
+
    }
+

+
    #[test]
+
    fn test_tick_idle_interval_elapsed() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            idle: LocalDuration::from_secs(1),
+
            ..Intervals::NEVER
+
        });
+

+
        // Advance time by exactly the idle interval
+
        let tick_time = start_time + test_duration(2);
+
        let command = timers.tick(tick_time);
+

+
        let mut expected = nonempty![TaskEvent::MaintainPersistentConnections];
+
        expected.append(&mut Idle::all().map(TaskEvent::from).into());
+
        assert_tasks(&command.tasks, expected);
+
        assert_timers(&command.timers, vec![TimerEvent::AdvanceIdle(tick_time)]);
+
    }
+

+
    #[test]
+
    fn test_tick_gossip_only() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            gossip: LocalDuration::from_secs(1),
+
            ..Intervals::NEVER
+
        });
+

+
        let tick_time = start_time + test_duration(2);
+
        let command = timers.tick(tick_time);
+

+
        let mut expected = nonempty![TaskEvent::MaintainPersistentConnections];
+
        expected.append(&mut Gossip::all().map(TaskEvent::from).into());
+
        assert_tasks(&command.tasks, expected);
+
        assert_timers(&command.timers, vec![TimerEvent::AdvanceGossip(tick_time)]);
+
    }
+

+
    #[test]
+
    fn test_tick_sync_interval_elapsed() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            sync: LocalDuration::from_secs(1),
+
            ..Intervals::NEVER
+
        });
+

+
        let tick_time = start_time + test_duration(2);
+
        let command = timers.tick(tick_time);
+

+
        let mut expected = nonempty![TaskEvent::MaintainPersistentConnections];
+
        expected.append(&mut Sync::all().map(TaskEvent::from).into());
+
        assert_tasks(&command.tasks, expected);
+
        assert_timers(&command.timers, vec![TimerEvent::AdvanceSync(tick_time)]);
+
    }
+

+
    #[test]
+
    fn test_tick_announce_interval_elapsed() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            announce: LocalDuration::from_secs(1),
+
            ..Intervals::NEVER
+
        });
+

+
        let tick_time = start_time + test_duration(2);
+
        let command = timers.tick(tick_time);
+

+
        let mut expected = nonempty![TaskEvent::MaintainPersistentConnections];
+
        expected.append(&mut Announce::all().map(TaskEvent::from).into());
+
        assert_tasks(&command.tasks, expected);
+
        assert_timers(
+
            &command.timers,
+
            vec![TimerEvent::AdvanceAnnounce(tick_time)],
+
        );
+
    }
+

+
    #[test]
+
    fn test_tick_prune_interval_elapsed() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            prune: LocalDuration::from_secs(1),
+
            ..Intervals::NEVER
+
        });
+

+
        let tick_time = start_time + test_duration(2);
+
        let command = timers.tick(tick_time);
+

+
        let mut expected = nonempty![TaskEvent::MaintainPersistentConnections];
+
        expected.append(&mut Prune::all().map(TaskEvent::from).into());
+
        assert_tasks(&command.tasks, expected);
+
        assert_timers(&command.timers, vec![TimerEvent::AdvancePrune(tick_time)]);
+
    }
+

+
    #[test]
+
    fn test_tick_multiple_intervals_elapsed() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            idle: LocalDuration::from_secs(1),
+
            gossip: LocalDuration::from_secs(1),
+
            sync: LocalDuration::from_secs(1),
+
            ..Intervals::NEVER
+
        });
+

+
        let tick_time = start_time + test_duration(2);
+
        let command = timers.tick(tick_time);
+

+
        let mut expected = nonempty![TaskEvent::MaintainPersistentConnections];
+
        expected.append(&mut Idle::all().map(TaskEvent::from).into());
+
        expected.append(&mut Gossip::all().map(TaskEvent::from).into());
+
        expected.append(&mut Sync::all().map(TaskEvent::from).into());
+
        assert_tasks(&command.tasks, expected);
+
        assert_timers(
+
            &command.timers,
+
            vec![
+
                TimerEvent::AdvanceIdle(tick_time),
+
                TimerEvent::AdvanceGossip(tick_time),
+
                TimerEvent::AdvanceSync(tick_time),
+
            ],
+
        );
+
    }
+

+
    #[test]
+
    fn test_tick_all_intervals_elapsed() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals::ALWAYS);
+

+
        let tick_time = start_time + test_duration(1);
+
        let command = timers.tick(tick_time);
+

+
        assert_tasks(&command.tasks, TaskEvent::all());
+
        assert_timers(
+
            &command.timers,
+
            vec![
+
                TimerEvent::AdvanceIdle(tick_time),
+
                TimerEvent::AdvanceGossip(tick_time),
+
                TimerEvent::AdvanceSync(tick_time),
+
                TimerEvent::AdvanceAnnounce(tick_time),
+
                TimerEvent::AdvancePrune(tick_time),
+
            ],
+
        );
+
    }
+

+
    #[test]
+
    fn test_advance_single_timer() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time);
+

+
        let new_time = test_time(2000);
+
        let events = vec![TimerEvent::AdvanceIdle(new_time)];
+

+
        timers.advance(&events);
+

+
        assert_eq!(timers.last_idle, new_time);
+
        assert_eq!(timers.last_gossip, start_time);
+
        assert_eq!(timers.last_sync, start_time);
+
        assert_eq!(timers.last_announce, start_time);
+
        assert_eq!(timers.last_prune, start_time);
+
    }
+

+
    #[test]
+
    fn test_advance_multiple_timers() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time);
+

+
        let new_time = test_time(2000);
+
        let events = vec![
+
            TimerEvent::AdvanceIdle(new_time),
+
            TimerEvent::AdvanceGossip(new_time),
+
            TimerEvent::AdvanceSync(new_time),
+
        ];
+

+
        timers.advance(&events);
+

+
        assert_eq!(timers.last_idle, new_time);
+
        assert_eq!(timers.last_gossip, new_time);
+
        assert_eq!(timers.last_sync, new_time);
+
        assert_eq!(timers.last_announce, start_time);
+
        assert_eq!(timers.last_prune, start_time);
+
    }
+

+
    #[test]
+
    fn test_advance_all_timers() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time);
+

+
        let new_time = test_time(2000);
+
        let events = vec![
+
            TimerEvent::AdvanceIdle(new_time),
+
            TimerEvent::AdvanceGossip(new_time),
+
            TimerEvent::AdvanceSync(new_time),
+
            TimerEvent::AdvanceAnnounce(new_time),
+
            TimerEvent::AdvancePrune(new_time),
+
        ];
+

+
        timers.advance(&events);
+

+
        assert_eq!(timers.last_idle, new_time);
+
        assert_eq!(timers.last_gossip, new_time);
+
        assert_eq!(timers.last_sync, new_time);
+
        assert_eq!(timers.last_announce, new_time);
+
        assert_eq!(timers.last_prune, new_time);
+
    }
+

+
    #[test]
+
    fn test_advance_empty_events() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time);
+

+
        timers.advance(&[]);
+

+
        // All timers should remain unchanged
+
        assert_eq!(timers.last_idle, start_time);
+
        assert_eq!(timers.last_gossip, start_time);
+
        assert_eq!(timers.last_sync, start_time);
+
        assert_eq!(timers.last_announce, start_time);
+
        assert_eq!(timers.last_prune, start_time);
+
    }
+

+
    #[test]
+
    fn test_exactly_at_interval_boundary() {
+
        let start_time = test_time(1000);
+
        let timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            idle: LocalDuration::from_secs(30),
+
            ..Intervals::NEVER
+
        });
+

+
        // Test exactly at the idle boundary
+
        let boundary_time = start_time + test_duration(30);
+
        let command = timers.tick(boundary_time);
+

+
        assert!(command.tasks.contains(&TaskEvent::Idle(Idle::KeepAlive)));
+
    }
+

+
    #[test]
+
    fn test_just_before_interval_boundary() {
+
        let start_time = test_time(1000);
+
        // Use different intervals to avoid gossip triggering when testing idle boundary
+
        let intervals = Intervals {
+
            idle: test_duration(30),
+
            ..Intervals::NEVER
+
        };
+
        let timers = TaskTimers::new(start_time).with_intervals(intervals);
+

+
        // Test just before the idle boundary
+
        let almost_boundary = start_time + test_duration(29);
+
        let command = timers.tick(almost_boundary);
+

+
        // Should not trigger idle tasks, only persistent connections
+
        assert_tasks(
+
            &command.tasks,
+
            nonempty![TaskEvent::MaintainPersistentConnections],
+
        );
+
    }
+

+
    #[test]
+
    fn test_tick_after_advance() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            idle: LocalDuration::from_secs(30),
+
            gossip: LocalDuration::from_secs(30),
+
            ..Intervals::NEVER
+
        });
+

+
        // First tick - should trigger idle and gossip
+
        let first_tick = start_time + test_duration(30);
+
        let command1 = timers.tick(first_tick);
+

+
        // Advance the timers
+
        timers.advance(&command1.timers);
+

+
        // Second tick at same time - should not trigger anything again
+
        let command2 = timers.tick(first_tick);
+

+
        assert!(!command2.tasks.contains(&TaskEvent::Idle(Idle::KeepAlive)));
+
        assert!(!command2
+
            .tasks
+
            .contains(&TaskEvent::Gossip(Gossip::RelayAnnouncements)));
+
        assert_eq!(
+
            command2.tasks,
+
            nonempty![TaskEvent::MaintainPersistentConnections]
+
        );
+
    }
+

+
    #[test]
+
    fn test_tick_advance_tick_cycle() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time).with_intervals(Intervals {
+
            idle: LocalDuration::from_secs(30),
+
            ..Intervals::NEVER
+
        });
+

+
        // First cycle
+
        let tick1 = start_time + test_duration(30);
+
        let command1 = timers.tick(tick1);
+
        timers.advance(&command1.timers);
+

+
        // Second cycle - advance by another idle interval
+
        let tick2 = tick1 + test_duration(30);
+
        let command2 = timers.tick(tick2);
+

+
        // Should trigger idle tasks again
+
        assert!(command2.tasks.contains(&TaskEvent::Idle(Idle::KeepAlive)));
+
    }
+

+
    #[test]
+
    fn test_custom_intervals_behavior() {
+
        let start_time = test_time(1000);
+
        let custom_intervals = Intervals {
+
            idle: test_duration(1),   // Very short idle interval
+
            gossip: test_duration(2), // Short gossip interval
+
            ..Intervals::NEVER
+
        };
+
        let timers = TaskTimers::new(start_time).with_intervals(custom_intervals);
+

+
        // Test that custom intervals are respected
+
        let tick_time = start_time + test_duration(1);
+
        let command = timers.tick(tick_time);
+

+
        assert!(command.tasks.contains(&TaskEvent::Idle(Idle::KeepAlive)));
+
        // Gossip should not trigger yet (interval is 2s)
+
        assert!(!command
+
            .tasks
+
            .contains(&TaskEvent::Gossip(Gossip::RelayAnnouncements)));
+

+
        // Test gossip triggers at its interval
+
        let gossip_time = start_time + test_duration(2);
+
        let command2 = timers.tick(gossip_time);
+

+
        assert!(command2
+
            .tasks
+
            .contains(&TaskEvent::Gossip(Gossip::RelayAnnouncements)));
+
    }
+

+
    #[test]
+
    fn test_zero_duration_intervals() {
+
        let start_time = test_time(1000);
+
        let zero_intervals = Intervals {
+
            idle: LocalDuration::from_secs(0),
+
            gossip: LocalDuration::from_secs(0),
+
            ..Intervals::NEVER
+
        };
+
        let timers = TaskTimers::new(start_time).with_intervals(zero_intervals);
+

+
        // Even with zero duration, should trigger immediately
+
        let command = timers.tick(start_time);
+

+
        assert!(command.tasks.contains(&TaskEvent::Idle(Idle::KeepAlive)));
+
        assert!(command
+
            .tasks
+
            .contains(&TaskEvent::Gossip(Gossip::RelayAnnouncements)));
+
    }
+

+
    #[test]
+
    fn test_partial_timer_advance() {
+
        let start_time = test_time(1000);
+
        let mut timers = TaskTimers::new(start_time);
+

+
        let new_time = test_time(2000);
+

+
        // Only advance some timers (not all that might have been generated)
+
        let partial_events = vec![
+
            TimerEvent::AdvanceIdle(new_time),
+
            TimerEvent::AdvanceGossip(new_time),
+
            // Intentionally omit sync, announce, and prune
+
        ];
+

+
        timers.advance(&partial_events);
+

+
        // Only idle and gossip should be advanced
+
        assert_eq!(timers.last_idle, new_time);
+
        assert_eq!(timers.last_gossip, new_time);
+
        assert_eq!(timers.last_sync, start_time);
+
        assert_eq!(timers.last_announce, start_time);
+
        assert_eq!(timers.last_prune, start_time);
+
    }
+

+
    #[test]
+
    fn test_property_timer_monotonicity() {
+
        let start_time = test_time(1000);
+
        let intervals = Intervals {
+
            idle: gen_duration(0),
+
            gossip: gen_duration(1),
+
            sync: gen_duration(2),
+
            announce: gen_duration(3),
+
            prune: gen_duration(4),
+
        };
+
        let mut timers = TaskTimers::new(start_time).with_intervals(intervals);
+

+
        let mut current_time = start_time;
+

+
        for _ in 0..100 {
+
            current_time = current_time + test_duration(10);
+
            let command = timers.tick(current_time);
+

+
            // Advance all timers
+
            timers.advance(&command.timers);
+

+
            // Verify all timers are monotonically increasing
+
            assert!(timers.last_idle >= start_time);
+
            assert!(timers.last_gossip >= start_time);
+
            assert!(timers.last_sync >= start_time);
+
            assert!(timers.last_announce >= start_time);
+
            assert!(timers.last_prune >= start_time);
+
        }
+
    }
+
}
modified crates/radicle/src/node.rs
@@ -898,7 +898,7 @@ impl From<Vec<Seed>> for Seeds {
    }
}

-
#[derive(Clone, Debug, Serialize, Deserialize)]
+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "camelCase")]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub enum FetchResult {