Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Persist historical gossip messages
cloudhead committed 2 years ago
commit f9df360171b64ffa7e3cbe96a3145330dabc3d86
parent 823a7e7c52a21d4450a1326df8a5ed315163c811
10 files changed +506 -216
modified radicle-crypto/src/lib.rs
@@ -442,6 +442,44 @@ impl sqlite::BindableWithIndex for &PublicKey {
    }
}

+
#[cfg(feature = "sqlite")]
+
impl From<&Signature> for sqlite::Value {
+
    fn from(sig: &Signature) -> Self {
+
        sqlite::Value::Binary(sig.to_vec())
+
    }
+
}
+

+
#[cfg(feature = "sqlite")]
+
impl TryFrom<&sqlite::Value> for Signature {
+
    type Error = sqlite::Error;
+

+
    fn try_from(value: &sqlite::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sqlite::Value::Binary(s) => ed25519::Signature::from_slice(s)
+
                .map_err(|e| sqlite::Error {
+
                    code: None,
+
                    message: Some(e.to_string()),
+
                })
+
                .map(Self),
+
            _ => Err(sqlite::Error {
+
                code: None,
+
                message: Some("sql: invalid column type for signature".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
#[cfg(feature = "sqlite")]
+
impl sqlite::BindableWithIndex for &Signature {
+
    fn bind<I: sqlite::ParameterIndex>(
+
        self,
+
        stmt: &mut sqlite::Statement<'_>,
+
        i: I,
+
    ) -> sqlite::Result<()> {
+
        sqlite::Value::from(self).bind(stmt, i)
+
    }
+
}
+

pub mod keypair {
    use super::*;

modified radicle-node/src/runtime.rs
@@ -27,7 +27,7 @@ use crate::control;
use crate::crypto::Signer;
use crate::node::{routing, NodeId};
use crate::service::message::NodeAnnouncement;
-
use crate::service::{tracking, Event};
+
use crate::service::{gossip, tracking, Event};
use crate::wire::Wire;
use crate::wire::{self, Decode};
use crate::worker;
@@ -48,6 +48,9 @@ pub enum Error {
    /// A tracking database error.
    #[error("tracking database error: {0}")]
    Tracking(#[from] tracking::Error),
+
    /// A gossip database error.
+
    #[error("gossip database error: {0}")]
+
    Gossip(#[from] gossip::Error),
    /// An I/O error.
    #[error("i/o error: {0}")]
    Io(#[from] io::Error),
@@ -141,7 +144,10 @@ impl Runtime {
        let policy = config.policy;

        log::info!(target: "node", "Opening address book {}..", address_db.display());
-
        let mut addresses = address::Book::open(address_db)?;
+
        let mut addresses = address::Book::open(address_db.clone())?;
+

+
        log::info!(target: "node", "Opening gossip store from {}..", address_db.display());
+
        let gossip = gossip::Store::open(address_db)?; // Nb. same database as address book.

        log::info!(target: "node", "Opening routing table {}..", routing_db.display());
        let routing = routing::Table::open(routing_db)?;
@@ -204,6 +210,7 @@ impl Runtime {
            routing,
            storage.clone(),
            addresses,
+
            gossip,
            tracking,
            signer.clone(),
            rng,
modified radicle-node/src/service.rs
@@ -1,14 +1,16 @@
#![allow(clippy::too_many_arguments)]
#![allow(clippy::collapsible_match)]
#![allow(clippy::collapsible_if)]
+
#![warn(clippy::unwrap_used)]
pub mod filter;
+
pub mod gossip;
pub mod io;
pub mod limitter;
pub mod message;
pub mod session;

use std::collections::hash_map::Entry;
-
use std::collections::{BTreeMap, HashMap, HashSet};
+
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, time};
@@ -50,7 +52,6 @@ pub use crate::service::session::Session;

pub use radicle::node::tracking::config as tracking;

-
use self::gossip::Gossip;
use self::io::Outbox;
use self::limitter::RateLimiter;
use self::message::InventoryAnnouncement;
@@ -203,7 +204,7 @@ pub struct Service<R, A, S, G> {
    /// Tracking policy configuration.
    tracking: tracking::Config<Write>,
    /// State relating to gossip.
-
    gossip: Gossip,
+
    gossip: gossip::Store,
    /// Peer sessions, currently or recently connected.
    sessions: Sessions,
    /// Clock. Tells the time.
@@ -262,6 +263,7 @@ where
        routing: R,
        storage: S,
        addresses: A,
+
        gossip: gossip::Store,
        tracking: tracking::Config<Write>,
        signer: G,
        rng: Rng,
@@ -280,7 +282,7 @@ where
            node,
            clock,
            routing,
-
            gossip: Gossip::default(),
+
            gossip,
            outbox: Outbox::default(),
            limiter: RateLimiter::default(),
            sessions,
@@ -483,7 +485,7 @@ where
                .inventory()
                .and_then(|i| self.announce_inventory(i))
            {
-
                error!(target: "service", "Error announcing inventory: {}", err);
+
                error!(target: "service", "Error announcing inventory: {err}");
            }
            self.outbox.wakeup(ANNOUNCE_INTERVAL);
            self.last_announce = now;
@@ -492,8 +494,15 @@ where
            trace!(target: "service", "Running 'prune' task...");

            if let Err(err) = self.prune_routing_entries(&now) {
-
                error!("Error pruning routing entries: {}", err);
+
                error!(target: "service", "Error pruning routing entries: {err}");
            }
+
            if let Err(err) = self
+
                .gossip
+
                .prune((now - self.config.limits.gossip_max_age).as_millis())
+
            {
+
                error!(target: "service", "Error pruning gossip entries: {err}");
+
            }
+

            self.outbox.wakeup(PRUNE_INTERVAL);
            self.last_prune = now;
        }
@@ -886,26 +895,36 @@ where
        let now = self.clock;
        let timestamp = message.timestamp();
        let relay = self.config.relay;
-
        let peer = self
-
            .gossip
-
            .nodes
-
            .entry(*announcer)
-
            .or_insert_with(Node::default);

        // Don't allow messages from too far in the future.
        if timestamp.saturating_sub(now.as_millis()) > MAX_TIME_DELTA.as_millis() as u64 {
            return Err(session::Error::InvalidTimestamp(timestamp));
        }
-

-
        match message {
-
            AnnouncementMessage::Inventory(message) => {
-
                // Discard inventory messages we've already seen, otherwise update
-
                // out last seen time.
-
                if !peer.inventory_announced(announcement.clone()) {
+
        // Check ref signatures validity.
+
        if let AnnouncementMessage::Refs(message) = message {
+
            for theirs in message.refs.iter() {
+
                if theirs.verify(&theirs.id).is_err() {
+
                    warn!(target: "service", "Peer {relayer} relayed refs announcement with invalid signature for {}", theirs.id);
+
                    return Err(session::Error::Misbehavior);
+
                }
+
            }
+
        }
+
        // Discard announcement messages we've already seen, otherwise update out last seen time.
+
        match self.gossip.announced(announcer, announcement) {
+
            Ok(fresh) => {
+
                if !fresh {
                    trace!(target: "service", "Ignoring stale inventory announcement from {announcer} (t={})", self.time());
                    return Ok(false);
                }
+
            }
+
            Err(e) => {
+
                error!(target: "service", "Error updating gossip entry from {announcer}: {e}");
+
                return Ok(false);
+
            }
+
        }

+
        match message {
+
            AnnouncementMessage::Inventory(message) => {
                match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
                    Ok(synced) => {
                        if synced.is_empty() {
@@ -914,7 +933,7 @@ where
                        }
                    }
                    Err(e) => {
-
                        error!(target: "service", "Error processing inventory from {}: {}", announcer, e);
+
                        error!(target: "service", "Error processing inventory from {announcer}: {e}");
                        return Ok(false);
                    }
                }
@@ -957,13 +976,6 @@ where
            }
            // Process a peer inventory update announcement by (maybe) fetching.
            AnnouncementMessage::Refs(message) => {
-
                for theirs in message.refs.iter() {
-
                    if theirs.verify(&theirs.id).is_err() {
-
                        warn!(target: "service", "Peer {relayer} relayed refs announcement with invalid signature for {}", theirs.id);
-
                        return Err(session::Error::Misbehavior);
-
                    }
-
                }
-

                // We update inventories when receiving ref announcements, as these could come
                // from a new repository being initialized.
                if let Ok(result) =
@@ -978,12 +990,6 @@ where
                        info!(target: "service", "Routing table updated for {} with seed {announcer}", message.rid);
                    }
                }
-
                // Discard announcement messages we've already seen, otherwise update
-
                // our last seen time.
-
                if !peer.refs_announced(message.rid, announcement.clone()) {
-
                    trace!(target: "service", "Ignoring stale refs announcement from {announcer} (time={timestamp})");
-
                    return Ok(false);
-
                }

                // Check if the announcer is in sync with our own refs, and if so emit an event.
                // This event is used for showing sync progress to users.
@@ -1042,13 +1048,6 @@ where
                    ..
                },
            ) => {
-
                // Discard node messages we've already seen, otherwise update
-
                // our last seen time.
-
                if !peer.node_announced(announcement.clone()) {
-
                    trace!(target: "service", "Ignoring stale node announcement from {announcer}");
-
                    return Ok(false);
-
                }
-

                // If this node isn't a seed, we're not interested in adding it
                // to our address book, but other nodes may be, so we relay the message anyway.
                if !features.has(Features::SEED) {
@@ -1174,14 +1173,30 @@ where
                }
            }
            (session::State::Connected { .. }, Message::Subscribe(subscribe)) => {
-
                for ann in self
+
                // Filter announcements by interest.
+
                match self
                    .gossip
-
                    // Filter announcements by interest.
                    .filtered(&subscribe.filter, subscribe.since, subscribe.until)
-
                    // Don't send announcements authored by the remote, back to the remote.
-
                    .filter(|ann| &ann.node != remote)
                {
-
                    self.outbox.write(peer, ann.into());
+
                    Ok(anns) => {
+
                        for ann in anns {
+
                            let ann = match ann {
+
                                Ok(a) => a,
+
                                Err(e) => {
+
                                    error!(target: "service", "Error reading gossip message from store: {e}");
+
                                    continue;
+
                                }
+
                            };
+
                            // Don't send announcements authored by the remote, back to the remote.
+
                            if ann.node == *remote {
+
                                continue;
+
                            }
+
                            self.outbox.write(peer, ann.into());
+
                        }
+
                    }
+
                    Err(e) => {
+
                        error!(target: "service", "Error querying gossip messages from store: {e}");
+
                    }
                }
                peer.subscribe = Some(subscribe);
            }
@@ -1720,75 +1735,6 @@ pub enum LookupError {
    Repository(#[from] RepositoryError),
}

-
/// Keeps track of the most recent announcements of a node.
-
#[derive(Default, Debug)]
-
pub struct Node {
-
    /// Last ref announcements (per project).
-
    pub last_refs: HashMap<Id, Announcement>,
-
    /// Last inventory announcement.
-
    pub last_inventory: Option<Announcement>,
-
    /// Last node announcement.
-
    pub last_node: Option<Announcement>,
-
}
-

-
impl Node {
-
    /// Process a refs announcement for the given node.
-
    /// Returns `true` if the timestamp was updated.
-
    pub fn refs_announced(&mut self, id: Id, ann: Announcement) -> bool {
-
        match self.last_refs.entry(id) {
-
            Entry::Vacant(e) => {
-
                e.insert(ann);
-
                return true;
-
            }
-
            Entry::Occupied(mut e) => {
-
                let last = e.get_mut();
-

-
                if ann.timestamp() > last.timestamp() {
-
                    *last = ann;
-
                    return true;
-
                }
-
            }
-
        }
-
        false
-
    }
-

-
    /// Process an inventory announcement for the given node.
-
    /// Returns `true` if the timestamp was updated.
-
    pub fn inventory_announced(&mut self, ann: Announcement) -> bool {
-
        match &mut self.last_inventory {
-
            Some(last) => {
-
                if ann.timestamp() > last.timestamp() {
-
                    *last = ann;
-
                    return true;
-
                }
-
            }
-
            None => {
-
                self.last_inventory = Some(ann);
-
                return true;
-
            }
-
        }
-
        false
-
    }
-

-
    /// Process a node announcement for the given node.
-
    /// Returns `true` if the timestamp was updated.
-
    pub fn node_announced(&mut self, ann: Announcement) -> bool {
-
        match &mut self.last_node {
-
            Some(last) => {
-
                if ann.timestamp() > last.timestamp() {
-
                    *last = ann;
-
                    return true;
-
                }
-
            }
-
            None => {
-
                self.last_node = Some(ann);
-
                return true;
-
            }
-
        }
-
        false
-
    }
-
}
-

#[derive(Debug, Clone)]
/// Holds currently (or recently) connected peers.
pub struct Sessions(AddressBook<NodeId, Session>);
@@ -1842,100 +1788,3 @@ impl DerefMut for Sessions {
        &mut self.0
    }
}
-

-
pub mod gossip {
-
    use super::*;
-
    use crate::service::filter::Filter;
-

-
    #[derive(Default, Debug)]
-
    pub struct Gossip {
-
        // FIXME: This should be loaded from the address store.
-
        /// Keeps track of node announcements.
-
        pub nodes: BTreeMap<NodeId, Node>,
-
    }
-

-
    impl Gossip {
-
        pub fn filtered<'a>(
-
            &'a self,
-
            filter: &'a Filter,
-
            start: Timestamp,
-
            end: Timestamp,
-
        ) -> impl Iterator<Item = Announcement> + '_ {
-
            self.nodes
-
                .values()
-
                .flat_map(|n| {
-
                    [&n.last_node, &n.last_inventory]
-
                        .into_iter()
-
                        .flatten()
-
                        .chain(n.last_refs.values())
-
                        .cloned()
-
                        .collect::<Vec<_>>()
-
                })
-
                .filter(move |ann| ann.timestamp() >= start && ann.timestamp() < end)
-
                .filter(move |ann| ann.matches(filter))
-
        }
-
    }
-

-
    pub fn handshake<G: Signer, S: ReadStorage>(
-
        node: NodeAnnouncement,
-
        now: Timestamp,
-
        storage: &S,
-
        signer: &G,
-
        filter: Filter,
-
    ) -> Vec<Message> {
-
        let inventory = match storage.inventory() {
-
            Ok(i) => i,
-
            Err(e) => {
-
                error!("Error getting local inventory for handshake: {}", e);
-
                // Other than crashing the node completely, there's nothing we can do
-
                // here besides returning an empty inventory and logging an error.
-
                vec![]
-
            }
-
        };
-

-
        vec![
-
            Message::node(node, signer),
-
            Message::inventory(gossip::inventory(now, inventory), signer),
-
            Message::subscribe(
-
                filter,
-
                now - SUBSCRIBE_BACKLOG_DELTA.as_millis() as u64,
-
                Timestamp::MAX,
-
            ),
-
        ]
-
    }
-

-
    pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
-
        let features = config.features();
-
        let alias = config.alias.clone();
-
        let addresses: BoundedVec<_, ADDRESS_LIMIT> = config
-
            .external_addresses
-
            .clone()
-
            .try_into()
-
            .expect("external addresses are within the limit");
-

-
        NodeAnnouncement {
-
            features,
-
            timestamp,
-
            alias,
-
            addresses,
-
            nonce: 0,
-
        }
-
    }
-

-
    pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement {
-
        type Inventory = BoundedVec<Id, INVENTORY_LIMIT>;
-

-
        if inventory.len() > Inventory::max() {
-
            error!(
-
                target: "service",
-
                "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects",
-
                inventory.len()
-
            );
-
        }
-

-
        InventoryAnnouncement {
-
            inventory: BoundedVec::truncate(inventory),
-
            timestamp,
-
        }
-
    }
-
}
added radicle-node/src/service/gossip.rs
@@ -0,0 +1,70 @@
+
pub mod store;
+

+
use super::*;
+
use crate::service::filter::Filter;
+

+
pub use store::Error;
+
pub use store::GossipStore as Store;
+

+
pub fn handshake<G: Signer, S: ReadStorage>(
+
    node: NodeAnnouncement,
+
    now: Timestamp,
+
    storage: &S,
+
    signer: &G,
+
    filter: Filter,
+
) -> Vec<Message> {
+
    let inventory = match storage.inventory() {
+
        Ok(i) => i,
+
        Err(e) => {
+
            error!("Error getting local inventory for handshake: {}", e);
+
            // Other than crashing the node completely, there's nothing we can do
+
            // here besides returning an empty inventory and logging an error.
+
            vec![]
+
        }
+
    };
+

+
    vec![
+
        Message::node(node, signer),
+
        Message::inventory(gossip::inventory(now, inventory), signer),
+
        Message::subscribe(
+
            filter,
+
            now - SUBSCRIBE_BACKLOG_DELTA.as_millis() as u64,
+
            Timestamp::MAX,
+
        ),
+
    ]
+
}
+

+
pub fn node(config: &Config, timestamp: Timestamp) -> NodeAnnouncement {
+
    let features = config.features();
+
    let alias = config.alias.clone();
+
    let addresses: BoundedVec<_, ADDRESS_LIMIT> = config
+
        .external_addresses
+
        .clone()
+
        .try_into()
+
        .expect("external addresses are within the limit");
+

+
    NodeAnnouncement {
+
        features,
+
        timestamp,
+
        alias,
+
        addresses,
+
        nonce: 0,
+
    }
+
}
+

+
pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement {
+
    type Inventory = BoundedVec<Id, INVENTORY_LIMIT>;
+

+
    if inventory.len() > Inventory::max() {
+
        error!(
+
            target: "service",
+
            "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects",
+
            inventory.len()
+
        );
+
    }
+

+
    InventoryAnnouncement {
+
        inventory: BoundedVec::truncate(inventory),
+
        timestamp,
+
    }
+
}
added radicle-node/src/service/gossip/store.rs
@@ -0,0 +1,279 @@
+
use std::{fmt, io, path::Path};
+

+
use radicle::crypto::Signature;
+
use sqlite as sql;
+
use thiserror::Error;
+

+
use crate::node::NodeId;
+
use crate::prelude::{Filter, Timestamp};
+
use crate::service::message::{
+
    Announcement, AnnouncementMessage, InventoryAnnouncement, NodeAnnouncement, RefsAnnouncement,
+
};
+
use crate::wire;
+
use crate::wire::Decode;
+

+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// I/O error.
+
    #[error("i/o error: {0}")]
+
    Io(#[from] io::Error),
+
    /// An Internal error.
+
    #[error("internal error: {0}")]
+
    Internal(#[from] sql::Error),
+
}
+

+
/// Keeps track of the latest received gossip messages for each node.
+
/// Grows linearly with the number of nodes on the network.
+
pub struct GossipStore {
+
    db: sql::Connection,
+
}
+

+
impl fmt::Debug for GossipStore {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        f.debug_struct("GossipStore").finish()
+
    }
+
}
+

+
impl GossipStore {
+
    /// Open a gossip store at the given path. Creates a new store if it doesn't exist.
+
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
+
        let db = sql::Connection::open_with_flags(
+
            path,
+
            sqlite::OpenFlags::new().set_read_write().set_full_mutex(),
+
        )?;
+

+
        Ok(Self { db })
+
    }
+

+
    /// Prune announcements older than the cutoff time.
+
    pub fn prune(&mut self, cutoff: Timestamp) -> Result<usize, Error> {
+
        let mut stmt = self
+
            .db
+
            .prepare("DELETE FROM `announcements` WHERE timestamp < ?1")?;
+

+
        stmt.bind((1, cutoff.try_into().unwrap_or(i64::MAX)))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count())
+
    }
+

+
    /// Process an announcement for the given node.
+
    /// Returns `true` if the timestamp was updated or the announcement wasn't there before.
+
    pub fn announced(&mut self, nid: &NodeId, ann: &Announcement) -> Result<bool, Error> {
+
        let mut stmt = self.db.prepare(
+
            "INSERT INTO `announcements` (node, repo, type, message, signature, timestamp)
+
             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
+
             ON CONFLICT DO UPDATE
+
             SET message = ?4, signature = ?5, timestamp = ?6
+
             WHERE timestamp < ?6",
+
        )?;
+
        stmt.bind((1, nid))?;
+

+
        match &ann.message {
+
            AnnouncementMessage::Node(msg) => {
+
                stmt.bind((2, sql::Value::String(String::new())))?;
+
                stmt.bind((3, &GossipType::Node))?;
+
                stmt.bind((4, msg))?;
+
            }
+
            AnnouncementMessage::Refs(msg) => {
+
                stmt.bind((2, &msg.rid))?;
+
                stmt.bind((3, &GossipType::Refs))?;
+
                stmt.bind((4, msg))?;
+
            }
+
            AnnouncementMessage::Inventory(msg) => {
+
                stmt.bind((2, sql::Value::String(String::new())))?;
+
                stmt.bind((3, &GossipType::Inventory))?;
+
                stmt.bind((4, msg))?;
+
            }
+
        }
+
        stmt.bind((5, &ann.signature))?;
+
        stmt.bind((6, ann.message.timestamp().try_into().unwrap_or(i64::MAX)))?;
+
        stmt.next()?;
+

+
        Ok(self.db.change_count() > 0)
+
    }
+

+
    /// Get all the latest gossip messages of all nodes, filtered by inventory filter and
+
    /// announcement timestamps.
+
    ///
+
    /// # Panics
+
    ///
+
    /// Panics if `from` > `to`.
+
    ///
+
    pub fn filtered<'a>(
+
        &'a self,
+
        filter: &'a Filter,
+
        from: Timestamp,
+
        to: Timestamp,
+
    ) -> Result<impl Iterator<Item = Result<Announcement, Error>> + 'a, Error> {
+
        let mut stmt = self.db.prepare(
+
            "SELECT node, type, message, signature, timestamp
+
             FROM announcements
+
             WHERE timestamp >= ?1 and timestamp < ?2
+
             ORDER BY timestamp",
+
        )?;
+
        assert!(from <= to);
+

+
        stmt.bind((1, i64::try_from(from).unwrap_or(i64::MAX)))?;
+
        stmt.bind((2, i64::try_from(to).unwrap_or(i64::MAX)))?;
+

+
        Ok(stmt
+
            .into_iter()
+
            .map(|row| {
+
                let row = row?;
+
                let node = row.read::<NodeId, _>("node");
+
                let gt = row.read::<GossipType, _>("type");
+
                let message = match gt {
+
                    GossipType::Refs => {
+
                        let ann = row.read::<RefsAnnouncement, _>("message");
+
                        AnnouncementMessage::Refs(ann)
+
                    }
+
                    GossipType::Inventory => {
+
                        let ann = row.read::<InventoryAnnouncement, _>("message");
+
                        AnnouncementMessage::Inventory(ann)
+
                    }
+
                    GossipType::Node => {
+
                        let ann = row.read::<NodeAnnouncement, _>("message");
+
                        AnnouncementMessage::Node(ann)
+
                    }
+
                };
+
                let signature = row.read::<Signature, _>("signature");
+
                let timestamp = row.read::<i64, _>("timestamp");
+

+
                debug_assert_eq!(timestamp, message.timestamp() as i64);
+

+
                Ok(Announcement {
+
                    node,
+
                    message,
+
                    signature,
+
                })
+
            })
+
            .filter(|ann| match ann {
+
                Ok(a) => a.matches(filter),
+
                Err(_) => true,
+
            }))
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for NodeAnnouncement {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::Binary(bytes) => {
+
                let mut reader = io::Cursor::new(bytes);
+
                NodeAnnouncement::decode(&mut reader).map_err(wire::Error::into)
+
            }
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for node announcement".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for &NodeAnnouncement {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        wire::serialize(self).bind(stmt, i)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for RefsAnnouncement {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::Binary(bytes) => {
+
                let mut reader = io::Cursor::new(bytes);
+
                RefsAnnouncement::decode(&mut reader).map_err(wire::Error::into)
+
            }
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for refs announcement".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for &RefsAnnouncement {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        wire::serialize(self).bind(stmt, i)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for InventoryAnnouncement {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::Binary(bytes) => {
+
                let mut reader = io::Cursor::new(bytes);
+
                InventoryAnnouncement::decode(&mut reader).map_err(wire::Error::into)
+
            }
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for inventory announcement".to_owned()),
+
            }),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for &InventoryAnnouncement {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        wire::serialize(self).bind(stmt, i)
+
    }
+
}
+

+
impl From<wire::Error> for sql::Error {
+
    fn from(other: wire::Error) -> Self {
+
        sql::Error {
+
            code: None,
+
            message: Some(other.to_string()),
+
        }
+
    }
+
}
+

+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
enum GossipType {
+
    Refs,
+
    Node,
+
    Inventory,
+
}
+

+
impl fmt::Display for GossipType {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::Refs => write!(f, "refs"),
+
            Self::Node => write!(f, "node"),
+
            Self::Inventory => write!(f, "inventory"),
+
        }
+
    }
+
}
+

