Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
node: add block command to control socket
Merged ade opened 2 months ago

Introduces the ability to explicitly block a peer via the node control socket. Previously, the node only exposed follow and unfollow commands. While the underlying policy database schema supported a Block variant, there was no mechanism to trigger this state via the client handle.

The new block command:

  1. Updates the node’s follow policy to Block.
  2. Immediately disconnects the peer if a session is active.
  3. Prevents future inbound and outbound connections to that peer.
  4. Filters the peer out of the available peers list.
11 files changed +173 -5 4286590f d88ef3fa
modified CHANGELOG.md
@@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## New Features

+
- The block policy for `NodeId`'s is used for limiting the namespaces fetched
+
  from other nodes. It is now also extended to block connections to the blocked
+
  `NodeId`.
- The set of references returned by `references_of` were restricted to `heads`,
  `tags`, `notes`, `rad`, and `cobs`. The restriction is lifted, and the only
  references filtered out are `refs/tmp/heads` – used by `radicle-remote-helper`
modified crates/radicle-node/src/control.rs
@@ -170,6 +170,14 @@ where
                return Err(CommandError::Runtime(e));
            }
        },
+
        Command::Block { nid } => match handle.block(nid) {
+
            Ok(result) => {
+
                CommandResult::updated(result).to_writer(writer)?;
+
            }
+
            Err(e) => {
+
                return Err(CommandError::Runtime(e));
+
            }
+
        },
        Command::Unfollow { nid } => match handle.unfollow(nid) {
            Ok(result) => {
                CommandResult::updated(result).to_writer(writer)?;
modified crates/radicle-node/src/runtime/handle.rs
@@ -247,6 +247,12 @@ impl radicle::node::Handle for Handle {
        Ok(receiver.recv()??)
    }

+
    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::Block(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
        let (responder, receiver) = service::command::Responder::oneshot();
        self.command(service::Command::Seed(id, scope, responder))?;
modified crates/radicle-node/src/test/handle.rs
@@ -18,6 +18,7 @@ pub struct Handle {
    pub updates: Arc<Mutex<Vec<(RepoId, PublicKey)>>>,
    pub seeding: Arc<Mutex<HashSet<RepoId>>>,
    pub following: Arc<Mutex<HashSet<NodeId>>>,
+
    pub blocked: Arc<Mutex<HashSet<NodeId>>>,
}

impl radicle::node::Handle for Handle {
@@ -85,6 +86,7 @@ impl radicle::node::Handle for Handle {
    }

    fn follow(&mut self, id: NodeId, _alias: Option<Alias>) -> Result<bool, Self::Error> {
+
        self.blocked.lock().unwrap().remove(&id);
        Ok(self.following.lock().unwrap().insert(id))
    }

@@ -93,7 +95,14 @@ impl radicle::node::Handle for Handle {
    }

    fn unfollow(&mut self, id: NodeId) -> Result<bool, Self::Error> {
-
        Ok(self.following.lock().unwrap().remove(&id))
+
        let f = self.following.lock().unwrap().remove(&id);
+
        let b = self.blocked.lock().unwrap().remove(&id);
+
        Ok(f || b)
+
    }
+

+
    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
+
        self.following.lock().unwrap().remove(&id);
+
        Ok(self.blocked.lock().unwrap().insert(id))
    }

    fn announce_refs_for(
modified crates/radicle-node/src/tests/e2e.rs
@@ -1663,3 +1663,85 @@ fn test_non_fastforward_identity_doc() {
    assert_eq!(identity.current, next);
    assert_eq!(identity.parent, Some(prev));
}
+

+
#[test]
+
fn test_block_active_connection() {
+
    let tmp = tempfile::tempdir().unwrap();
+
    let alice = Node::init(tmp.path(), config::relay("alice"));
+
    let bob = Node::init(tmp.path(), config::relay("bob"));
+

+
    let mut alice = alice.spawn();
+
    let bob = bob.spawn();
+

+
    alice.connect(&bob);
+
    converge([&alice, &bob]);
+

+
    let events = alice.handle.events();
+
    assert!(alice.handle.block(bob.id).unwrap());
+

+
    events
+
        .wait(
+
            |e| matches!(e, Event::PeerDisconnected { nid, .. } if *nid == bob.id).then_some(()),
+
            DEFAULT_TIMEOUT,
+
        )
+
        .unwrap();
+

+
    let sessions = alice.handle.sessions().unwrap();
+
    assert!(sessions.iter().all(|s| s.nid != bob.id));
+
}
+

+
#[test]
+
fn test_block_prevents_connection() {
+
    let tmp = tempfile::tempdir().unwrap();
+
    let alice = Node::init(tmp.path(), config::relay("alice"));
+
    let bob = Node::init(tmp.path(), config::relay("bob"));
+

+
    let mut alice = alice.spawn();
+
    let mut bob = bob.spawn();
+

+
    assert!(alice.handle.block(bob.id).unwrap());
+

+
    let result = alice
+
        .handle
+
        .connect(bob.id, bob.addr.into(), ConnectOptions::default())
+
        .unwrap();
+

+
    assert_matches!(result, ConnectResult::Disconnected { .. });
+

+
    let events = alice.handle.events();
+
    bob.connect(&alice);
+

+
    // Alice should acknowledge the connection but drop it immediately.
+
    // We wait for `PeerConnected` because `PeerDisconnected` might not be
+
    // emitted if the session wasn't fully established. The important part
+
    // is that the session is NOT in the active sessions list.
+
    events
+
        .wait(
+
            |e| matches!(e, Event::PeerConnected { nid } if *nid == bob.id).then_some(()),
+
            time::Duration::from_secs(10),
+
        )
+
        .unwrap();
+

+
    let sessions = alice.handle.sessions().unwrap();
+
    assert!(sessions.iter().all(|s| s.nid != bob.id));
+
}
+

+
#[test]
+
fn test_block_prevents_fetch() {
+
    let tmp = tempfile::tempdir().unwrap();
+
    let alice = Node::init(tmp.path(), config::relay("alice"));
+
    let mut bob = Node::init(tmp.path(), config::relay("bob"));
+
    let rid = bob.project("acme", "");
+

+
    let mut alice = alice.spawn();
+
    let bob = bob.spawn();
+

+
    assert!(alice.handle.block(bob.id).unwrap());
+

+
    let result = alice
+
        .handle
+
        .fetch(rid, bob.id, time::Duration::from_secs(5))
+
        .unwrap();
+

+
    assert_matches!(result, FetchResult::Failed { .. });
+
}
modified crates/radicle-protocol/src/service.rs
@@ -230,6 +230,8 @@ pub enum ConnectError {
        "attempted connection to {nid}, via {addr} but addresses of this kind are not supported"
    )]
    UnsupportedAddress { nid: NodeId, addr: Address },
+
    #[error("attempted connection with blocked peer {nid}")]
+
    Blocked { nid: NodeId },
}

/// A store for all node data.
@@ -856,6 +858,16 @@ where
                    .expect("Service::command: error unfollowing node");
                resp.ok(updated).ok();
            }
+
            Command::Block(id, resp) => {
+
                let updated = self
+
                    .policies
+
                    .set_follow_policy(&id, policy::Policy::Block)
+
                    .expect("Service::command: error blocking node");
+
                if updated {
+
                    self.outbox.disconnect(id, DisconnectReason::Policy);
+
                }
+
                resp.send(updated).ok();
+
            }
            Command::AnnounceRefs(id, namespaces, resp) => {
                let doc = match self.storage.get(id) {
                    Ok(Some(doc)) => doc,
@@ -1212,6 +1224,12 @@ where
        info!(target: "service", "Connected to {remote} ({addr}) ({link:?})");
        self.emitter.emit(Event::PeerConnected { nid: remote });

+
        if let Ok(true) = self.policies.is_blocked(&remote) {
+
            info!(target: "service", "Disconnecting blocked peer {remote}");
+
            self.outbox.disconnect(remote, DisconnectReason::Policy);
+
            return;
+
        }
+

        let msgs = self.initial(link);

        if link.is_outbound() {
@@ -1344,6 +1362,7 @@ where
                DisconnectReason::Session(e) => e.severity(),
                DisconnectReason::Command
                | DisconnectReason::Conflict
+
                | DisconnectReason::Policy
                | DisconnectReason::SelfConnection => Severity::Low,
            };

@@ -2153,6 +2172,9 @@ where
        if nid == self.node_id() {
            return Err(ConnectError::SelfConnection);
        }
+
        if let Ok(true) = self.policies.is_blocked(&nid) {
+
            return Err(ConnectError::Blocked { nid });
+
        }
        if !self.is_supported_address(&addr) {
            return Err(ConnectError::UnsupportedAddress { nid, addr });
        }
@@ -2386,6 +2408,7 @@ where
                    .filter(|entry| !entry.address.banned)
                    .filter(|entry| !entry.penalty.is_connect_threshold_reached())
                    .filter(|entry| !self.sessions.contains_key(&entry.node))
+
                    .filter(|entry| !self.policies.is_blocked(&entry.node).unwrap_or(false))
                    .filter(|entry| !self.config.external_addresses.contains(&entry.address.addr))
                    .filter(|entry| &entry.node != self.nid())
                    .filter(|entry| self.is_supported_address(&entry.address.addr))
@@ -2553,6 +2576,9 @@ where

        for (nid, session) in self.sessions.iter_mut() {
            if self.config.is_persistent(nid) {
+
                if self.policies.is_blocked(nid).unwrap_or(false) {
+
                    continue;
+
                }
                if let session::State::Disconnected { retry_at, .. } = &mut session.state {
                    // TODO: Try to reconnect only if the peer was attempted. A disconnect without
                    // even a successful attempt means that we're unlikely to be able to reconnect.
@@ -2684,6 +2710,8 @@ pub enum DisconnectReason {
    Conflict,
    /// Connection to self.
    SelfConnection,
+
    /// Peer is blocked by policy
+
    Policy,
    /// User requested disconnect
    Command,
}
@@ -2712,6 +2740,7 @@ impl fmt::Display for DisconnectReason {
            Self::Command => write!(f, "command"),
            Self::SelfConnection => write!(f, "self-connection"),
            Self::Conflict => write!(f, "conflict"),
+
            Self::Policy => write!(f, "policy"),
            Self::Session(err) => write!(f, "{err}"),
            Self::Fetch(err) => write!(f, "fetch: {err}"),
        }
modified crates/radicle-protocol/src/service/command.rs
@@ -99,6 +99,8 @@ pub enum Command {
    Follow(NodeId, Option<Alias>, Responder<bool>),
    /// Unfollow the given node.
    Unfollow(NodeId, Responder<bool>),
+
    /// Block the given node.
+
    Block(NodeId, Sender<bool>),
    /// Query the internal service state.
    QueryState(Arc<QueryState>, Sender<Result<()>>),
}
@@ -194,6 +196,7 @@ impl fmt::Debug for Command {
            Self::Unseed(id, _) => write!(f, "Unseed({id})"),
            Self::Follow(id, _, _) => write!(f, "Follow({id})"),
            Self::Unfollow(id, _) => write!(f, "Unfollow({id})"),
+
            Self::Block(id, _) => write!(f, "Block({id})"),
            Self::QueryState { .. } => write!(f, "QueryState(..)"),
        }
    }
modified crates/radicle/CHANGELOG.md
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

+
- `radicle::node::Handle` added a `block` method to allow setting the follow
+
  policy to `Policy::Block`.
- `radicle::node::Handle::announce_refs_for` now allows specifying for which
  namespaces changes should be announced. A corresponding enum variant
  `radicle::node::Command::AnnounceRefsFor` is added.
modified crates/radicle/src/node.rs
@@ -907,6 +907,8 @@ pub trait Handle: Clone + Sync + Send {
    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Self::Error>;
    /// Start following the given peer.
    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Self::Error>;
+
    /// Set the following policy to block for the given peer.
+
    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error>;
    /// Un-seed the given repo and delete it from storage.
    fn unseed(&mut self, id: RepoId) -> Result<bool, Self::Error>;
    /// Unfollow the given peer.
@@ -1191,6 +1193,13 @@ impl Handle for Node {
        Ok(response.updated)
    }

+
    fn block(&mut self, nid: NodeId) -> Result<bool, Error> {
+
        let mut lines = self.call::<Success>(Command::Block { nid }, DEFAULT_TIMEOUT)?;
+
        let response = lines.next().ok_or(Error::EmptyResponse)??;
+

+
        Ok(response.updated)
+
    }
+

    fn seed(&mut self, rid: RepoId, scope: policy::Scope) -> Result<bool, Error> {
        let mut lines = self.call::<Success>(Command::Seed { rid, scope }, DEFAULT_TIMEOUT)?;
        let response = lines.next().ok_or(Error::EmptyResponse)??;
modified crates/radicle/src/node/command.rs
@@ -120,6 +120,10 @@ pub enum Command {
    #[serde(rename_all = "camelCase")]
    Unfollow { nid: NodeId },

+
    /// Block the given node.
+
    #[serde(rename_all = "camelCase")]
+
    Block { nid: NodeId },
+

    /// Get the node's status.
    Status,

modified crates/radicle/src/node/policy/store.rs
@@ -119,14 +119,15 @@ impl Store<Write> {
    /// Follow a node.
    pub fn follow(&mut self, id: &NodeId, alias: Option<&Alias>) -> Result<bool, Error> {
        let mut stmt = self.db.prepare(
-
            "INSERT INTO `following` (id, alias)
-
             VALUES (?1, ?2)
-
             ON CONFLICT DO UPDATE
-
             SET alias = ?2 WHERE alias != ?2",
+
            "INSERT INTO `following` (id, alias, policy)
+
             VALUES (?1, ?2, ?3)
+
             ON CONFLICT (id) DO UPDATE
+
             SET alias = ?2, policy = ?3 WHERE alias != ?2 OR policy != ?3",
        )?;

        stmt.bind((1, id))?;
        stmt.bind((2, alias.map_or("", |alias| alias.as_str())))?;
+
        stmt.bind((3, Policy::Allow))?;
        stmt.next()?;

        Ok(self.db.change_count() > 0)
@@ -248,6 +249,18 @@ impl<T> Store<T> {
        ))
    }

+
    /// Returns `true` if there is a follow policy for the given node, and that
+
    /// policy is [`Policy::Block`].
+
    pub fn is_blocked(&self, id: &NodeId) -> Result<bool, Error> {
+
        Ok(matches!(
+
            self.follow_policy(id)?,
+
            Some(FollowPolicy {
+
                policy: Policy::Block,
+
                ..
+
            })
+
        ))
+
    }
+

    /// Get a node's follow policy.
    pub fn follow_policy(&self, id: &NodeId) -> Result<Option<FollowPolicy>, Error> {
        let mut stmt = self