Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Take session reference when writing messages
Alexis Sellier committed 3 years ago
commit 0f81f8a7aabbee5a6bbd75d0a7004931fdd15dae
parent feedf345a36c0f39e67fbd5f3fabc65c5425ac16
3 files changed +16 -15
modified radicle-node/src/service.rs
@@ -519,7 +519,7 @@ where
            session::FetchResult::Ready(fetch) => {
                debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

-
                self.reactor.write(session.id, fetch);
+
                self.reactor.write(&session, fetch);
            }
            session::FetchResult::AlreadyFetching(other) => {
                if other == rid {
@@ -642,7 +642,7 @@ where

            if let Some(peer) = self.sessions.get_mut(&remote) {
                self.reactor.write_all(
-
                    remote,
+
                    peer,
                    gossip::handshake(
                        self.clock.as_millis(),
                        &self.storage,
@@ -969,9 +969,11 @@ where
                    );
                    return Err(session::Error::Misbehavior);
                }
+
                *initialized = true;
+

                if peer.link.is_inbound() {
                    self.reactor.write_all(
-
                        peer.id,
+
                        peer,
                        gossip::handshake(
                            self.clock.as_millis(),
                            &self.storage,
@@ -981,7 +983,6 @@ where
                        ),
                    );
                }
-
                *initialized = true;
                // Nb. we don't set the peer timestamp here, since it is going to be
                // set after the first message is received only. Setting it here would
                // mean that messages received right after the handshake could be ignored.
@@ -1015,7 +1016,7 @@ where
                    // Don't send announcements authored by the remote, back to the remote.
                    .filter(|ann| &ann.node != remote)
                {
-
                    self.reactor.write(peer.id, ann.into());
+
                    self.reactor.write(peer, ann.into());
                }
                peer.subscribe = Some(subscribe);
            }
@@ -1025,7 +1026,7 @@ where
                    return Ok(());
                }
                self.reactor.write(
-
                    peer.id,
+
                    &peer,
                    Message::Pong {
                        zeroes: ZeroBytes::new(ponglen),
                    },
@@ -1045,7 +1046,7 @@ where

                *protocol = Protocol::Fetch { rid };
                // Accept the request and instruct the transport to handover the socket to the worker.
-
                self.reactor.write(*remote, Message::FetchOk { rid });
+
                self.reactor.write(peer, Message::FetchOk { rid });
                self.reactor
                    .fetch(*remote, rid, Namespaces::default(), false);
            }
@@ -1201,8 +1202,8 @@ where
    fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
        let time = self.time();
        let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
-
        for id in self.sessions.negotiated().map(|(id, _)| id) {
-
            self.reactor.write(*id, inv.clone());
+
        for (_, sess) in self.sessions.negotiated() {
+
            self.reactor.write(sess, inv.clone());
        }
        Ok(())
    }
modified radicle-node/src/service/reactor.rs
@@ -61,13 +61,13 @@ impl Reactor {
        self.io.push_back(Io::Disconnect(id, reason));
    }

-
    pub fn write(&mut self, remote: NodeId, msg: Message) {
+
    pub fn write(&mut self, remote: &Session, msg: Message) {
        debug!(target: "service", "Write {:?} to {}", &msg, remote);

-
        self.io.push_back(Io::Write(remote, vec![msg]));
+
        self.io.push_back(Io::Write(remote.id, vec![msg]));
    }

-
    pub fn write_all(&mut self, remote: NodeId, msgs: impl IntoIterator<Item = Message>) {
+
    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
        let msgs = msgs.into_iter().collect::<Vec<_>>();
        for (ix, msg) in msgs.iter().enumerate() {
            debug!(
@@ -79,7 +79,7 @@ impl Reactor {
                msgs.len()
            );
        }
-
        self.io.push_back(Io::Write(remote, msgs));
+
        self.io.push_back(Io::Write(remote.id, msgs));
    }

    pub fn wakeup(&mut self, after: LocalDuration) {
@@ -103,7 +103,7 @@ impl Reactor {
    ) {
        let msg = msg.into();
        for peer in peers {
-
            self.write(peer.id, msg.clone());
+
            self.write(peer, msg.clone());
        }
    }

modified radicle-node/src/service/session.rs
@@ -259,7 +259,7 @@ impl Session {
            let msg = message::Ping::new(&mut self.rng);
            *ping = PingState::AwaitingResponse(msg.ponglen);

-
            reactor.write(self.id, Message::Ping(msg));
+
            reactor.write(self, Message::Ping(msg));
        }
        Ok(())
    }