+
impl sql::BindableWithIndex for &GossipType {
+
    fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
+
        self.to_string().as_str().bind(stmt, i)
+
    }
+
}
+

+
impl TryFrom<&sql::Value> for GossipType {
+
    type Error = sql::Error;
+

+
    fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
+
        match value {
+
            sql::Value::String(s) => match s.as_str() {
+
                "refs" => Ok(Self::Refs),
+
                "node" => Ok(Self::Node),
+
                "inventory" => Ok(Self::Inventory),
+
                other => Err(sql::Error {
+
                    code: None,
+
                    message: Some(format!("unknown gossip type '{other}'")),
+
                }),
+
            },
+
            _ => Err(sql::Error {
+
                code: None,
+
                message: Some("sql: invalid type for gossip type".to_owned()),
+
            }),
+
        }
+
    }
+
}
modified radicle-node/src/service/message.rs
@@ -525,6 +525,7 @@ impl ZeroBytes {
}

#[cfg(test)]
+
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;
    use crate::prelude::*;
modified radicle-node/src/test/peer.rs
@@ -99,26 +99,33 @@ where
pub struct Config<G: Signer + 'static> {
    pub config: service::Config,
    pub addrs: address::Book,
+
    pub gossip: gossip::Store,
    pub local_time: LocalTime,
    pub policy: Policy,
    pub scope: Scope,
    pub signer: G,
    pub rng: fastrand::Rng,
