Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Fix filter construction and policy code
Alexis Sellier committed 3 years ago
commit 2f1a18470e39e3aaef5083017e9524955a285588
parent 27d0c8f23589c75117c7a6a185f48ad3fde8aa2a
4 files changed +64 -43
modified radicle-node/src/service.rs
@@ -249,6 +249,12 @@ where
        }
    }

+
    /// Return the next i/o action to execute.
+
    #[allow(clippy::should_implement_trait)]
+
    pub fn next(&mut self) -> Option<reactor::Io> {
+
        self.reactor.next()
+
    }
+

    /// Track a repository.
    /// Returns whether or not the tracking policy was updated.
    pub fn track_repo(&mut self, id: &Id, scope: tracking::Scope) -> Result<bool, tracking::Error> {
@@ -267,8 +273,14 @@ where
        // Nb. This is potentially slow if we have lots of projects. We should probably
        // only re-compute the filter when we've untracked a certain amount of projects
        // and the filter is really out of date.
-
        self.filter = Filter::new(self.tracking.repo_entries()?.map(|(e, _)| e));
-

+
        //
+
        // TODO: Share this code with initialization code.
+
        self.filter = Filter::new(
+
            self.tracking
+
                .repo_entries()?
+
                .filter(|(_, _, policy)| *policy == tracking::Policy::Track)
+
                .map(|(e, _, _)| e),
+
        );
        Ok(updated)
    }

@@ -345,7 +357,12 @@ where
            self.routing.insert(id, self.node_id(), time.as_secs())?;
        }
        // Setup subscription filter for tracked repos.
-
        self.filter = Filter::new(self.tracking.repo_entries()?.map(|(e, _)| e));
+
        self.filter = Filter::new(
+
            self.tracking
+
                .repo_entries()?
+
                .filter(|(_, _, policy)| *policy == tracking::Policy::Track)
+
                .map(|(e, _, _)| e),
+
        );

        Ok(())
    }
@@ -586,6 +603,8 @@ where
        // For outbound connections, we are the first to say "Hello".
        // For inbound connections, we wait for the remote to say "Hello" first.
        if link.is_outbound() {
+
            let filter = self.filter();
+

            if let Some(peer) = self.sessions.get_mut(&remote) {
                self.reactor.write_all(
                    remote,
@@ -593,7 +612,7 @@ where
                        self.clock.as_secs(),
                        &self.storage,
                        &self.signer,
-
                        self.filter.clone(),
+
                        filter,
                        &self.config,
                    ),
                );
@@ -833,6 +852,7 @@ where
        remote: &NodeId,
        message: Message,
    ) -> Result<(), session::Error> {
+
        let filter = self.filter(); // TODO: Don't call this if it's not used.
        let Some(peer) = self.sessions.get_mut(remote) else {
            return Err(session::Error::NotFound(*remote));
        };
@@ -871,7 +891,7 @@ where
                            self.clock.as_secs(),
                            &self.storage,
                            &self.signer,
-
                            self.filter.clone(),
+
                            filter,
                            &self.config,
                        ),
                    );
@@ -1053,6 +1073,16 @@ where
        false
    }

+
    /// Return a new filter object, based on our tracking policy.
+
    fn filter(&self) -> Filter {
+
        if self.config.policy == tracking::Policy::Track {
+
            // TODO: Remove bits for blocked repos.
+
            Filter::default()
+
        } else {
+
            self.filter.clone()
+
        }
+
    }
+

    ////////////////////////////////////////////////////////////////////////////
    // Periodic tasks
    ////////////////////////////////////////////////////////////////////////////
@@ -1242,14 +1272,6 @@ impl fmt::Display for DisconnectReason {
    }
}

-
impl<R, A, S, G> Iterator for Service<R, A, S, G> {
-
    type Item = reactor::Io;
-

-
    fn next(&mut self) -> Option<Self::Item> {
-
        self.reactor.next()
-
    }
-
}
-

