Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: add block command to control socket
✗ CI failure Adrian Duke committed 2 months ago
commit cd239a48fcd3c569958f8c7462de840d9fddf26c
parent 03bbe52417f131ec721a7c1cc57185019f0298a3
1 failed (1 total) View logs
7 files changed +78 -5
modified crates/radicle-node/src/control.rs
@@ -170,6 +170,14 @@ where
                return Err(CommandError::Runtime(e));
            }
        },
+
        Command::Block { nid } => match handle.block(nid) {
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
+
            }
+
            Err(e) => {
+
                return Err(CommandError::Runtime(e));
+
            }
+
        },
        Command::Unfollow { nid } => match handle.unfollow(nid) {
            Ok(result) => {
                CommandResult::updated(result).to_writer(writer)?;
modified crates/radicle-node/src/runtime/handle.rs
@@ -247,6 +247,12 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

+
    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::Block(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
        let (sender, receiver) = chan::bounded(1);
        self.command(service::Command::Seed(id, scope, sender))?;
modified crates/radicle-node/src/test/handle.rs
@@ -18,6 +18,7 @@ pub struct Handle {
    pub updates: Arc<Mutex<Vec<(RepoId, PublicKey)>>>,
    pub seeding: Arc<Mutex<HashSet<RepoId>>>,
    pub following: Arc<Mutex<HashSet<NodeId>>>,
+
    pub blocked: Arc<Mutex<HashSet<NodeId>>>,
}

impl radicle::node::Handle for Handle {
@@ -85,6 +86,7 @@ impl radicle::node::Handle for Handle {
    }

    fn follow(&mut self, id: NodeId, _alias: Option<Alias>) -> Result<bool, Self::Error> {
+
        self.blocked.lock().unwrap().remove(&id);
        Ok(self.following.lock().unwrap().insert(id))
    }

@@ -93,7 +95,14 @@ impl radicle::node::Handle for Handle {
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error> {
-
        Ok(self.following.lock().unwrap().remove(&id))
+
        let f = self.following.lock().unwrap().remove(&id);
+
        let b = self.blocked.lock().unwrap().remove(&id);
+
        Ok(f || b)
+
    }
+

+
    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
+
        self.following.lock().unwrap().remove(&id);
+
        Ok(self.blocked.lock().unwrap().insert(id))
    }

    fn announce_refs_for(
modified crates/radicle-protocol/src/service.rs
@@ -271,6 +271,8 @@ pub enum Command {
    Follow(NodeId, Option<Alias>, chan::Sender<bool>),
    /// Unfollow the given node.
    Unfollow(NodeId, chan::Sender<bool>),
+
    /// Block the given node.
+
    Block(NodeId, chan::Sender<bool>),
    /// Query the internal service state.
    QueryState(Arc<QueryState>, chan::Sender<Result<(), CommandError>>),
}
@@ -291,6 +293,7 @@ impl fmt::Debug for Command {
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
+
            Self::Block(id, _) => write!(f, "Block({id})"),
            Self::QueryState { .. } => write!(f, "QueryState(..)"),
        }
    }
@@ -922,6 +925,16 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.send(updated).ok();
            }
+
            Command::Block(id, resp) => {
+
                let updated = self
+
                    .policies
+
                    .set_follow_policy(&id, policy::Policy::Block)
+
                    .expect("Service::command: error blocking node");
+
                if updated {
+
                    self.outbox.disconnect(id, DisconnectReason::Policy);
+
                }
+
                resp.send(updated).ok();
+
            }
            Command::AnnounceRefs(id, namespaces, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
@@ -1265,6 +1278,12 @@ where
        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
        self.emitter.emit(Event::PeerConnected { nid: remote });

+
        if let Ok(true) = self.policies.is_blocked(&remote) {
+
            debug!(target: "service", "Disconnecting blocked peer {remote}");
+
            self.outbox.disconnect(remote, DisconnectReason::Policy);
+
            return;
+
        }
+

        let msgs = self.initial(link);

        if link.is_outbound() {
@@ -1397,6 +1416,7 @@ where
                DisconnectReason::Session(e) => e.severity(),
                DisconnectReason::Command
                | DisconnectReason::Conflict
+
                | DisconnectReason::Policy
                | DisconnectReason::SelfConnection => Severity::Low,
            };

@@ -2430,6 +2450,7 @@ where
                    .filter(|entry| !entry.address.banned)
                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
                    .filter(|entry| !self.sessions.contains_key(&entry.node))
+
                    .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
                    .filter(|entry| &entry.node != self.nid())
                    .filter(|entry| self.is_supported_address(&entry.address.addr))
@@ -2597,6 +2618,9 @@ where

        for (nid, session) in self.sessions.iter_mut() {
            if self.config.is_persistent(nid) {
+
                if self.policies.is_blocked(nid).unwrap_or(false) {
+
                    continue;
+
                }
                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
                    // even a successful attempt means that we're unlikely to be able to reconnect.
@@ -2728,6 +2752,8 @@ pub enum DisconnectReason {
    Conflict,
    /// Connection to self.
    SelfConnection,
+
    /// Peer is blocked by policy
+
    Policy,
    /// User requested disconnect
    Command,
}
@@ -2756,6 +2782,7 @@ impl fmt::Display for DisconnectReason {
            Self::Command => write!(f, "command"),
            Self::SelfConnection => write!(f, "self-connection"),
            Self::Conflict => write!(f, "conflict"),
+
            Self::Policy => write!(f, "policy"),
            Self::Session(err) => write!(f, "{err}"),
            Self::Fetch(err) => write!(f, "fetch: {err}"),
        }
modified crates/radicle/src/node.rs
@@ -907,6 +907,7 @@ pub trait Handle: Clone + Sync + Send {
    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Self::Error>;
    /// Start following the given peer.
    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
+
    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error>;
    /// Un-seed the given repo and delete it from storage.
    fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
    /// Unfollow the given peer.
@@ -1191,6 +1192,13 @@ impl Handle for Node {
        Ok(response.updated)
    }

+
    fn block(&mut self, nid: NodeId) -> Result<bool, Error> {
+
        let mut lines = self.call::<Success>(Command::Block { nid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;
+

+
        Ok(response.updated)
+
    }
+

    fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
        let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
        let response = lines.next().ok_or(Error::EmptyResponse)??;
modified crates/radicle/src/node/command.rs
@@ -120,6 +120,10 @@ pub enum Command {
    #[serde(rename_all = "camelCase")]
    Unfollow { nid: NodeId },

+
    /// Block the given node.
+
    #[serde(rename_all = "camelCase")]
+
    Block { nid: NodeId },
+

    /// Get the node's status.
    Status,

modified crates/radicle/src/node/policy/store.rs
@@ -119,14 +119,15 @@ impl Store<Write> {
    /// Follow a node.
    pub fn follow(&mut self, id: &NodeId, alias: Option<&Alias>) -> Result<bool, Error> {
        let mut stmt = self.db.prepare(
-
            "INSERT INTO `following` (id, alias)
-
             VALUES (?1, ?2)
-
             ON CONFLICT DO UPDATE
-
             SET alias = ?2 WHERE alias != ?2",
+
            "INSERT INTO `following` (id, alias, policy)
+
             VALUES (?1, ?2, ?3)
+
             ON CONFLICT (id) DO UPDATE
+
             SET alias = ?2, policy = ?3 WHERE alias != ?2 OR policy != ?3",
        )?;

        stmt.bind((1, id))?;
        stmt.bind((2, alias.map_or("", |alias| alias.as_str())))?;
+
        stmt.bind((3, Policy::Allow))?;
        stmt.next()?;

        Ok(self.db.change_count() > 0)
@@ -248,6 +249,16 @@ impl<T> Store<T> {
        ))
    }

+
    pub fn is_blocked(&self, id: &NodeId) -> Result<bool, Error> {
+
        Ok(matches!(
+
            self.follow_policy(id)?,
+
            Some(FollowPolicy {
+
                policy: Policy::Block,
+
                ..
+
            })
+
        ))
+
    }
+

    /// Get a node's follow policy.
    pub fn follow_policy(&self, id: &NodeId) -> Result<Option<FollowPolicy>, Error> {
        let mut stmt = self