+
    pub tmp: tempfile::TempDir,
}

impl Default for Config<MockSigner> {
    fn default() -> Self {
        let mut rng = fastrand::Rng::new();
        let signer = MockSigner::new(&mut rng);
+
        let tmp = tempfile::TempDir::new().unwrap();
+
        let addrs = address::Book::open(tmp.path().join("addresses.db")).unwrap();
+
        let gossip = gossip::Store::open(tmp.path().join("addresses.db")).unwrap();

        Config {
            config: service::Config::test(Alias::from_str("mocky").unwrap()),
-
            addrs: address::Book::memory().unwrap(),
+
            addrs,
+
            gossip,
            local_time: LocalTime::now(),
            policy: Policy::default(),
            scope: Scope::default(),
            signer,
            rng,
+
            tmp,
        }
    }
}
@@ -157,7 +164,6 @@ where
        let routing = routing::Table::memory().unwrap();
        let tracking = tracking::Store::<tracking::store::Write>::memory().unwrap();
        let mut tracking = tracking::Config::new(config.policy, config.scope, tracking);
-
        let tempdir = tempfile::tempdir().unwrap();
        let id = *config.signer.public_key();
        let ip = ip.into();
        let local_addr = net::SocketAddr::new(ip, config.rng.u16(..));
@@ -176,6 +182,7 @@ where
            routing,
            storage,
            config.addrs,
