Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add tracking configuration store
Alexis Sellier committed 3 years ago
commit 1007756238a18909c95d00bf7484101e87097683
parent db3568db2fd364f04c5cc4545c9a660765e80649
12 files changed +605 -145
modified radicle-crypto/src/lib.rs
@@ -369,7 +369,7 @@ impl TryFrom<&sqlite::Value> for PublicKey {
            }),
            _ => Err(sqlite::Error {
                code: None,
-
                message: None,
+
                message: Some("sql: invalid type for public key".to_owned()),
            }),
        }
    }
modified radicle-node/src/address/store.rs
@@ -31,7 +31,7 @@ pub struct Book {

impl fmt::Debug for Book {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        write!(f, "Cache(..)")
+
        write!(f, "Book(..)")
    }
}

@@ -234,13 +234,13 @@ impl TryFrom<&sql::Value> for Address {

    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
        match value {
-
            sql::Value::String(s) => Address::from_str(s.as_str()).map_err(|_| sql::Error {
+
            sql::Value::String(s) => Address::from_str(s.as_str()).map_err(|e| sql::Error {
                code: None,
-
                message: None,
+
                message: Some(e.to_string()),
            }),
            _ => Err(sql::Error {
                code: None,
-
                message: None,
+
                message: Some("sql: invalid type for address".to_owned()),
            }),
        }
    }
@@ -256,20 +256,18 @@ impl TryFrom<&sql::Value> for Source {
    type Error = sql::Error;

    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        let err = sql::Error {
+
            code: None,
+
            message: Some("sql: invalid source".to_owned()),
+
        };
        match value {
            sql::Value::String(s) => match s.as_str() {
                "dns" => Ok(Source::Dns),
                "peer" => Ok(Source::Peer),
                "imported" => Ok(Source::Imported),
-
                _ => Err(sql::Error {
-
                    code: None,
-
                    message: None,
-
                }),
+
                _ => Err(err),
            },
-
            _ => Err(sql::Error {
-
                code: None,
-
                message: None,
-
            }),
+
            _ => Err(err),
        }
    }
}
@@ -288,21 +286,19 @@ impl TryFrom<&sql::Value> for AddressType {
    type Error = sql::Error;

    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        let err = sql::Error {
+
            code: None,
+
            message: Some("sql: invalid address type".to_owned()),
+
        };
        match value {
            sql::Value::String(s) => match s.as_str() {
                "ipv4" => Ok(AddressType::Ipv4),
                "ipv6" => Ok(AddressType::Ipv6),
                "hostname" => Ok(AddressType::Hostname),
                "onion" => Ok(AddressType::Onion),
-
                _ => Err(sql::Error {
-
                    code: None,
-
                    message: None,
-
                }),
+
                _ => Err(err),
            },
-
            _ => Err(sql::Error {
-
                code: None,
-
                message: None,
-
            }),
+
            _ => Err(err),
        }
    }
}
modified radicle-node/src/client.rs
@@ -8,7 +8,7 @@ use radicle::crypto::Signer;

use crate::clock::RefClock;
use crate::profile::Profile;
-
use crate::service::routing;
+
use crate::service::{routing, tracking};
use crate::wire::transcode::NoHandshake;
use crate::wire::Wire;
use crate::{address, service};
@@ -21,6 +21,8 @@ pub const NODE_DIR: &str = "node";
pub const ROUTING_DB_FILE: &str = "routing.db";
/// Filename of address database under [`NODE_DIR`].
pub const ADDRESS_DB_FILE: &str = "addresses.db";
+
/// Filename of tracking table database under [`NODE_DIR`].
+
pub const TRACKING_DB_FILE: &str = "tracking.db";

