Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
protocol: connections subscribe commands
Fintan Halpenny committed 10 months ago
commit db6337e73043b1ef35e28548ee556d566cf5fda0
parent 1fde061794ea4dddb8beb8a3949c19bfcd3ab8c5
3 files changed +117 -3
modified crates/radicle-protocol/src/connections.rs
@@ -7,6 +7,7 @@ pub mod effects;
pub mod events;

use radicle::node::config::RateLimits;
+
use radicle::prelude::RepoId;
use session::{HasAttempts as _, Session, Sessions};
mod session;

@@ -18,7 +19,7 @@ use radicle::node::address;
use radicle::node::{Address, HostName, Link, NodeId, Severity};

use crate::service::limiter::RateLimiter;
-
use crate::service::DisconnectReason;
+
use crate::service::{message, DisconnectReason};

/// Minimum amount of time to wait before reconnecting to a peer.
pub const MIN_RECONNECTION_DELTA: LocalDuration = LocalDuration::from_secs(3);
@@ -65,10 +66,20 @@ impl Default for ReconnectionDelay {
}

pub enum CommandEvent {
-
    MissingSession { node: NodeId },
+
    MissingSession {
+
        node: NodeId,
+
    },
    Attempted(Session<session::Attempted>),
    Connected(Session<session::Connected>),
    Disconnected(Session<session::Disconnected>),
+
    Subscribed {
+
        node: NodeId,
+
        subscription: message::Subscribe,
+
    },
+
    SubscribedTo {
+
        node: NodeId,
+
        rid: RepoId,
+
    },
}

impl Connections {
@@ -77,6 +88,8 @@ impl Connections {
            commands::Command::Attempt(attempt) => self.attempted(attempt),
            commands::Command::Connect(connect) => self.connected(connect),
            commands::Command::Disconnect(disconnect) => self.disconnected(disconnect),
+
            commands::Command::Subscribe(subscribe) => self.subscribed(subscribe),
+
            commands::Command::SubscribeTo(subscribe) => self.subscribed_to(subscribe),
        }
    }

@@ -115,6 +128,28 @@ impl Connections {
            .map(CommandEvent::Disconnected)
            .unwrap_or(CommandEvent::MissingSession { node })
    }
+

+
    fn subscribed(
+
        &mut self,
+
        commands::Subscribe { node, subscription }: commands::Subscribe,
+
    ) -> CommandEvent {
+
        if self.sessions.subscribe(&node, subscription.clone()) {
+
            CommandEvent::Subscribed { node, subscription }
+
        } else {
+
            CommandEvent::MissingSession { node }
+
        }
+
    }
+

+
    fn subscribed_to(
+
        &mut self,
+
        commands::SubscribeTo { node, rid }: commands::SubscribeTo,
+
    ) -> CommandEvent {
+
        if self.sessions.subscribe_to(&node, &rid) {
+
            CommandEvent::SubscribedTo { node, rid }
+
        } else {
+
            CommandEvent::MissingSession { node }
+
        }
+
    }
}

impl Connections {
modified crates/radicle-protocol/src/connections/commands.rs
@@ -1,10 +1,17 @@
use localtime::LocalTime;
-
use radicle::node::{Link, NodeId};
+
use radicle::{
+
    node::{Link, NodeId},
+
    prelude::RepoId,
+
};
+

+
use crate::service::message;

pub enum Command {
    Attempt(Attempt),
    Connect(Connect),
    Disconnect(Disconnect),
+
    Subscribe(Subscribe),
+
    SubscribeTo(SubscribeTo),
}

impl From<Attempt> for Command {
@@ -25,6 +32,18 @@ impl From<Disconnect> for Command {
    }
}

+
impl From<Subscribe> for Command {
+
    fn from(v: Subscribe) -> Self {
+
        Self::Subscribe(v)
+
    }
+
}
+

+
impl From<SubscribeTo> for Command {
+
    fn from(v: SubscribeTo) -> Self {
+
        Self::SubscribeTo(v)
+
    }
+
}
+

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Attempt {
    pub node: NodeId,
@@ -44,3 +63,15 @@ pub struct Disconnect {
    pub since: LocalTime,
    pub retry_at: Option<LocalTime>,
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct Subscribe {
+
    pub node: NodeId,
+
    pub subscription: message::Subscribe,
+
}
+

+
#[derive(Clone, Debug, PartialEq, Eq)]
+
pub struct SubscribeTo {
+
    pub node: NodeId,
+
    pub rid: RepoId,
+
}
modified crates/radicle-protocol/src/connections/session.rs
@@ -203,6 +203,54 @@ impl Sessions {
            })
    }

+
    pub fn subscribe(&mut self, node: &NodeId, subscription: message::Subscribe) -> bool {
+
        if let Some(session) = self.connected.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        if let Some(session) = self.disconnected.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        if let Some(session) = self.attempted.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        if let Some(session) = self.initial.get_mut(node) {
+
            session.set_subscription(subscription);
+
            return true;
+
        }
+

+
        false
+
    }
+

+
    pub fn subscribe_to(&mut self, node: &NodeId, rid: &RepoId) -> bool {
+
        if let Some(session) = self.connected.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        if let Some(session) = self.disconnected.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        if let Some(session) = self.attempted.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        if let Some(session) = self.initial.get_mut(node) {
+
            session.subscribe_to(rid);
+
            return true;
+
        }
+

+
        false
+
    }
+

    fn remove_session(&mut self, node: &NodeId) -> Option<Session<State>> {
        self.initial
            .remove(node)