+
            config.gossip,
            tracking,
            config.signer,
            config.rng.clone(),
@@ -191,7 +198,7 @@ where
            local_addr,
            rng: config.rng,
            initialized: false,
-
            tempdir,
+
            tempdir: config.tmp,
        }
    }

modified radicle/src/node/address/schema.sql
@@ -37,3 +37,26 @@ create table if not exists "addresses" (
  unique ("node", "type", "value")
  --
) strict;
+

+
create table if not exists "announcements" (
+
  -- Node ID.
+
  "node"               text      not null references "nodes" ("id"),
+
  -- Repo ID, if any, for example in ref announcements.
+
  "repo"               text      not null,
+
  -- Announcement type.
+
  --
+
  -- Valid values are:
+
  --
+
  -- "refs"
+
  -- "node"
+
  -- "inventory"
+
  "type"               text      not null,
+
  -- Announcement message in wire format (binary).
+
  "message"            blob      not null,
+
  -- Signature over message.
+
  "signature"          blob      not null,
+
  -- Announcement timestamp.
+
  "timestamp"          integer   not null,
+
  --
+
  unique ("node", "repo", "type")
+
) strict;
modified radicle/src/node/address/store.rs
@@ -38,13 +38,25 @@ impl fmt::Debug for Book {
    }
}