/// Result of a project lookup.
#[derive(Debug)]
pub struct Lookup {
modified radicle-node/src/service/tracking.rs
@@ -62,34 +62,30 @@ impl Config {

    /// Check if a repository is tracked.
    pub fn is_repo_tracked(&self, id: &Id) -> Result<bool, Error> {
-
        if self.default == Policy::Track {
-
            return Ok(true);
-
        }
-
        self.store.is_repo_tracked(id)
+
        self.repo_policy(id).map(|policy| policy == Policy::Track)
    }

    /// Check if a node is tracked.
    pub fn is_node_tracked(&self, id: &NodeId) -> Result<bool, Error> {
-
        if self.default == Policy::Track {
-
            return Ok(true);
-
        }
-
        self.store.is_node_tracked(id)
+
        self.node_policy(id).map(|policy| policy == Policy::Track)
    }

    /// Get a node's tracking information.
-
    pub fn node_entry(&self, id: &NodeId) -> Result<(Option<Alias>, Policy), Error> {
-
        if let Some(result) = self.store.node_entry(id)? {
-
            return Ok(result);
+
    /// Returns the default policy if the node isn't found.
+
    pub fn node_policy(&self, id: &NodeId) -> Result<Policy, Error> {
+
        if let Some((_, policy)) = self.store.node_entry(id)? {
+
            return Ok(policy);
        }
-
        Ok((None, self.default))
+
        Ok(self.default)
    }

    /// Get a repository's tracking information.
-
    pub fn repo_entry(&self, id: &Id) -> Result<(Scope, Policy), Error> {
-
        if let Some(result) = self.store.repo_entry(id)? {
-
            return Ok(result);
+
    /// Returns the default policy if the repo isn't found.
+
    pub fn repo_policy(&self, id: &Id) -> Result<Policy, Error> {
+
        if let Some((_, policy)) = self.store.repo_entry(id)? {
+
            return Ok(policy);
        }
-
        Ok((Scope::All, self.default))
+
        Ok(self.default)
    }
}

modified radicle-node/src/service/tracking/store.rs
@@ -1,3 +1,4 @@
+
#![allow(clippy::type_complexity)]
use std::path::Path;
use std::str::FromStr;
use std::{fmt, io};
@@ -247,35 +248,37 @@ impl Config {
    }

    /// Get node tracking entries.
-
    pub fn node_entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, Alias)>>, Error> {
+
    pub fn node_entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, Alias, Policy)>>, Error> {
        let mut stmt = self
            .db
-
            .prepare("SELECT id, alias FROM `node-policies`")?
+
            .prepare("SELECT id, alias, policy FROM `node-policies`")?
            .into_iter();
        let mut entries = Vec::new();

        while let Some(Ok(row)) = stmt.next() {
            let id = row.read("id");
            let alias = row.read::<&str, _>("alias");
+
            let policy = row.read::<Policy, _>("policy");

-
            entries.push((id, alias.to_owned()));
+
            entries.push((id, alias.to_owned(), policy));
        }
        Ok(Box::new(entries.into_iter()))
    }

    /// Get repository tracking entries.
-
    pub fn repo_entries(&self) -> Result<Box<dyn Iterator<Item = (Id, Scope)>>, Error> {
+
    pub fn repo_entries(&self) -> Result<Box<dyn Iterator<Item = (Id, Scope, Policy)>>, Error> {
        let mut stmt = self
            .db
-
            .prepare("SELECT id, scope FROM `repo-policies`")?
+
            .prepare("SELECT id, scope, policy FROM `repo-policies`")?
            .into_iter();
        let mut entries = Vec::new();

        while let Some(Ok(row)) = stmt.next() {
            let id = row.read("id");
            let scope = row.read("scope");
+
            let policy = row.read::<Policy, _>("policy");

-
            entries.push((id, scope));
+
            entries.push((id, scope, policy));
        }
        Ok(Box::new(entries.into_iter()))
    }
@@ -321,9 +324,9 @@ mod test {
            assert!(db.track_node(id, None).unwrap());
        }
        let mut entries = db.node_entries().unwrap();
-
        assert_matches!(entries.next(), Some((id, _)) if id == ids[0]);
-
        assert_matches!(entries.next(), Some((id, _)) if id == ids[1]);
-
        assert_matches!(entries.next(), Some((id, _)) if id == ids[2]);
+
        assert_matches!(entries.next(), Some((id, _, _)) if id == ids[0]);
+
        assert_matches!(entries.next(), Some((id, _, _)) if id == ids[1]);
+
        assert_matches!(entries.next(), Some((id, _, _)) if id == ids[2]);
    }

    #[test]
@@ -335,9 +338,9 @@ mod test {
            assert!(db.track_repo(id, Scope::All).unwrap());
        }
        let mut entries = db.repo_entries().unwrap();
-
        assert_matches!(entries.next(), Some((id, _)) if id == ids[0]);
-
        assert_matches!(entries.next(), Some((id, _)) if id == ids[1]);
-
        assert_matches!(entries.next(), Some((id, _)) if id == ids[2]);
+
        assert_matches!(entries.next(), Some((id, _, _)) if id == ids[0]);
+
        assert_matches!(entries.next(), Some((id, _, _)) if id == ids[1]);
+
        assert_matches!(entries.next(), Some((id, _, _)) if id == ids[2]);
    }

    #[test]
modified radicle-node/src/test/simulator.rs
@@ -346,7 +346,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
        for peer in nodes.values_mut() {
            let id = peer.id();

-
            for o in peer.by_ref() {
+
            while let Some(o) = peer.next() {
                self.schedule(&id, o);
            }
        }
@@ -417,7 +417,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        p.fetched(f, result);
                    }
                }
-
                for o in p.by_ref() {
+
                while let Some(o) = p.next() {
                    self.schedule(&node, o);
                }
            } else {