Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Subscribe on track-repo
Alexis Sellier committed 3 years ago
commit 89fcc6de1cc4ebdadf3b3c8542f0b6464090cfc0
parent 8cbc3c240359798f5617774fc7a1f798086ff1ff
3 files changed +42 -9
modified radicle-node/src/service.rs
@@ -456,12 +456,18 @@ where
                self.fetch_reqs.insert(rid, resp);
                self.fetch(rid, &seed);
            }
-
            Command::TrackRepo(id, resp) => {
+
            Command::TrackRepo(rid, resp) => {
                let tracked = self
-
                    .track_repo(&id, tracking::Scope::All)
+
                    .track_repo(&rid, tracking::Scope::All)
                    .expect("Service::command: error tracking repository");
                // TODO: Try to fetch project if we weren't tracking it before.
                resp.send(tracked).ok();
+

+
                // Let all our peers know that we're interested in this repo from now on.
+
                self.reactor.broadcast(
+
                    Message::subscribe(self.filter(), self.time(), Timestamp::MAX),
+
                    self.sessions.negotiated().map(|(_, s)| s),
+
                );
            }
            Command::UntrackRepo(id, resp) => {
                let untracked = self
@@ -716,7 +722,7 @@ where
                self.reactor
                    .disconnect(remote, DisconnectReason::Session(err));

-
                // FIXME: The peer should be set in a state such that we don'that
+
                // FIXME: The peer should be set in a state such that we don't
                // process further messages.
            }
            Ok(()) => {}
@@ -760,7 +766,7 @@ where
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
                if !peer.inventory_announced(timestamp) {
-
                    debug!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.clock.as_millis());
+
                    debug!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.time());
                    return Ok(false);
                }

@@ -1077,7 +1083,7 @@ where
    /// Sync, and if needed, announce our local inventory.
    fn sync_and_announce_inventory(&mut self) -> Result<Vec<Id>, Error> {
        let inventory = self.storage.inventory()?;
-
        let updated = self.sync_routing(&inventory, self.node_id(), self.clock.as_millis())?;
+
        let updated = self.sync_routing(&inventory, self.node_id(), self.time())?;

        if !updated.is_empty() {
            self.announce_inventory(inventory)?;
@@ -1127,7 +1133,7 @@ where
    fn announce_refs(&mut self, rid: Id, namespaces: Namespaces) -> Result<(), storage::Error> {
        let repo = self.storage.repository(rid)?;
        let peers = self.sessions.negotiated().map(|(_, p)| p);
-
        let timestamp = self.clock.as_millis();
+
        let timestamp = self.time();
        let mut refs = BoundedVec::<_, REF_REMOTE_LIMIT>::new();

        match namespaces {
@@ -1182,13 +1188,18 @@ where
        }
    }

+
    /// Get the current time.
+
    fn time(&self) -> Timestamp {
+
        self.clock.as_millis()
+
    }
+

    ////////////////////////////////////////////////////////////////////////////
    // Periodic tasks
    ////////////////////////////////////////////////////////////////////////////

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
-
        let time = self.clock.as_millis();
+
        let time = self.time();
        let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
        for id in self.sessions.negotiated().map(|(id, _)| id) {
            self.reactor.write(*id, inv.clone());
modified radicle-node/src/service/reactor.rs
@@ -98,11 +98,12 @@ impl Reactor {
    /// Broadcast a message to a list of peers.
    pub fn broadcast<'a>(
        &mut self,
-
        msg: Announcement,
+
        msg: impl Into<Message>,
        peers: impl IntoIterator<Item = &'a Session>,
    ) {
+
        let msg = msg.into();
        for peer in peers {
-
            self.write(peer.id, msg.clone().into());
+
            self.write(peer.id, msg.clone());
        }
    }

modified radicle-node/src/tests.rs
@@ -836,6 +836,27 @@ fn test_maintain_connections() {
}

#[test]
+
fn test_track_repo_subscribe() {
+
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
+
    let bob = Peer::new("bob", [8, 8, 8, 8]);
+
    let rid = arbitrary::gen::<Id>(1);
+
    let (send, recv) = chan::bounded(1);
+

+
    alice.connect_to(&bob);
+
    alice.command(Command::TrackRepo(rid, send));
+
    assert!(recv.recv().unwrap());
+

+
    assert_matches!(
+
        alice.messages(bob.id).next(),
+
        Some(Message::Subscribe(Subscribe {
+
            filter,
+
            since,
+
            ..
+
        })) if since == alice.clock().as_millis() && filter.contains(&rid)
+
    );
+
}
+

+
#[test]
fn test_push_and_pull() {
    let tempdir = tempfile::tempdir().unwrap();