+
impl From<sql::Connection> for Book {
+
    fn from(db: sql::Connection) -> Self {
+
        Self { db }
+
    }
+
}
+

impl Book {
    const SCHEMA: &str = include_str!("schema.sql");

    /// Open an address book at the given path. Creates a new address book if it
    /// doesn't exist.
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
-
        let db = sql::Connection::open(path)?;
+
        let db = sql::Connection::open_with_flags(
+
            path,
+
            sqlite::OpenFlags::new()
+
                .set_create()
+
                .set_read_write()
+
                .set_full_mutex(),
+
        )?;
        db.execute(Self::SCHEMA)?;

        Ok(Self { db })
modified radicle/src/node/config.rs
@@ -56,6 +56,9 @@ pub struct Limits {
    /// How long to keep a routing table entry before being pruned.
    #[serde(with = "crate::serde_ext::localtime::duration")]
    pub routing_max_age: LocalDuration,
+
    /// How long to keep a gossip message entry before pruning it.
+
    #[serde(with = "crate::serde_ext::localtime::duration")]
+
    pub gossip_max_age: LocalDuration,
    /// Maximum number of concurrent fetches per per connection.
    pub fetch_concurrency: usize,
    /// Rate limitter settings.
@@ -67,7 +70,8 @@ impl Default for Limits {
    fn default() -> Self {
        Self {
            routing_max_size: 1000,
-
            routing_max_age: LocalDuration::from_mins(7 * 24 * 60),
+
            routing_max_age: LocalDuration::from_mins(7 * 24 * 60), // One week
+
            gossip_max_age: LocalDuration::from_mins(2 * 7 * 24 * 60), // Two weeks
            fetch_concurrency: 1,
            rate: RateLimits::default(),
        }