/// A client error.
#[derive(Error, Debug)]
@@ -31,6 +33,9 @@ pub enum Error {
    /// An address database error.
    #[error("address database error: {0}")]
    Addresses(#[from] address::Error),
+
    /// A tracking database error.
+
    #[error("tracking database error: {0}")]
+
    Tracking(#[from] tracking::Error),
    /// An I/O error.
    #[error("i/o error: {0}")]
    Io(#[from] io::Error),
@@ -111,6 +116,7 @@ impl<R: Reactor> Client<R> {
        let node_dir = profile.home.join(NODE_DIR);
        let address_db = node_dir.join(ADDRESS_DB_FILE);
        let routing_db = node_dir.join(ROUTING_DB_FILE);
+
        let tracking_db = node_dir.join(TRACKING_DB_FILE);

        log::info!("Opening address book {}..", address_db.display());
        let addresses = address::Book::open(address_db)?;
@@ -118,6 +124,9 @@ impl<R: Reactor> Client<R> {
        log::info!("Opening routing table {}..", routing_db.display());
        let routing = routing::Table::open(routing_db)?;

+
        log::info!("Opening tracking policy table {}..", tracking_db.display());
+
        let tracking = tracking::Config::open(tracking_db)?;
+

        log::info!("Initializing client ({:?})..", network);

        let service = service::Service::new(
@@ -126,6 +135,7 @@ impl<R: Reactor> Client<R> {
            routing,
            storage,
            addresses,
+
            tracking,
            signer,
            rng,
        );
modified radicle-node/src/service.rs
@@ -1,9 +1,11 @@
+
#![allow(clippy::too_many_arguments)]
pub mod config;
pub mod filter;
pub mod message;
pub mod reactor;
pub mod routing;
pub mod session;
+
pub mod tracking;

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
@@ -30,7 +32,6 @@ use crate::git;
use crate::identity::{Doc, Id};
use crate::node;
use crate::prelude::*;
-
use crate::service::config::ProjectTracking;
use crate::service::message::{Address, Announcement, AnnouncementMessage, Ping};
use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::storage;
@@ -190,6 +191,8 @@ pub struct Service<R, A, S, G> {
    routing: R,
    /// Node address manager.
    addresses: A,
+
    /// Tracking policy configuration.
+
    tracking: tracking::Config,
    /// State relating to gossip.
    gossip: Gossip,
    /// Peer sessions, currently or recently connected.
@@ -204,6 +207,8 @@ pub struct Service<R, A, S, G> {
    rng: Rng,
    /// Whether our local inventory no long represents what we have announced to the network.
    out_of_sync: bool,
+
    /// Current tracked repository bloom filter.
+
    filter: Filter,
    /// Last time the service was idle.
    last_idle: LocalTime,
    /// Last time the service synced.
@@ -244,6 +249,7 @@ where
        routing: R,
        storage: S,
        addresses: A,
+
        tracking: tracking::Config,
        signer: G,
        rng: Rng,
    ) -> Self {
@@ -253,6 +259,7 @@ where
            config,
            storage,
            addresses,
+
            tracking,
            signer,
            rng,
            clock,
@@ -263,6 +270,7 @@ where
            reactor: Reactor::default(),
            sessions,
            out_of_sync: false,
+
            filter: Filter::empty(),
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
            last_prune: LocalTime::default(),
@@ -282,34 +290,30 @@ where
        }
    }

-
    pub fn tracked(&self) -> Result<Vec<Id>, storage::Error> {
-
        let tracked = match &self.config.project_tracking {
-
            ProjectTracking::All { blocked } => self
-
                .storage
-
                .inventory()?
-
                .into_iter()
-
                .filter(|id| !blocked.contains(id))
-
                .collect(),
-

-
            ProjectTracking::Allowed(projs) => projs.iter().cloned().collect(),
-
        };
-

-
        Ok(tracked)
-
    }
-

    /// Track a project.
    /// Returns whether or not the tracking policy was updated.
-
    pub fn track(&mut self, id: Id) -> bool {
-
        self.out_of_sync = self.config.track(id);
-
        self.out_of_sync
+
    pub fn track(&mut self, id: &Id, scope: tracking::Scope) -> Result<bool, tracking::Error> {
+
        self.out_of_sync = self.tracking.track_repo(id, scope)?;
+
        self.filter.insert(id);
+

+
        Ok(self.out_of_sync)
    }

    /// Untrack a project.
    /// Returns whether or not the tracking policy was updated.
    /// Note that when untracking, we don't announce anything to the network. This is because by
    /// simply not announcing it anymore, it will eventually be pruned by nodes.
-
    pub fn untrack(&mut self, id: Id) -> bool {
-
        self.config.untrack(id)
+
    pub fn untrack(&mut self, id: &Id) -> Result<bool, tracking::Error> {
+
        // Nb. This is potentially slow if we have lots of projects. We should probably
+
        // only re-compute the filter when we've untracked a certain amount of projects
+
        // and the filter is really out of date.
+
        self.filter = Filter::new(self.tracking.repo_entries()?.map(|(e, _)| e));
+
        self.tracking.untrack_repo(id)
+
    }
+

+
    /// Check whether we are tracking a certain repository.
+
    pub fn is_tracking(&self, id: &Id) -> Result<bool, tracking::Error> {
+
        self.tracking.is_repo_tracked(id)
    }

    /// Find the closest `n` peers by proximity in tracking graphs.
@@ -340,6 +344,11 @@ where
        &mut self.storage
    }

+
    /// Get the tracking policy.
+
    pub fn tracking(&self) -> &tracking::Config {
+
        &self.tracking
+
    }
+

    /// Get the local signer.
    pub fn signer(&self) -> &G {
        &self.signer
@@ -425,7 +434,11 @@ where
        match cmd {
            Command::Connect(addr) => self.reactor.connect(addr),
            Command::Fetch(id, resp) => {
-
                if !self.config.is_tracking(&id) {
+
                if !self
+
                    .tracking
+
                    .is_repo_tracked(&id)
+
                    .expect("Service::command: error accessing tracking configuration")
+
                {
                    resp.send(FetchLookup::NotTracking).ok();
                    return;
                }
@@ -479,10 +492,16 @@ where
                }
            }
            Command::Track(id, resp) => {
-
                resp.send(self.track(id)).ok();
+
                let tracked = self
+
                    .track(&id, tracking::Scope::All)
+
                    .expect("Service::command: error tracking repository");
+
                resp.send(tracked).ok();
            }
            Command::Untrack(id, resp) => {
-
                resp.send(self.untrack(id)).ok();
+
                let untracked = self
+
                    .untrack(&id)
+
                    .expect("Service::command: error untracking repository");
+
                resp.send(untracked).ok();
            }
            Command::AnnounceRefs(id) => {
                if let Err(err) = self.announce_refs(id) {
@@ -531,6 +550,7 @@ where
                            self.clock.timestamp(),
                            &self.storage,
                            &self.signer,
+
                            self.filter.clone(),
                            &self.config,
                        ),
                    );
@@ -668,7 +688,11 @@ where
            AnnouncementMessage::Refs(message) => {
                // TODO: Buffer/throttle fetches.
                // TODO: Check that we're tracking this user as well.
-
                if self.config.is_tracking(&message.id) {
+
                if self
+
                    .tracking
+
                    .is_repo_tracked(&message.id)
+
                    .expect("Service::handle_announcement: error accessing tracking configuration")
+
                {
                    // Discard inventory messages we've already seen, otherwise update
                    // out last seen time.
                    if !peer.refs_announced(message.id, timestamp) {
@@ -705,6 +729,11 @@ where
                    if is_updated {
                        return Ok(relay);
                    }
+
                } else {
+
                    log::debug!(
+
                        "Ignoring refs announcement from {announcer}: repository {} isn't tracked",
+
                        message.id
+
                    );
                }
            }
            AnnouncementMessage::Node(
@@ -795,6 +824,7 @@ where
                            self.clock.timestamp(),
                            &self.storage,
                            &self.signer,
+
                            self.filter.clone(),
                            &self.config,
                        ),
                    );
@@ -890,7 +920,11 @@ where
        let mut included = HashSet::new();
        for proj_id in inventory {
            included.insert(proj_id);
-
            if self.routing.insert(*proj_id, from, *timestamp)? && self.config.is_tracking(proj_id)
+
            if self.routing.insert(*proj_id, from, *timestamp)?
+
                && self
+
                    .tracking
+
                    .is_repo_tracked(proj_id)
+
                    .expect("Service::process_inventory: error accessing tracking configuration")
            {
                log::info!("Routing table updated for {} with seed {}", proj_id, from);
            }
@@ -1264,6 +1298,7 @@ mod gossip {
        timestamp: Timestamp,
        storage: &S,
        signer: &G,
+
        filter: Filter,
        config: &Config,
    ) -> Vec<Message> {
        let inventory = match storage.inventory() {
@@ -1286,7 +1321,7 @@ mod gossip {
                    .expect("external addresses are within the limit"),
            ),
            Message::inventory(gossip::inventory(timestamp, inventory), signer),
-
            Message::subscribe(config.filter(), timestamp, Timestamp::MAX),
+
            Message::subscribe(filter, timestamp, Timestamp::MAX),
        ];
        if let Some(m) = gossip::node(timestamp, config) {
            msgs.push(Message::node(m, signer));
modified radicle-node/src/service/config.rs
@@ -1,8 +1,5 @@
use super::nakamoto::LocalDuration;

-
use crate::collections::HashSet;
-
use crate::identity::{Id, PublicKey};
-
use crate::service::filter::Filter;
use crate::service::message::Address;

/// Peer-to-peer network.
@@ -13,35 +10,6 @@ pub enum Network {
    Test,
}

-
/// Project tracking policy.
-
#[derive(Debug, Clone)]
-
pub enum ProjectTracking {
-
    /// Track all projects we come across.
-
    All { blocked: HashSet<Id> },
-
    /// Track a static list of projects.
-
    Allowed(HashSet<Id>),
-
}
-

-
impl Default for ProjectTracking {
-
    fn default() -> Self {
-
        Self::All {
-
            blocked: HashSet::default(),
-
        }
-
    }
-
}
-

-
/// Project remote tracking policy.
-
#[derive(Debug, Default, Clone)]
-
pub enum RemoteTracking {
-
    /// Only track remotes of project delegates.
-
    #[default]
-
    DelegatesOnly,
-
    /// Track all remotes.
-
    All { blocked: HashSet<PublicKey> },
-
    /// Track a specific list of users as well as the project delegates.
-
    Allowed(HashSet<PublicKey>),
-
}
-

/// Configuration parameters defining attributes of minima and maxima.
#[derive(Debug, Clone)]
pub struct Limits {
@@ -70,14 +38,11 @@ pub struct Config {
    pub external_addresses: Vec<Address>,
    /// Peer-to-peer network.
    pub network: Network,
-
    /// Project tracking policy.
-
    pub project_tracking: ProjectTracking,
-
    /// Project remote tracking policy.
-
    pub remote_tracking: RemoteTracking,
    /// Whether or not our node should relay inventories.
    pub relay: bool,
    /// List of addresses to listen on for protocol connections.
    pub listen: Vec<Address>,
+
    /// Configured service limits.
    pub limits: Limits,
}

@@ -87,8 +52,6 @@ impl Default for Config {
            connect: Vec::default(),
            external_addresses: Vec::default(),
            network: Network::default(),
-
            project_tracking: ProjectTracking::default(),
-
            remote_tracking: RemoteTracking::default(),
            relay: true,
            listen: vec![],
            limits: Limits::default(),
@@ -97,38 +60,15 @@ impl Default for Config {
}

impl Config {
-
    pub fn is_persistent(&self, addr: &Address) -> bool {
-
        self.connect.contains(addr)
-
    }
-

-
    pub fn is_tracking(&self, id: &Id) -> bool {
-
        match &self.project_tracking {
-
            ProjectTracking::All { blocked } => !blocked.contains(id),
-
            ProjectTracking::Allowed(ids) => ids.contains(id),
-
        }
-
    }
-

-
    /// Track a project. Returns whether the policy was updated.
-
    pub fn track(&mut self, id: Id) -> bool {
-
        match &mut self.project_tracking {
-
            ProjectTracking::All { .. } => false,
-
            ProjectTracking::Allowed(ids) => ids.insert(id),
-
        }
-
    }
-

-
    /// Untrack a project. Returns whether the policy was updated.
-
    pub fn untrack(&mut self, id: Id) -> bool {
-
        match &mut self.project_tracking {
-
            ProjectTracking::All { blocked } => blocked.insert(id),
-
            ProjectTracking::Allowed(ids) => ids.remove(&id),
+
    pub fn new(network: Network) -> Self {
+
        Self {
+
            network,
+
            ..Self::default()
        }
    }

-
    pub fn filter(&self) -> Filter {
-
        match &self.project_tracking {
-
            ProjectTracking::All { .. } => Filter::default(),
-
            ProjectTracking::Allowed(ids) => Filter::new(ids.iter()),
-
        }
+
    pub fn is_persistent(&self, addr: &Address) -> bool {
+
        self.connect.contains(addr)
    }

    pub fn alias(&self) -> [u8; 32] {
modified radicle-node/src/service/filter.rs
@@ -40,7 +40,7 @@ impl Filter {
    /// Create a new filter with the given items.
    ///
    /// Uses the iterator's size hint to determine the size of the filter.
-
    pub fn new<'a>(ids: impl IntoIterator<Item = &'a Id>) -> Self {
+
    pub fn new(ids: impl IntoIterator<Item = Id>) -> Self {
        let iterator = ids.into_iter();
        let (min, _) = iterator.size_hint();
        let size = bloomy::bloom::optimal_bits(min, FILTER_FP_RATE) / 8;
@@ -54,11 +54,16 @@ impl Filter {
        let mut bloom = BloomFilter::with_size(size);

        for id in iterator {
-
            bloom.insert(id);
+
            bloom.insert(&id);
        }
        Self(bloom)
    }

+
    /// Empty filter with nothing set.
+
    pub fn empty() -> Self {
+
        Self(BloomFilter::from(vec![0x0; FILTER_SIZE_S]))
+
    }
+

    /// Size in bytes.
    pub fn size(&self) -> usize {
        self.0.bits() / 8
@@ -132,13 +137,13 @@ mod test {
    #[test]
    fn test_sizes() {
        let ids = arbitrary::vec::<Id>(3420);
-
        let f = Filter::new(ids.iter().take(10));
+
        let f = Filter::new(ids.iter().cloned().take(10));
        assert_eq!(f.size(), FILTER_SIZE_S);

-
        let f = Filter::new(ids.iter().take(1000));
+
        let f = Filter::new(ids.iter().cloned().take(1000));
        assert_eq!(f.size(), FILTER_SIZE_M);

-
        let f = Filter::new(ids.iter());
+
        let f = Filter::new(ids.iter().cloned());
        assert_eq!(f.size(), FILTER_SIZE_L);

        // Just checking that iterators over hash sets give correct size hints.
added radicle-node/src/service/tracking.rs
@@ -0,0 +1,38 @@
+
mod store;
+

+
use std::str::FromStr;
+

+
pub use store::{Config, Error};
+

+
/// Tracking policy.
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+
pub enum Policy {
+
    /// The resource is tracked.
+
    Track,
+
    /// The resource is blocked.
+
    Block,
+
}
+

+
/// Tracking scope of a repository tracking policy.
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+
pub enum Scope {
+
    /// Track remotes of nodes that are already tracked.
+
    Trusted,
+
    /// Track remotes of repository delegates.
+
    DelegatesOnly,
+
    /// Track all remotes.
+
    All,
+
}
+

+
impl FromStr for Scope {
+
    type Err = ();
+

+
    fn from_str(s: &str) -> Result<Self, Self::Err> {
+
        match s {
+
            "trusted" => Ok(Self::Trusted),
+
            "delegates-only" => Ok(Self::DelegatesOnly),
+
            "all" => Ok(Self::All),
+
            _ => Err(()),
+
        }
+
    }
+
}
added radicle-node/src/service/tracking/schema.sql
@@ -0,0 +1,32 @@
+
--
+
-- Service configuration schema.
+
--
+

+
-- Node tracking policy.
+
create table if not exists "node-policies" (
+
  -- Node ID.
+
  "id"                 text      primary key not null,
+
  -- Node alias. May override the alias announced by the node.
+
  "alias"              text      default '',
+
  -- Tracking policy for this node.
+
  "policy"             text      default 'track'
+
  --
+
) strict;
+

+
-- Repository tracking policy.
+
create table if not exists "repo-policies" (
+
  -- Repository ID.
+
  "id"                 text      primary key not null,
+
  -- Tracking scope for this repository.
+
  --
+
  -- Valid values are:
+
  --
+
  -- "trusted"         track repository delegates and remotes in the `node-policies` table.
+
  -- "delegates-only"  only track repository delegates.
+
  -- "all"             track all remotes.
+
  --
+
  "scope"              text      default 'trusted',
+
  -- Tracking policy for this repository.
+
  "policy"             text      default 'track'
+
  --
+
) strict;
added radicle-node/src/service/tracking/store.rs
@@ -0,0 +1,397 @@
+
use std::path::Path;
+
use std::str::FromStr;
+
use std::{fmt, io};
+

+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::prelude::Id;
+
use crate::service::NodeId;
+

+
use super::{Policy, Scope};
+

+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// I/O error.
+
    #[error("i/o error: {0}")]
+
    Io(#[from] io::Error),
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
}
+

+
pub type Alias = String;
+

+
impl sqlite::BindableWithIndex for Scope {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        let s = match self {
+
            Self::Trusted => "trusted",
+
            Self::DelegatesOnly => "delegates-only",
+
            Self::All => "all",
+
        };
+
        s.bind(stmt, i)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for Scope {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        let message = Some("invalid remote scope".to_owned());
+

+
        match value {
+
            sql::Value::String(scope) => Scope::from_str(scope).map_err(|_| sql::Error {
+
                code: None,
+
                message,
+
            }),
+
            _ => Err(sql::Error {
+
                code: None,
+
                message,
+
            }),
+
        }
+
    }
+
}
+

+
impl sqlite::BindableWithIndex for Policy {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        match self {
+
            Self::Track => "track",
+
            Self::Block => "block",
+
        }
+
        .bind(stmt, i)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for Policy {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        let message = Some("sql: invalid policy".to_owned());
+

+
        match value {
+
            sql::Value::String(s) if s == "track" => Ok(Policy::Track),
+
            sql::Value::String(s) if s == "block" => Ok(Policy::Block),
+
            _ => Err(sql::Error {
+
                code: None,
+
                message,
+
            }),
+
        }
+
    }
+
}
+

+
/// Tracking configuration.
+
pub struct Config {
+
    db: sql::Connection,
+
}
+

+
impl fmt::Debug for Config {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        write!(f, "Config(..)")
+
    }
+
}
+

+
impl Config {
+
    const SCHEMA: &str = include_str!("schema.sql");
+

+
    /// Open a policy store at the given path. Creates a new store if it
+
    /// doesn't exist.
+
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+
        let db = sql::Connection::open(path)?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self { db })
+
    }
+

+
    /// Create a new in-memory address book.
+
    pub fn memory() -> Result<Self, Error> {
+
        let db = sql::Connection::open(":memory:")?;
+
        db.execute(Self::SCHEMA)?;
+

+
        Ok(Self { db })
+
    }
+

+
    /// Track a node.
+
    pub fn track_node(&mut self, id: &NodeId, alias: Option<&str>) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `node-policies` (id, alias)
+
             VALUES (?1, ?2)
+
             ON CONFLICT DO UPDATE
+
             SET alias = ?2 WHERE alias != ?2",
+
        )?;
+

+
        stmt.bind((1, id))?;
+
        stmt.bind((2, alias.unwrap_or_default()))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Track a repository.
+
    pub fn track_repo(&mut self, id: &Id, scope: Scope) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `repo-policies` (id, scope)
+
             VALUES (?1, ?2)
+
             ON CONFLICT DO UPDATE
+
             SET scope = ?2 WHERE scope != ?2",
+
        )?;
+

+
        stmt.bind((1, id))?;
+
        stmt.bind((2, scope))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Set a node's tracking policy.
+
    pub fn set_node_policy(&mut self, id: &NodeId, policy: Policy) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `node-policies` (id, policy)
+
             VALUES (?1, ?2)
+
             ON CONFLICT DO UPDATE
+
             SET policy = ?2 WHERE policy != ?2",
+
        )?;
+

+
        stmt.bind((1, id))?;
+
        stmt.bind((2, policy))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Set a repository's tracking policy.
+
    pub fn set_repo_policy(&mut self, id: &Id, policy: Policy) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `repo-policies` (id, policy)
+
             VALUES (?1, ?2)
+
             ON CONFLICT DO UPDATE
+
             SET policy = ?2 WHERE policy != ?2",
+
        )?;
+

+
        stmt.bind((1, id))?;
+
        stmt.bind((2, policy))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Untrack a node.
+
    pub fn untrack_node(&mut self, id: &NodeId) -> Result<bool, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM `node-policies` WHERE id = ?")?;
+

+
        stmt.bind((1, id))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Untrack a repository.
+
    pub fn untrack_repo(&mut self, id: &Id) -> Result<bool, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM `repo-policies` WHERE id = ?")?;
+

+
        stmt.bind((1, id))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Check if a node is tracked.
+
    pub fn is_node_tracked(&self, id: &NodeId) -> Result<bool, Error> {
+
        Ok(matches!(self.node_entry(id)?, Some((_, Policy::Track))))
+
    }
+

+
    /// Check if a repository is tracked.
+
    pub fn is_repo_tracked(&self, id: &Id) -> Result<bool, Error> {
+
        Ok(matches!(self.repo_entry(id)?, Some((_, Policy::Track))))
+
    }
+

+
    /// Get a node's tracking information.
+
    pub fn node_entry(&self, id: &NodeId) -> Result<Option<(Option<Alias>, Policy)>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT alias, policy FROM `node-policies` WHERE id = ?")?;
+

+
        stmt.bind((1, id))?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            let alias = row.read::<&str, _>("alias");
+

+
            return Ok(Some((
+
                if alias.is_empty() {
+
                    None
+
                } else {
+
                    Some(alias.to_owned())
+
                },
+
                row.read::<Policy, _>("policy"),
+
            )));
+
        }
+
        Ok(None)
+
    }
+

+
    /// Get a repository's tracking information.
+
    pub fn repo_entry(&self, id: &Id) -> Result<Option<(Scope, Policy)>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT scope, policy FROM `repo-policies` WHERE id = ?")?;
+

+
        stmt.bind((1, id))?;
+

+
        if let Some(Ok(row)) = stmt.into_iter().next() {
+
            return Ok(Some((
+
                row.read::<Scope, _>("scope"),
+
                row.read::<Policy, _>("policy"),
+
            )));
+
        }
+
        Ok(None)
+
    }
+

+
    /// Get node tracking entries.
+
    pub fn node_entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, Alias)>>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT id, alias FROM `node-policies`")?
+
            .into_iter();
+
        let mut entries = Vec::new();
+

+
        while let Some(Ok(row)) = stmt.next() {
+
            let id = row.read("id");
+
            let alias = row.read::<&str, _>("alias");
+

+
            entries.push((id, alias.to_owned()));
+
        }
+
        Ok(Box::new(entries.into_iter()))
+
    }
+

+
    /// Get repository tracking entries.
+
    pub fn repo_entries(&self) -> Result<Box<dyn Iterator<Item = (Id, Scope)>>, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("SELECT id, scope FROM `repo-policies`")?
+
            .into_iter();
+
        let mut entries = Vec::new();
+

+
        while let Some(Ok(row)) = stmt.next() {
+
            let id = row.read("id");
+
            let scope = row.read("scope");
+

+
            entries.push((id, scope));
+
        }
+
        Ok(Box::new(entries.into_iter()))
+
    }
+
}
+

+
#[cfg(test)]
+
mod test {
+
    use radicle::assert_matches;
+

+
    use super::*;
+
    use crate::test::arbitrary;
+

+
    #[test]
+
    fn test_track_and_untrack_node() {
+
        let id = arbitrary::gen::<NodeId>(1);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        assert!(db.track_node(&id, Some("eve")).unwrap());
+
        assert!(db.is_node_tracked(&id).unwrap());
+
        assert!(!db.track_node(&id, Some("eve")).unwrap());
+
        assert!(db.untrack_node(&id).unwrap());
+
        assert!(!db.is_node_tracked(&id).unwrap());
+
    }
+

+
    #[test]
+
    fn test_track_and_untrack_repo() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        assert!(db.track_repo(&id, Scope::All).unwrap());
+
        assert!(db.is_repo_tracked(&id).unwrap());
+
        assert!(!db.track_repo(&id, Scope::All).unwrap());
+
        assert!(db.untrack_repo(&id).unwrap());
+
        assert!(!db.is_repo_tracked(&id).unwrap());
+
    }
+

+
    #[test]
+
    fn test_node_entries() {
+
        let ids = arbitrary::vec::<NodeId>(3);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            assert!(db.track_node(id, None).unwrap());
+
        }
+
        let mut entries = db.node_entries().unwrap();
+
        assert_matches!(entries.next(), Some((id, _)) if id == ids[0]);
+
        assert_matches!(entries.next(), Some((id, _)) if id == ids[1]);
+
        assert_matches!(entries.next(), Some((id, _)) if id == ids[2]);
+
    }
+

+
    #[test]
+
    fn test_repo_entries() {
+
        let ids = arbitrary::vec::<Id>(3);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        for id in &ids {
+
            assert!(db.track_repo(id, Scope::All).unwrap());
+
        }
+
        let mut entries = db.repo_entries().unwrap();
+
        assert_matches!(entries.next(), Some((id, _)) if id == ids[0]);
+
        assert_matches!(entries.next(), Some((id, _)) if id == ids[1]);
+
        assert_matches!(entries.next(), Some((id, _)) if id == ids[2]);
+
    }
+

+
    #[test]
+
    fn test_update_alias() {
+
        let id = arbitrary::gen::<NodeId>(1);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        assert!(db.track_node(&id, Some("eve")).unwrap());
+
        assert_eq!(
+
            db.node_entry(&id).unwrap().unwrap().0,
+
            Some(String::from("eve"))
+
        );
+
        assert!(db.track_node(&id, None).unwrap());
+
        assert_eq!(db.node_entry(&id).unwrap().unwrap().0, None);
+
        assert!(!db.track_node(&id, None).unwrap());
+
        assert!(db.track_node(&id, Some("alice")).unwrap());
+
        assert_eq!(
+
            db.node_entry(&id).unwrap().unwrap().0,
+
            Some(String::from("alice"))
+
        );
+
    }
+

+
    #[test]
+
    fn test_update_scope() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        assert!(db.track_repo(&id, Scope::All).unwrap());
+
        assert_eq!(db.repo_entry(&id).unwrap().unwrap().0, Scope::All);
+
        assert!(db.track_repo(&id, Scope::DelegatesOnly).unwrap());
+
        assert_eq!(db.repo_entry(&id).unwrap().unwrap().0, Scope::DelegatesOnly);
+
    }
+

+
    #[test]
+
    fn test_repo_policy() {
+
        let id = arbitrary::gen::<Id>(1);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        assert!(db.track_repo(&id, Scope::All).unwrap());
+
        assert_eq!(db.repo_entry(&id).unwrap().unwrap().1, Policy::Track);
+
        assert!(db.set_repo_policy(&id, Policy::Block).unwrap());
+
        assert_eq!(db.repo_entry(&id).unwrap().unwrap().1, Policy::Block);
+
    }
+

+
    #[test]
+
    fn test_node_policy() {
+
        let id = arbitrary::gen::<NodeId>(1);
+
        let mut db = Config::open(":memory:").unwrap();
+

+
        assert!(db.track_node(&id, None).unwrap());
+
        assert_eq!(db.node_entry(&id).unwrap().unwrap().1, Policy::Track);
+
        assert!(db.set_node_policy(&id, Policy::Block).unwrap());
+
        assert_eq!(db.node_entry(&id).unwrap().unwrap().1, Policy::Block);
+
    }
+
}
modified radicle-node/src/test/peer.rs
@@ -98,7 +98,17 @@ where
        let local_time = LocalTime::now();
        let clock = RefClock::from(local_time);
        let routing = routing::Table::memory().unwrap();
-
        let service = Service::new(config, clock, routing, storage, addrs, signer, rng.clone());
+
        let tracking = tracking::Config::memory().unwrap();
+
        let service = Service::new(
+
            config,
+
            clock,
+
            routing,
+
            storage,
+
            addrs,
+
            tracking,
+
            signer,
+
            rng.clone(),
+
        );
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, rng.u16(..));

modified radicle-node/src/tests.rs
@@ -343,10 +343,7 @@ fn test_inventory_pruning() {
fn test_tracking() {
    let mut alice = Peer::config(
        "alice",
-
        Config {
-
            project_tracking: ProjectTracking::Allowed(HashSet::default()),
-
            ..Config::default()
-
        },
+
        Config::default(),
        [7, 7, 7, 7],
        MockStorage::empty(),
        address::Book::memory().unwrap(),
@@ -362,7 +359,7 @@ fn test_tracking() {
        .map_err(client::handle::Error::from)
        .unwrap();
    assert!(policy_change);
-
    assert!(alice.config().is_tracking(&proj_id));
+
    assert!(alice.tracking().is_repo_tracked(&proj_id).unwrap());

    let (sender, receiver) = chan::bounded(1);
    alice.command(Command::Untrack(proj_id, sender));
@@ -371,7 +368,7 @@ fn test_tracking() {
        .map_err(client::handle::Error::from)
        .unwrap();
    assert!(policy_change);
-
    assert!(!alice.config().is_tracking(&proj_id));
+
    assert!(!alice.tracking().is_repo_tracked(&proj_id).unwrap());
}

#[test]
@@ -559,9 +556,9 @@ fn test_refs_announcement_relay() {
    };
    let bob_inv = bob.inventory().unwrap();

-
    alice.track(bob_inv[0]);
-
    alice.track(bob_inv[1]);
-
    alice.track(bob_inv[2]);
+
    alice.track(&bob_inv[0], tracking::Scope::All).unwrap();
+
    alice.track(&bob_inv[1], tracking::Scope::All).unwrap();
+
    alice.track(&bob_inv[2], tracking::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(&eve.addr(), Message::Subscribe(Subscribe::all()));
@@ -601,7 +598,7 @@ fn test_refs_announcement_no_subscribe() {
    let eve = Peer::new("eve", [9, 9, 9, 9], MockStorage::empty());
    let id = arbitrary::gen(1);

-
    alice.track(id);
+
    alice.track(&id, tracking::Scope::All).unwrap();
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(&bob.addr(), bob.refs_announcement(id));
@@ -847,16 +844,6 @@ fn test_push_and_pull() {
    alice.command(service::Command::Connect(eve.addr()));
    bob.command(service::Command::Connect(eve.addr()));

-
    let mut sim = Simulation::new(
-
        LocalTime::now(),
-
        alice.rng.clone(),
-
        simulator::Options::default(),
-
    )
-
    .initialize([&mut alice, &mut bob, &mut eve]);
-

-
    // Here we expect Alice to connect to Eve.
-
    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());
-

    // Alice creates a new project.
    let (proj_id, _, _) = rad::init(
        &repo,
@@ -876,7 +863,17 @@ fn test_push_and_pull() {
    let (sender, _) = chan::bounded(1);
    eve.command(service::Command::Track(proj_id, sender));

-
    // Neither of them have it in the beginning.
+
    let mut sim = Simulation::new(
+
        LocalTime::now(),
+
        alice.rng.clone(),
+
        simulator::Options::default(),
+
    )
+
    .initialize([&mut alice, &mut bob, &mut eve]);
+

+
    // Here we expect Alice to connect to Eve.
+
    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());
+

+
    // Neither Eve nor Bob have Alice's project for now.
    assert!(eve.get(proj_id).unwrap().is_none());
    assert!(bob.get(proj_id).unwrap().is_none());

modified radicle/src/sql.rs
@@ -18,7 +18,7 @@ impl TryFrom<&Value> for Id {
            }),
            _ => Err(sql::Error {
                code: None,
-
                message: None,
+
                message: Some("sql: invalid type for id".to_owned()),
            }),
        }
    }
@@ -44,7 +44,7 @@ impl TryFrom<&Value> for node::Features {
            Value::Integer(bits) => Ok(node::Features::from(*bits as u64)),
            _ => Err(sql::Error {
                code: None,
-
                message: None,
+
                message: Some("sql: invalid type for node features".to_owned()),
            }),
        }
    }